Claude Code Plugins

Community-maintained marketplace

Feedback

Async/await patterns for FastAPI and SQLAlchemy. Use when working with async code, database sessions, concurrent operations, or debugging async issues in Python.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name py-async-patterns
description Async/await patterns for FastAPI and SQLAlchemy. Use when working with async code, database sessions, concurrent operations, or debugging async issues in Python.

Python Async Patterns

Problem Statement

Async Python is powerful but error-prone. Race conditions, session leaks, and connection pool issues are common pitfalls in async codebases.


Pattern: AsyncSession Lifecycle

Problem: Session must be scoped to request. Leaking sessions causes stale data and connection exhaustion.

# ✅ CORRECT: Session scoped to request via dependency
async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        yield session
        # Session automatically closed after request

# Usage in endpoint
@router.get("/users/{user_id}")
async def get_user(
    user_id: UUID,
    session: AsyncSession = Depends(get_session),
) -> UserRead:
    result = await session.execute(select(User).where(User.id == user_id))
    return result.scalar_one()

# ❌ WRONG: Global session (stale data, connection leaks)
_global_session = None  # NEVER do this

async def get_user(user_id: UUID):
    result = await _global_session.execute(...)  # Stale, shared state

Why it matters: Each request needs isolated database state. Shared sessions see stale data and can't be safely committed.


Pattern: Concurrent vs Sequential Queries

Problem: Running independent queries sequentially wastes time. But dependent queries must be sequential.

# ✅ CORRECT: Concurrent independent queries
async def get_dashboard_data(user_id: UUID, session: AsyncSession):
    # These don't depend on each other - run in parallel
    user_result, stats_result, recent_result = await asyncio.gather(
        session.execute(select(User).where(User.id == user_id)),
        session.execute(select(UserStats).where(UserStats.user_id == user_id)),
        session.execute(
            select(Activity)
            .where(Activity.user_id == user_id)
            .order_by(Activity.created_at.desc())
            .limit(10)
        ),
    )
    
    return {
        "user": user_result.scalar_one(),
        "stats": stats_result.scalar_one_or_none(),
        "recent": recent_result.scalars().all(),
    }

# ❌ WRONG: Sequential when parallel is safe
async def get_dashboard_data_slow(user_id: UUID, session: AsyncSession):
    user = await session.execute(...)      # Wait...
    stats = await session.execute(...)     # Wait more...
    recent = await session.execute(...)    # Even more waiting
    # Total time = sum of all queries

# ✅ CORRECT: Sequential when queries depend on each other
async def get_user_with_team(user_id: UUID, session: AsyncSession):
    # Must get user first to know team_id
    user_result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = user_result.scalar_one()
    
    # Now we can query team
    team_result = await session.execute(
        select(Team).where(Team.id == user.team_id)
    )
    return user, team_result.scalar_one()

Decision framework:

Queries share data? Use
No (independent) asyncio.gather()
Yes (dependent) Sequential await

Pattern: Transaction Boundaries

Problem: Knowing when to commit, rollback, and refresh.

# ✅ CORRECT: Explicit transaction for multi-step operations
async def transfer_player(
    player_id: UUID,
    from_team_id: UUID,
    to_team_id: UUID,
    session: AsyncSession,
):
    try:
        # All operations in one transaction
        player = await session.get(Player, player_id)
        player.team_id = to_team_id
        
        from_team = await session.get(Team, from_team_id)
        from_team.player_count -= 1
        
        to_team = await session.get(Team, to_team_id)
        to_team.player_count += 1
        
        await session.commit()
    except Exception:
        await session.rollback()
        raise

# ✅ CORRECT: Using context manager
async with session.begin():
    # All operations here are in a transaction
    # Auto-commits on success, auto-rollbacks on exception
    player.team_id = to_team_id
    from_team.player_count -= 1
    to_team.player_count += 1

# ✅ CORRECT: Refresh after commit to get DB-generated values
await session.commit()
await session.refresh(new_entity)  # Get id, created_at, etc.
return new_entity

When to use what:

Scenario Pattern
Single create/update session.add() + commit() at request end
Multi-step operation Explicit begin() / commit() / rollback()
Need DB-generated values refresh() after commit
Read-only query No commit needed

Pattern: Connection Pool Management

Problem: Exhausting connection pool causes requests to hang.

# This codebase uses NullPool for async - understand why
engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # No connection pooling
)

# NullPool: Each request gets new connection, closes after
# Why: Avoids issues with asyncpg + connection reuse
# Tradeoff: Slightly more connection overhead

