jpskill.com
💼 ビジネス コミュニティ

realtime-analytics

イベント発生時のデータ収集から、ClickHouseでのデータ分析、そしてダッシュボード表示まで、リアルタイム分析基盤の構築をゼロから支援し、ビジネス状況を可視化するSkill。

📜 元の英語説明(参考)

Build real-time analytics pipelines from scratch. Use when someone asks to "set up analytics", "build a dashboard", "track events in real time", "ClickHouse analytics", "event ingestion pipeline", or "live metrics". Covers event schema design, ingestion services with batching, ClickHouse table optimization, aggregation queries, and dashboard wiring.

🇯🇵 日本人クリエイター向け解説

一言でいうと

イベント発生時のデータ収集から、ClickHouseでのデータ分析、そしてダッシュボード表示まで、リアルタイム分析基盤の構築をゼロから支援し、ビジネス状況を可視化するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o realtime-analytics.zip https://jpskill.com/download/15325.zip && unzip -o realtime-analytics.zip && rm realtime-analytics.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/15325.zip -OutFile "$d\realtime-analytics.zip"; Expand-Archive "$d\realtime-analytics.zip" -DestinationPath $d -Force; ri "$d\realtime-analytics.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して realtime-analytics.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → realtime-analytics フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

リアルタイム分析

概要

このスキルは、AIエージェントが自己ホスト型のリアルタイム分析システムを構築できるようにします。イベントの取り込みから、ストレージ、クエリ、可視化まで、パイプライン全体をカバーし、ClickHouse を分析データベースとして使用して、大規模な環境で1秒未満のクエリパフォーマンスを実現します。

手順

イベントスキーマの設計

  1. すべてのイベントは、以下の基本フィールドを持つ必要があります。

    • event_name — 効率的なストレージのための LowCardinality(String)
    • timestamp — ミリ秒精度のための DateTime64(3)
    • session_id — クライアントが生成する UUID の String
    • user_id — 匿名トラッキングのための Nullable(String)
    • device_typeLowCardinality(String): desktop, mobile, tablet
    • country_codeLowCardinality(FixedString(2))
    • properties — イベント固有のデータのための JSON を含む String
  2. ClickHouse テーブルの最適化ルール:

    • MergeTree() エンジンを使用し、toYYYYMM(date) でパーティション分割します。
    • ORDER BY は、最もフィルタリングされるカラム(通常は event_name)から始める必要があります。
    • 自動データ削除のために TTL を追加します(デフォルトは90日)。
    • 10,000未満の異なる値を持つ文字列カラムには LowCardinality() を使用します。

取り込みサービス

  1. JSON配列のボディを持つ POST /events を受け入れるステートレスな HTTP サービスとして構築します。
  2. 受信イベントを検証します: event_name または timestamp が欠落している場合は拒否します。
  3. イベントをメモリにバッファリングします。次のいずれかの条件が満たされた場合にフラッシュします。
    • バッファが 1,000 イベントに達した場合
    • 最後のフラッシュから2秒経過した場合
  4. バッチ挿入には ClickHouse の INSERT ... FORMAT JSONEachRow を使用します。
  5. フラッシュに失敗した場合は、指数バックオフで3回リトライし、その後、デッドレターファイルに書き込みます。
  6. { "buffer_size": N, "last_flush": "ISO timestamp", "status": "ok" } を返す GET /health を公開します。

集計クエリ

クエリを名前付きの .sql ファイルとして記述します。一般的なダッシュボードパネル:

アクティブユーザー数(過去N分間):

SELECT count(DISTINCT session_id) AS active_users
FROM events
WHERE timestamp > now() - INTERVAL 5 MINUTE;

ファネル分析(2段階):

SELECT
  toStartOfHour(timestamp) AS hour,
  countIf(event_name = 'signup_started') AS started,
  countIf(event_name = 'signup_completed') AS completed,
  round(completed / started * 100, 1) AS conversion_pct
FROM events
WHERE date = today()
GROUP BY hour
ORDER BY hour;

ユニークユーザーによるトップ機能:

SELECT
  JSONExtractString(properties, 'feature') AS feature,
  count(DISTINCT user_id) AS unique_users
FROM events
WHERE event_name = 'feature_used' AND date = today()
GROUP BY feature
ORDER BY unique_users DESC
LIMIT 10;

1分あたりのエラー率:

SELECT
  toStartOfMinute(timestamp) AS minute,
  count() AS error_count
