jpskill.com
🛠️ 開発・MCP コミュニティ

dagster

Dagsterは、ソフトウェア定義アセットという考え方に基づいたデータパイプラインのオーケストレーターであり、保守しやすいデータプラットフォームを構築するために、アセット、オペレーション、ジョブなどを定義するSkill。

📜 元の英語説明(参考)

Dagster is a data pipeline orchestrator built around the concept of software-defined assets. Learn to define assets, ops, jobs, schedules, sensors, and resources for building maintainable data platforms.

🇯🇵 日本人クリエイター向け解説

一言でいうと

Dagsterは、ソフトウェア定義アセットという考え方に基づいたデータパイプラインのオーケストレーターであり、保守しやすいデータプラットフォームを構築するために、アセット、オペレーション、ジョブなどを定義するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o dagster.zip https://jpskill.com/download/14812.zip && unzip -o dagster.zip && rm dagster.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14812.zip -OutFile "$d\dagster.zip"; Expand-Archive "$d\dagster.zip" -DestinationPath $d -Force; ri "$d\dagster.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して dagster.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → dagster フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

Dagster

Dagster は、ソフトウェア定義アセットを中心にデータパイプラインを構成します。ソフトウェア定義アセットとは、パイプラインが生成するデータアーティファクトの宣言のことです。アセットは、リネージを追跡し、インクリメンタルな計算を可能にし、Dagster UI と統合します。

インストール

# Dagster と UI をインストール
pip install dagster dagster-webserver

# 新しいプロジェクトを作成
dagster project scaffold --name my_pipeline
cd my_pipeline
pip install -e ".[dev]"

# 開発サーバーを起動
dagster dev
# UI は http://localhost:3000 で利用可能

ソフトウェア定義アセット

# my_pipeline/assets.py: データを生成するアセットを定義
from dagster import asset, AssetExecutionContext
import pandas as pd

@asset(group_name="raw")
def raw_users(context: AssetExecutionContext) -> pd.DataFrame:
    """API から生のユーザーデータを取得します。"""
    import httpx
    response = httpx.get("https://api.example.com/users")
    df = pd.DataFrame(response.json())
    context.log.info(f"Fetched {len(df)} users")
    return df

@asset(group_name="raw")
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
    """API から生の注文データを取得します。"""
    import httpx
    response = httpx.get("https://api.example.com/orders")
    return pd.DataFrame(response.json())

@asset(group_name="analytics", deps=[raw_users, raw_orders])
def revenue_by_user(raw_users: pd.DataFrame, raw_orders: pd.DataFrame) -> pd.DataFrame:
    """ユーザーごとの総収益を計算します。"""
    merged = raw_orders.merge(raw_users, left_on="user_id", right_on="id")
    result = (
        merged.groupby(["user_id", "name"])
        .agg(total_revenue=("amount", "sum"), order_count=("id_x", "count"))
        .reset_index()
    )
    return result

リソース

# my_pipeline/resources.py: 外部システム用の構成可能なリソース
from dagster import resource, ConfigurableResource
import sqlalchemy

class DatabaseResource(ConfigurableResource):
    connection_string: str

    def query(self, sql: str) -> list:
        engine = sqlalchemy.create_engine(self.connection_string)
        with engine.connect() as conn:
            result = conn.execute(sqlalchemy.text(sql))
            return [dict(row._mapping) for row in result]

    def execute(self, sql: str):
        engine = sqlalchemy.create_engine(self.connection_string)
        with engine.connect() as conn:
            conn.execute(sqlalchemy.text(sql))
            conn.commit()

リソースを持つアセット

# my_pipeline/db_assets.py: データベースリソースを使用するアセット
from dagster import asset, AssetExecutionContext
from .resources import DatabaseResource

@asset(group_name="warehouse")
def dim_users(context: AssetExecutionContext, database: DatabaseResource):
    """クレンジングされたユーザーディメンションテーブルをウェアハウスにロードします。"""
    users = database.query("SELECT id, name, email, created_at FROM raw_users")
    context.log.info(f"Loaded {len(users)} users into warehouse")
    return users

定義

# my_pipeline/__init__.py: すべてをまとめて接続
from dagster import Definitions, load_assets_from_modules
from . import assets, db_assets
from .resources import DatabaseResource

all_assets = load_assets_from_modules([assets, db_assets])

defs = Definitions(
    assets=all_assets,
    resources={
        "database": DatabaseResource(
            connection_string="postgresql://user:pass@localhost:5432/analytics"
        ),
    },
)

スケジュールとセンサー

# my_pipeline/schedules.py: 時間ベースおよびイベントベースのトリガー
from dagster import (
    ScheduleDefinition,
    define_asset_job,
    sensor,
    RunRequest,
    SensorEvaluationContext,
    AssetSelection,
)

# 特定のアセットを具体化するジョブ
analytics_job = define_asset_job(
    name="analytics_job",
    selection=AssetSelection.groups("analytics"),
)

# Cron スケジュール
daily_analytics = ScheduleDefinition(
    job=analytics_job,
    cron_schedule="0 6 * * *",  # 毎日午前6時
)

# センサー — 外部イベントでトリガー
@sensor(job=analytics_job, minimum_interval_seconds=60)
def new_file_sensor(context: SensorEvaluationContext):
    import os
    files = os.listdir("/data/incoming")
    new_files = [f for f in files if f.endswith(".csv")]
    if new_files:
        context.log.info(f"Found {len(new_files)} new files")
        yield RunRequest(run_key=new_files[0])

パーティション化されたアセット

