🛠️ Azure Servicebus Py
Pythonを使って、クラウド上で複数のシステムやアプリケーション間で
📺 まず動画で見る(YouTube)
▶ 【衝撃】最強のAIエージェント「Claude Code」の最新機能・使い方・プログラミングをAIで効率化する超実践術を解説! ↗
※ jpskill.com 編集部が参考用に選んだ動画です。動画の内容と Skill の挙動は厳密には一致しないことがあります。
📜 元の英語説明(参考)
Azure Service Bus SDK for Python messaging. Use for queues, topics, subscriptions, and enterprise messaging patterns.
🇯🇵 日本人クリエイター向け解説
Pythonを使って、クラウド上で複数のシステムやアプリケーション間で
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-17
- 取得日時
- 2026-05-17
- 同梱ファイル
- 1
💬 こう話しかけるだけ — サンプルプロンプト
- › Azure Servicebus Py を使って、最小構成のサンプルコードを示して
- › Azure Servicebus Py の主な使い方と注意点を教えて
- › Azure Servicebus Py を既存プロジェクトに組み込む方法を教えて
これをClaude Code に貼るだけで、このSkillが自動発動します。
📖 Claude が読む原文 SKILL.md(中身を展開)
この本文は AI(Claude)が読むための原文(英語または中国語)です。日本語訳は順次追加中。
Azure Service Bus SDK for Python
Enterprise messaging for reliable cloud communication with queues and pub/sub topics.
Installation
pip install azure-servicebus azure-identity
Environment Variables
SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
SERVICEBUS_QUEUE_NAME=myqueue
SERVICEBUS_TOPIC_NAME=mytopic
SERVICEBUS_SUBSCRIPTION_NAME=mysubscription
Authentication
from azure.identity import DefaultAzureCredential
from azure.servicebus import ServiceBusClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
client = ServiceBusClient(
fully_qualified_namespace=namespace,
credential=credential
)
Client Types
| Client | Purpose | Get From |
|---|---|---|
ServiceBusClient |
Connection management | Direct instantiation |
ServiceBusSender |
Send messages | client.get_queue_sender() / get_topic_sender() |
ServiceBusReceiver |
Receive messages | client.get_queue_receiver() / get_subscription_receiver() |
Send Messages (Async)
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
async def send_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
sender = client.get_queue_sender(queue_name="myqueue")
async with sender:
# Single message
message = ServiceBusMessage("Hello, Service Bus!")
await sender.send_messages(message)
# Batch of messages
messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
await sender.send_messages(messages)
# Message batch (for size control)
batch = await sender.create_message_batch()
for i in range(100):
try:
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
except ValueError: # Batch full
await sender.send_messages(batch)
batch = await sender.create_message_batch()
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
await sender.send_messages(batch)
asyncio.run(send_messages())
Receive Messages (Async)
async def receive_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
receiver = client.get_queue_receiver(queue_name="myqueue")
async with receiver:
# Receive batch
messages = await receiver.receive_messages(
max_message_count=10,
max_wait_time=5 # seconds
)
for msg in messages:
print(f"Received: {str(msg)}")
await receiver.complete_message(msg) # Remove from queue
asyncio.run(receive_messages())
Receive Modes
| Mode | Behavior | Use Case |
|---|---|---|
PEEK_LOCK (default) |
Message locked, must complete/abandon | Reliable processing |
RECEIVE_AND_DELETE |
Removed immediately on receive | At-most-once delivery |
from azure.servicebus import ServiceBusReceiveMode
receiver = client.get_queue_receiver(
queue_name="myqueue",
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE
)
Message Settlement
async with receiver:
messages = await receiver.receive_messages(max_message_count=1)
for msg in messages:
try:
# Process message...
await receiver.complete_message(msg) # Success - remove from queue
except ProcessingError:
await receiver.abandon_message(msg) # Retry later
except PermanentError:
await receiver.dead_letter_message(
msg,
reason="ProcessingFailed",
error_description="Could not process"
)
| Action | Effect |
|---|---|
complete_message() |
Remove from queue (success) |
abandon_message() |
Release lock, retry immediately |
dead_letter_message() |
Move to dead-letter queue |
defer_message() |
Set aside, receive by sequence number |
Topics and Subscriptions
# Send to topic
sender = client.get_topic_sender(topic_name="mytopic")
async with sender:
await sender.send_messages(ServiceBusMessage("Topic message"))
# Receive from subscription
receiver = client.get_subscription_receiver(
topic_name="mytopic",
subscription_name="mysubscription"
)
async with receiver:
messages = await receiver.receive_messages(max_message_count=10)
Sessions (FIFO)
# Send with session
message = ServiceBusMessage("Session message")
message.session_id = "order-123"
await sender.send_messages(message)
# Receive from specific session
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id="order-123"
)
# Receive from next available session
from azure.servicebus import NEXT_AVAILABLE_SESSION
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id=NEXT_AVAILABLE_SESSION
)
Scheduled Messages
from datetime import datetime, timedelta, timezone
message = ServiceBusMessage("Scheduled message")
scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10)
# Schedule message
sequence_number = await sender.schedule_messages(message, scheduled_time)
# Cancel scheduled message
await sender.cancel_scheduled_messages(sequence_number)
Dead-Letter Queue
from azure.servicebus import ServiceBusSubQueue
# Receive from dead-letter queue
dlq_receiver = client.get_queue_receiver(
queue_name="myqueue",
sub_queue=ServiceBusSubQueue.DEAD_LETTER
)
async with dlq_receiver:
messages = await dlq_receiver.receive_messages(max_message_count=10)
for msg in messages:
print(f"Dead-lettered: {msg.dead_letter_reason}")
await dlq_receiver.complete_message(msg)
Sync Client (for simple scripts)
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=DefaultAzureCredential()
) as client:
with client.get_queue_sender("myqueue") as sender:
sender.send_messages(ServiceBusMessage("Sync message"))
with client.get_queue_receiver("myqueue") as receiver:
for msg in receiver:
print(str(msg))
receiver.complete_message(msg)
Best Practices
- Use async client for production workloads
- Use context managers (
async with) for proper cleanup - Complete messages after successful processing
- Use dead-letter queue for poison messages
- Use sessions for ordered, FIFO processing
- Use message batches for high-throughput scenarios
- Set
max_wait_timeto avoid infinite blocking
Reference Files
| File | Contents |
|---|---|
| references/patterns.md | Competing consumers, sessions, retry patterns, request-response, transactions |
| references/dead-letter.md | DLQ handling, poison messages, reprocessing strategies |
| scripts/setup_servicebus.py | CLI for queue/topic/subscription management and DLQ monitoring |
When to Use
This skill is applicable to execute the workflow or actions described in the overview.
Limitations
- Use this skill only when the task clearly matches the scope described above.
- Do not treat the output as a substitute for environment-specific validation, testing, or expert review.
- Stop and ask for clarification if required inputs, permissions, safety boundaries, or success criteria are missing.