jpskill.com
🛠️ 開発・MCP コミュニティ

configuring-dapr-pubsub

Configures Dapr pub/sub components for event-driven microservices with Kafka or Redis. Use when wiring agent-to-agent communication, setting up event subscriptions, or integrating Dapr sidecars. Covers component configuration, subscription patterns, publishing events, and Kubernetes deployment. NOT when using direct Kafka clients or non-Dapr messaging patterns.

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o configuring-dapr-pubsub.zip https://jpskill.com/download/17291.zip && unzip -o configuring-dapr-pubsub.zip && rm configuring-dapr-pubsub.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/17291.zip -OutFile "$d\configuring-dapr-pubsub.zip"; Expand-Archive "$d\configuring-dapr-pubsub.zip" -DestinationPath $d -Force; ri "$d\configuring-dapr-pubsub.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して configuring-dapr-pubsub.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → configuring-dapr-pubsub フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
2

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

Dapr Pub/Sub の構成

Kafka または Redis バックエンドを使用して、Dapr pub/sub でイベント駆動型マイクロサービスを接続します。

クイックスタート

# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"
    - name: disableTls
      value: "true"
# コンポーネントの適用
kubectl apply -f components/pubsub.yaml

# Dapr CLI でのテスト
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'

コンポーネントの構成

Kafka (本番環境)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    # 必須
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"

    # コンシューマー設定
    - name: consumerGroup
      value: "{namespace}-{appId}"  # デプロイごとにテンプレート化
    - name: consumeRetryInterval
      value: "100ms"
    - name: heartbeatInterval
      value: "3s"
    - name: sessionTimeout
      value: "10s"

    # パフォーマンス
    - name: maxMessageBytes
      value: "1048576"  # 1MB
    - name: channelBufferSize
      value: "256"

SASL 認証付き Kafka

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-secure
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka.example.com:9093"
    - name: authType
      value: "password"
    - name: saslUsername
      value: "dapr-user"
    - name: saslPassword
      secretKeyRef:
        name: kafka-secrets
        key: password
    - name: saslMechanism
      value: "SCRAM-SHA-256"

Redis (開発/シンプル)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: "redis-master.redis.svc.cluster.local:6379"
    - name: redisPassword
      secretKeyRef:
        name: redis-secrets
        key: password

サブスクリプションパターン

宣言的なサブスクリプション (推奨)

# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: task-created-subscription
spec:
  pubsubname: pubsub
  topic: task-created
  routes:
    default: /dapr/task-created
  scopes:
    - triage-agent
    - concepts-agent

プログラムによるサブスクリプション (FastAPI)

from fastapi import FastAPI, Request

app = FastAPI()

@app.get("/dapr/subscribe")
async def subscribe():
    """Dapr はサブスクリプションを検出するためにこれを呼び出します。"""
    return [
        {
            "pubsubname": "pubsub",
            "topic": "task-created",
            "route": "/dapr/task-created"
        },
        {
            "pubsubname": "pubsub",
            "topic": "task-completed",
            "route": "/dapr/task-completed"
        }
    ]

@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
    """受信 CloudEvent を処理します。"""
    event = await request.json()

    # CloudEvent ラッパー - データはネストされています
    task_data = event.get("data", event)
    task_id = task_data.get("task_id")

    # イベントの処理
    print(f"Task created: {task_id}")

    return {"status": "SUCCESS"}

イベントの発行

FastAPI サービスから

import httpx

DAPR_URL = "http://localhost:3500"

async def publish_event(topic: str, data: dict):
    """Dapr サイドカーを介してイベントを発行します。"""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={"Content-Type": "application/json"}
        )
        response.raise_for_status()

# 使用例
await publish_event("task-created", {
    "task_id": "123",
    "title": "Learn Python",
    "user_id": "user-456"
})

CloudEvent メタデータを使用する

async def publish_cloudevent(topic: str, data: dict, event_type: str):
    """明示的な CloudEvent フィールドを使用して発行します。"""
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={
                "Content-Type": "application/cloudevents+json",
                "ce-specversion": "1.0",
                "ce-type": event_type,
                "ce-source": "triage-agent",
                "ce-id": str(uuid.uuid4())
            }
        )

