Claude Code Plugins

Community-maintained marketplace

Feedback

async-sqlalchemy-patterns

@vanman2024/ai-dev-marketplace
0
0

Async SQLAlchemy 2.0+ database patterns for FastAPI including session management, connection pooling, Alembic migrations, relationship loading strategies, and query optimization. Use when implementing database models, configuring async sessions, setting up migrations, optimizing queries, managing relationships, or when user mentions SQLAlchemy, async database, ORM, Alembic, database performance, or connection pooling.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name async-sqlalchemy-patterns
description Async SQLAlchemy 2.0+ database patterns for FastAPI including session management, connection pooling, Alembic migrations, relationship loading strategies, and query optimization. Use when implementing database models, configuring async sessions, setting up migrations, optimizing queries, managing relationships, or when user mentions SQLAlchemy, async database, ORM, Alembic, database performance, or connection pooling.
allowed-tools Read, Write, Edit, Grep, Glob, Bash

Async SQLAlchemy Patterns

Purpose: Implement production-ready async SQLAlchemy 2.0+ patterns in FastAPI with proper session management, migrations, and performance optimization.

Activation Triggers:

  • Database model implementation
  • Async session configuration
  • Alembic migration setup
  • Query performance optimization
  • Relationship loading issues
  • Connection pool configuration
  • Transaction management
  • Database schema migrations

Key Resources:

  • scripts/setup-alembic.sh - Initialize Alembic with async support
  • scripts/generate-migration.sh - Create migrations from model changes
  • templates/base_model.py - Base model with common patterns
  • templates/session_manager.py - Async session factory and dependency
  • templates/alembic.ini - Alembic configuration for async
  • examples/user_model.py - Complete model with relationships
  • examples/async_context_examples.py - Session usage patterns

Core Patterns

1. Async Engine and Session Setup

Database Configuration:

# app/core/database.py
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
    async_sessionmaker
)
from sqlalchemy.orm import declarative_base
from app.core.config import settings

# Create async engine
engine = create_async_engine(
    settings.DATABASE_URL,
    echo=settings.DEBUG,
    pool_pre_ping=True,  # Verify connections before using
    pool_size=5,         # Base number of connections
    max_overflow=10,     # Additional connections when pool is full
    pool_recycle=3600,   # Recycle connections after 1 hour
    pool_timeout=30,     # Wait 30s for available connection
)

# Create async session factory
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Keep objects usable after commit
    autoflush=False,         # Manual flush control
    autocommit=False,        # Explicit commits only
)

Base = declarative_base()

Dependency Injection Pattern:

# app/core/deps.py
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import AsyncSessionLocal

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """
    FastAPI dependency for database sessions.
    Automatically handles cleanup.
    """
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

2. Base Model Pattern

Use template from templates/base_model.py with:

  • UUID primary keys by default
  • Automatic created_at/updated_at timestamps
  • Soft delete support
  • Common query methods

Essential Mixins:

from datetime import datetime
from sqlalchemy import DateTime, Boolean, func
from sqlalchemy.orm import Mapped, mapped_column

class TimestampMixin:
    """Auto-managed timestamps"""
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        nullable=False
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now(),
        nullable=False
    )

class SoftDeleteMixin:
    """Soft delete support"""
    is_deleted: Mapped[bool] = mapped_column(
        Boolean,
        default=False,
        nullable=False
    )
    deleted_at: Mapped[datetime | None] = mapped_column(
        DateTime(timezone=True),
        nullable=True
    )

3. Relationship Loading Strategies

Lazy Loading (Default - N+1 Problem Risk):

# Bad: Causes N+1 queries
users = await session.execute(select(User))
for user in users.scalars():
    print(user.posts)  # Separate query per user!

Eager Loading (Recommended):

from sqlalchemy.orm import selectinload, joinedload

# selectinload: Separate query, good for one-to-many
stmt = select(User).options(selectinload(User.posts))
result = await session.execute(stmt)
users = result.scalars().all()

# joinedload: SQL JOIN, good for many-to-one
stmt = select(Post).options(joinedload(Post.author))
result = await session.execute(stmt)
posts = result.scalars().unique().all()  # unique() required with joins!