# my_pipeline/partitioned.py: インクリメンタル処理のための時間パーティション化されたアセット
from dagster import asset, DailyPartitionsDefinition

daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(partitions_def=daily_partitions, group_name="raw")
def daily_events(context):
    """特定の日付パーティションのイベントを取得します。"""
    date = context.partition_key  # 例: "2026-02-19"
    context.log.info(f"Processing events for {date}")
    # この日付のデータのみを取得
    return fetch_events(date)

CLI リファレンス

# cli.sh: 一般的な Dagster CLI コマンド
# 開発サーバー
dagster dev

# アセットを具体化
dagster asset materialize --select raw_users,raw_orders

# アセットをリスト表示
dagster asset list

# ジョブを実行
dagster job execute -j analytics_job

# 定義をチェック
dagster definitions validate
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Dagster

Dagster organizes data pipelines around software-defined assets — declarations of the data artifacts your pipeline produces. Assets track lineage, enable incremental computation, and integrate with the Dagster UI.

Installation

# Install Dagster and UI
pip install dagster dagster-webserver

# Create a new project
dagster project scaffold --name my_pipeline
cd my_pipeline
pip install -e ".[dev]"

# Start the dev server
dagster dev
# UI at http://localhost:3000

Software-Defined Assets

# my_pipeline/assets.py: Define assets that produce data
from dagster import asset, AssetExecutionContext
import pandas as pd

@asset(group_name="raw")
def raw_users(context: AssetExecutionContext) -> pd.DataFrame:
    """Fetch raw user data from API."""
    import httpx
    response = httpx.get("https://api.example.com/users")
    df = pd.DataFrame(response.json())
    context.log.info(f"Fetched {len(df)} users")
    return df

@asset(group_name="raw")
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
    """Fetch raw order data from API."""
    import httpx
    response = httpx.get("https://api.example.com/orders")
    return pd.DataFrame(response.json())

@asset(group_name="analytics", deps=[raw_users, raw_orders])
def revenue_by_user(raw_users: pd.DataFrame, raw_orders: pd.DataFrame) -> pd.DataFrame:
    """Calculate total revenue per user."""
    merged = raw_orders.merge(raw_users, left_on="user_id", right_on="id")
    result = (
        merged.groupby(["user_id", "name"])
        .agg(total_revenue=("amount", "sum"), order_count=("id_x", "count"))
        .reset_index()
    )
    return result

Resources

# my_pipeline/resources.py: Configurable resources for external systems
from dagster import resource, ConfigurableResource
import sqlalchemy

class DatabaseResource(ConfigurableResource):
    connection_string: str

    def query(self, sql: str) -> list:
        engine = sqlalchemy.create_engine(self.connection_string)
        with engine.connect() as conn:
            result = conn.execute(sqlalchemy.text(sql))
            return [dict(row._mapping) for row in result]

    def execute(self, sql: str):
        engine = sqlalchemy.create_engine(self.connection_string)
        with engine.connect() as conn:
            conn.execute(sqlalchemy.text(sql))
            conn.commit()

Assets with Resources

# my_pipeline/db_assets.py: Assets that use database resources
from dagster import asset, AssetExecutionContext
from .resources import DatabaseResource

@asset(group_name="warehouse")
def dim_users(context: AssetExecutionContext, database: DatabaseResource):
    """Load cleaned user dimension table into warehouse."""
    users = database.query("SELECT id, name, email, created_at FROM raw_users")
    context.log.info(f"Loaded {len(users)} users into warehouse")
    return users

Definitions

# my_pipeline/__init__.py: Wire everything together
from dagster import Definitions, load_assets_from_modules
from . import assets, db_assets
from .resources import DatabaseResource

all_assets = load_assets_from_modules([assets, db_assets])

defs = Definitions(
    assets=all_assets,
    resources={
        "database": DatabaseResource(
            connection_string="postgresql://user:pass@localhost:5432/analytics"
        ),
    },
)

Schedules and Sensors

# my_pipeline/schedules.py: Time-based and event-based triggers
from dagster import (
    ScheduleDefinition,
    define_asset_job,
    sensor,
    RunRequest,
    SensorEvaluationContext,
    AssetSelection,
)

# Job that materializes specific assets
analytics_job = define_asset_job(
    name="analytics_job",
    selection=AssetSelection.groups("analytics"),
)

# Cron schedule
daily_analytics = ScheduleDefinition(
    job=analytics_job,
    cron_schedule="0 6 * * *",  # 6 AM daily
)

# Sensor — trigger on external event
@sensor(job=analytics_job, minimum_interval_seconds=60)
def new_file_sensor(context: SensorEvaluationContext):
    import os
    files = os.listdir("/data/incoming")
    new_files = [f for f in files if f.endswith(".csv")]
    if new_files:
        context.log.info(f"Found {len(new_files)} new files")
        yield RunRequest(run_key=new_files[0])

Partitioned Assets

# my_pipeline/partitioned.py: Time-partitioned assets for incremental processing
from dagster import asset, DailyPartitionsDefinition

daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(partitions_def=daily_partitions, group_name="raw")
def daily_events(context):
    """Fetch events for a specific date partition."""
    date = context.partition_key  # e.g., "2026-02-19"
    context.log.info(f"Processing events for {date}")
    # Fetch only this date's data
    return fetch_events(date)

CLI Reference

# cli.sh: Common Dagster CLI commands
# Development server
dagster dev

# Materialize assets
dagster asset materialize --select raw_users,raw_orders

# List assets
dagster asset list

# Run a job
dagster job execute -j analytics_job

# Check definitions
dagster definitions validate