jpskill.com
🛠️ 開発・MCP コミュニティ

rabbitmq

RabbitMQを活用し、タスクキューやバックグラウンド処理、サービス間連携、メッセージの確実な配信などを実現するメッセージングシステムを構築するSkill。

📜 元の英語説明(参考)

Build messaging systems with RabbitMQ — queues, exchanges, routing, dead-letter queues, delayed messages, priority queues, and RPC patterns. Use when tasks involve task queues, background job processing, inter-service communication, request-reply patterns, or reliable message delivery with acknowledgments.

🇯🇵 日本人クリエイター向け解説

一言でいうと

RabbitMQを活用し、タスクキューやバックグラウンド処理、サービス間連携、メッセージの確実な配信などを実現するメッセージングシステムを構築するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o rabbitmq.zip https://jpskill.com/download/15311.zip && unzip -o rabbitmq.zip && rm rabbitmq.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/15311.zip -OutFile "$d\rabbitmq.zip"; Expand-Archive "$d\rabbitmq.zip" -DestinationPath $d -Force; ri "$d\rabbitmq.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して rabbitmq.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → rabbitmq フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

RabbitMQ

マイクロサービスアーキテクチャとバックグラウンド処理のための、信頼性の高いメッセージングおよびタスクキューシステムを構築します。

セットアップ

# docker-compose.yml — 管理UI付きのRabbitMQ
services:
  rabbitmq:
    image: rabbitmq:3.13-management
    ports:
      - "5672:5672"    # AMQP
      - "15672:15672"  # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: secret
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq

volumes:
  rabbitmq-data:

管理UIは http://localhost:15672 (admin/secret) で利用できます。

コアコンセプト