# subqueryload: Subquery loading
stmt = select(User).options(subqueryload(User.posts))

Relationship Configuration:

from sqlalchemy.orm import relationship

class User(Base):
    __tablename__ = "users"

    # One-to-many with cascade
    posts: Mapped[list["Post"]] = relationship(
        back_populates="author",
        cascade="all, delete-orphan",
        lazy="selectin"  # Default eager loading
    )

    # Many-to-many
    roles: Mapped[list["Role"]] = relationship(
        secondary="user_roles",
        back_populates="users",
        lazy="selectin"
    )

4. Query Patterns

Basic CRUD:

from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession

# Create
async def create_user(session: AsyncSession, user_data: dict):
    user = User(**user_data)
    session.add(user)
    await session.commit()
    await session.refresh(user)
    return user

# Read with filters
async def get_users(session: AsyncSession, skip: int = 0, limit: int = 100):
    stmt = (
        select(User)
        .where(User.is_deleted == False)
        .offset(skip)
        .limit(limit)
        .order_by(User.created_at.desc())
    )
    result = await session.execute(stmt)
    return result.scalars().all()

# Update
async def update_user(session: AsyncSession, user_id: int, updates: dict):
    stmt = (
        update(User)
        .where(User.id == user_id)
        .values(**updates)
        .returning(User)
    )
    result = await session.execute(stmt)
    await session.commit()
    return result.scalar_one()

# Delete
async def delete_user(session: AsyncSession, user_id: int):
    stmt = delete(User).where(User.id == user_id)
    await session.execute(stmt)
    await session.commit()

Complex Queries:

from sqlalchemy import func, and_, or_

# Aggregations
stmt = (
    select(User.id, func.count(Post.id))
    .join(Post)
    .group_by(User.id)
    .having(func.count(Post.id) > 5)
)

# Subqueries
subq = (
    select(func.avg(Post.views))
    .where(Post.user_id == User.id)
    .scalar_subquery()
)
stmt = select(User).where(User.id.in_(
    select(Post.user_id).where(Post.views > subq)
))

# Window functions
from sqlalchemy import over
stmt = select(
    Post,
    func.row_number().over(
        partition_by=Post.user_id,
        order_by=Post.created_at.desc()
    ).label('row_num')
).where(over.row_num <= 10)

5. Transaction Management

Nested Transactions:

async def transfer_funds(
    session: AsyncSession,
    from_account: int,
    to_account: int,
    amount: float
):
    async with session.begin_nested():  # Savepoint
        # Debit
        stmt = (
            update(Account)
            .where(Account.id == from_account)
            .where(Account.balance >= amount)
            .values(balance=Account.balance - amount)
        )
        result = await session.execute(stmt)
        if result.rowcount == 0:
            raise ValueError("Insufficient funds")

        # Credit
        stmt = (
            update(Account)
            .where(Account.id == to_account)
            .values(balance=Account.balance + amount)
        )
        await session.execute(stmt)

    await session.commit()

Manual Transaction Control:

async def batch_operation(session: AsyncSession, items: list):
    try:
        for item in items:
            session.add(item)
            if len(session.new) >= 100:
                await session.flush()  # Flush but don't commit

        await session.commit()
    except Exception as e:
        await session.rollback()
        raise

6. Alembic Migration Setup

# Initialize Alembic with async template
./scripts/setup-alembic.sh

# Creates:
# - alembic/ directory
# - alembic.ini configured for async
# - env.py with async support

Generate Migrations:

# Auto-generate from model changes
./scripts/generate-migration.sh "add user table"

# Review generated migration in alembic/versions/
# Always review auto-generated migrations!

Migration Best Practices:

  1. Always review auto-generated migrations
  2. Use batch operations for large tables
  3. Add indexes in separate migrations
  4. Include both upgrade and downgrade
  5. Test migrations on staging first

Manual Migration Template:

# alembic/versions/xxx_add_index.py
from alembic import op
import sqlalchemy as sa

