kafka-engineer
Apache Kafkaやイベントストリーミングを活用し、リアルタイムデータパイプラインを構築・最適化するSkill。
📜 元の英語説明(参考)
Expert in Apache Kafka, Event Streaming, and Real-time Data Pipelines. Specializes in Kafka Connect, KSQL, and Schema Registry.
🇯🇵 日本人クリエイター向け解説
Apache Kafkaやイベントストリーミングを活用し、リアルタイムデータパイプラインを構築・最適化するSkill。
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-17
- 取得日時
- 2026-05-17
- 同梱ファイル
- 1
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
Kafka Engineer
目的
Apache Kafkaとイベントストリーミングに関する専門知識を提供し、スケーラブルなイベント駆動型アーキテクチャとリアルタイムデータパイプラインに特化しています。exactly-once処理、Kafka Connect、Schema Registry管理を備えたフォールトトレラントなストリーミングプラットフォームを構築します。
使用する場面
- イベント駆動型マイクロサービスアーキテクチャの設計
- Kafka Connectパイプライン(CDC、S3 Sink)のセットアップ
- ストリーム処理アプリケーション(Kafka Streams / ksqlDB)の作成
- コンシューマーラグ、リバランスストーム、またはブローカーパフォーマンスのデバッグ
- Schema Registryを使用したスキーマ(Avro/Protobuf)の設計
- ACLとmTLSセキュリティの設定
2. 意思決定フレームワーク
アーキテクチャの選択
What is the use case?
│
├─ **Data Integration (ETL)**
│ ├─ DB to DB/Data Lake? → **Kafka Connect** (Zero code)
│ └─ Complex transformations? → **Kafka Streams**
│
├─ **Real-Time Analytics**
│ ├─ SQL-like queries? → **ksqlDB** (Quick aggregation)
│ └─ Complex stateful logic? → **Kafka Streams / Flink**
│
└─ **Microservices Comm**
├─ Event Notification? → **Standard Producer/Consumer**
└─ Event Sourcing? → **State Stores (RocksDB)**
設定チューニング(「ビッグ3」)
- スループット:
batch.size,linger.ms,compression.type=lz4。 - レイテンシー:
linger.ms=0,acks=1。 - 耐久性:
acks=all,min.insync.replicas=2,replication.factor=3。
危険信号 → sre-engineer にエスカレート:
- 「Unclean leader election」が有効になっている(データ損失のリスク)
- 新しいクラスターでのZookeeperへの依存(KRaftモードを使用)
- ブローカーのディスク使用量が80%を超えている
- コンシューマーラグが常に増加している(容量の不一致)
3. コアワークフロー
ワークフロー 1: Kafka Connect (CDC)
目標: PostgreSQLからS3に変更をストリーミングします。
手順:
-
ソース設定 (
postgres-source.json){ "name": "postgres-source", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "db-host", "database.dbname": "mydb", "database.user": "kafka", "plugin.name": "pgoutput" } } -
シンク設定 (
s3-sink.json){ "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "s3.bucket.name": "my-datalake", "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", "flush.size": "1000" } } -
デプロイ
curl -X POST -d @postgres-source.json http://connect:8083/connectors
ワークフロー 3: Schema Registry統合
目標: スキーマの互換性を強制します。
手順:
-
スキーマの定義 (
user.avsc){ "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"} ] } -
プロデューサー (Java)
KafkaAvroSerializerを使用します。- レジストリURL:
http://schema-registry:8081。
5. アンチパターンと落とし穴
❌ アンチパターン 1: 大きなメッセージ
どのようなものか:
- Kafkaメッセージで10MBの画像ペイロードを送信する。
なぜ失敗するのか:
- Kafkaは小さなメッセージ(1MB未満)に最適化されています。大きなメッセージはブローカースレッドをブロックします。
正しいアプローチ:
- 画像をS3に保存します。
- Kafkaメッセージで参照URLを送信します。
❌ アンチパターン 2: パーティションが多すぎる
どのようなものか:
- 小さなクラスターで10,000個のパーティションを作成する。
なぜ失敗するのか:
- リーダー選出が遅い(Zookeeperのオーバーヘッド)。
- ファイルハンドルの使用量が多い。
正しいアプローチ:
- ブローカーあたりのパーティション数を制限します(約4000)。トピック数を減らすか、より大きなクラスターを使用します。
❌ アンチパターン 3: ブロッキングコンシューマー
どのようなものか:
- コンシューマーがメッセージごとに重いHTTP呼び出し(30秒)を実行する。
なぜ失敗するのか:
- リバランスストーム(タイムアウトによりコンシューマーがグループを離れる)。
正しいアプローチ:
- 非同期処理: 作業をスレッドプールに移動します。
- 一時停止/再開: バッファーがいっぱいの場合、
consumer.pause()を実行します。
7. 品質チェックリスト
設定:
- [ ] レプリケーション: 本番環境ではFactor 3。
- [ ] Min.ISR: 2(データ損失を防ぎます)。
- [ ] リテンション: 正しく設定されている(時間 vs サイズ)。
可観測性:
- [ ] ラグ: コンシューマーラグが監視されている(Burrow/Prometheus)。
- [ ] アンダーレプリケート: アンダーレプリケートされたパーティション(>0)についてアラート。
- [ ] JMX: メトリクスがエクスポートされている。
例
例 1: リアルタイム不正検出パイプライン
シナリオ: 金融サービス会社がKafkaストリーミングを使用してリアルタイム不正検出を必要としています。
アーキテクチャの実装:
- イベント取り込み: PostgreSQLトランザクションデータベースからのKafka Connect CDC
- ストリーム処理: リアルタイムパターン検出のためのKafka Streamsアプリケーション
- アラートシステム: 通知をトリガーするアラートトピックへのプロデューサー
- ストレージ: 履歴分析とコンプライアンスのためのS3シンク
パイプライン構成: | コンポーネント | 構成 | 目的 | |-----------|---------------|---------| | トピック | 3 (transactions, alerts, enriched) | データ整理 | | パーティション | 12 (3ブローカー × 4) | 並列処理 | | レプリケーション | 3 | 高可用性 | | 圧縮 | LZ4 | スループット最適化 |
主要ロジック:
- 速度パターン(1分間に5回以上のトランザクション)を検出
- 地理的異常(不可能な移動)を特定
- 高リスクの加盟店カテゴリにフラグを立てる
結果:
- 不正の99.7%が100ms未満で検出
- 誤検知率が5%から0.3%に減少
- コンプライアンス監査で指摘事項なし
例 2: Eコマース注文処理システム
シナリオ: 高い信頼性を持つKafkaを使用した回復力のある注文処理システムを構築します。
システム設計:
- 注文イベント: 注文ライフサイクルイベント用のトピック
- 在庫サービス: 注文を消費し、在庫を更新
- 支払いサービス: 支払いを処理し、結果を公開
- 通知サービス: メール/SMSで確認を送信
回復力パターン:
- 処理失敗時のDead Letter Queue
- べき等なプロデューサー
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
Kafka Engineer
Purpose
Provides Apache Kafka and event streaming expertise specializing in scalable event-driven architectures and real-time data pipelines. Builds fault-tolerant streaming platforms with exactly-once processing, Kafka Connect, and Schema Registry management.
When to Use
- Designing event-driven microservices architectures
- Setting up Kafka Connect pipelines (CDC, S3 Sink)
- Writing stream processing apps (Kafka Streams / ksqlDB)
- Debugging consumer lag, rebalancing storms, or broker performance
- Designing schemas (Avro/Protobuf) with Schema Registry
- Configuring ACLs and mTLS security
2. Decision Framework
Architecture Selection
What is the use case?
│
├─ **Data Integration (ETL)**
│ ├─ DB to DB/Data Lake? → **Kafka Connect** (Zero code)
│ └─ Complex transformations? → **Kafka Streams**
│
├─ **Real-Time Analytics**
│ ├─ SQL-like queries? → **ksqlDB** (Quick aggregation)
│ └─ Complex stateful logic? → **Kafka Streams / Flink**
│
└─ **Microservices Comm**
├─ Event Notification? → **Standard Producer/Consumer**
└─ Event Sourcing? → **State Stores (RocksDB)**
Config Tuning (The "Big 3")
- Throughput:
batch.size,linger.ms,compression.type=lz4. - Latency:
linger.ms=0,acks=1. - Durability:
acks=all,min.insync.replicas=2,replication.factor=3.
Red Flags → Escalate to sre-engineer:
- "Unclean leader election" enabled (Data loss risk)
- Zookeeper dependency in new clusters (Use KRaft mode)
- Disk usage > 80% on brokers
- Consumer lag constantly increasing (Capacity mismatch)
3. Core Workflows
Workflow 1: Kafka Connect (CDC)
Goal: Stream changes from PostgreSQL to S3.
Steps:
-
Source Config (
postgres-source.json){ "name": "postgres-source", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "db-host", "database.dbname": "mydb", "database.user": "kafka", "plugin.name": "pgoutput" } } -
Sink Config (
s3-sink.json){ "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "s3.bucket.name": "my-datalake", "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", "flush.size": "1000" } } -
Deploy
curl -X POST -d @postgres-source.json http://connect:8083/connectors
Workflow 3: Schema Registry Integration
Goal: Enforce schema compatibility.
Steps:
-
Define Schema (
user.avsc){ "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"} ] } -
Producer (Java)
- Use
KafkaAvroSerializer. - Registry URL:
http://schema-registry:8081.
- Use
5. Anti-Patterns & Gotchas
❌ Anti-Pattern 1: Large Messages
What it looks like:
- Sending 10MB images payload in Kafka message.
Why it fails:
- Kafka is optimized for small messages (< 1MB). Large messages block the broker threads.
Correct approach:
- Store image in S3.
- Send Reference URL in Kafka message.
❌ Anti-Pattern 2: Too Many Partitions
What it looks like:
- Creating 10,000 partitions on a small cluster.
Why it fails:
- Slow leader election (Zookeeper overhead).
- High file handle usage.
Correct approach:
- Limit partitions per broker (~4000). Use fewer topics or larger clusters.
❌ Anti-Pattern 3: Blocking Consumer
What it looks like:
- Consumer doing heavy HTTP call (30s) for each message.
Why it fails:
- Rebalance storm (Consumer leaves group due to timeout).
Correct approach:
- Async Processing: Move work to a thread pool.
- Pause/Resume:
consumer.pause()if buffer is full.
7. Quality Checklist
Configuration:
- [ ] Replication: Factor 3 for production.
- [ ] Min.ISR: 2 (Prevents data loss).
- [ ] Retention: Configured correctly (Time vs Size).
Observability:
- [ ] Lag: Consumer Lag monitored (Burrow/Prometheus).
- [ ] Under-replicated: Alert on under-replicated partitions (>0).
- [ ] JMX: Metrics exported.
Examples
Example 1: Real-Time Fraud Detection Pipeline
Scenario: A financial services company needs real-time fraud detection using Kafka streaming.
Architecture Implementation:
- Event Ingestion: Kafka Connect CDC from PostgreSQL transaction database
- Stream Processing: Kafka Streams application for real-time pattern detection
- Alert System: Producer to alert topic triggering notifications
- Storage: S3 sink for historical analysis and compliance
Pipeline Configuration: | Component | Configuration | Purpose | |-----------|---------------|---------| | Topics | 3 (transactions, alerts, enriched) | Data organization | | Partitions | 12 (3 brokers × 4) | Parallelism | | Replication | 3 | High availability | | Compression | LZ4 | Throughput optimization |
Key Logic:
- Detects velocity patterns (5+ transactions in 1 minute)
- Identifies geographic anomalies (impossible travel)
- Flags high-risk merchant categories
Results:
- 99.7% of fraud detected in under 100ms
- False positive rate reduced from 5% to 0.3%
- Compliance audit passed with zero findings
Example 2: E-Commerce Order Processing System
Scenario: Build a resilient order processing system with Kafka for high reliability.
System Design:
- Order Events: Topic for order lifecycle events
- Inventory Service: Consumes orders, updates stock
- Payment Service: Processes payments, publishes results
- Notification Service: Sends confirmations via email/SMS
Resilience Patterns:
- Dead Letter Queue for failed processing
- Idempotent producers for exactly-once semantics
- Consumer groups with manual offset management
- Retries with exponential backoff
Configuration:
# Producer Configuration
acks: all
retries: 3
enable.idempotence: true
# Consumer Configuration
auto.offset.reset: earliest
enable.auto.commit: false
max.poll.records: 500
Results:
- 99.99% message delivery reliability
- Zero duplicate orders in 6 months
- Peak processing: 10,000 orders/second
Example 3: IoT Telemetry Platform
Scenario: Process millions of IoT device telemetry messages with Kafka.
Platform Architecture:
- Device Gateway: MQTT to Kafka proxy
- Data Enrichment: Stream processing adds device metadata
- Time-Series Storage: S3 sink partitioned by device_id/date
- Real-Time Alerts: Threshold-based alerting for anomalies
Scalability Configuration:
- 50 partitions for parallel processing
- Compression enabled for cost optimization
- Retention: 7 days hot, 1 year cold in S3
- Schema Registry for data contracts
Performance Metrics: | Metric | Value | |--------|-------| | Throughput | 500,000 messages/sec | | Latency (P99) | 50ms | | Consumer lag | < 1 second | | Storage efficiency | 60% reduction with compression |
Best Practices
Topic Design
- Naming Conventions: Use clear, hierarchical topic names (domain.entity.event)
- Partition Strategy: Plan for future growth (3x expected throughput)
- Retention Policies: Match retention to business requirements
- Cleanup Policies: Use delete for time-based, compact for state
- Schema Management: Enforce schemas via Schema Registry
Producer Optimization
- Batching: Increase batch.size and linger.ms for throughput
- Compression: Use LZ4 for balance of speed and size
- Acks Configuration: Use all for reliability, 1 for latency
- Retry Strategy: Implement retries with backoff
- Idempotence: Enable for exactly-once semantics in critical paths
Consumer Best Practices
- Offset Management: Use manual commit for critical processing
- Batch Processing: Increase max.poll.records for efficiency
- Rebalance Handling: Implement graceful shutdown
- Error Handling: Dead letter queues for poison messages
- Monitoring: Track consumer lag and processing time
Security Configuration
- Encryption: TLS for all client-broker communication
- Authentication: SASL/SCRAM or mTLS for production
- Authorization: ACLs with least privilege principle
- Quotas: Implement client quotas to prevent abuse
- Audit Logging: Log all access and configuration changes
Performance Tuning
- Broker Configuration: Optimize for workload type (throughput vs latency)
- JVM Tuning: Heap size and garbage collector selection
- OS Tuning: File descriptor limits, network settings
- Monitoring: Metrics for throughput, latency, and errors
- Capacity Planning: Regular review and scaling assessment
Security:
- [ ] Encryption: TLS enabled for Client-Broker and Inter-broker.
- [ ] Auth: SASL/SCRAM or mTLS enabled.
- [ ] ACLs: Principle of least privilege (Topic read/write).