realtime-analytics
イベント発生時のデータ収集から、ClickHouseでのデータ分析、そしてダッシュボード表示まで、リアルタイム分析基盤の構築をゼロから支援し、ビジネス状況を可視化するSkill。
📜 元の英語説明(参考)
Build real-time analytics pipelines from scratch. Use when someone asks to "set up analytics", "build a dashboard", "track events in real time", "ClickHouse analytics", "event ingestion pipeline", or "live metrics". Covers event schema design, ingestion services with batching, ClickHouse table optimization, aggregation queries, and dashboard wiring.
🇯🇵 日本人クリエイター向け解説
イベント発生時のデータ収集から、ClickHouseでのデータ分析、そしてダッシュボード表示まで、リアルタイム分析基盤の構築をゼロから支援し、ビジネス状況を可視化するSkill。
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o realtime-analytics.zip https://jpskill.com/download/15325.zip && unzip -o realtime-analytics.zip && rm realtime-analytics.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/15325.zip -OutFile "$d\realtime-analytics.zip"; Expand-Archive "$d\realtime-analytics.zip" -DestinationPath $d -Force; ri "$d\realtime-analytics.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
realtime-analytics.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
realtime-analyticsフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
リアルタイム分析
概要
このスキルは、AIエージェントが自己ホスト型のリアルタイム分析システムを構築できるようにします。イベントの取り込みから、ストレージ、クエリ、可視化まで、パイプライン全体をカバーし、ClickHouse を分析データベースとして使用して、大規模な環境で1秒未満のクエリパフォーマンスを実現します。
手順
イベントスキーマの設計
-
すべてのイベントは、以下の基本フィールドを持つ必要があります。
event_name— 効率的なストレージのためのLowCardinality(String)timestamp— ミリ秒精度のためのDateTime64(3)session_id— クライアントが生成する UUID のStringuser_id— 匿名トラッキングのためのNullable(String)device_type—LowCardinality(String): desktop, mobile, tabletcountry_code—LowCardinality(FixedString(2))properties— イベント固有のデータのための JSON を含むString
-
ClickHouse テーブルの最適化ルール:
MergeTree()エンジンを使用し、toYYYYMM(date)でパーティション分割します。ORDER BYは、最もフィルタリングされるカラム(通常はevent_name)から始める必要があります。- 自動データ削除のために TTL を追加します(デフォルトは90日)。
- 10,000未満の異なる値を持つ文字列カラムには
LowCardinality()を使用します。
取り込みサービス
- JSON配列のボディを持つ
POST /eventsを受け入れるステートレスな HTTP サービスとして構築します。 - 受信イベントを検証します:
event_nameまたはtimestampが欠落している場合は拒否します。 - イベントをメモリにバッファリングします。次のいずれかの条件が満たされた場合にフラッシュします。
- バッファが 1,000 イベントに達した場合
- 最後のフラッシュから2秒経過した場合
- バッチ挿入には ClickHouse の
INSERT ... FORMAT JSONEachRowを使用します。 - フラッシュに失敗した場合は、指数バックオフで3回リトライし、その後、デッドレターファイルに書き込みます。
{ "buffer_size": N, "last_flush": "ISO timestamp", "status": "ok" }を返すGET /healthを公開します。
集計クエリ
クエリを名前付きの .sql ファイルとして記述します。一般的なダッシュボードパネル:
アクティブユーザー数(過去N分間):
SELECT count(DISTINCT session_id) AS active_users
FROM events
WHERE timestamp > now() - INTERVAL 5 MINUTE;
ファネル分析(2段階):
SELECT
toStartOfHour(timestamp) AS hour,
countIf(event_name = 'signup_started') AS started,
countIf(event_name = 'signup_completed') AS completed,
round(completed / started * 100, 1) AS conversion_pct
FROM events
WHERE date = today()
GROUP BY hour
ORDER BY hour;
ユニークユーザーによるトップ機能:
SELECT
JSONExtractString(properties, 'feature') AS feature,
count(DISTINCT user_id) AS unique_users
FROM events
WHERE event_name = 'feature_used' AND date = today()
GROUP BY feature
ORDER BY unique_users DESC
LIMIT 10;
1分あたりのエラー率:
SELECT
toStartOfMinute(timestamp) AS minute,
count() AS error_count
FROM events
WHERE event_name = 'error_occurred'
AND timestamp > now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;
ダッシュボード API レイヤー
- ダッシュボードパネルごとに1つの REST エンドポイントを作成します(例:
GET /api/dashboard/active-users)。 - 各エンドポイントは SQL クエリを実行し、JSON を返します。
time_rangeクエリパラメータを追加します:1h,6h,24h,7d。それに応じて WHERE 句を調整します。- 同時ダッシュボードロードで ClickHouse に負荷がかからないように、結果を3〜5秒間キャッシュします。
Docker Compose の設定
以下を含む docker-compose.yml を提供します。
- ClickHouse サービス(固定バージョン、データ用ボリューム)
- 取り込みサービス(Dockerfile からビルド)
- ダッシュボードフロントエンド(Dockerfile からビルド、または nginx 経由で提供)
- イベントテーブルを作成する init コンテナまたはエントリポイントスクリプト
例
例 1 — Eコマースアプリのスキーマ
入力: "Eコマースのチェックアウトフローの分析イベントを設計してください。"
出力:
-- コアイベント: product_viewed, add_to_cart, checkout_started,
-- payment_submitted, order_completed, order_failed
CREATE TABLE events (
event_name LowCardinality(String),
timestamp DateTime64(3),
session_id String,
user_id Nullable(String),
device_type LowCardinality(String),
country_code LowCardinality(FixedString(2)),
properties String,
date Date DEFAULT toDate(timestamp)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (event_name, timestamp)
TTL date + INTERVAL 180 DAY;
イベントごとの Properties JSON:
product_viewed:{"product_id": "p-42", "category": "electronics", "price_cents": 4999}add_to_cart:{"product_id": "p-42", "quantity": 1}payment_submitted:{"method": "card", "amount_cents": 9998}
例 2 — 取り込みバッファロジック
入力: "Node.js でのバッファフラッシュロジックを示してください。"
出力:
const FLUSH_INTERVAL_MS = 2000;
const FLUSH_SIZE = 1000;
let buffer = [];
async function addEvents(events) {
buffer.push(...events);
if (buffer.length >= FLUSH_SIZE) await flush();
}
setInterval(async () => {
if (buffer.length > 0) await flush();
}, FLUSH_INTERVAL_MS);
async function flush() {
const batch = buffer.splice(0, buffer.length);
const rows = batch.map(e => JSON.stringify(e)).join('\n');
await clickhouse.insert({
table: 'events',
values: batch,
format: 'JSONEachRow',
});
}
ガイドライン
- 賢くパーティション分割してください。 月単位のパーティションは、ほとんどのアプリで機能します。1日に1億を超えるイベントを取り込む場合にのみ、日単位にしてください。
- 要求されない限り、Kafka を使用しないでください。 1秒あたり50K未満のイベントの場合、インメモリバッファリングによる直接 HTTP 取り込みの方がシンプルで十分です。
- 常に TTL を追加してください。 無制限の分析テーブルは急速に増加します。デフォルトは90日に設定し、ユーザーに上書きさせます。
- 現実的なボリュームでテストしてください。 本番稼働前に、合成イベントを生成して、パイプラインが予想されるスループットを処理できることを検証します。
- *SELECT を避けてください。** I/O を最小限に抑えるために、集計クエリでは常にカラムを指定してください。
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
Real-Time Analytics
Overview
This skill enables AI agents to build self-hosted, real-time analytics systems. It covers the full pipeline from event ingestion through storage to query and visualization, using ClickHouse as the analytical database for sub-second query performance at scale.
Instructions
Event Schema Design
-
Every event must have these base fields:
event_name— LowCardinality(String) for efficient storagetimestamp— DateTime64(3) for millisecond precisionsession_id— String, client-generated UUIDuser_id— Nullable(String) for anonymous trackingdevice_type— LowCardinality(String): desktop, mobile, tabletcountry_code— LowCardinality(FixedString(2))properties— String containing JSON for event-specific data
-
ClickHouse table optimization rules:
- Use
MergeTree()engine, partition bytoYYYYMM(date) - ORDER BY should start with the most filtered column (usually
event_name) - Add TTL for automatic data expiration (default 90 days)
- Use
LowCardinality()for any string column with fewer than 10,000 distinct values
- Use
Ingestion Service
- Build as a stateless HTTP service accepting
POST /eventswith JSON array body. - Validate incoming events: reject if
event_nameortimestampis missing. - Buffer events in memory. Flush when either condition is met:
- Buffer reaches 1,000 events
- 2 seconds have elapsed since last flush
- Use ClickHouse's
INSERT ... FORMAT JSONEachRowfor batch inserts. - On flush failure, retry 3 times with exponential backoff, then write to a dead-letter file.
- Expose
GET /healthreturning:{ "buffer_size": N, "last_flush": "ISO timestamp", "status": "ok" }.
Aggregation Queries
Write queries as named .sql files. Common dashboard panels:
Active users (last N minutes):
SELECT count(DISTINCT session_id) AS active_users
FROM events
WHERE timestamp > now() - INTERVAL 5 MINUTE;
Funnel analysis (two-step):
SELECT
toStartOfHour(timestamp) AS hour,
countIf(event_name = 'signup_started') AS started,
countIf(event_name = 'signup_completed') AS completed,
round(completed / started * 100, 1) AS conversion_pct
FROM events
WHERE date = today()
GROUP BY hour
ORDER BY hour;
Top features by unique users:
SELECT
JSONExtractString(properties, 'feature') AS feature,
count(DISTINCT user_id) AS unique_users
FROM events
WHERE event_name = 'feature_used' AND date = today()
GROUP BY feature
ORDER BY unique_users DESC
LIMIT 10;
Error rate per minute:
SELECT
toStartOfMinute(timestamp) AS minute,
count() AS error_count
FROM events
WHERE event_name = 'error_occurred'
AND timestamp > now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;
Dashboard API Layer
- Create one REST endpoint per dashboard panel (e.g.,
GET /api/dashboard/active-users). - Each endpoint runs its SQL query and returns JSON.
- Add a
time_rangequery param:1h,6h,24h,7d. Adjust the WHERE clause accordingly. - Cache results for 3-5 seconds to avoid hammering ClickHouse on concurrent dashboard loads.
Docker Compose Setup
Provide a docker-compose.yml with:
- ClickHouse service (pinned version, volume for data)
- Ingestion service (builds from Dockerfile)
- Dashboard frontend (builds from Dockerfile or served via nginx)
- An init container or entrypoint script that creates the events table
Examples
Example 1 — Schema for an e-commerce app
Input: "Design analytics events for an e-commerce checkout flow."
Output:
-- Core events: product_viewed, add_to_cart, checkout_started,
-- payment_submitted, order_completed, order_failed
CREATE TABLE events (
event_name LowCardinality(String),
timestamp DateTime64(3),
session_id String,
user_id Nullable(String),
device_type LowCardinality(String),
country_code LowCardinality(FixedString(2)),
properties String,
date Date DEFAULT toDate(timestamp)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (event_name, timestamp)
TTL date + INTERVAL 180 DAY;
Properties JSON per event:
product_viewed:{"product_id": "p-42", "category": "electronics", "price_cents": 4999}add_to_cart:{"product_id": "p-42", "quantity": 1}payment_submitted:{"method": "card", "amount_cents": 9998}
Example 2 — Ingestion buffer logic
Input: "Show the buffer flush logic in Node.js."
Output:
const FLUSH_INTERVAL_MS = 2000;
const FLUSH_SIZE = 1000;
let buffer = [];
async function addEvents(events) {
buffer.push(...events);
if (buffer.length >= FLUSH_SIZE) await flush();
}
setInterval(async () => {
if (buffer.length > 0) await flush();
}, FLUSH_INTERVAL_MS);
async function flush() {
const batch = buffer.splice(0, buffer.length);
const rows = batch.map(e => JSON.stringify(e)).join('\n');
await clickhouse.insert({
table: 'events',
values: batch,
format: 'JSONEachRow',
});
}
Guidelines
- Partition wisely. Monthly partitions work for most apps. Daily only if you ingest > 100M events/day.
- Do not use Kafka unless asked. For under 50K events/second, direct HTTP ingestion with in-memory buffering is simpler and sufficient.
- Always add TTL. Unbounded analytical tables grow fast. Default to 90 days; let the user override.
- Test with realistic volume. Generate synthetic events to validate the pipeline handles expected throughput before going live.
- *Avoid SELECT .** Always specify columns in aggregation queries to minimize I/O.