jpskill.com
💼 ビジネス コミュニティ

py-async-patterns

FastAPIやSQLAlchemyで非同期処理を行う際に、データベースセッション管理や並行処理、デバッグなどを効率的に行うためのasync/awaitパターンを活用するSkill。

📜 元の英語説明(参考)

Async/await patterns for FastAPI and SQLAlchemy. Use when working with async code, database sessions, concurrent operations, or debugging async issues in Python.

🇯🇵 日本人クリエイター向け解説

一言でいうと

FastAPIやSQLAlchemyで非同期処理を行う際に、データベースセッション管理や並行処理、デバッグなどを効率的に行うためのasync/awaitパターンを活用するSkill。

※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。

⚡ おすすめ: コマンド1行でインストール(60秒)

下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。

🍎 Mac / 🐧 Linux
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o py-async-patterns.zip https://jpskill.com/download/17843.zip && unzip -o py-async-patterns.zip && rm py-async-patterns.zip
🪟 Windows (PowerShell)
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/17843.zip -OutFile "$d\py-async-patterns.zip"; Expand-Archive "$d\py-async-patterns.zip" -DestinationPath $d -Force; ri "$d\py-async-patterns.zip"

完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。

💾 手動でダウンロードしたい(コマンドが難しい人向け)
  1. 1. 下の青いボタンを押して py-async-patterns.zip をダウンロード
  2. 2. ZIPファイルをダブルクリックで解凍 → py-async-patterns フォルダができる
  3. 3. そのフォルダを C:\Users\あなたの名前\.claude\skills\(Win)または ~/.claude/skills/(Mac)へ移動
  4. 4. Claude Code を再起動

⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。

🎯 このSkillでできること

下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。

📦 インストール方法 (3ステップ)

  1. 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
  2. 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
  3. 3. 展開してできたフォルダを、ホームフォルダの .claude/skills/ に置く
    • · macOS / Linux: ~/.claude/skills/
    • · Windows: %USERPROFILE%\.claude\skills\

Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。

詳しい使い方ガイドを見る →
最終更新
2026-05-18
取得日時
2026-05-18
同梱ファイル
1

📖 Skill本文(日本語訳)

※ 原文(英語/中国語)を Gemini で日本語化したものです。Claude 自身は原文を読みます。誤訳がある場合は原文をご確認ください。

Python Async パターン

問題提起

Async Python は強力ですが、エラーが発生しやすいです。競合状態、セッションリーク、および接続プールに関する問題は、async コードベースにおける一般的な落とし穴です。


パターン: AsyncSession のライフサイクル

問題: セッションはリクエストのスコープに限定される必要があります。セッションをリークさせると、データが古くなったり、接続が枯渇したりします。

# ✅ 正しい: セッションは依存関係によってリクエストのスコープに限定される
async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        yield session
        # セッションはリクエスト後に自動的に閉じられる

# エンドポイントでの使用例
@router.get("/users/{user_id}")
async def get_user(
    user_id: UUID,
    session: AsyncSession = Depends(get_session),
) -> UserRead:
    result = await session.execute(select(User).where(User.id == user_id))
    return result.scalar_one()

# ❌ 間違い: グローバルセッション (古いデータ、接続リーク)
_global_session = None  # 絶対にしないでください

async def get_user(user_id: UUID):
    result = await _global_session.execute(...)  # 古い、共有状態

なぜ重要なのか: 各リクエストは、分離されたデータベース状態を必要とします。共有セッションは古いデータを参照し、安全にコミットできません。


パターン: 並行クエリ vs 逐次クエリ

問題: 独立したクエリを逐次的に実行すると、時間が無駄になります。ただし、依存するクエリは逐次的である必要があります。

# ✅ 正しい: 並行して実行できる独立したクエリ
async def get_dashboard_data(user_id: UUID, session: AsyncSession):
    # これらは互いに依存していないため、並行して実行する
    user_result, stats_result, recent_result = await asyncio.gather(
        session.execute(select(User).where(User.id == user_id)),
        session.execute(select(UserStats).where(UserStats.user_id == user_id)),
        session.execute(
            select(Activity)
            .where(Activity.user_id == user_id)
            .order_by(Activity.created_at.desc())
            .limit(10)
        ),
    )

    return {
        "user": user_result.scalar_one(),
        "stats": stats_result.scalar_one_or_none(),
        "recent": recent_result.scalars().all(),
    }