Kubernetes デプロイメント

コンポーネントのスコープ設定

コンポーネントへのアクセスを特定のアプリに制限します。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka:9092"
scopes:
  - triage-agent
  - concepts-agent
  - debug-agent

Dapr サイドカーを使用したアプリのデプロイメント

apiVersion: apps/v1
kind: Deployment
metadata:
  name: triage-agent
spec:
  replicas: 2
  selector:
    matchLabels:
      app: triage-agent
  template:
    metadata:
      labels:
        app: triage-agent
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "triage-agent"
        dapr.io/app-port: "8000"
        dapr.io/enable-api-logging: "true"
    spec:
      containers:
        - name: triage-agent
          image: myapp/triage-agent:latest
          ports:
            - containerPort: 8000
          env:
            - name: DAPR_HTTP_PORT
              value: "3500"

マルチエージェントルーティングパターン

トリアージエージェント → スペシャリストエージェント

# triage_agent.

(原文がここで切り詰められています)
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Configuring Dapr Pub/Sub

Wire event-driven microservices using Dapr pub/sub with Kafka or Redis backends.

Quick Start

# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"
    - name: disableTls
      value: "true"
# Apply component
kubectl apply -f components/pubsub.yaml

# Test with Dapr CLI
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'

Component Configurations

Kafka (Production)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    # Required
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"

    # Consumer settings
    - name: consumerGroup
      value: "{namespace}-{appId}"  # Templated per deployment
    - name: consumeRetryInterval
      value: "100ms"
    - name: heartbeatInterval
      value: "3s"
    - name: sessionTimeout
      value: "10s"

    # Performance
    - name: maxMessageBytes
      value: "1048576"  # 1MB
    - name: channelBufferSize
      value: "256"

Kafka with SASL Authentication

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-secure
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka.example.com:9093"
    - name: authType
      value: "password"
    - name: saslUsername
      value: "dapr-user"
    - name: saslPassword
      secretKeyRef:
        name: kafka-secrets
        key: password
    - name: saslMechanism
      value: "SCRAM-SHA-256"

Redis (Development/Simple)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: "redis-master.redis.svc.cluster.local:6379"
    - name: redisPassword
      secretKeyRef:
        name: redis-secrets
        key: password

Subscription Patterns

Declarative Subscription (Recommended)

# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: task-created-subscription
spec:
  pubsubname: pubsub
  topic: task-created
  routes:
    default: /dapr/task-created
  scopes:
    - triage-agent
    - concepts-agent

Programmatic Subscription (FastAPI)

from fastapi import FastAPI, Request

app = FastAPI()

@app.get("/dapr/subscribe")
async def subscribe():
    """Dapr calls this to discover subscriptions."""
    return [
        {
            "pubsubname": "pubsub",
            "topic": "task-created",
            "route": "/dapr/task-created"
        },
        {
            "pubsubname": "pubsub",
            "topic": "task-completed",
            "route": "/dapr/task-completed"
        }
    ]

@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
    """Handle incoming CloudEvent."""
    event = await request.json()

    # CloudEvent wrapper - data is nested
    task_data = event.get("data", event)
    task_id = task_data.get("task_id")

    # Process event
    print(f"Task created: {task_id}")

    return {"status": "SUCCESS"}

Publishing Events

From FastAPI Service

import httpx

DAPR_URL = "http://localhost:3500"

