airflow-hitl
Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o airflow-hitl.zip https://jpskill.com/download/23192.zip && unzip -o airflow-hitl.zip && rm airflow-hitl.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/23192.zip -OutFile "$d\airflow-hitl.zip"; Expand-Archive "$d\airflow-hitl.zip" -DestinationPath $d -Force; ri "$d\airflow-hitl.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
airflow-hitl.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
airflow-hitlフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Claude が読む原文 SKILL.md(中身を展開)
この本文は AI(Claude)が読むための原文(英語または中国語)です。日本語訳は順次追加中。
Airflow Human-in-the-Loop Operators
Pause a DAG until a human responds via the Airflow UI or REST API. HITL operators are deferrable — they release their worker slot while waiting.
Requires Airflow 3.1+ (
af config version).UI location: Browse → Required Actions. Respond from the task instance page's Required Actions tab.
Cross-references:
airflow-aifor AI/LLM task decorators;airflowfor registry and API discovery commands used below.
Step 1 — Pick the capability you need
| Capability | Class (verify in Step 2) |
|---|---|
| Approve or reject; downstream skips on reject | ApprovalOperator |
| Present N options and return which were chosen | HITLOperator |
| Branch to one or more downstream tasks based on a choice | HITLBranchOperator |
| Collect a form (no approve/select step) | HITLEntryOperator |
| Use the HITL trigger directly (advanced / custom operators) | HITLTrigger |
This is the only place class names are hardcoded. The provider adds, renames, and removes params across releases — do not copy parameter lists from memory. Fetch the current signature before writing code.
Step 2 — Discover the current signatures from the Airflow Registry
Before writing HITL code, run these to see the live roster and constructor params (see the airflow skill for the full af registry reference):
# Every HITL-related module in the standard provider
af registry modules standard \
| jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}'
# Constructor signatures: name, type, default, required, description
af registry parameters standard \
| jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}'
# Pin to the exact installed provider version
af config providers \
| jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version'
# then: af registry parameters standard --version <VERSION>
If the registry shows a param that this skill does not mention, prefer the registry. If the registry shows a class that is not in Step 1, treat it as additive — the decision table above may be stale.
Step 3 — Canonical example (approval gate)
Starting point for any HITL task. Adapt by swapping the class name and params per Step 2.
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve", # Auto-selected on timeout
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()
For the other classes in Step 1, the shape is the same (task_id, subject, plus class-specific params). Verify each constructor through Step 2 — for example, HITLBranchOperator requires every option either to match a downstream task id directly or to be resolved via a mapping param surfaced in the registry.
Step 4 — Behavior contracts (stable across versions)
Timeout
- With
defaultsset: task succeeds on timeout, default option(s) selected. - Without
defaults: task fails on timeout.
Markdown + Jinja in body
body supports Markdown and is Jinja-templatable. Render XCom context directly:
body = """**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}
| Category | Amount |
|----------|--------|
| Marketing | $1M |
"""
Callbacks
All HITL operators accept the standard Airflow callback kwargs (on_success_callback, on_failure_callback, etc.).
Notifiers
HITL operators accept a notifiers list. Inside a notifier's notify(context) method, build a link to the pending task with HITLOperator.generate_link_to_ui_from_context(context, base_url=...).
Restricting who can respond
The parameter name and accepted identifier format depend on the active auth manager. Do not hardcode — check which one is active and which kwarg the current provider exposes:
af config show | jq '.auth_manager // .core.auth_manager'
Then look up the current kwarg in Step 2 (at the time of writing it is assigned_users, accepting identifiers in whatever format the active auth manager uses — Astro uses the Astro user ID, FabAuthManager uses email, SimpleAuthManager uses username).
Step 5 — Responding from external integrations
For Slack bots, custom apps, or scripts. Discover the live endpoint rather than hardcoding a path:
af api ls --filter hitl # live endpoint list
af api spec \
| jq '.paths | to_entries[] | select(.key | test("hitl"))' # request/response schemas
The PATCH-to-respond pattern is stable; the exact path is discovered. Typical shape:
import os, requests
HOST = os.environ["AIRFLOW_HOST"]
TOKEN = os.environ["AIRFLOW_API_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}
# List pending — use the path from `af api ls --filter hitl`
requests.get(f"{HOST}/<path>", headers=HEADERS, params={"state": "pending"})
# Respond — same discovered path family, PATCH
requests.patch(
f"{HOST}/<path>/{dag_id}/{run_id}/{task_id}",
headers=HEADERS,
json={"chosen_options": ["Approve"], "params_input": {"comments": "ok"}},
)
Step 6 — Safety checks
- [ ] Airflow version ≥ 3.1 (
af config version). - [ ] Constructor kwargs match the current registry output from Step 2 — no
respondents-vs-assigned_usersstyle drift. - [ ] For branching: every option resolves to a downstream task id (directly or via the mapping kwarg from Step 2).
- [ ] Every value in
defaultsis also inoptions. - [ ]
execution_timeoutset;defaultsconfigured if timeout should succeed rather than fail. - [ ] API token configured if external responders are part of the flow.
References
The upstream docs URL is surfaced per-module by the registry — do not hardcode:
af registry modules standard \
| jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, docs_url}'
Related skills
- airflow —
af registry,af api,af configcommand reference. - airflow-ai — AI/LLM task decorators and GenAI patterns.
- authoring-dags — general DAG writing best practices.
- testing-dags — iterative test → debug → fix cycles.