def upgrade() -> None:
    # Use batch for SQLite compatibility
    with op.batch_alter_table('users') as batch_op:
        batch_op.create_index(
            'ix_users_email',
            ['email'],
            unique=True
        )

def downgrade() -> None:
    with op.batch_alter_table('users') as batch_op:
        batch_op.drop_index('ix_users_email')

7. Connection Pool Configuration

Production Settings:

# For web applications
engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,           # Concurrent requests
    max_overflow=40,        # Burst capacity
    pool_recycle=3600,      # 1 hour
    pool_pre_ping=True,     # Verify connections
    pool_timeout=30,        # Wait time
    echo_pool=False,        # Pool debug logging
)

# For background tasks
engine = create_async_engine(
    DATABASE_URL,
    pool_size=5,            # Fewer connections
    max_overflow=10,
    pool_recycle=7200,      # 2 hours
)

Connection Health Check:

from sqlalchemy import text

async def check_database_health(session: AsyncSession) -> bool:
    try:
        await session.execute(text("SELECT 1"))
        return True
    except Exception:
        return False

8. Performance Optimization

Bulk Operations:

# Bulk insert
async def bulk_create_users(session: AsyncSession, users: list[dict]):
    stmt = insert(User).values(users)
    await session.execute(stmt)
    await session.commit()

# Bulk update
async def bulk_update_status(session: AsyncSession, user_ids: list[int]):
    stmt = (
        update(User)
        .where(User.id.in_(user_ids))
        .values(status="active")
    )
    await session.execute(stmt)
    await session.commit()

Query Result Streaming:

async def stream_large_dataset(session: AsyncSession):
    stmt = select(User).execution_options(yield_per=100)
    result = await session.stream(stmt)

    async for partition in result.partitions(100):
        users = partition.scalars().all()
        # Process batch
        yield users

Index Optimization:

from sqlalchemy import Index

class User(Base):
    __tablename__ = "users"

    email: Mapped[str] = mapped_column(unique=True, index=True)

    __table_args__ = (
        Index('ix_user_status_created', 'status', 'created_at'),
        Index('ix_user_email_active', 'email', postgresql_where=~is_deleted),
    )

Troubleshooting

Common Issues

"Object is not bound to session":

# Bad: Object expires after commit
user = await create_user(session, data)
print(user.email)  # Error if expire_on_commit=True

# Fix: Refresh or configure session
await session.refresh(user)
# Or: AsyncSessionLocal(expire_on_commit=False)

"N+1 Query Problem":

# Enable SQL logging to detect
engine = create_async_engine(url, echo=True)

# Fix with eager loading
stmt = select(User).options(selectinload(User.posts))

"DetachedInstanceError":

# Bad: Accessing relationship outside session
async with AsyncSessionLocal() as session:
    user = await session.get(User, user_id)
print(user.posts)  # Error!

# Fix: Load within session or use eager loading
stmt = select(User).options(selectinload(User.posts))

"Connection Pool Exhausted":

# Increase pool size or add timeout
engine = create_async_engine(
    url,
    pool_size=20,
    max_overflow=40,
    pool_timeout=30
)

Resources

Scripts: scripts/ directory contains:

  • setup-alembic.sh - Initialize Alembic with async configuration
  • generate-migration.sh - Create migrations from model changes
  • test-connection.sh - Test database connectivity
  • optimize-queries.sh - Analyze and suggest query optimizations

Templates: templates/ directory includes:

  • base_model.py - Base model with UUID, timestamps, soft delete
  • session_manager.py - Complete session factory and dependencies
  • alembic.ini - Production-ready Alembic configuration
  • env.py - Alembic async environment template

Examples: examples/ directory provides:

  • user_model.py - Full model with relationships and indexes
  • async_context_examples.py - Session patterns and best practices
  • query_patterns.py - Common query optimization examples
  • migration_examples.py - Manual migration patterns

SQLAlchemy Version: 2.0+ Database Support: PostgreSQL, MySQL, SQLite (async drivers) Async Drivers: asyncpg (PostgreSQL), aiomysql (MySQL), aiosqlite (SQLite) Version: 1.0.0