jpskill.com
🛠️ 開発・MCP コミュニティ

prefect

Prefectは、Pythonで作られたデータ処理の流れを効率的に管理するツールで、処理の定義や実行、監視などを簡単に行えるようにし、データパイプラインの構築と運用を支援するSkill。

📜 元の英語説明(参考)

Prefect is a modern workflow orchestration framework for Python data pipelines. Learn to define flows and tasks with decorators, handle retries and scheduling, create deployments, and monitor via the Prefect UI.

🇯🇵 日本人クリエイター向け解説

一言でいうと

Prefectは、Pythonで作られたデータ処理の流れを効率的に管理するツールで、処理の定義や実行、監視などを簡単に行えるようにし、データパイプラインの構築と運用を支援するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o prefect.zip https://jpskill.com/download/15283.zip && unzip -o prefect.zip && rm prefect.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/15283.zip -OutFile "$d\prefect.zip"; Expand-Archive "$d\prefect.zip" -DestinationPath $d -Force; ri "$d\prefect.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して prefect.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → prefect フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

Prefect

Prefect は、最小限のボイラープレートで Python 関数を監視可能でスケジュール可能なワークフローに変えます。@flow および @task デコレータを追加すると、リトライ、ロギング、キャッシング、および監視 UI が利用可能になります。

インストール

# Prefect をインストール
pip install prefect

# ローカルの Prefect サーバー (UI + API) を起動
prefect server start
# UI は http://localhost:4200 で利用可能

# または、Prefect Cloud (マネージド) を使用
prefect cloud login

基本的な Flow

# flows/hello.py: タスクを含むシンプルな flow
from prefect import flow, task, get_run_logger
from datetime import timedelta

@task(retries=3, retry_delay_seconds=10)
def fetch_data(url: str) -> dict:
    import httpx
    logger = get_run_logger()
    logger.info(f"Fetching {url}")
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()

@task(cache_expiration=timedelta(hours=1))
def transform(data: dict) -> list:
    return [
        {"id": item["id"], "value": item["amount"] * 100}
        for item in data["results"]
    ]

@task
def load(records: list) -> int:
    logger = get_run_logger()
    logger.info(f"Loading {len(records)} records")
    # データベースに挿入...
    return len(records)

@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(api_url: str = "https://api.example.com/data"):
    raw = fetch_data(api_url)
    cleaned = transform(raw)
    count = load(cleaned)
    print(f"Processed {count} records")
    return count

if __name__ == "__main__":
    etl_pipeline()

スケジューリングとデプロイメント

# flows/deploy.py: スケジュールによるデプロイメントの作成
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

@flow
def daily_report():
    print("Generating daily report...")

if __name__ == "__main__":
    # Python 経由でデプロイ
    daily_report.serve(
        name="daily-report-deployment",
        cron="0 8 * * *",  # 毎日午前8時
        tags=["reporting"],
        parameters={"param1": "value1"},
    )
# deploy.sh: CLI 経由でデプロイおよび管理
# flow ファイルからデプロイメントを作成
prefect deploy flows/hello.py:etl_pipeline \
  --name etl-prod \
  --pool default-agent-pool \
  --cron "*/30 * * * *"

# デプロイメントを実行するワーカーを起動
prefect worker start --pool default-agent-pool

# デプロイメントの実行をトリガー
prefect deployment run "etl-pipeline/etl-prod" --param api_url=https://api.example.com

エラー処理と並行処理

# flows/advanced.py: 並行タスク、エラー処理、およびサブ flow
from prefect import flow, task
from prefect.tasks import task_input_hash
import asyncio

@task(
    retries=2,
    retry_delay_seconds=[10, 60],  # 指数バックオフ
    cache_key_fn=task_input_hash,
    timeout_seconds=300,
)
def process_item(item_id: int) -> dict:
    # 単一のアイテムを処理
    return {"id": item_id, "status": "done"}

@flow
def batch_process(item_ids: list[int]):
    # タスクを並行して送信
    futures = [process_item.submit(id) for id in item_ids]
    results = [f.result() for f in futures]

    succeeded = [r for r in results if r["status"] == "done"]
    print(f"Processed {len(succeeded)}/{len(item_ids)} items")

@flow
async def async_pipeline():
    # I/O バウンドの作業のための非同期 flow
    results = await asyncio.gather(
        fetch_from_api("source_a"),
        fetch_from_api("source_b"),
    )
    return results

Blocks とインフラストラクチャ

# flows/blocks.py: 再利用可能な構成のために blocks を使用
from prefect.blocks.system import Secret, JSON
from prefect_sqlalchemy import SqlAlchemyConnector

# シークレットを保存 (UI または CLI 経由で設定)
# prefect block register -m prefect_sqlalchemy
# 次に、http://localhost:4200/blocks の UI で構成

# flow で使用
@flow
def db_flow():
    api_key = Secret.load("my-api-key").get()
    config = JSON.load("pipeline-config").value

    with SqlAlchemyConnector.load("prod-db") as conn:
        result = conn.fetch_all("SELECT count(*) FROM users")
        print(result)

通知

# flows/notifications.py: 失敗時にアラートを送信
from prefect import flow
from prefect.blocks.notifications import SlackWebhook