# ❌ 間違い: 並行処理が安全な場合に逐次処理を行う
async def get_dashboard_data_slow(user_id: UUID, session: AsyncSession):
    user = await session.execute(...)      # 待機...
    stats = await session.execute(...)     # さらに待機...
    recent = await session.execute(...)    # さらに待機
    # 合計時間 = すべてのクエリの合計

# ✅ 正しい: クエリが互いに依存している場合は逐次処理を行う
async def get_user_with_team(user_id: UUID, session: AsyncSession):
    # team_id を知るには、最初にユーザーを取得する必要がある
    user_result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = user_result.scalar_one()

    # これでチームをクエリできる
    team_result = await session.execute(
        select(Team).where(Team.id == user.team_id)
    )
    return user, team_result.scalar_one()

判断の枠組み:

クエリはデータを共有しますか? 使用方法
いいえ (独立) asyncio.gather()
はい (依存) 逐次 await

パターン: トランザクションの境界

問題: いつコミット、ロールバック、およびリフレッシュするかを知ること。

# ✅ 正しい: 複数ステップの操作のための明示的なトランザクション
async def transfer_player(
    player_id: UUID,
    from_team_id: UUID,
    to_team_id: UUID,
    session: AsyncSession,
):
    try:
        # すべての操作を1つのトランザクションで実行する
        player = await session.get(Player, player_id)
        player.team_id = to_team_id

        from_team = await session.get(Team, from_team_id)
        from_team.player_count -= 1

        to_team = await session.get(Team, to_team_id)
        to_team.player_count += 1

        await session.commit()
    except Exception:
        await session.rollback()
        raise

# ✅ 正しい: コンテキストマネージャーの使用
async with session.begin():
    # ここでのすべての操作はトランザクション内にある
    # 成功時に自動コミット、例外時に自動ロールバック
    player.team_id = to_team_id
    from_team.player_count -= 1
    to_team.player_count += 1

# ✅ 正しい: DB生成の値を取得するためにコミット後にリフレッシュする
await session.commit()
await session.refresh(new_entity)  # id、created_at などを取得する
return new_entity

いつ何を使用するか:

シナリオ パターン
単一の作成/更新 session.add() + リクエスト終了時に commit()
複数ステップの操作 明示的な begin() / commit() / rollback()
DB生成の値が必要 コミット後に refresh()
読み取り専用クエリ コミットは不要

パターン: 接続プールの管理

問題: 接続プールを使い果たすと、リクエストがハングアップします。

# このコードベースは async に NullPool を使用しています - 理由を理解してください
engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # 接続プーリングなし
)

# NullPool: 各リクエストは新しい接続を取得し、その後閉じる
# 理由: asyncpg + 接続の再利用に関する問題を回避するため
# トレードオフ: 接続のオーバーヘッドがわずかに増加

# ✅ 正しい: 常にセッションを閉じる (Depends によって処理される)
async with async_session() as session:
    # セッションを操作する
    pass  # セッションはここで閉じられる

# ❌ 間違い: 閉じ忘れる
session = async_session()
result = await session.execute(query)
# セッションが閉じられない - 接続リーク!

パターン: バックグラウンドタスク

問題: 時間のかかる作業は、レスポンスをブロックすべきではありません。


from fastapi import BackgroundTasks

# ✅ 正しい: リクエストスコープの作業のための FastAPI BackgroundTasks
@router.post("/assessments/{id}/submit")
async def submit_assessment(
    id: UUID,
    session: AsyncSession = Depends(get_session),
    background_tasks: BackgroundTasks,
) -> AssessmentResult:
    # 短時間で終わる作業 - レスポンスを返す
    result = await process_submission(id, session)

    # 時間のかかる作業 - レスポンス後に実行する
    background_tasks.add_task(send_completion_email, result.user_email)
    background_tasks.add_task(update_analytics, result)

    return result

# ✅ 正しい: asyncio.

(原文がここで切り詰められています)
📜 原文 SKILL.md(Claudeが読む英語/中国語)を展開

Python Async Patterns

Problem Statement

Async Python is powerful but error-prone. Race conditions, session leaks, and connection pool issues are common pitfalls in async codebases.