FROM events
WHERE event_name = 'error_occurred'
  AND timestamp > now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;

ダッシュボード API レイヤー

  1. ダッシュボードパネルごとに1つの REST エンドポイントを作成します(例: GET /api/dashboard/active-users)。
  2. 各エンドポイントは SQL クエリを実行し、JSON を返します。
  3. time_range クエリパラメータを追加します: 1h, 6h, 24h, 7d。それに応じて WHERE 句を調整します。
  4. 同時ダッシュボードロードで ClickHouse に負荷がかからないように、結果を3〜5秒間キャッシュします。

Docker Compose の設定

以下を含む docker-compose.yml を提供します。

  • ClickHouse サービス(固定バージョン、データ用ボリューム)
  • 取り込みサービス(Dockerfile からビルド)
  • ダッシュボードフロントエンド(Dockerfile からビルド、または nginx 経由で提供)
  • イベントテーブルを作成する init コンテナまたはエントリポイントスクリプト

例 1 — Eコマースアプリのスキーマ

入力: "Eコマースのチェックアウトフローの分析イベントを設計してください。"

出力:

-- コアイベント: product_viewed, add_to_cart, checkout_started,
-- payment_submitted, order_completed, order_failed

CREATE TABLE events (
    event_name   LowCardinality(String),
    timestamp    DateTime64(3),
    session_id   String,
    user_id      Nullable(String),
    device_type  LowCardinality(String),
    country_code LowCardinality(FixedString(2)),
    properties   String,
    date         Date DEFAULT toDate(timestamp)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (event_name, timestamp)
TTL date + INTERVAL 180 DAY;

イベントごとの Properties JSON:

  • product_viewed: {"product_id": "p-42", "category": "electronics", "price_cents": 4999}
  • add_to_cart: {"product_id": "p-42", "quantity": 1}
  • payment_submitted: {"method": "card", "amount_cents": 9998}

例 2 — 取り込みバッファロジック

入力: "Node.js でのバッファフラッシュロジックを示してください。"

出力:

const FLUSH_INTERVAL_MS = 2000;
const FLUSH_SIZE = 1000;
let buffer = [];

async function addEvents(events) {
  buffer.push(...events);
  if (buffer.length >= FLUSH_SIZE) await flush();
}

setInterval(async () => {
  if (buffer.length > 0) await flush();
}, FLUSH_INTERVAL_MS);

async function flush() {
  const batch = buffer.splice(0, buffer.length);
  const rows = batch.map(e => JSON.stringify(e)).join('\n');
  await clickhouse.insert({
    table: 'events',
    values: batch,
    format: 'JSONEachRow',
  });
}

ガイドライン

  • 賢くパーティション分割してください。 月単位のパーティションは、ほとんどのアプリで機能します。1日に1億を超えるイベントを取り込む場合にのみ、日単位にしてください。
  • 要求されない限り、Kafka を使用しないでください。 1秒あたり50K未満のイベントの場合、インメモリバッファリングによる直接 HTTP 取り込みの方がシンプルで十分です。
  • 常に TTL を追加してください。 無制限の分析テーブルは急速に増加します。デフォルトは90日に設定し、ユーザーに上書きさせます。
  • 現実的なボリュームでテストしてください。 本番稼働前に、合成イベントを生成して、パイプラインが予想されるスループットを処理できることを検証します。
  • *SELECT を避けてください。** I/O を最小限に抑えるために、集計クエリでは常にカラムを指定してください。
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Real-Time Analytics

Overview

This skill enables AI agents to build self-hosted, real-time analytics systems. It covers the full pipeline from event ingestion through storage to query and visualization, using ClickHouse as the analytical database for sub-second query performance at scale.

Instructions

Event Schema Design

  1. Every event must have these base fields:

    • event_name — LowCardinality(String) for efficient storage
    • timestamp — DateTime64(3) for millisecond precision
    • session_id — String, client-generated UUID
    • user_id — Nullable(String) for anonymous tracking
    • device_type — LowCardinality(String): desktop, mobile, tablet
    • country_code — LowCardinality(FixedString(2))
    • properties — String containing JSON for event-specific data
  2. ClickHouse table optimization rules:

    • Use MergeTree() engine, partition by toYYYYMM(date)
    • ORDER BY should start with the most filtered column (usually event_name)
    • Add TTL for automatic data expiration (default 90 days)
    • Use LowCardinality() for any string column with fewer than 10,000 distinct values

Ingestion Service

  1. Build as a stateless HTTP service accepting POST /events with JSON array body.
  2. Validate incoming events: reject if event_name or timestamp is missing.
  3. Buffer events in memory. Flush when either condition is met:
    • Buffer reaches 1,000 events
    • 2 seconds have elapsed since last flush
  4. Use ClickHouse's INSERT ... FORMAT JSONEachRow for batch inserts.
  5. On flush failure, retry 3 times with exponential backoff, then write to a dead-letter file.
  6. Expose GET /health returning: { "buffer_size": N, "last_flush": "ISO timestamp", "status": "ok" }.

Aggregation Queries

Write queries as named .sql files. Common dashboard panels:

Active users (last N minutes):

SELECT count(DISTINCT session_id) AS active_users
FROM events
WHERE timestamp > now() - INTERVAL 5 MINUTE;

Funnel analysis (two-step):

SELECT
  toStartOfHour(timestamp) AS hour,
  countIf(event_name = 'signup_started') AS started,
  countIf(event_name = 'signup_completed') AS completed,
  round(completed / started * 100, 1) AS conversion_pct
FROM events
WHERE date = today()
GROUP BY hour
ORDER BY hour;

Top features by unique users:

SELECT
  JSONExtractString(properties, 'feature') AS feature,
  count(DISTINCT user_id) AS unique_users
FROM events
WHERE event_name = 'feature_used' AND date = today()
GROUP BY feature
ORDER BY unique_users DESC
LIMIT 10;

Error rate per minute:

SELECT
  toStartOfMinute(timestamp) AS minute,
  count() AS error_count
FROM events
WHERE event_name = 'error_occurred'
  AND timestamp > now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;

Dashboard API Layer

  1. Create one REST endpoint per dashboard panel (e.g., GET /api/dashboard/active-users).
  2. Each endpoint runs its SQL query and returns JSON.
  3. Add a time_range query param: 1h, 6h, 24h, 7d. Adjust the WHERE clause accordingly.
  4. Cache results for 3-5 seconds to avoid hammering ClickHouse on concurrent dashboard loads.

Docker Compose Setup

Provide a docker-compose.yml with:

  • ClickHouse service (pinned version, volume for data)
  • Ingestion service (builds from Dockerfile)
  • Dashboard frontend (builds from Dockerfile or served via nginx)
  • An init container or entrypoint script that creates the events table

Examples

Example 1 — Schema for an e-commerce app

Input: "Design analytics events for an e-commerce checkout flow."

Output:

-- Core events: product_viewed, add_to_cart, checkout_started,
-- payment_submitted, order_completed, order_failed

CREATE TABLE events (
    event_name   LowCardinality(String),
    timestamp    DateTime64(3),
    session_id   String,
    user_id      Nullable(String),
    device_type  LowCardinality(String),
    country_code LowCardinality(FixedString(2)),
    properties   String,
    date         Date DEFAULT toDate(timestamp)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (event_name, timestamp)
TTL date + INTERVAL 180 DAY;

Properties JSON per event:

  • product_viewed: {"product_id": "p-42", "category": "electronics", "price_cents": 4999}
  • add_to_cart: {"product_id": "p-42", "quantity": 1}
  • payment_submitted: {"method": "card", "amount_cents": 9998}

Example 2 — Ingestion buffer logic

Input: "Show the buffer flush logic in Node.js."

Output:

const FLUSH_INTERVAL_MS = 2000;
const FLUSH_SIZE = 1000;
let buffer = [];

async function addEvents(events) {
  buffer.push(...events);
  if (buffer.length >= FLUSH_SIZE) await flush();
}

setInterval(async () => {
  if (buffer.length > 0) await flush();
}, FLUSH_INTERVAL_MS);

async function flush() {
  const batch = buffer.splice(0, buffer.length);
  const rows = batch.map(e => JSON.stringify(e)).join('\n');
  await clickhouse.insert({
    table: 'events',
    values: batch,
    format: 'JSONEachRow',
  });
}

Guidelines

  • Partition wisely. Monthly partitions work for most apps. Daily only if you ingest > 100M events/day.
  • Do not use Kafka unless asked. For under 50K events/second, direct HTTP ingestion with in-memory buffering is simpler and sufficient.
  • Always add TTL. Unbounded analytical tables grow fast. Default to 90 days; let the user override.
  • Test with realistic volume. Generate synthetic events to validate the pipeline handles expected throughput before going live.
  • *Avoid SELECT .** Always specify columns in aggregation queries to minimize I/O.