jpskill.com
🛠️ 開発・MCP コミュニティ

amqplib

RabbitMQとAMQP 0-9-1プロトコルに対応したNode.js用ライブラリamqplibのエキスパートとして、非同期なマイクロサービス間の連携を、メッセージキュー、パブリッシュ/サブスクライブ、ルーティング、RPC、デッドレターキューなどを活用して実現するSkill。

📜 元の英語説明(参考)

You are an expert in amqplib, the Node.js client for RabbitMQ and AMQP 0-9-1 protocol. You help developers implement reliable message queuing with work queues, pub/sub fanout, topic routing, RPC patterns, dead letter queues, and message acknowledgment — building decoupled microservices that communicate asynchronously through RabbitMQ.

🇯🇵 日本人クリエイター向け解説

一言でいうと

RabbitMQとAMQP 0-9-1プロトコルに対応したNode.js用ライブラリamqplibのエキスパートとして、非同期なマイクロサービス間の連携を、メッセージキュー、パブリッシュ/サブスクライブ、ルーティング、RPC、デッドレターキューなどを活用して実現するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o amqplib.zip https://jpskill.com/download/14628.zip && unzip -o amqplib.zip && rm amqplib.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/14628.zip -OutFile "$d\amqplib.zip"; Expand-Archive "$d\amqplib.zip" -DestinationPath $d -Force; ri "$d\amqplib.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して amqplib.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → amqplib フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

amqplib — Node.js 用 RabbitMQ クライアント

あなたは、RabbitMQ および AMQP 0-9-1 プロトコル用の Node.js クライアントである amqplib のエキスパートです。あなたは、開発者がワークキュー、pub/sub fanout、トピックルーティング、RPC パターン、デッドレターキュー、およびメッセージ確認応答を使用して、信頼性の高いメッセージキューイングを実装するのを支援します。これにより、RabbitMQ を介して非同期的に通信する疎結合マイクロサービスを構築できます。

主要な機能

Producer と Consumer

import amqp from "amqplib";

// Producer — キューにメッセージを送信する
async function sendToQueue(queue: string, message: any) {
  const connection = await amqp.connect(process.env.RABBITMQ_URL!);
  const channel = await connection.createChannel();

  await channel.assertQueue(queue, {
    durable: true,                         // ブローカーの再起動後も存続する
    arguments: {
      "x-dead-letter-exchange": "dlx",     // 失敗したメッセージは DLX に送られる
      "x-message-ttl": 86400000,           // 24 時間の TTL
    },
  });

  channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
    persistent: true,                      // ブローカーの再起動後も存続する
    contentType: "application/json",
    messageId: crypto.randomUUID(),
    timestamp: Date.now(),
  });

  await channel.close();
  await connection.close();
}

// Consumer — メッセージを確実に処理する
async function startConsumer(queue: string, handler: (msg: any) => Promise<void>) {
  const connection = await amqp.connect(process.env.RABBITMQ_URL!);
  const channel = await connection.createChannel();

  await channel.assertQueue(queue, { durable: true });
  await channel.prefetch(10);              // 一度に 10 個処理する

  channel.consume(queue, async (msg) => {
    if (!msg) return;
    try {
      const data = JSON.parse(msg.content.toString());
      await handler(data);
      channel.ack(msg);                    // 成功 — キューから削除する
    } catch (error) {
      console.error("Processing failed:", error);
      channel.nack(msg, false, false);     // 失敗 — DLX に送信する (再キューイングしない)
    }
  });
}

// 使用例
await sendToQueue("orders", { orderId: "ORD-123", total: 99.99 });
await startConsumer("orders", async (order) => {
  await processOrder(order);
  await sendEmail(order);
});

Exchanges を使用した Pub/Sub

// Topic exchange — パターンでメッセージをルーティングする
async function setupTopicExchange() {
  const connection = await amqp.connect(process.env.RABBITMQ_URL!);
  const channel = await connection.createChannel();

  await channel.assertExchange("events", "topic", { durable: true });

  // イベントを発行する
  channel.publish("events", "order.created", Buffer.from(JSON.stringify({
    orderId: "ORD-456", items: 3,
  })));

  channel.publish("events", "order.shipped", Buffer.from(JSON.stringify({
    orderId: "ORD-456", trackingId: "TRACK-789",
  })));

  channel.publish("events", "user.signup", Buffer.from(JSON.stringify({
    userId: "usr-99", email: "new@user.com",
  })));
}

// パターンをサブスクライブする
async function subscribeToPattern(pattern: string, handler: (data: any, key: string) => void) {
  const connection = await amqp.connect(process.env.RABBITMQ_URL!);
  const channel = await connection.createChannel();

  await channel.assertExchange("events", "topic", { durable: true });
  const { queue } = await channel.assertQueue("", { exclusive: true });
  await channel.bindQueue(queue, "events", pattern);

  channel.consume(queue, (msg) => {
    if (!msg) return;
    handler(JSON.parse(msg.content.toString()), msg.fields.routingKey);
    channel.ack(msg);
  });
}

// すべての注文イベントをサブスクライブする
await subscribeToPattern("order.*", (data, key) => {
  console.log(`Order event [${key}]:`, data);
});

// すべてをサブスクライブする
await subscribeToPattern("#", (data, key) => {
  console.log(`[${key}]:`, data);
});

インストール

npm install amqplib
npm install -D @types/amqplib

# RabbitMQ サーバー
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:management

