streaming-api-patterns
Server-Sent EventsやWebSocketなどの技術を使い、リアルタイムなデータ配信を実現し、通信負荷の調整や再接続、大規模言語モデルのストリーミングなど、最新の応用技術を使いこなせるようにするSkill。
📜 元の英語説明(参考)
Implement real-time data streaming with Server-Sent Events (SSE), WebSockets, and ReadableStream APIs. Master backpressure handling, reconnection strategies, and LLM streaming for 2025+ real-time applications.
🇯🇵 日本人クリエイター向け解説
Server-Sent EventsやWebSocketなどの技術を使い、リアルタイムなデータ配信を実現し、通信負荷の調整や再接続、大規模言語モデルのストリーミングなど、最新の応用技術を使いこなせるようにするSkill。
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o streaming-api-patterns.zip https://jpskill.com/download/17255.zip && unzip -o streaming-api-patterns.zip && rm streaming-api-patterns.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/17255.zip -OutFile "$d\streaming-api-patterns.zip"; Expand-Archive "$d\streaming-api-patterns.zip" -DestinationPath $d -Force; ri "$d\streaming-api-patterns.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
streaming-api-patterns.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
streaming-api-patternsフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-18
- 取得日時
- 2026-05-18
- 同梱ファイル
- 1
📖 Skill本文(日本語訳)
※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。
Streaming API パターン
概要
現代のアプリケーションは、リアルタイムなデータ配信を必要とします。このスキルでは、サーバーからクライアントへのストリーミングのための Server-Sent Events (SSE)、双方向通信のための WebSockets、そしてバックプレッシャーの処理と効率的なデータフローのための Streams API を扱います。
このスキルを使用するべき場合:
- LLM レスポンスのストリーミング (ChatGPT スタイルのインターフェース)
- リアルタイムな通知とアップデート
- ライブデータフィード (株価、分析)
- チャットアプリケーション
- 長時間実行タスクの進捗アップデート
- 共同編集機能
コアテクノロジー
1. Server-Sent Events (SSE)
最適な用途: サーバーからクライアントへのストリーミング (LLM レスポンス、通知)
// Next.js Route Handler
export async function GET(req: Request) {
const encoder = new TextEncoder()
const stream = new ReadableStream({
async start(controller) {
// Send data
controller.enqueue(encoder.encode('data: Hello\n\n'))
// Keep connection alive
const interval = setInterval(() => {
controller.enqueue(encoder.encode(': keepalive\n\n'))
}, 30000)
// Cleanup
req.signal.addEventListener('abort', () => {
clearInterval(interval)
controller.close()
})
}
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
})
}
// Client
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
console.log(event.data)
}
2. WebSockets
最適な用途: 双方向リアルタイム通信 (チャット、コラボレーション)
// WebSocket Server (Next.js with ws)
import { WebSocketServer } from 'ws'
const wss = new WebSocketServer({ port: 8080 })
wss.on('connection', (ws) => {
ws.on('message', (data) => {
// Broadcast to all clients
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data)
}
})
})
})
// Client
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))
3. ReadableStream API
最適な用途: バックプレッシャーを伴う大規模なデータストリームの処理
async function* generateData() {
for (let i = 0; i < 1000; i++) {
await new Promise(resolve => setTimeout(resolve, 100))
yield `data-${i}`
}
}
const stream = new ReadableStream({
async start(controller) {
for await (const chunk of generateData()) {
controller.enqueue(new TextEncoder().encode(chunk + '\n'))
}
controller.close()
}
})
LLM ストリーミングパターン
// Server
import OpenAI from 'openai'
const openai = new OpenAI()
export async function POST(req: Request) {
const { messages } = await req.json()
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo-preview',
messages,
stream: true
})
const encoder = new TextEncoder()
return new Response(
new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content
if (content) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}\n\n`))
}
}
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
}
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
}
}
)
}
// Client
async function streamChat(messages) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
})
const reader = response.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value)
const lines = chunk.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') return
const json = JSON.parse(data)
console.log(json.content) // Stream token
}
}
}
}
再接続戦略
class ReconnectingEventSource {
private eventSource: EventSource | null = null
private reconnectDelay = 1000
private maxReconnectDelay = 30000
constructor(private url: string, private onMessage: (data: string) => void) {
this.connect()
}
private connect() {
this.eventSource = new EventSource(this.url)
this.eventSource.onmessage = (event) => {
this.reconnectDelay = 1000 // Reset on success
this.onMessage(event.data)
}
this.eventSource.onerror = () => {
this.eventSource?.close()
// Exponential backoff
setTimeout(() => this.connect(), this.reconnectDelay)
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
}
}
close() {
this.eventSource?.close()
}
}
ベストプラクティス
SSE
- ✅ 一方向のサーバーからクライアントへのストリーミングに使用します
- ✅ 自動再接続を実装します
- ✅ 30秒ごとにキープアライブメッセージを送信します
- ✅ ブラウザの接続制限 (ドメインごとに6つ) を処理します
- ✅ より良いパフォーマンスのために HTTP/2 を使用します
WebSockets
- ✅ 双方向のリアルタイム通信に使用します
- ✅ ハートビート/ping-pong を実装します
- ✅ 指数バックオフで再接続を処理します
- ✅ メッセージを検証およびサニタイズします
- ✅ オフライン期間のメッセージキューイングを実装します
バックプレッシャー
- ✅ 適切なフロー制御で
ReadableStreamを使用します - ✅ バッファサイズを監視します
- ✅ コンシューマーが遅い場合は、プロダクションを一時停止します
- ✅ 低速なコンシューマーに対するタイムアウトを実装します
パフォーマンス
- ✅ データを圧縮します (gzip/brotli)
- ✅ 小さなメッセージをバッチ処理します
- ✅ 大量のデータにはバイナリ形式 (MessagePack, Protobuf) を使用します
- ✅ クライアント側のバッファリングを実装します
- ✅ 接続数とリソース使用量を監視します
リソース
- [Server-Sent Events
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開
Streaming API Patterns
Overview
Modern applications require real-time data delivery. This skill covers Server-Sent Events (SSE) for server-to-client streaming, WebSockets for bidirectional communication, and the Streams API for handling backpressure and efficient data flow.
When to use this skill:
- Streaming LLM responses (ChatGPT-style interfaces)
- Real-time notifications and updates
- Live data feeds (stock prices, analytics)
- Chat applications
- Progress updates for long-running tasks
- Collaborative editing features
Core Technologies
1. Server-Sent Events (SSE)
Best for: Server-to-client streaming (LLM responses, notifications)
// Next.js Route Handler
export async function GET(req: Request) {
const encoder = new TextEncoder()
const stream = new ReadableStream({
async start(controller) {
// Send data
controller.enqueue(encoder.encode('data: Hello\n\n'))
// Keep connection alive
const interval = setInterval(() => {
controller.enqueue(encoder.encode(': keepalive\n\n'))
}, 30000)
// Cleanup
req.signal.addEventListener('abort', () => {
clearInterval(interval)
controller.close()
})
}
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
})
}
// Client
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
console.log(event.data)
}
2. WebSockets
Best for: Bidirectional real-time communication (chat, collaboration)
// WebSocket Server (Next.js with ws)
import { WebSocketServer } from 'ws'
const wss = new WebSocketServer({ port: 8080 })
wss.on('connection', (ws) => {
ws.on('message', (data) => {
// Broadcast to all clients
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data)
}
})
})
})
// Client
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))
3. ReadableStream API
Best for: Processing large data streams with backpressure
async function* generateData() {
for (let i = 0; i < 1000; i++) {
await new Promise(resolve => setTimeout(resolve, 100))
yield `data-${i}`
}
}
const stream = new ReadableStream({
async start(controller) {
for await (const chunk of generateData()) {
controller.enqueue(new TextEncoder().encode(chunk + '\n'))
}
controller.close()
}
})
LLM Streaming Pattern
// Server
import OpenAI from 'openai'
const openai = new OpenAI()
export async function POST(req: Request) {
const { messages } = await req.json()
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo-preview',
messages,
stream: true
})
const encoder = new TextEncoder()
return new Response(
new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content
if (content) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}\n\n`))
}
}
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
}
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
}
}
)
}
// Client
async function streamChat(messages) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
})
const reader = response.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value)
const lines = chunk.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') return
const json = JSON.parse(data)
console.log(json.content) // Stream token
}
}
}
}
Reconnection Strategy
class ReconnectingEventSource {
private eventSource: EventSource | null = null
private reconnectDelay = 1000
private maxReconnectDelay = 30000
constructor(private url: string, private onMessage: (data: string) => void) {
this.connect()
}
private connect() {
this.eventSource = new EventSource(this.url)
this.eventSource.onmessage = (event) => {
this.reconnectDelay = 1000 // Reset on success
this.onMessage(event.data)
}
this.eventSource.onerror = () => {
this.eventSource?.close()
// Exponential backoff
setTimeout(() => this.connect(), this.reconnectDelay)
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
}
}
close() {
this.eventSource?.close()
}
}
Best Practices
SSE
- ✅ Use for one-way server-to-client streaming
- ✅ Implement automatic reconnection
- ✅ Send keepalive messages every 30s
- ✅ Handle browser connection limits (6 per domain)
- ✅ Use HTTP/2 for better performance
WebSockets
- ✅ Use for bidirectional real-time communication
- ✅ Implement heartbeat/ping-pong
- ✅ Handle reconnection with exponential backoff
- ✅ Validate and sanitize messages
- ✅ Implement message queuing for offline periods
Backpressure
- ✅ Use ReadableStream with proper flow control
- ✅ Monitor buffer sizes
- ✅ Pause production when consumer is slow
- ✅ Implement timeouts for slow consumers
Performance
- ✅ Compress data (gzip/brotli)
- ✅ Batch small messages
- ✅ Use binary formats (MessagePack, Protobuf) for large data
- ✅ Implement client-side buffering
- ✅ Monitor connection count and resource usage