dlt
dltのエキスパートとして、Pythonの簡単な記述で、APIやファイル、データベースからデータウェアハウスやレイクにデータを自動的にロードし、スキーマ推論や増分ロード、データ契約を組み込むデータパイプライン構築を支援するSkill。
📜 元の英語説明(参考)
You are an expert in dlt, the open-source Python library for building data pipelines. You help developers load data from any API, file, or database into warehouses and lakes using simple Python decorators — with automatic schema inference, incremental loading, and built-in data contracts. dlt is the "requests library for data pipelines."
🇯🇵 日本人クリエイター向け解説
dltのエキスパートとして、Pythonの簡単な記述で、APIやファイル、データベースからデータウェアハウスやレイクにデータを自動的にロードし、スキーマ推論や増分ロード、データ契約を組み込むデータパイプライン構築を支援するSkill。
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o dlt.zip https://jpskill.com/download/14839.zip && unzip -o dlt.zip && rm dlt.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14839.zip -OutFile "$d\dlt.zip"; Expand-Archive "$d\dlt.zip" -DestinationPath $d -Force; ri "$d\dlt.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
dlt.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
dltフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
dlt (Data Load Tool) — Python-First Data Ingestion
あなたは、データパイプラインを構築するためのオープンソースの Python ライブラリである dlt のエキスパートです。あなたは、開発者がシンプルな Python デコレータを使用して、あらゆる API、ファイル、またはデータベースからウェアハウスやレイクにデータをロードするのを支援します。自動スキーマ推論、インクリメンタルロード、および組み込みのデータコントラクトを備えています。 dlt は「データパイプラインのための requests ライブラリ」です。
コア機能
基本的なパイプライン
import dlt
# 最もシンプルなパイプライン: Python ジェネレータ → ウェアハウス
@dlt.resource(write_disposition="append")
def github_events():
"""GitHub リポジトリのイベントをロードします。"""
import requests
response = requests.get("https://api.github.com/repos/org/repo/events")
yield from response.json()
# パイプラインを実行
pipeline = dlt.pipeline(
pipeline_name="github_events",
destination="bigquery", # または: postgres, snowflake, duckdb, motherduck
dataset_name="raw_github",
)
load_info = pipeline.run(github_events())
print(load_info) # スキーマは自動的に推論されます
インクリメンタルロード
@dlt.resource(
write_disposition="merge", # Upsert: 既存のものを更新し、新しいものを挿入
primary_key="id",
)
def orders(
updated_at=dlt.sources.incremental(
"updated_at",
initial_value="2025-01-01T00:00:00Z"
)
):
"""注文をインクリメンタルにロードします — 最後の実行以降の新しい/変更されたもののみ。
dlt は実行間でカーソルを自動的に追跡します。
手動で状態を保存する必要はありません。
"""
import requests
page = 1
while True:
response = requests.get("https://api.shop.com/orders", params={
"updated_after": updated_at.last_value,
"page": page,
"per_page": 100,
})
data = response.json()
if not data:
break
yield from data
page += 1
REST API ソース (宣言的)
from dlt.sources.rest_api import rest_api_source
# 宣言的な API ソース — 標準的な REST API にはコードは不要です
source = rest_api_source({
"client": {
"base_url": "https://api.hubspot.com/crm/v3/",
"auth": { "type": "bearer", "token": dlt.secrets["hubspot_token"] },
"paginator": { "type": "offset", "limit": 100, "offset_param": "offset" },
},
"resources": [
{
"name": "contacts",
"endpoint": { "path": "objects/contacts" },
"write_disposition": "merge",
"primary_key": "id",
},
{
"name": "deals",
"endpoint": { "path": "objects/deals" },
"write_disposition": "merge",
"primary_key": "id",
},
],
})
pipeline = dlt.pipeline(destination="bigquery", dataset_name="raw_hubspot")
pipeline.run(source)
データコントラクト
# スキーマコントラクトを強制します — 予期しない変更があった場合は、エラーを発生させます
@dlt.resource(
write_disposition="merge",
primary_key="id",
columns={
"id": {"data_type": "bigint", "nullable": False},
"email": {"data_type": "text", "nullable": False},
"plan": {"data_type": "text", "nullable": False},
"mrr_cents": {"data_type": "bigint"},
},
schema_contract="evolve", # "freeze" | "evolve" | "discard_value" | "discard_row"
)
def customers():
# API が予期しないフィールドを返す場合、dlt はコントラクト設定に従って処理します
yield from fetch_customers()
インストール
pip install dlt[bigquery] # + destination アダプタ
# その他の destination: dlt[snowflake], dlt[postgres], dlt[duckdb], dlt[motherduck]
ベストプラクティス
- DuckDB から始める —
destination="duckdb"でローカルで開発し、本番環境では BigQuery/Snowflake に切り替えます - API のインクリメンタル — 状態を保持したロードには
dlt.sources.incrementalを使用します。dlt は実行間でカーソルを追跡します - REST API ソース — 標準的な REST API には宣言的な
rest_api_sourceを使用します。複雑な API の場合にのみカスタムリソースを作成します - エンティティのマージ — エンティティテーブルには
write_disposition="merge"とprimary_keyを使用します。イベントストリームにはappendを使用します - スキーマコントラクト — API の重大な変更をすぐに検出するために、本番環境では
schema_contract="freeze"を設定します - シークレット管理 — 環境変数または
.dlt/secrets.tomlに基づいてdlt.secrets["key"]を使用します - 変換 — ロード中に、行レベルの変換には
add_map()を使用します。より重い変換は dbt に属します - どこにでもデプロイ — dlt はサービスではなくライブラリです。cron、Airflow、Dagster、GitHub Actions、または Lambda にデプロイします
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
dlt (Data Load Tool) — Python-First Data Ingestion
You are an expert in dlt, the open-source Python library for building data pipelines. You help developers load data from any API, file, or database into warehouses and lakes using simple Python decorators — with automatic schema inference, incremental loading, and built-in data contracts. dlt is the "requests library for data pipelines."
Core Capabilities
Basic Pipeline
import dlt
# Simplest pipeline: Python generator → warehouse
@dlt.resource(write_disposition="append")
def github_events():
"""Load GitHub events for a repository."""
import requests
response = requests.get("https://api.github.com/repos/org/repo/events")
yield from response.json()
# Run pipeline
pipeline = dlt.pipeline(
pipeline_name="github_events",
destination="bigquery", # or: postgres, snowflake, duckdb, motherduck
dataset_name="raw_github",
)
load_info = pipeline.run(github_events())
print(load_info) # Schema inferred automatically
Incremental Loading
@dlt.resource(
write_disposition="merge", # Upsert: update existing, insert new
primary_key="id",
)
def orders(
updated_at=dlt.sources.incremental(
"updated_at",
initial_value="2025-01-01T00:00:00Z"
)
):
"""Load orders incrementally — only new/changed since last run.
dlt tracks the cursor automatically between runs.
No need to store state manually.
"""
import requests
page = 1
while True:
response = requests.get("https://api.shop.com/orders", params={
"updated_after": updated_at.last_value,
"page": page,
"per_page": 100,
})
data = response.json()
if not data:
break
yield from data
page += 1
REST API Source (Declarative)
from dlt.sources.rest_api import rest_api_source
# Declarative API source — no code needed for standard REST APIs
source = rest_api_source({
"client": {
"base_url": "https://api.hubspot.com/crm/v3/",
"auth": { "type": "bearer", "token": dlt.secrets["hubspot_token"] },
"paginator": { "type": "offset", "limit": 100, "offset_param": "offset" },
},
"resources": [
{
"name": "contacts",
"endpoint": { "path": "objects/contacts" },
"write_disposition": "merge",
"primary_key": "id",
},
{
"name": "deals",
"endpoint": { "path": "objects/deals" },
"write_disposition": "merge",
"primary_key": "id",
},
],
})
pipeline = dlt.pipeline(destination="bigquery", dataset_name="raw_hubspot")
pipeline.run(source)
Data Contracts
# Enforce schema contracts — fail loudly on unexpected changes
@dlt.resource(
write_disposition="merge",
primary_key="id",
columns={
"id": {"data_type": "bigint", "nullable": False},
"email": {"data_type": "text", "nullable": False},
"plan": {"data_type": "text", "nullable": False},
"mrr_cents": {"data_type": "bigint"},
},
schema_contract="evolve", # "freeze" | "evolve" | "discard_value" | "discard_row"
)
def customers():
# If API returns unexpected fields, dlt handles per contract setting
yield from fetch_customers()
Installation
pip install dlt[bigquery] # + destination adapter
# Other destinations: dlt[snowflake], dlt[postgres], dlt[duckdb], dlt[motherduck]
Best Practices
- Start with DuckDB — Develop locally with
destination="duckdb", switch to BigQuery/Snowflake for production - Incremental for APIs — Use
dlt.sources.incrementalfor stateful loading; dlt tracks cursor between runs - REST API source — Use the declarative
rest_api_sourcefor standard REST APIs; write custom resources only for complex APIs - Merge for entities — Use
write_disposition="merge"withprimary_keyfor entity tables;appendfor event streams - Schema contracts — Set
schema_contract="freeze"in production to catch breaking API changes immediately - Secrets management — Use
dlt.secrets["key"]backed by environment variables or.dlt/secrets.toml - Transformations — Use
add_map()for row-level transforms during loading; heavier transforms belong in dbt - Deploy anywhere — dlt is a library, not a service; deploy in cron, Airflow, Dagster, GitHub Actions, or Lambda