@flow
def monitored_flow():
    try:
        # ... 作業を実行
        pass
    except Exception as e:
        slack = SlackWebhook.load("alerts-channel")
        slack.notify(f"❌ Pipeline failed: {e}")
        raise

# または、Prefect UI で自動化を使用:
# Automations → Create → Trigger: Flow run failed → Action: Send Slack notification

CLI リファレンス

# cli.sh: 一般的な Prefect CLI コマンド
# 接続を確認
prefect version
prefect config view

# flow とデプロイメントをリスト
prefect flow-run ls
prefect deployment ls

# ログを表示
prefect flow-run logs <flow-run-id>

# ワークプールを管理
prefect work-pool create my-pool --type process
prefect work-pool ls
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Prefect

Prefect turns Python functions into observable, schedulable workflows with minimal boilerplate. Add @flow and @task decorators to get retries, logging, caching, and a monitoring UI.

Installation

# Install Prefect
pip install prefect

# Start the local Prefect server (UI + API)
prefect server start
# UI at http://localhost:4200

# Or use Prefect Cloud (managed)
prefect cloud login

Basic Flow

# flows/hello.py: Simple flow with tasks
from prefect import flow, task, get_run_logger
from datetime import timedelta

@task(retries=3, retry_delay_seconds=10)
def fetch_data(url: str) -> dict:
    import httpx
    logger = get_run_logger()
    logger.info(f"Fetching {url}")
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()

@task(cache_expiration=timedelta(hours=1))
def transform(data: dict) -> list:
    return [
        {"id": item["id"], "value": item["amount"] * 100}
        for item in data["results"]
    ]

@task
def load(records: list) -> int:
    logger = get_run_logger()
    logger.info(f"Loading {len(records)} records")
    # Insert into database...
    return len(records)

@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(api_url: str = "https://api.example.com/data"):
    raw = fetch_data(api_url)
    cleaned = transform(raw)
    count = load(cleaned)
    print(f"Processed {count} records")
    return count

if __name__ == "__main__":
    etl_pipeline()

Scheduling and Deployments

# flows/deploy.py: Create a deployment with schedule
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

@flow
def daily_report():
    print("Generating daily report...")

if __name__ == "__main__":
    # Deploy via Python
    daily_report.serve(
        name="daily-report-deployment",
        cron="0 8 * * *",  # Every day at 8 AM
        tags=["reporting"],
        parameters={"param1": "value1"},
    )
# deploy.sh: Deploy and manage via CLI
# Create deployment from flow file
prefect deploy flows/hello.py:etl_pipeline \
  --name etl-prod \
  --pool default-agent-pool \
  --cron "*/30 * * * *"

# Start a worker to execute deployments
prefect worker start --pool default-agent-pool

# Trigger a deployment run
prefect deployment run "etl-pipeline/etl-prod" --param api_url=https://api.example.com

Error Handling and Concurrency

# flows/advanced.py: Concurrent tasks, error handling, and sub-flows
from prefect import flow, task
from prefect.tasks import task_input_hash
import asyncio

@task(
    retries=2,
    retry_delay_seconds=[10, 60],  # Exponential backoff
    cache_key_fn=task_input_hash,
    timeout_seconds=300,
)
def process_item(item_id: int) -> dict:
    # Process a single item
    return {"id": item_id, "status": "done"}

@flow
def batch_process(item_ids: list[int]):
    # Submit tasks concurrently
    futures = [process_item.submit(id) for id in item_ids]
    results = [f.result() for f in futures]

    succeeded = [r for r in results if r["status"] == "done"]
    print(f"Processed {len(succeeded)}/{len(item_ids)} items")

@flow
async def async_pipeline():
    # Async flow for I/O-bound work
    results = await asyncio.gather(
        fetch_from_api("source_a"),
        fetch_from_api("source_b"),
    )
    return results

Blocks and Infrastructure

# flows/blocks.py: Use blocks for reusable configuration
from prefect.blocks.system import Secret, JSON
from prefect_sqlalchemy import SqlAlchemyConnector

# Store secrets (set via UI or CLI)
# prefect block register -m prefect_sqlalchemy
# Then configure in UI at http://localhost:4200/blocks

# Use in flows
@flow
def db_flow():
    api_key = Secret.load("my-api-key").get()
    config = JSON.load("pipeline-config").value

    with SqlAlchemyConnector.load("prod-db") as conn:
        result = conn.fetch_all("SELECT count(*) FROM users")
        print(result)

Notifications

# flows/notifications.py: Send alerts on failure
from prefect import flow
from prefect.blocks.notifications import SlackWebhook

@flow
def monitored_flow():
    try:
        # ... do work
        pass
    except Exception as e:
        slack = SlackWebhook.load("alerts-channel")
        slack.notify(f"❌ Pipeline failed: {e}")
        raise

# Or use automations in Prefect UI:
# Automations → Create → Trigger: Flow run failed → Action: Send Slack notification

CLI Reference

# cli.sh: Common Prefect CLI commands
# Check connection
prefect version
prefect config view

# List flows and deployments
prefect flow-run ls
prefect deployment ls

# View logs
prefect flow-run logs <flow-run-id>

# Manage work pools
prefect work-pool create my-pool --type process
prefect work-pool ls