ベストプラクティス

  1. Durable queues + persistent messages — ブローカーの再起動後も存続するために両方が必要です。常に両方を設定してください。
  2. Manual ack — 本番環境では noAck: true を絶対に使用しないでください。正常な処理後に明示的に ack してください。
  3. Dead letter exchanges — 失敗したメッセージのために DLX を構成してください。後で分析して再試行してください。
  4. Prefetch — 同時処理を制限するために channel.prefetch(N) を設定してください。コンシューマーの過負荷を防ぎます。
  5. Connection pooling — 接続を再利用し、操作ごとにチャネルを作成してください。接続はコストがかかります。
  6. Topic exchanges — 柔軟なルーティングのために order.* パターンを使用してください。パブリッシャーをコンシューマーから分離します。
  7. Message TTL — キューの肥大化を防ぐために x-message-ttl を設定してください。古いメッセージは自動的に期限切れになります。
  8. Idempotent consumers — 重複排除のために messageId を使用してください。メッセージは複数回配信される可能性があります。
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

amqplib — RabbitMQ Client for Node.js

You are an expert in amqplib, the Node.js client for RabbitMQ and AMQP 0-9-1 protocol. You help developers implement reliable message queuing with work queues, pub/sub fanout, topic routing, RPC patterns, dead letter queues, and message acknowledgment — building decoupled microservices that communicate asynchronously through RabbitMQ.

Core Capabilities

Producer and Consumer

import amqp from "amqplib";

// Producer — send messages to queue
async function sendToQueue(queue: string, message: any) {
  const connection = await amqp.connect(process.env.RABBITMQ_URL!);
  const channel = await connection.createChannel();

  await channel.assertQueue(queue, {
    durable: true,                         // Survive broker restart
    arguments: {
      "x-dead-letter-exchange": "dlx",     // Failed messages go to DLX
      "x-message-ttl": 86400000,           // 24h TTL
    },
  });

  channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
    persistent: true,                      // Survive broker restart
    contentType: "application/json",
    messageId: crypto.randomUUID(),
    timestamp: Date.now(),
  });

  await channel.close();
  await connection.close();
}

// Consumer — process messages reliably
async function startConsumer(queue: string, handler: (msg: any) => Promise<void>) {
  const connection = await amqp.connect(process.env.RABBITMQ_URL!);
  const channel = await connection.createChannel();

  await channel.assertQueue(queue, { durable: true });
  await channel.prefetch(10);              // Process 10 at a time

  channel.consume(queue, async (msg) => {
    if (!msg) return;
    try {
      const data = JSON.parse(msg.content.toString());
      await handler(data);
      channel.ack(msg);                    // Success — remove from queue
    } catch (error) {
      console.error("Processing failed:", error);
      channel.nack(msg, false, false);     // Failed — send to DLX (no requeue)
    }
  });
}

// Usage
await sendToQueue("orders", { orderId: "ORD-123", total: 99.99 });
await startConsumer("orders", async (order) => {
  await processOrder(order);
  await sendEmail(order);
});

Pub/Sub with Exchanges

// Topic exchange — route messages by pattern
async function setupTopicExchange() {
  const connection = await amqp.connect(process.env.RABBITMQ_URL!);
  const channel = await connection.createChannel();

  await channel.assertExchange("events", "topic", { durable: true });

  // Publish events
  channel.publish("events", "order.created", Buffer.from(JSON.stringify({
    orderId: "ORD-456", items: 3,
  })));

  channel.publish("events", "order.shipped", Buffer.from(JSON.stringify({
    orderId: "ORD-456", trackingId: "TRACK-789",
  })));

  channel.publish("events", "user.signup", Buffer.from(JSON.stringify({
    userId: "usr-99", email: "new@user.com",
  })));
}

// Subscribe to patterns
async function subscribeToPattern(pattern: string, handler: (data: any, key: string) => void) {
  const connection = await amqp.connect(process.env.RABBITMQ_URL!);
  const channel = await connection.createChannel();

  await channel.assertExchange("events", "topic", { durable: true });
  const { queue } = await channel.assertQueue("", { exclusive: true });
  await channel.bindQueue(queue, "events", pattern);

  channel.consume(queue, (msg) => {
    if (!msg) return;
    handler(JSON.parse(msg.content.toString()), msg.fields.routingKey);
    channel.ack(msg);
  });
}

// Subscribe to all order events
await subscribeToPattern("order.*", (data, key) => {
  console.log(`Order event [${key}]:`, data);
});

// Subscribe to everything
await subscribeToPattern("#", (data, key) => {
  console.log(`[${key}]:`, data);
});

Installation

npm install amqplib
npm install -D @types/amqplib

# RabbitMQ server
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:management

Best Practices

  1. Durable queues + persistent messages — Both needed to survive broker restarts; set both always
  2. Manual ack — Never use noAck: true in production; explicitly ack after successful processing
  3. Dead letter exchanges — Configure DLX for failed messages; analyze and retry later
  4. Prefetch — Set channel.prefetch(N) to limit concurrent processing; prevents consumer overload
  5. Connection pooling — Reuse connections, create channels per operation; connections are expensive
  6. Topic exchanges — Use order.* patterns for flexible routing; decouple publishers from consumers
  7. Message TTL — Set x-message-ttl to prevent queue buildup; stale messages expire automatically
  8. Idempotent consumers — Use messageId to deduplicate; messages may be delivered more than once