Pattern: AsyncSession Lifecycle

Problem: Session must be scoped to request. Leaking sessions causes stale data and connection exhaustion.

# ✅ CORRECT: Session scoped to request via dependency
async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        yield session
        # Session automatically closed after request

# Usage in endpoint
@router.get("/users/{user_id}")
async def get_user(
    user_id: UUID,
    session: AsyncSession = Depends(get_session),
) -> UserRead:
    result = await session.execute(select(User).where(User.id == user_id))
    return result.scalar_one()

# ❌ WRONG: Global session (stale data, connection leaks)
_global_session = None  # NEVER do this

async def get_user(user_id: UUID):
    result = await _global_session.execute(...)  # Stale, shared state

Why it matters: Each request needs isolated database state. Shared sessions see stale data and can't be safely committed.


Pattern: Concurrent vs Sequential Queries

Problem: Running independent queries sequentially wastes time. But dependent queries must be sequential.

# ✅ CORRECT: Concurrent independent queries
async def get_dashboard_data(user_id: UUID, session: AsyncSession):
    # These don't depend on each other - run in parallel
    user_result, stats_result, recent_result = await asyncio.gather(
        session.execute(select(User).where(User.id == user_id)),
        session.execute(select(UserStats).where(UserStats.user_id == user_id)),
        session.execute(
            select(Activity)
            .where(Activity.user_id == user_id)
            .order_by(Activity.created_at.desc())
            .limit(10)
        ),
    )

    return {
        "user": user_result.scalar_one(),
        "stats": stats_result.scalar_one_or_none(),
        "recent": recent_result.scalars().all(),
    }

# ❌ WRONG: Sequential when parallel is safe
async def get_dashboard_data_slow(user_id: UUID, session: AsyncSession):
    user = await session.execute(...)      # Wait...
    stats = await session.execute(...)     # Wait more...
    recent = await session.execute(...)    # Even more waiting
    # Total time = sum of all queries

# ✅ CORRECT: Sequential when queries depend on each other
async def get_user_with_team(user_id: UUID, session: AsyncSession):
    # Must get user first to know team_id
    user_result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = user_result.scalar_one()

    # Now we can query team
    team_result = await session.execute(
        select(Team).where(Team.id == user.team_id)
    )
    return user, team_result.scalar_one()

Decision framework:

Queries share data? Use
No (independent) asyncio.gather()
Yes (dependent) Sequential await

Pattern: Transaction Boundaries

Problem: Knowing when to commit, rollback, and refresh.

# ✅ CORRECT: Explicit transaction for multi-step operations
async def transfer_player(
    player_id: UUID,
    from_team_id: UUID,
    to_team_id: UUID,
    session: AsyncSession,
):
    try:
        # All operations in one transaction
        player = await session.get(Player, player_id)
        player.team_id = to_team_id

        from_team = await session.get(Team, from_team_id)
        from_team.player_count -= 1

        to_team = await session.get(Team, to_team_id)
        to_team.player_count += 1

        await session.commit()
    except Exception:
        await session.rollback()
        raise

# ✅ CORRECT: Using context manager
async with session.begin():
    # All operations here are in a transaction
    # Auto-commits on success, auto-rollbacks on exception
    player.team_id = to_team_id
    from_team.player_count -= 1
    to_team.player_count += 1

# ✅ CORRECT: Refresh after commit to get DB-generated values
await session.commit()
await session.refresh(new_entity)  # Get id, created_at, etc.
return new_entity

When to use what:

Scenario Pattern
Single create/update session.add() + commit() at request end
Multi-step operation Explicit begin() / commit() / rollback()
Need DB-generated values refresh() after commit
Read-only query No commit needed

Pattern: Connection Pool Management

Problem: Exhausting connection pool causes requests to hang.

# This codebase uses NullPool for async - understand why
engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # No connection pooling
)

# NullPool: Each request gets new connection, closes after
# Why: Avoids issues with asyncpg + connection reuse
# Tradeoff: Slightly more connection overhead

# ✅ CORRECT: Always close sessions (handled by Depends)
async with async_session() as session:
    # Work with session
    pass  # Session closed here

# ❌ WRONG: Forgetting to close
session = async_session()
result = await session.execute(query)
# Session never closed - connection leak!

