dbt
dbtは、SQLのSELECT文を使ってデータウェアハウス内のデータを変換し、プロジェクトの構築からモデル、テスト、ドキュメント作成、増分マテリアライズ、PostgreSQL、BigQuery、Snowflakeなどのデータウェアハウスとの連携までを学習するSkill。
📜 元の英語説明(参考)
dbt (data build tool) transforms data in your warehouse using SQL SELECT statements. Learn project setup, models, tests, documentation, incremental materializations, and integration with data warehouses like PostgreSQL, BigQuery, and Snowflake.
🇯🇵 日本人クリエイター向け解説
dbtは、SQLのSELECT文を使ってデータウェアハウス内のデータを変換し、プロジェクトの構築からモデル、テスト、ドキュメント作成、増分マテリアライズ、PostgreSQL、BigQuery、Snowflakeなどのデータウェアハウスとの連携までを学習するSkill。
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o dbt.zip https://jpskill.com/download/14823.zip && unzip -o dbt.zip && rm dbt.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14823.zip -OutFile "$d\dbt.zip"; Expand-Archive "$d\dbt.zip" -DestinationPath $d -Force; ri "$d\dbt.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
dbt.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
dbtフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
dbt
dbtを使用すると、分析エンジニアはSQLのSELECTステートメントを記述してデータを変換できます。dbtは、マテリアライゼーション(テーブル、ビュー、増分)、テスト、ドキュメント、およびリネージ追跡を処理します。
インストール
# PostgreSQLアダプターを使用してdbtをインストール
pip install dbt-postgres
# または他のアダプターを使用
pip install dbt-bigquery
pip install dbt-snowflake
# 新しいプロジェクトを初期化
dbt init my_project
cd my_project
プロジェクト構造
my_project/
├── dbt_project.yml # プロジェクト構成
├── profiles.yml # 接続プロファイル(通常は ~/.dbt/ にあります)
├── models/
│ ├── staging/ # 生データのクレンジング
│ │ ├── _staging.yml # スキーマ + ステージングモデルのテスト
│ │ ├── stg_users.sql
│ │ └── stg_orders.sql
│ └── marts/ # ビジネスロジック
│ ├── _marts.yml
│ └── fct_revenue.sql
├── tests/ # カスタムデータテスト
├── macros/ # 再利用可能なSQLマクロ
└── seeds/ # ロードするCSVファイル
構成
# dbt_project.yml: プロジェクト構成
name: my_project
version: '1.0.0'
profile: my_project
models:
my_project:
staging:
+materialized: view
+schema: staging
marts:
+materialized: table
+schema: analytics
# profiles.yml: データベース接続 (~/.dbt/profiles.yml)
my_project:
target: dev
outputs:
dev:
type: postgres
host: localhost
port: 5432
user: analyst
password: "{{ env_var('DBT_PASSWORD') }}"
dbname: analytics
schema: dev
threads: 4
prod:
type: postgres
host: prod-db.example.com
port: 5432
user: dbt_prod
password: "{{ env_var('DBT_PROD_PASSWORD') }}"
dbname: analytics
schema: public
threads: 8
ステージングモデル
-- models/staging/stg_users.sql: 生のユーザーデータをクレンジング
WITH source AS (
SELECT * FROM {{ source('raw', 'users') }}
),
cleaned AS (
SELECT
id AS user_id,
LOWER(TRIM(email)) AS email,
name,
created_at::timestamp AS signed_up_at,
CASE WHEN status = 'active' THEN TRUE ELSE FALSE END AS is_active
FROM source
WHERE email IS NOT NULL
)
SELECT * FROM cleaned
-- models/staging/stg_orders.sql: 生の注文データをクレンジング
SELECT
id AS order_id,
user_id,
amount_cents / 100.0 AS amount,
status,
created_at::timestamp AS ordered_at
FROM {{ source('raw', 'orders') }}
WHERE status != 'test'
マートモデル
-- models/marts/fct_revenue.sql: 収益ファクトテーブル
{{
config(
materialized='incremental',
unique_key='order_date',
on_schema_change='sync_all_columns'
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE ordered_at > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
),
daily AS (
SELECT
DATE_TRUNC('day', ordered_at)::date AS order_date,
COUNT(*) AS total_orders,
COUNT(DISTINCT user_id) AS unique_customers,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value
FROM orders
WHERE status = 'completed'
GROUP BY 1
)
SELECT * FROM daily
スキーマとテスト
# models/staging/_staging.yml: ソース、カラム、およびテストを定義
version: 2
sources:
- name: raw
schema: public
tables:
- name: users
loaded_at_field: created_at
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
- name: orders
models:
- name: stg_users
description: Cleaned user data
columns:
- name: user_id
tests: [unique, not_null]
- name: email
tests: [unique, not_null]
- name: stg_orders
columns:
- name: order_id
tests: [unique, not_null]
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
CLIコマンド
# commands.sh: 一般的なdbt CLIコマンド
# すべてのモデルを実行
dbt run
# 特定のモデルとその上流の依存関係を実行
dbt run --select +fct_revenue
# テストを実行
dbt test
# ドキュメントを生成して提供
dbt docs generate
dbt docs serve --port 8081
# ソースの鮮度を確認
dbt source freshness
# フルビルド(run + test + snapshot)
dbt build
# 本番環境に対して実行
dbt run --target prod
マクロ
-- macros/cents_to_dollars.sql: 通貨換算のための再利用可能なマクロ
{% macro cents_to_dollars(column_name) %}
({{ column_name }} / 100.0)::numeric(10,2)
{% endmacro %}
-- モデルでの使用例: SELECT {{ cents_to_dollars('amount_cents') }} AS amount 📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
dbt
dbt lets analytics engineers transform data by writing SQL SELECT statements. It handles materialization (tables, views, incremental), testing, documentation, and lineage tracking.
Installation
# Install dbt with PostgreSQL adapter
pip install dbt-postgres
# Or with other adapters
pip install dbt-bigquery
pip install dbt-snowflake
# Initialize a new project
dbt init my_project
cd my_project
Project Structure
my_project/
├── dbt_project.yml # Project configuration
├── profiles.yml # Connection profiles (usually in ~/.dbt/)
├── models/
│ ├── staging/ # Raw data cleaning
│ │ ├── _staging.yml # Schema + tests for staging models
│ │ ├── stg_users.sql
│ │ └── stg_orders.sql
│ └── marts/ # Business logic
│ ├── _marts.yml
│ └── fct_revenue.sql
├── tests/ # Custom data tests
├── macros/ # Reusable SQL macros
└── seeds/ # CSV files to load
Configuration
# dbt_project.yml: Project configuration
name: my_project
version: '1.0.0'
profile: my_project
models:
my_project:
staging:
+materialized: view
+schema: staging
marts:
+materialized: table
+schema: analytics
# profiles.yml: Database connection (~/.dbt/profiles.yml)
my_project:
target: dev
outputs:
dev:
type: postgres
host: localhost
port: 5432
user: analyst
password: "{{ env_var('DBT_PASSWORD') }}"
dbname: analytics
schema: dev
threads: 4
prod:
type: postgres
host: prod-db.example.com
port: 5432
user: dbt_prod
password: "{{ env_var('DBT_PROD_PASSWORD') }}"
dbname: analytics
schema: public
threads: 8
Staging Models
-- models/staging/stg_users.sql: Clean raw user data
WITH source AS (
SELECT * FROM {{ source('raw', 'users') }}
),
cleaned AS (
SELECT
id AS user_id,
LOWER(TRIM(email)) AS email,
name,
created_at::timestamp AS signed_up_at,
CASE WHEN status = 'active' THEN TRUE ELSE FALSE END AS is_active
FROM source
WHERE email IS NOT NULL
)
SELECT * FROM cleaned
-- models/staging/stg_orders.sql: Clean raw order data
SELECT
id AS order_id,
user_id,
amount_cents / 100.0 AS amount,
status,
created_at::timestamp AS ordered_at
FROM {{ source('raw', 'orders') }}
WHERE status != 'test'
Mart Models
-- models/marts/fct_revenue.sql: Revenue fact table
{{
config(
materialized='incremental',
unique_key='order_date',
on_schema_change='sync_all_columns'
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE ordered_at > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
),
daily AS (
SELECT
DATE_TRUNC('day', ordered_at)::date AS order_date,
COUNT(*) AS total_orders,
COUNT(DISTINCT user_id) AS unique_customers,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value
FROM orders
WHERE status = 'completed'
GROUP BY 1
)
SELECT * FROM daily
Schema and Tests
# models/staging/_staging.yml: Define sources, columns, and tests
version: 2
sources:
- name: raw
schema: public
tables:
- name: users
loaded_at_field: created_at
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
- name: orders
models:
- name: stg_users
description: Cleaned user data
columns:
- name: user_id
tests: [unique, not_null]
- name: email
tests: [unique, not_null]
- name: stg_orders
columns:
- name: order_id
tests: [unique, not_null]
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
CLI Commands
# commands.sh: Common dbt CLI commands
# Run all models
dbt run
# Run specific model and its upstream dependencies
dbt run --select +fct_revenue
# Run tests
dbt test
# Generate and serve documentation
dbt docs generate
dbt docs serve --port 8081
# Check source freshness
dbt source freshness
# Full build (run + test + snapshot)
dbt build
# Run against production
dbt run --target prod
Macros
-- macros/cents_to_dollars.sql: Reusable macro for currency conversion
{% macro cents_to_dollars(column_name) %}
({{ column_name }} / 100.0)::numeric(10,2)
{% endmacro %}
-- Usage in a model: SELECT {{ cents_to_dollars('amount_cents') }} AS amount