prefect
Prefectは、Pythonで作られたデータ処理の流れを効率的に管理するツールで、処理の定義や実行、監視などを簡単に行えるようにし、データパイプラインの構築と運用を支援するSkill。
📜 元の英語説明(参考)
Prefect is a modern workflow orchestration framework for Python data pipelines. Learn to define flows and tasks with decorators, handle retries and scheduling, create deployments, and monitor via the Prefect UI.
🇯🇵 日本人クリエイター向け解説
Prefectは、Pythonで作られたデータ処理の流れを効率的に管理するツールで、処理の定義や実行、監視などを簡単に行えるようにし、データパイプラインの構築と運用を支援するSkill。
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o prefect.zip https://jpskill.com/download/15283.zip && unzip -o prefect.zip && rm prefect.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/15283.zip -OutFile "$d\prefect.zip"; Expand-Archive "$d\prefect.zip" -DestinationPath $d -Force; ri "$d\prefect.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
prefect.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
prefectフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
Prefect
Prefect は、最小限のボイラープレートで Python 関数を監視可能でスケジュール可能なワークフローに変えます。@flow および @task デコレータを追加すると、リトライ、ロギング、キャッシング、および監視 UI が利用可能になります。
インストール
# Prefect をインストール
pip install prefect
# ローカルの Prefect サーバー (UI + API) を起動
prefect server start
# UI は http://localhost:4200 で利用可能
# または、Prefect Cloud (マネージド) を使用
prefect cloud login
基本的な Flow
# flows/hello.py: タスクを含むシンプルな flow
from prefect import flow, task, get_run_logger
from datetime import timedelta
@task(retries=3, retry_delay_seconds=10)
def fetch_data(url: str) -> dict:
import httpx
logger = get_run_logger()
logger.info(f"Fetching {url}")
response = httpx.get(url)
response.raise_for_status()
return response.json()
@task(cache_expiration=timedelta(hours=1))
def transform(data: dict) -> list:
return [
{"id": item["id"], "value": item["amount"] * 100}
for item in data["results"]
]
@task
def load(records: list) -> int:
logger = get_run_logger()
logger.info(f"Loading {len(records)} records")
# データベースに挿入...
return len(records)
@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(api_url: str = "https://api.example.com/data"):
raw = fetch_data(api_url)
cleaned = transform(raw)
count = load(cleaned)
print(f"Processed {count} records")
return count
if __name__ == "__main__":
etl_pipeline()
スケジューリングとデプロイメント
# flows/deploy.py: スケジュールによるデプロイメントの作成
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
@flow
def daily_report():
print("Generating daily report...")
if __name__ == "__main__":
# Python 経由でデプロイ
daily_report.serve(
name="daily-report-deployment",
cron="0 8 * * *", # 毎日午前8時
tags=["reporting"],
parameters={"param1": "value1"},
)
# deploy.sh: CLI 経由でデプロイおよび管理
# flow ファイルからデプロイメントを作成
prefect deploy flows/hello.py:etl_pipeline \
--name etl-prod \
--pool default-agent-pool \
--cron "*/30 * * * *"
# デプロイメントを実行するワーカーを起動
prefect worker start --pool default-agent-pool
# デプロイメントの実行をトリガー
prefect deployment run "etl-pipeline/etl-prod" --param api_url=https://api.example.com
エラー処理と並行処理
# flows/advanced.py: 並行タスク、エラー処理、およびサブ flow
from prefect import flow, task
from prefect.tasks import task_input_hash
import asyncio
@task(
retries=2,
retry_delay_seconds=[10, 60], # 指数バックオフ
cache_key_fn=task_input_hash,
timeout_seconds=300,
)
def process_item(item_id: int) -> dict:
# 単一のアイテムを処理
return {"id": item_id, "status": "done"}
@flow
def batch_process(item_ids: list[int]):
# タスクを並行して送信
futures = [process_item.submit(id) for id in item_ids]
results = [f.result() for f in futures]
succeeded = [r for r in results if r["status"] == "done"]
print(f"Processed {len(succeeded)}/{len(item_ids)} items")
@flow
async def async_pipeline():
# I/O バウンドの作業のための非同期 flow
results = await asyncio.gather(
fetch_from_api("source_a"),
fetch_from_api("source_b"),
)
return results
Blocks とインフラストラクチャ
# flows/blocks.py: 再利用可能な構成のために blocks を使用
from prefect.blocks.system import Secret, JSON
from prefect_sqlalchemy import SqlAlchemyConnector
# シークレットを保存 (UI または CLI 経由で設定)
# prefect block register -m prefect_sqlalchemy
# 次に、http://localhost:4200/blocks の UI で構成
# flow で使用
@flow
def db_flow():
api_key = Secret.load("my-api-key").get()
config = JSON.load("pipeline-config").value
with SqlAlchemyConnector.load("prod-db") as conn:
result = conn.fetch_all("SELECT count(*) FROM users")
print(result)
通知
# flows/notifications.py: 失敗時にアラートを送信
from prefect import flow
from prefect.blocks.notifications import SlackWebhook
@flow
def monitored_flow():
try:
# ... 作業を実行
pass
except Exception as e:
slack = SlackWebhook.load("alerts-channel")
slack.notify(f"❌ Pipeline failed: {e}")
raise
# または、Prefect UI で自動化を使用:
# Automations → Create → Trigger: Flow run failed → Action: Send Slack notification
CLI リファレンス
# cli.sh: 一般的な Prefect CLI コマンド
# 接続を確認
prefect version
prefect config view
# flow とデプロイメントをリスト
prefect flow-run ls
prefect deployment ls
# ログを表示
prefect flow-run logs <flow-run-id>
# ワークプールを管理
prefect work-pool create my-pool --type process
prefect work-pool ls 📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
Prefect
Prefect turns Python functions into observable, schedulable workflows with minimal boilerplate. Add @flow and @task decorators to get retries, logging, caching, and a monitoring UI.
Installation
# Install Prefect
pip install prefect
# Start the local Prefect server (UI + API)
prefect server start
# UI at http://localhost:4200
# Or use Prefect Cloud (managed)
prefect cloud login
Basic Flow
# flows/hello.py: Simple flow with tasks
from prefect import flow, task, get_run_logger
from datetime import timedelta
@task(retries=3, retry_delay_seconds=10)
def fetch_data(url: str) -> dict:
import httpx
logger = get_run_logger()
logger.info(f"Fetching {url}")
response = httpx.get(url)
response.raise_for_status()
return response.json()
@task(cache_expiration=timedelta(hours=1))
def transform(data: dict) -> list:
return [
{"id": item["id"], "value": item["amount"] * 100}
for item in data["results"]
]
@task
def load(records: list) -> int:
logger = get_run_logger()
logger.info(f"Loading {len(records)} records")
# Insert into database...
return len(records)
@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(api_url: str = "https://api.example.com/data"):
raw = fetch_data(api_url)
cleaned = transform(raw)
count = load(cleaned)
print(f"Processed {count} records")
return count
if __name__ == "__main__":
etl_pipeline()
Scheduling and Deployments
# flows/deploy.py: Create a deployment with schedule
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
@flow
def daily_report():
print("Generating daily report...")
if __name__ == "__main__":
# Deploy via Python
daily_report.serve(
name="daily-report-deployment",
cron="0 8 * * *", # Every day at 8 AM
tags=["reporting"],
parameters={"param1": "value1"},
)
# deploy.sh: Deploy and manage via CLI
# Create deployment from flow file
prefect deploy flows/hello.py:etl_pipeline \
--name etl-prod \
--pool default-agent-pool \
--cron "*/30 * * * *"
# Start a worker to execute deployments
prefect worker start --pool default-agent-pool
# Trigger a deployment run
prefect deployment run "etl-pipeline/etl-prod" --param api_url=https://api.example.com
Error Handling and Concurrency
# flows/advanced.py: Concurrent tasks, error handling, and sub-flows
from prefect import flow, task
from prefect.tasks import task_input_hash
import asyncio
@task(
retries=2,
retry_delay_seconds=[10, 60], # Exponential backoff
cache_key_fn=task_input_hash,
timeout_seconds=300,
)
def process_item(item_id: int) -> dict:
# Process a single item
return {"id": item_id, "status": "done"}
@flow
def batch_process(item_ids: list[int]):
# Submit tasks concurrently
futures = [process_item.submit(id) for id in item_ids]
results = [f.result() for f in futures]
succeeded = [r for r in results if r["status"] == "done"]
print(f"Processed {len(succeeded)}/{len(item_ids)} items")
@flow
async def async_pipeline():
# Async flow for I/O-bound work
results = await asyncio.gather(
fetch_from_api("source_a"),
fetch_from_api("source_b"),
)
return results
Blocks and Infrastructure
# flows/blocks.py: Use blocks for reusable configuration
from prefect.blocks.system import Secret, JSON
from prefect_sqlalchemy import SqlAlchemyConnector
# Store secrets (set via UI or CLI)
# prefect block register -m prefect_sqlalchemy
# Then configure in UI at http://localhost:4200/blocks
# Use in flows
@flow
def db_flow():
api_key = Secret.load("my-api-key").get()
config = JSON.load("pipeline-config").value
with SqlAlchemyConnector.load("prod-db") as conn:
result = conn.fetch_all("SELECT count(*) FROM users")
print(result)
Notifications
# flows/notifications.py: Send alerts on failure
from prefect import flow
from prefect.blocks.notifications import SlackWebhook
@flow
def monitored_flow():
try:
# ... do work
pass
except Exception as e:
slack = SlackWebhook.load("alerts-channel")
slack.notify(f"❌ Pipeline failed: {e}")
raise
# Or use automations in Prefect UI:
# Automations → Create → Trigger: Flow run failed → Action: Send Slack notification
CLI Reference
# cli.sh: Common Prefect CLI commands
# Check connection
prefect version
prefect config view
# List flows and deployments
prefect flow-run ls
prefect deployment ls
# View logs
prefect flow-run logs <flow-run-id>
# Manage work pools
prefect work-pool create my-pool --type process
prefect work-pool ls