# ✅ CORRECT: Always close sessions (handled by Depends)
async with async_session() as session:
    # Work with session
    pass  # Session closed here

# ❌ WRONG: Forgetting to close
session = async_session()
result = await session.execute(query)
# Session never closed - connection leak!

Pattern: Background Tasks

Problem: Long-running work shouldn't block the response.

from fastapi import BackgroundTasks

# ✅ CORRECT: FastAPI BackgroundTasks for request-scoped work
@router.post("/assessments/{id}/submit")
async def submit_assessment(
    id: UUID,
    session: AsyncSession = Depends(get_session),
    background_tasks: BackgroundTasks,
) -> AssessmentResult:
    # Quick work - return response
    result = await process_submission(id, session)
    
    # Slow work - do after response
    background_tasks.add_task(send_completion_email, result.user_email)
    background_tasks.add_task(update_analytics, result)
    
    return result

# ✅ CORRECT: asyncio.create_task for fire-and-forget
async def process_with_side_effect():
    result = await main_operation()
    
    # Fire and forget - don't await
    asyncio.create_task(log_to_external_service(result))
    
    return result

# ❌ WRONG: Awaiting non-critical slow operations
async def slow_endpoint():
    result = await main_operation()
    await send_email(result)           # User waits for email...
    await update_analytics(result)     # User still waiting...
    return result

When to use what:

Scenario Pattern
Post-response cleanup BackgroundTasks
Fire-and-forget logging asyncio.create_task()
Must complete before response Direct await

Pattern: Avoiding Deadlocks

Problem: Concurrent operations acquiring locks in different order.

# ❌ WRONG: Potential deadlock
async def transfer_both_ways():
    # Task 1: Lock A, then B
    # Task 2: Lock B, then A
    # = Deadlock if interleaved
    pass

# ✅ CORRECT: Consistent lock ordering
async def transfer_credits(
    from_id: UUID,
    to_id: UUID,
    amount: int,
    session: AsyncSession,
):
    # Always lock in consistent order (e.g., by UUID)
    first_id, second_id = sorted([from_id, to_id])
    
    # Lock in consistent order
    first = await session.get(Account, first_id, with_for_update=True)
    second = await session.get(Account, second_id, with_for_update=True)
    
    # Now safe to modify
    if from_id == first_id:
        first.balance -= amount
        second.balance += amount
    else:
        second.balance -= amount
        first.balance += amount
    
    await session.commit()

Pattern: Post-Condition Validation

Same principle as frontend - verify async operations succeeded:

# ✅ CORRECT: Validate after async operations
async def create_assessment(data: AssessmentCreate, session: AsyncSession):
    assessment = Assessment(**data.model_dump())
    session.add(assessment)
    await session.commit()
    await session.refresh(assessment)
    
    # Validate post-condition
    if assessment.id is None:
        raise RuntimeError("Assessment creation failed - no ID assigned")
    
    return assessment

# ✅ CORRECT: Validate data was actually loaded
async def get_user_or_fail(user_id: UUID, session: AsyncSession) -> User:
    result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()
    
    if user is None:
        raise HTTPException(404, f"User {user_id} not found")
    
    return user

Pattern: Logging Async Operations

import structlog

logger = structlog.get_logger()

async def complex_operation(user_id: UUID, session: AsyncSession):
    logger.info("complex_operation.start", user_id=str(user_id))
    
    try:
        result = await step_one(session)
        logger.debug("complex_operation.step_one_complete", result_count=len(result))
        
        await step_two(result, session)
        logger.debug("complex_operation.step_two_complete")
        
        await session.commit()
        logger.info("complex_operation.success", user_id=str(user_id))
        
    except Exception as e:
        logger.error("complex_operation.failed", 
            user_id=str(user_id), 
            error=str(e),
            step="unknown"
        )
        raise

Common Issues

Issue Likely Cause Solution
"Session is closed" Using session after request ends Keep session in request scope
Connection timeout Pool exhausted Check for session leaks
Stale data Shared session or missing refresh Scope session to request, refresh after commit
Deadlock Inconsistent lock ordering Always acquire locks in same order
Slow endpoint Sequential queries that could be parallel Use asyncio.gather()

Detection Commands

# Find potential session leaks (global sessions)
grep -rn "async_session()" --include="*.py" | grep -v "async with\|Depends"

# Find sequential queries that might be parallelizable
grep -rn "await session.execute" --include="*.py" -A2 | grep -B1 "await session.execute"

# Find missing awaits
ruff check --select=RUF006  # asyncio dangling task