async def publish_event(topic: str, data: dict):
    """Publish event through Dapr sidecar."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={"Content-Type": "application/json"}
        )
        response.raise_for_status()

# Usage
await publish_event("task-created", {
    "task_id": "123",
    "title": "Learn Python",
    "user_id": "user-456"
})

With CloudEvent Metadata

async def publish_cloudevent(topic: str, data: dict, event_type: str):
    """Publish with explicit CloudEvent fields."""
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={
                "Content-Type": "application/cloudevents+json",
                "ce-specversion": "1.0",
                "ce-type": event_type,
                "ce-source": "triage-agent",
                "ce-id": str(uuid.uuid4())
            }
        )

Kubernetes Deployment

Component Scoping

Limit component access to specific apps:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka:9092"
scopes:
  - triage-agent
  - concepts-agent
  - debug-agent

App Deployment with Dapr Sidecar

apiVersion: apps/v1
kind: Deployment
metadata:
  name: triage-agent
spec:
  replicas: 2
  selector:
    matchLabels:
      app: triage-agent
  template:
    metadata:
      labels:
        app: triage-agent
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "triage-agent"
        dapr.io/app-port: "8000"
        dapr.io/enable-api-logging: "true"
    spec:
      containers:
        - name: triage-agent
          image: myapp/triage-agent:latest
          ports:
            - containerPort: 8000
          env:
            - name: DAPR_HTTP_PORT
              value: "3500"

Multi-Agent Routing Pattern

Triage Agent → Specialist Agents

# triage_agent.py
from fastapi import FastAPI, Request
import httpx

app = FastAPI()
DAPR_URL = "http://localhost:3500"

@app.post("/api/question")
async def handle_question(request: Request):
    data = await request.json()
    question = data["question"]

    # Route based on content
    if "python" in question.lower() or "code" in question.lower():
        topic = "concepts-request"
    elif "error" in question.lower() or "bug" in question.lower():
        topic = "debug-request"
    else:
        topic = "concepts-request"  # Default

    # Publish to appropriate agent
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json={
                "question": question,
                "user_id": data["user_id"],
                "session_id": data["session_id"]
            }
        )

    return {"status": "routed", "topic": topic}

Specialist Agent Handler

# concepts_agent.py
from fastapi import FastAPI, Request
import httpx

app = FastAPI()
DAPR_URL = "http://localhost:3500"

@app.get("/dapr/subscribe")
async def subscribe():
    return [{"pubsubname": "pubsub", "topic": "concepts-request", "route": "/dapr/handle"}]

@app.post("/dapr/handle")
async def handle_concepts_request(request: Request):
    event = await request.json()
    data = event.get("data", event)

    # Process with LLM
    response = await process_with_llm(data["question"])

    # Publish response
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/response-ready",
            json={
                "session_id": data["session_id"],
                "response": response,
                "agent": "concepts"
            }
        )

    return {"status": "SUCCESS"}

Local Development

Run with Dapr CLI

# Start subscriber first
dapr run --app-id concepts-agent --app-port 8001 --dapr-http-port 3501 \
  --resources-path ./components -- uvicorn concepts:app --port 8001

# Start publisher
dapr run --app-id triage-agent --app-port 8000 --dapr-http-port 3500 \
  --resources-path ./components -- uvicorn triage:app --port 8000

Docker Compose with Dapr

version: "3.8"
services:
  triage-agent:
    build: ./services/triage
    ports:
      - "8000:8000"

  triage-agent-dapr:
    image: daprio/daprd:latest
    command: ["./daprd",
      "--app-id", "triage-agent",
      "--app-port", "8000",
      "--dapr-http-port", "3500",
      "--resources-path", "/components"
    ]
    volumes:
      - ./components:/components
    network_mode: "service:triage-agent"
    depends_on:
      - triage-agent

  kafka:
    image: confluentinc/cp-kafka:latest
    # ... kafka config

Troubleshooting

Check Dapr Sidecar

# View sidecar logs
kubectl logs deploy/triage-agent -c daprd

# Check component registration
curl http://localhost:3500/v1.0/metadata

Common Issues

Error Cause Fix
component not found Component not loaded Check --resources-path or K8s namespace
connection refused Kafka not reachable Verify broker address in component
consumer group rebalance Multiple instances Use unique consumerGroup per app
event not received Wrong topic/route Check subscription config

Debug Event Flow

# Publish test event
dapr publish --pubsub pubsub --topic test --data '{"test": true}'

# Check consumer logs
kubectl logs deploy/my-app -c daprd | grep -i subscribe

Verification

Run: python scripts/verify.py

Related Skills

  • deploying-kafka-k8s - Kafka cluster setup with Strimzi
  • scaffolding-fastapi-dapr - FastAPI services with Dapr
  • scaffolding-openai-agents - Agent orchestration patterns

同梱ファイル

※ ZIPに含まれるファイル一覧。`SKILL.md` 本体に加え、参考資料・サンプル・スクリプトが入っている場合があります。