Pattern: Background Tasks

Problem: Long-running work shouldn't block the response.

from fastapi import BackgroundTasks

# ✅ CORRECT: FastAPI BackgroundTasks for request-scoped work
@router.post("/assessments/{id}/submit")
async def submit_assessment(
    id: UUID,
    session: AsyncSession = Depends(get_session),
    background_tasks: BackgroundTasks,
) -> AssessmentResult:
    # Quick work - return response
    result = await process_submission(id, session)

    # Slow work - do after response
    background_tasks.add_task(send_completion_email, result.user_email)
    background_tasks.add_task(update_analytics, result)

    return result

# ✅ CORRECT: asyncio.create_task for fire-and-forget
async def process_with_side_effect():
    result = await main_operation()

    # Fire and forget - don't await
    asyncio.create_task(log_to_external_service(result))

    return result

# ❌ WRONG: Awaiting non-critical slow operations
async def slow_endpoint():
    result = await main_operation()
    await send_email(result)           # User waits for email...
    await update_analytics(result)     # User still waiting...
    return result

When to use what:

Scenario Pattern
Post-response cleanup BackgroundTasks
Fire-and-forget logging asyncio.create_task()
Must complete before response Direct await

Pattern: Avoiding Deadlocks

Problem: Concurrent operations acquiring locks in different order.

# ❌ WRONG: Potential deadlock
async def transfer_both_ways():
    # Task 1: Lock A, then B
    # Task 2: Lock B, then A
    # = Deadlock if interleaved
    pass

# ✅ CORRECT: Consistent lock ordering
async def transfer_credits(
    from_id: UUID,
    to_id: UUID,
    amount: int,
    session: AsyncSession,
):
    # Always lock in consistent order (e.g., by UUID)
    first_id, second_id = sorted([from_id, to_id])

    # Lock in consistent order
    first = await session.get(Account, first_id, with_for_update=True)
    second = await session.get(Account, second_id, with_for_update=True)

    # Now safe to modify
    if from_id == first_id:
        first.balance -= amount
        second.balance += amount
    else:
        second.balance -= amount
        first.balance += amount

    await session.commit()

Pattern: Post-Condition Validation

Same principle as frontend - verify async operations succeeded:

# ✅ CORRECT: Validate after async operations
async def create_assessment(data: AssessmentCreate, session: AsyncSession):
    assessment = Assessment(**data.model_dump())
    session.add(assessment)
    await session.commit()
    await session.refresh(assessment)

    # Validate post-condition
    if assessment.id is None:
        raise RuntimeError("Assessment creation failed - no ID assigned")

    return assessment

# ✅ CORRECT: Validate data was actually loaded
async def get_user_or_fail(user_id: UUID, session: AsyncSession) -> User:
    result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()

    if user is None:
        raise HTTPException(404, f"User {user_id} not found")

    return user

Pattern: Logging Async Operations

import structlog

logger = structlog.get_logger()

async def complex_operation(user_id: UUID, session: AsyncSession):
    logger.info("complex_operation.start", user_id=str(user_id))

    try:
        result = await step_one(session)
        logger.debug("complex_operation.step_one_complete", result_count=len(result))

        await step_two(result, session)
        logger.debug("complex_operation.step_two_complete")

        await session.commit()
        logger.info("complex_operation.success", user_id=str(user_id))

    except Exception as e:
        logger.error("complex_operation.failed", 
            user_id=str(user_id), 
            error=str(e),
            step="unknown"
        )
        raise

Common Issues

Issue Likely Cause Solution
"Session is closed" Using session after request ends Keep session in request scope
Connection timeout Pool exhausted Check for session leaks
Stale data Shared session or missing refresh Scope session to request, refresh after commit
Deadlock Inconsistent lock ordering Always acquire locks in same order
Slow endpoint Sequential queries that could be parallel Use asyncio.gather()

Detection Commands

# Find potential session leaks (global sessions)
grep -rn "async_session()" --include="*.py" | grep -v "async with\|Depends"

# Find sequential queries that might be parallelizable
grep -rn "await session.execute" --include="*.py" -A2 | grep -B1 "await session.execute"

# Find missing awaits
ruff check --select=RUF006  # asyncio dangling task