RabbitMQは、exchange を通して queue にメッセージをルーティングします。プロデューサーは exchange に送信し、コンシューマーは queue から読み取ります。exchange のタイプによってルーティングロジックが決まります。

  • Direct — 正確なルーティングキーの一致によってルーティングします
  • Fanout — バインドされたすべての queue にブロードキャストします (pub/sub)
  • Topic — ルーティングキーのパターンマッチングによってルーティングします (order.*, #.error)
  • Headers — メッセージヘッダーの値によってルーティングします

プロデューサーとコンシューマー

Node.js (amqplib)

// connection.ts — 自動再接続による共有接続

import amqp, { Channel, Connection } from 'amqplib';

let connection: Connection;
let channel: Channel;

async function getChannel(): Promise<Channel> {
  if (channel) return channel;

  connection = await amqp.connect('amqp://admin:secret@localhost:5672');
  channel = await connection.createChannel();

  // Prefetch 10 — 各コンシューマーは最大10個の未確認応答メッセージを処理します
  // 1つの遅いコンシューマーが他のコンシューマーを枯渇させるのを防ぎます
  await channel.prefetch(10);

  connection.on('close', () => {
    console.error('RabbitMQ connection closed, reconnecting...');
    channel = null!;
    setTimeout(getChannel, 5000);
  });

  return channel;
}

export { getChannel };
// task-producer.ts — 作業キューにタスクを送信します

import { getChannel } from './connection';

/** バックグラウンドタスクを処理キューに送信します。
 *
 * @param queue - ターゲットキュー名。
 * @param task - タスクペイロード。
 * @param priority - 0 (通常) から 9 (最高)。優先度キューが必要です。
 */
async function sendTask(queue: string, task: object, priority = 0) {
  const ch = await getChannel();

  // 必要なプロパティを持つキューが存在することを確認します (冪等性)
  await ch.assertQueue(queue, {
    durable: true,           // ブローカーの再起動後も存続します
    arguments: {
      'x-max-priority': 10,  // 優先度キューを有効にします
      'x-dead-letter-exchange': 'dlx',  // 失敗したメッセージはここに送られます
      'x-message-ttl': 86400000,        // 24時間のTTL
    },
  });

  ch.sendToQueue(queue, Buffer.from(JSON.stringify(task)), {
    persistent: true,  // ディスクに書き込みます (再起動後も存続します)
    priority,
    contentType: 'application/json',
    timestamp: Date.now(),
    messageId: crypto.randomUUID(),
  });
}

// 例: メールタスクを送信します
await sendTask('email-queue', {
  to: 'user@example.com',
  template: 'welcome',
  data: { name: 'Alex', activationLink: 'https://app.example.com/activate/abc123' },
});

// 優先度高: パスワードリセット (マーケティングメールよりも優先されます)
await sendTask('email-queue', {
  to: 'user@example.com',
  template: 'password-reset',
  data: { resetLink: 'https://app.example.com/reset/xyz789' },
}, 9);
// task-consumer.ts — 確認応答付きでタスクを処理します

import { getChannel } from './connection';

/** キューからのタスクの消費を開始します。
 *
 * @param queue - 消費元のキュー。
 * @param handler - 各タスクを処理する非同期関数。
 */
async function startWorker(queue: string, handler: (task: any) => Promise<void>) {
  const ch = await getChannel();

  await ch.assertQueue(queue, { durable: true });

  ch.consume(queue, async (msg) => {
    if (!msg) return;

    const task = JSON.parse(msg.content.toString());
    const msgId = msg.properties.messageId;

    try {
      await handler(task);
      ch.ack(msg);  // 成功 — キューから削除します
    } catch (err) {
      console.error(`Task ${msgId} failed:`, err);

      // これが何度も再試行されたかどうかを確認します
      const retryCount = (msg.properties.headers?.['x-retry-count'] || 0);
      if (retryCount >= 3) {
        ch.nack(msg, false, false);  // 3回の再試行後にデッドレターキューに送信します
      } else {
        // 再試行回数を増やして再キューします
        ch.nack(msg, false, false);
        ch.sendToQueue(queue, msg.content, {
          ...msg.properties,
          headers: { ...msg.properties.headers, 'x-retry-count': retryCount + 1 },
        });
      }
    }
  });

  console.log(`Worker started on queue: ${queue}`);
}

// メール処理を開始します
startWorker('email-queue', async (task) => {
  await sendEmail(task.to, task.template, task.data);
});

Pythonの場合は、pikaBlockingConnectionbasic_qos(prefetch_count=N)、およびメッセージ処理用の basic_ack/basic_nack を使用した basic_consume と共に使用します。

Exchangeパターン

Fanout (ブロードキャスト)

バインドされたすべての queue がすべてのメッセージを取得します — 通知、キャッシュの無効化、監査ログに役立ちます。

// fanout.ts — イベントを複数のコンシューマーにブロードキャストします

const ch = await getChannel();

await ch.assertExchange('user-events', 'fanout', { durable: true });

// 各サービスは、exchange にバインドされた独自の queue を作成します
// メールサービスキュー
await ch.assertQueue('user-events-email', { durable: true });
await ch.bindQueue('user-events-email', 'user-events', '');

// アナリティクスサービスキュー
await ch.assertQueue('user-events-analytics', { durable: true });
await ch.bindQueue('user-events-analytics', 'user-events', '');

// パブリッシュ — バインドされたすべての queue がメッセージを受信します
ch.publish('user-events', '', Buffer.from(JSON.stringify({
  type: 'user.registered',
  userId: 'usr-456',
  email: 'new@example.com',
})), { persistent: true });

Topic (パターンルーティング)

パターンマッチングに基づいてメッセージを queue にルーティングします。


// topic.ts — パターンでルーティングします (例: order.created は fulfillment に、order.* は audit に送られます)

const ch = await getChannel();

await ch.assertExchange('events', 'topic', { durable:

(原文がここで切り詰められています)
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

RabbitMQ

Build reliable messaging and task queue systems for microservice architectures and background processing.

Setup

# docker-compose.yml — RabbitMQ with management UI
services:
  rabbitmq:
    image: rabbitmq:3.13-management
    ports:
      - "5672:5672"    # AMQP
      - "15672:15672"  # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: secret
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq

volumes:
  rabbitmq-data:

Management UI at http://localhost:15672 (admin/secret).

Core Concepts

RabbitMQ routes messages through exchanges to queues. Producers send to exchanges, consumers read from queues. The exchange type determines routing logic:

  • Direct — routes by exact routing key match
  • Fanout — broadcasts to all bound queues (pub/sub)
  • Topic — routes by pattern matching on routing key (order.*, #.error)
  • Headers — routes by message header values

Producer and Consumer

Node.js (amqplib)

// connection.ts — Shared connection with auto-reconnect

import amqp, { Channel, Connection } from 'amqplib';

let connection: Connection;
let channel: Channel;

async function getChannel(): Promise<Channel> {
  if (channel) return channel;

  connection = await amqp.connect('amqp://admin:secret@localhost:5672');
  channel = await connection.createChannel();

  // Prefetch 10 — each consumer processes up to 10 unacked messages
  // Prevents one slow consumer from starving others
  await channel.prefetch(10);

  connection.on('close', () => {
    console.error('RabbitMQ connection closed, reconnecting...');
    channel = null!;
    setTimeout(getChannel, 5000);
  });

  return channel;
}

export { getChannel };
// task-producer.ts — Send tasks to a work queue

import { getChannel } from './connection';

/** Send a background task to the processing queue.
 *
 * @param queue - Target queue name.
 * @param task - Task payload.
 * @param priority - 0 (normal) to 9 (highest). Requires priority queue.
 */
async function sendTask(queue: string, task: object, priority = 0) {
  const ch = await getChannel();

  // Assert queue exists with desired properties (idempotent)
  await ch.assertQueue(queue, {
    durable: true,           // Survives broker restart
    arguments: {
      'x-max-priority': 10,  // Enable priority queue
      'x-dead-letter-exchange': 'dlx',  // Failed messages go here
      'x-message-ttl': 86400000,        // 24h TTL
    },
  });

  ch.sendToQueue(queue, Buffer.from(JSON.stringify(task)), {
    persistent: true,  // Write to disk (survives restart)
    priority,
    contentType: 'application/json',
    timestamp: Date.now(),
    messageId: crypto.randomUUID(),
  });
}

// Example: send an email task
await sendTask('email-queue', {
  to: 'user@example.com',
  template: 'welcome',
  data: { name: 'Alex', activationLink: 'https://app.example.com/activate/abc123' },
});

// High-priority: password reset (skip ahead of marketing emails)
await sendTask('email-queue', {
  to: 'user@example.com',
  template: 'password-reset',
  data: { resetLink: 'https://app.example.com/reset/xyz789' },
}, 9);
// task-consumer.ts — Process tasks with acknowledgment

import { getChannel } from './connection';

/** Start consuming tasks from a queue.
 *
 * @param queue - Queue to consume from.
 * @param handler - Async function that processes each task.
 */
async function startWorker(queue: string, handler: (task: any) => Promise<void>) {
  const ch = await getChannel();

  await ch.assertQueue(queue, { durable: true });

  ch.consume(queue, async (msg) => {
    if (!msg) return;

    const task = JSON.parse(msg.content.toString());
    const msgId = msg.properties.messageId;

    try {
      await handler(task);
      ch.ack(msg);  // Success — remove from queue
    } catch (err) {
      console.error(`Task ${msgId} failed:`, err);

      // Check if this has been retried too many times
      const retryCount = (msg.properties.headers?.['x-retry-count'] || 0);
      if (retryCount >= 3) {
        ch.nack(msg, false, false);  // Send to dead-letter queue after 3 retries
      } else {
        // Requeue with incremented retry count
        ch.nack(msg, false, false);
        ch.sendToQueue(queue, msg.content, {
          ...msg.properties,
          headers: { ...msg.properties.headers, 'x-retry-count': retryCount + 1 },
        });
      }
    }
  });

  console.log(`Worker started on queue: ${queue}`);
}

// Start processing emails
startWorker('email-queue', async (task) => {
  await sendEmail(task.to, task.template, task.data);
});

For Python, use pika with BlockingConnection, basic_qos(prefetch_count=N), and basic_consume with basic_ack/basic_nack for message handling.

Exchange Patterns

Fanout (broadcast)

Every bound queue gets every message — useful for notifications, cache invalidation, audit logging:

// fanout.ts — Broadcast events to multiple consumers

const ch = await getChannel();

await ch.assertExchange('user-events', 'fanout', { durable: true });

// Each service creates its own queue bound to the exchange
// Email service queue
await ch.assertQueue('user-events-email', { durable: true });
await ch.bindQueue('user-events-email', 'user-events', '');

// Analytics service queue
await ch.assertQueue('user-events-analytics', { durable: true });
await ch.bindQueue('user-events-analytics', 'user-events', '');

// Publish — all bound queues receive the message
ch.publish('user-events', '', Buffer.from(JSON.stringify({
  type: 'user.registered',
  userId: 'usr-456',
  email: 'new@example.com',
})), { persistent: true });

Topic (pattern routing)

Route messages to queues based on pattern matching:

// topic.ts — Route by pattern (e.g., order.created goes to fulfillment, order.* goes to audit)

const ch = await getChannel();

await ch.assertExchange('events', 'topic', { durable: true });

// Fulfillment only cares about new orders
await ch.assertQueue('fulfillment', { durable: true });
await ch.bindQueue('fulfillment', 'events', 'order.created');

// Audit log captures everything
await ch.assertQueue('audit-log', { durable: true });
await ch.bindQueue('audit-log', 'events', '#');  // # matches all routing keys

// Payment service handles payment events
await ch.assertQueue('payment-processing', { durable: true });
await ch.bindQueue('payment-processing', 'events', 'payment.*');

// Publish with routing key
ch.publish('events', 'order.created', Buffer.from(JSON.stringify({ orderId: 'ord-123' })));
ch.publish('events', 'payment.received', Buffer.from(JSON.stringify({ amount: 89.97 })));

Dead-Letter Queue

Failed messages need a place to go for inspection and replay:

// dlq.ts — Dead-letter exchange and queue setup

const ch = await getChannel();

// Dead-letter exchange and queue
await ch.assertExchange('dlx', 'direct', { durable: true });
await ch.assertQueue('dead-letters', { durable: true });
await ch.bindQueue('dead-letters', 'dlx', '');

// Main queue — failed messages automatically route to DLX
await ch.assertQueue('orders', {
  durable: true,
  arguments: {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': '',
  },
});

// Monitor dead letters
ch.consume('dead-letters', (msg) => {
  if (!msg) return;
  const reason = msg.properties.headers?.['x-death']?.[0]?.reason;
  console.error(`Dead letter (${reason}):`, msg.content.toString());
  // Alert, log to monitoring, or attempt manual replay
  ch.ack(msg);
});

Delayed Messages

Use the rabbitmq_delayed_message_exchange plugin. Assert an exchange with type x-delayed-message, then publish messages with headers: { 'x-delay': delayMs } to schedule future delivery.

RPC (Request-Reply)

For request-reply, create an exclusive auto-delete reply queue, send with correlationId and replyTo, and consume the reply queue filtering by correlationId.

Guidelines

  • Always acknowledge messages — unacked messages block the queue and prevent other consumers from getting them
  • Set prefetch count — without it, RabbitMQ sends all messages to one consumer, starving others
  • Use durable queues and persistent messages for anything that matters — in-memory queues vanish on restart
  • Dead-letter queues are not optional — without them, failed messages disappear silently
  • One queue per consumer type — don't have email and SMS services reading from the same queue
  • Idempotent consumers — messages can be delivered more than once (network issues, rebalances). Design handlers to be safe to re-run.
  • Monitor queue depth — growing queue length means consumers can't keep up. Alert on it.
  • Connection pooling — create one connection with multiple channels, not one connection per operation