Claude Code Plugins

Community-maintained marketplace

Feedback

database-integration

@tachyon-beep/skillpacks
2
0

Use when working with SQLAlchemy, database connection pooling, N+1 queries, migrations, transactions, or ORM vs raw SQL decisions - covers production patterns for connection management, query optimization, zero-downtime migrations, and testing strategies

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name database-integration
description Use when working with SQLAlchemy, database connection pooling, N+1 queries, migrations, transactions, or ORM vs raw SQL decisions - covers production patterns for connection management, query optimization, zero-downtime migrations, and testing strategies

Database Integration

Overview

Database integration specialist covering SQLAlchemy, connection pooling, query optimization, migrations, transactions, and production patterns.

Core principle: Databases are stateful, high-latency external systems requiring careful connection management, query optimization, and migration strategies to maintain performance and reliability at scale.

When to Use This Skill

Use when encountering:

  • Connection pooling: Pool exhaustion, "too many connections" errors, pool configuration
  • Query optimization: N+1 queries, slow endpoints, eager loading strategies
  • Migrations: Schema changes, zero-downtime deployments, data backfills
  • Transactions: Multi-step operations, rollback strategies, isolation levels
  • ORM vs Raw SQL: Complex queries, performance optimization, query readability
  • Testing: Database test strategies, fixtures, test isolation
  • Monitoring: Query performance tracking, connection pool health

Do NOT use for:

  • Database selection (PostgreSQL vs MySQL vs MongoDB)
  • Database administration (backup, replication, sharding)
  • Schema design principles (see general architecture resources)

Connection Pool Configuration

Pool Sizing Formula

Calculate pool size based on deployment architecture:

# Formula: pool_size × num_workers ≤ (postgres_max_connections - reserved)
# Example: 10 workers × 5 connections = 50 total ≤ (100 - 10) reserved

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

DATABASE_URL = "postgresql://user:pass@host/db"

engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=5,              # Connections per worker
    max_overflow=10,          # Additional connections during spikes
    pool_pre_ping=True,       # CRITICAL: Verify connection before use
    pool_recycle=3600,        # Recycle after 1 hour (prevent stale connections)
    pool_timeout=30,          # Wait max 30s for connection from pool
    echo_pool=False,          # Enable for debugging pool issues
    connect_args={
        "connect_timeout": 10,
        "options": "-c statement_timeout=30000"  # 30s query timeout
    }
)

Environment-based configuration:

import os
from pydantic import BaseSettings

class DatabaseSettings(BaseSettings):
    database_url: str
    pool_size: int = 5
    max_overflow: int = 10
    pool_pre_ping: bool = True
    pool_recycle: int = 3600

    class Config:
        env_file = ".env"

settings = DatabaseSettings()

engine = create_engine(
    settings.database_url,
    pool_size=settings.pool_size,
    max_overflow=settings.max_overflow,
    pool_pre_ping=settings.pool_pre_ping,
    pool_recycle=settings.pool_recycle
)

Async configuration (asyncpg):

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@host/db",
    pool_size=20,           # Async handles more concurrent connections
    max_overflow=0,         # No overflow - fail fast
    pool_pre_ping=False,    # asyncpg handles internally
    pool_recycle=3600
)

async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

Pool Health Monitoring

Health check endpoint:

from fastapi import FastAPI, HTTPException
from sqlalchemy import text

app = FastAPI()

@app.get("/health/database")
async def database_health(db: Session = Depends(get_db)):
    """Check database connectivity and pool status"""
    try:
        # Simple query to verify connection
        result = db.execute(text("SELECT 1"))

        # Check pool statistics
        pool = db.get_bind().pool
        pool_status = {
            "size": pool.size(),
            "checked_in": pool.checkedin(),
            "checked_out": pool.checkedout(),
            "overflow": pool.overflow(),
            "total_connections": pool.size() + pool.overflow()
        }

        return {
            "status": "healthy",
            "pool": pool_status
        }
    except Exception as e:
        raise HTTPException(status_code=503, detail=f"Database unhealthy: {e}")

Pool exhaustion debugging:

import logging

logger = logging.getLogger(__name__)

# Enable pool event logging
from sqlalchemy import event

@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
    logger.info(f"New connection created: {id(dbapi_conn)}")

@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
    logger.debug(f"Connection checked out: {id(dbapi_conn)}")

    pool = connection_proxy._pool
    logger.debug(
        f"Pool status - size: {pool.size()}, "
        f"checked_out: {pool.checkedout()}, "
        f"overflow: {pool.overflow()}"
    )

@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
    logger.debug(f"Connection checked in: {id(dbapi_conn)}")

Testing with NullPool

Disable pooling in tests:

from sqlalchemy.pool import NullPool

# Test configuration - no connection pooling
test_engine = create_engine(
    "postgresql://user:pass@localhost/test_db",
    poolclass=NullPool,  # No pooling - each query gets new connection
    echo=True            # Log all SQL queries
)

Query Optimization

N+1 Query Detection

Automatic detection in tests:

from sqlalchemy import event
from sqlalchemy.engine import Engine
import pytest

class QueryCounter:
    """Count queries executed during test"""
    def __init__(self):
        self.queries = []

    def __enter__(self):
        event.listen(Engine, "before_cursor_execute", self._before_cursor_execute)
        return self

    def __exit__(self, *args):
        event.remove(Engine, "before_cursor_execute", self._before_cursor_execute)

    def _before_cursor_execute(self, conn, cursor, statement, *args):
        self.queries.append(statement)

    @property
    def count(self):
        return len(self.queries)

# Test usage
def test_no_n_plus_1():
    with QueryCounter() as counter:
        users = get_users_with_posts()  # Should use eager loading

        # Access posts (should not trigger additional queries)
        for user in users:
            _ = [post.title for post in user.posts]

        # Should be 1-2 queries, not 101
        assert counter.count <= 2, f"N+1 detected: {counter.count} queries"

Eager Loading Strategies

Decision matrix:

Pattern Queries Use When Example
joinedload() 1 (JOIN) One-to-one, small one-to-many User → Profile
selectinload() 2 (IN clause) One-to-many with many rows User → Posts
subqueryload() 2 (subquery) Legacy alternative Use selectinload instead
raiseload() 0 (raises error) Prevent lazy loading Production safety

joinedload() - Single query with JOIN:

from sqlalchemy.orm import joinedload

# Single query: SELECT * FROM users LEFT OUTER JOIN posts ON ...
users = db.query(User).options(
    joinedload(User.posts)
).all()

# Best for one-to-one or small one-to-many
user = db.query(User).options(
    joinedload(User.profile)  # One-to-one
).filter(User.id == user_id).first()

selectinload() - Two queries (more efficient for many rows):

from sqlalchemy.orm import selectinload

# Query 1: SELECT * FROM users
# Query 2: SELECT * FROM posts WHERE user_id IN (1, 2, 3, ...)
users = db.query(User).options(
    selectinload(User.posts)
).all()

# Best for one-to-many with many related rows

Nested eager loading:

# Load users → posts → comments (3 queries total)
users = db.query(User).options(
    selectinload(User.posts).selectinload(Post.comments)
).all()

Conditional eager loading:

from sqlalchemy.orm import selectinload, Load

# Only load published posts
users = db.query(User).options(
    selectinload(User.posts).options(
        Load(Post).filter(Post.published == True)
    )
).all()

Prevent lazy loading in production (raiseload):

from sqlalchemy.orm import raiseload

# Raise error if any relationship accessed without eager loading
users = db.query(User).options(
    raiseload('*')  # Disable all lazy loading
).all()

# This will raise an error:
# user.posts  # InvalidRequestError: 'User.posts' is not available due to lazy='raise'

Query Performance Measurement

Log slow queries:

from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
import logging

logger = logging.getLogger(__name__)

SLOW_QUERY_THRESHOLD = 1.0  # seconds

@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())

@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    total_time = time.time() - conn.info['query_start_time'].pop()

    if total_time > SLOW_QUERY_THRESHOLD:
        logger.warning(
            f"Slow query ({total_time:.2f}s): {statement[:200]}",
            extra={
                "duration": total_time,
                "statement": statement,
                "parameters": parameters
            }
        )

EXPLAIN ANALYZE for query optimization:

from sqlalchemy import text

def explain_query(db: Session, query):
    """Get query execution plan"""
    compiled = query.statement.compile(
        compile_kwargs={"literal_binds": True}
    )

    explain_result = db.execute(
        text(f"EXPLAIN ANALYZE {compiled}")
    ).fetchall()

    return "\n".join([row[0] for row in explain_result])

# Usage
query = db.query(User).join(Post).filter(Post.published == True)
plan = explain_query(db, query)
print(plan)

Deferred Column Loading

Exclude large columns from initial query:

from sqlalchemy.orm import defer, undefer

# Don't load large 'bio' column initially
users = db.query(User).options(
    defer(User.bio),  # Skip this column
    defer(User.profile_image)  # Skip binary data
).all()

# Load specific user's bio when needed
user = db.query(User).options(
    undefer(User.bio)  # Load this column
).filter(User.id == user_id).first()

Load only specific columns:

from sqlalchemy.orm import load_only

# Only load id and name (ignore all other columns)
users = db.query(User).options(
    load_only(User.id, User.name)
).all()

Zero-Downtime Migrations

Migration Decision Matrix

Operation Locking Approach Downtime
Add nullable column None Single migration No
Add NOT NULL column Table lock Multi-phase (nullable → backfill → NOT NULL) No
Add index Share lock CREATE INDEX CONCURRENTLY No
Add foreign key Share lock NOT VALIDVALIDATE No
Drop column None Multi-phase (stop using → drop) No
Rename column None Multi-phase (add new → dual write → drop old) No
Alter column type Table lock Multi-phase or rebuild table Maybe

Multi-Phase NOT NULL Migration

Phase 1: Add nullable column:

# migrations/versions/001_add_email_verified.py
def upgrade():
    # Fast: no table rewrite
    op.add_column('users', sa.Column('email_verified', sa.Boolean(), nullable=True))

    # Set default for new rows
    op.execute("ALTER TABLE users ALTER COLUMN email_verified SET DEFAULT false")

def downgrade():
    op.drop_column('users', 'email_verified')

Phase 2: Backfill in batches:

# migrations/versions/002_backfill_email_verified.py
from alembic import op
import sqlalchemy as sa

def upgrade():
    """Backfill existing rows in batches"""
    connection = op.get_bind()

    # Process in batches to avoid long transactions
    batch_size = 10000
    total_updated = 0

    while True:
        result = connection.execute(sa.text("""
            UPDATE users
            SET email_verified = false
            WHERE email_verified IS NULL
            AND id IN (
                SELECT id FROM users
                WHERE email_verified IS NULL
                ORDER BY id
                LIMIT :batch_size
            )
        """), {"batch_size": batch_size})

        rows_updated = result.rowcount
        total_updated += rows_updated

        if rows_updated == 0:
            break

        print(f"Backfilled {total_updated} rows")

def downgrade():
    pass  # No rollback needed

Phase 3: Add NOT NULL constraint:

# migrations/versions/003_make_email_verified_not_null.py
def upgrade():
    # Verify no NULLs remain
    connection = op.get_bind()
    result = connection.execute(sa.text(
        "SELECT COUNT(*) FROM users WHERE email_verified IS NULL"
    ))
    null_count = result.scalar()

    if null_count > 0:
        raise Exception(f"Cannot add NOT NULL: {null_count} NULL values remain")

    # Add NOT NULL constraint (fast since all values are set)
    op.alter_column('users', 'email_verified', nullable=False)

def downgrade():
    op.alter_column('users', 'email_verified', nullable=True)

Concurrent Index Creation

Without CONCURRENTLY (blocks writes):

# BAD: Locks table during index creation
def upgrade():
    op.create_index('idx_users_email', 'users', ['email'])

With CONCURRENTLY (no locks):

# GOOD: No blocking, safe for production
def upgrade():
    # Requires raw SQL for CONCURRENTLY
    op.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email
        ON users (email)
    """)

def downgrade():
    op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_email")

Partial index for efficiency:

def upgrade():
    op.execute("""
        CREATE INDEX CONCURRENTLY idx_users_active_email
        ON users (email)
        WHERE deleted_at IS NULL
    """)

Adding Foreign Keys Without Blocking

Using NOT VALID constraint:

# migrations/versions/004_add_foreign_key.py
def upgrade():
    # Phase 1: Add constraint without validating existing rows (fast)
    op.execute("""
        ALTER TABLE posts
        ADD CONSTRAINT fk_posts_user_id
        FOREIGN KEY (user_id)
        REFERENCES users (id)
        NOT VALID
    """)

    # Phase 2: Validate constraint in background (can be canceled/restarted)
    op.execute("""
        ALTER TABLE posts
        VALIDATE CONSTRAINT fk_posts_user_id
    """)

def downgrade():
    op.drop_constraint('fk_posts_user_id', 'posts', type_='foreignkey')

Migration Monitoring

Track migration progress:

-- Check backfill progress
SELECT
    COUNT(*) FILTER (WHERE email_verified IS NULL) as null_count,
    COUNT(*) as total_count,
    ROUND(100.0 * COUNT(*) FILTER (WHERE email_verified IS NOT NULL) / COUNT(*), 2) as pct_complete
FROM users;

-- Check index creation progress (PostgreSQL 12+)
SELECT
    phase,
    ROUND(100.0 * blocks_done / NULLIF(blocks_total, 0), 2) as pct_complete
FROM pg_stat_progress_create_index
WHERE relid = 'users'::regclass;

Transaction Management

Basic Transaction Pattern

Context manager with automatic rollback:

from contextlib import contextmanager
from sqlalchemy.orm import Session

@contextmanager
def transactional_session(db: Session):
    """Context manager for automatic rollback on error"""
    try:
        yield db
        db.commit()
    except Exception as e:
        db.rollback()
        raise
    finally:
        db.close()

# Usage
with transactional_session(db) as session:
    user = User(name="Alice")
    session.add(user)
    # Automatic commit on success, rollback on exception

Savepoints for Partial Rollback

Nested transactions with savepoints:

def create_order_with_retry(db: Session, order_data: dict):
    """Use savepoints to retry failed steps without losing entire transaction"""
    # Start main transaction
    order = Order(**order_data)
    db.add(order)
    db.flush()  # Get order.id

    # Try payment with savepoint
    sp = db.begin_nested()  # Create savepoint
    try:
        payment = process_payment(order.total)
        order.payment_id = payment.id
    except PaymentError as e:
        sp.rollback()  # Rollback to savepoint (keep order)

        # Try alternative payment method
        sp = db.begin_nested()
        try:
            payment = process_backup_payment(order.total)
            order.payment_id = payment.id
        except PaymentError:
            sp.rollback()
            raise HTTPException(status_code=402, detail="All payment methods failed")

    db.commit()  # Commit entire transaction
    return order

Locking Strategies

Optimistic locking with version column:

from sqlalchemy import Column, Integer, String

class Product(Base):
    __tablename__ = "products"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    inventory = Column(Integer)
    version = Column(Integer, nullable=False, default=1)  # Version column

# Usage
def decrement_inventory(db: Session, product_id: int, quantity: int):
    product = db.query(Product).filter(Product.id == product_id).first()

    if product.inventory < quantity:
        raise ValueError("Insufficient inventory")

    # Update with version check
    rows_updated = db.execute(
        sa.update(Product)
        .where(Product.id == product_id)
        .where(Product.version == product.version)  # Check version hasn't changed
        .values(
            inventory=Product.inventory - quantity,
            version=Product.version + 1
        )
    ).rowcount

    if rows_updated == 0:
        # Version mismatch - another transaction modified this row
        raise HTTPException(status_code=409, detail="Product was modified by another transaction")

    db.commit()

Pessimistic locking with SELECT FOR UPDATE:

def decrement_inventory_with_lock(db: Session, product_id: int, quantity: int):
    """Acquire row lock to prevent concurrent modifications"""
    # Lock the row (blocks other transactions)
    product = db.query(Product).filter(
        Product.id == product_id
    ).with_for_update().first()  # SELECT ... FOR UPDATE

    if not product:
        raise HTTPException(status_code=404, detail="Product not found")

    if product.inventory < quantity:
        raise HTTPException(status_code=400, detail="Insufficient inventory")

    product.inventory -= quantity
    db.commit()
    # Lock released after commit

Lock timeout to prevent deadlocks:

from sqlalchemy import text

def with_lock_timeout(db: Session, timeout_ms: int = 5000):
    """Set lock timeout for this transaction"""
    db.execute(text(f"SET LOCAL lock_timeout = '{timeout_ms}ms'"))

# Usage
try:
    with_lock_timeout(db, 3000)  # 3 second timeout
    product = db.query(Product).with_for_update().filter(...).first()
except Exception as e:
    if "lock timeout" in str(e).lower():
        raise HTTPException(status_code=409, detail="Resource locked by another transaction")
    raise

Isolation Levels

Configure isolation level:

from sqlalchemy import create_engine

# Default: READ COMMITTED
engine = create_engine(
    DATABASE_URL,
    isolation_level="REPEATABLE READ"  # Options: READ UNCOMMITTED, READ COMMITTED, REPEATABLE READ, SERIALIZABLE
)

# Per-transaction isolation
from sqlalchemy.orm import Session

with Session(engine) as session:
    session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
    # ... transaction logic ...

Raw SQL vs ORM

Decision Matrix

Use ORM When Use Raw SQL When
CRUD operations Complex CTEs (Common Table Expressions)
Simple joins (<3 tables) Window functions with PARTITION BY
Type safety critical Performance-critical queries
Database portability needed Database-specific optimizations (PostgreSQL arrays, JSONB)
Code readability with ORM is good ORM query becomes unreadable (>10 lines)

Raw SQL with Type Safety

Parameterized queries with Pydantic results:

from sqlalchemy import text
from pydantic import BaseModel
from typing import List

class CustomerReport(BaseModel):
    id: int
    name: str
    region: str
    total_spent: float
    order_count: int
    rank_in_region: int

@app.get("/reports/top-customers")
def get_top_customers(
    db: Session = Depends(get_db),
    region: str = None,
    limit: int = 100
) -> List[CustomerReport]:
    """Complex report with CTEs and window functions"""
    query = text("""
        WITH customer_totals AS (
            SELECT
                u.id,
                u.name,
                u.region,
                COUNT(o.id) as order_count,
                COALESCE(SUM(o.total), 0) as total_spent
            FROM users u
            LEFT JOIN orders o ON u.id = o.user_id
            WHERE u.deleted_at IS NULL
                AND (:region IS NULL OR u.region = :region)
            GROUP BY u.id, u.name, u.region
        ),
        ranked AS (
            SELECT
                *,
                ROW_NUMBER() OVER (
                    PARTITION BY region
                    ORDER BY total_spent DESC
                ) as rank_in_region
            FROM customer_totals
        )
        SELECT * FROM ranked
        WHERE total_spent > 0
        ORDER BY total_spent DESC
        LIMIT :limit
    """)

    result = db.execute(query, {"region": region, "limit": limit})

    # Type-safe results with Pydantic
    return [CustomerReport(**dict(row._mapping)) for row in result]

Hybrid Approach

Combine ORM and raw SQL:

def get_user_analytics(db: Session, user_id: int):
    """Use raw SQL for complex aggregation, ORM for simple queries"""

    # Complex aggregation in raw SQL
    analytics_query = text("""
        SELECT
            COUNT(*) as total_orders,
            SUM(total) as lifetime_value,
            AVG(total) as avg_order_value,
            MAX(created_at) as last_order_date,
            MIN(created_at) as first_order_date
        FROM orders
        WHERE user_id = :user_id
    """)

    analytics = db.execute(analytics_query, {"user_id": user_id}).first()

    # Simple ORM query for user details
    user = db.query(User).filter(User.id == user_id).first()

    return {
        "user": {
            "id": user.id,
            "name": user.name,
            "email": user.email
        },
        "analytics": {
            "total_orders": analytics.total_orders,
            "lifetime_value": float(analytics.lifetime_value or 0),
            "avg_order_value": float(analytics.avg_order_value or 0),
            "first_order": analytics.first_order_date,
            "last_order": analytics.last_order_date
        }
    }

Query Optimization Checklist

Before optimizing:

  1. Measure with EXPLAIN ANALYZE:

    EXPLAIN ANALYZE
    SELECT * FROM users JOIN orders ON users.id = orders.user_id;
    
  2. Look for:

    • Sequential scans on large tables → Add index
    • High loop counts → N+1 query problem
    • Hash joins on small tables → Consider nested loop
    • Sort operations → Consider index on ORDER BY columns
  3. Optimize:

    • Add indexes on foreign keys, WHERE clauses, ORDER BY columns
    • Use LIMIT for pagination
    • Use EXISTS instead of IN for large subqueries
    • Denormalize for read-heavy workloads

Index usage verification:

-- Check if index is being used
EXPLAIN SELECT * FROM users WHERE email = 'test@example.com';
-- Look for "Index Scan using idx_users_email"

-- Check index statistics
SELECT
    schemaname,
    tablename,
    indexname,
    idx_scan as index_scans,
    idx_tup_read as tuples_read,
    idx_tup_fetch as tuples_fetched
FROM pg_stat_user_indexes
WHERE tablename = 'users';

Testing Strategies

Test Database Setup

Separate test database with fixtures:

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

@pytest.fixture(scope="session")
def test_engine():
    """Create test database engine"""
    engine = create_engine(
        "postgresql://user:pass@localhost/test_db",
        poolclass=NullPool,  # No pooling in tests
        echo=True            # Log all queries
    )

    # Create all tables
    Base.metadata.create_all(engine)

    yield engine

    # Drop all tables after tests
    Base.metadata.drop_all(engine)

@pytest.fixture(scope="function")
def db_session(test_engine):
    """Create fresh database session for each test"""
    connection = test_engine.connect()
    transaction = connection.begin()

    Session = sessionmaker(bind=connection)
    session = Session()

    yield session

    # Rollback transaction (undo all changes)
    session.close()
    transaction.rollback()
    connection.close()

Factory Pattern for Test Data

Use factories for consistent test data:

from factory import Factory, Faker, SubFactory
from factory.alchemy import SQLAlchemyModelFactory

class UserFactory(SQLAlchemyModelFactory):
    class Meta:
        model = User
        sqlalchemy_session = db_session

    name = Faker('name')
    email = Faker('email')
    created_at = Faker('date_time')

class PostFactory(SQLAlchemyModelFactory):
    class Meta:
        model = Post
        sqlalchemy_session = db_session

    title = Faker('sentence')
    content = Faker('text')
    user = SubFactory(UserFactory)  # Auto-create related user

# Test usage
def test_get_user_posts(db_session):
    user = UserFactory.create()
    PostFactory.create_batch(5, user=user)  # Create 5 posts for user

    posts = db_session.query(Post).filter(Post.user_id == user.id).all()
    assert len(posts) == 5

Testing Transactions

Test rollback behavior:

def test_transaction_rollback(db_session):
    """Verify rollback on error"""
    user = User(name="Alice", email="alice@example.com")
    db_session.add(user)

    with pytest.raises(IntegrityError):
        # This should fail (duplicate email)
        user2 = User(name="Bob", email="alice@example.com")
        db_session.add(user2)
        db_session.commit()

    # Verify rollback occurred
    db_session.rollback()
    assert db_session.query(User).count() == 0

Testing Migrations

Test migration up and down:

from alembic import command
from alembic.config import Config

def test_migration_upgrade_downgrade():
    """Test migration can be applied and reversed"""
    alembic_cfg = Config("alembic.ini")
    alembic_cfg.set_main_option("sqlalchemy.url", TEST_DATABASE_URL)

    # Apply migration
    command.upgrade(alembic_cfg, "head")

    # Verify schema changes
    # ... assertions ...

    # Rollback migration
    command.downgrade(alembic_cfg, "-1")

    # Verify rollback
    # ... assertions ...

Monitoring and Observability

Query Performance Tracking

Track slow queries with middleware:

from fastapi import Request
import time
import logging

logger = logging.getLogger(__name__)

@app.middleware("http")
async def track_db_queries(request: Request, call_next):
    """Track database query performance per request"""
    query_count = 0
    total_query_time = 0.0

    def track_query(conn, cursor, statement, parameters, context, executemany):
        nonlocal query_count, total_query_time
        start = time.time()

        # Execute query
        cursor.execute(statement, parameters)

        duration = time.time() - start
        query_count += 1
        total_query_time += duration

        if duration > 1.0:  # Log slow queries
            logger.warning(
                f"Slow query ({duration:.2f}s): {statement[:200]}",
                extra={
                    "duration": duration,
                    "path": request.url.path
                }
            )

    # Attach listener
    event.listen(Engine, "before_cursor_execute", track_query)

    response = await call_next(request)

    # Remove listener
    event.remove(Engine, "before_cursor_execute", track_query)

    # Add headers
    response.headers["X-DB-Query-Count"] = str(query_count)
    response.headers["X-DB-Query-Time"] = f"{total_query_time:.3f}s"

    return response

Connection Pool Metrics

Expose pool metrics for monitoring:

from prometheus_client import Gauge

pool_size_gauge = Gauge('db_pool_size', 'Number of connections in pool')
pool_checked_out_gauge = Gauge('db_pool_checked_out', 'Connections currently checked out')
pool_overflow_gauge = Gauge('db_pool_overflow', 'Overflow connections')

@app.on_event("startup")
async def start_pool_metrics():
    """Collect pool metrics periodically"""
    import asyncio

    async def collect_metrics():
        while True:
            pool = engine.pool
            pool_size_gauge.set(pool.size())
            pool_checked_out_gauge.set(pool.checkedout())
            pool_overflow_gauge.set(pool.overflow())

            await asyncio.sleep(10)  # Every 10 seconds

    asyncio.create_task(collect_metrics())

Anti-Patterns

Anti-Pattern Why Bad Fix
No connection pooling Creates new connection per request (slow) Use create_engine() with pool
pool_pre_ping=False Fails on stale connections Always pool_pre_ping=True in production
Lazy loading in loops N+1 query problem Use joinedload() or selectinload()
No query timeout Slow queries block workers Set statement_timeout in connect_args
Large transactions Locks held too long, blocking Break into smaller transactions
No migration rollback Can't undo bad migrations Always test downgrade path
String interpolation in SQL SQL injection vulnerability Use parameterized queries with text()
No index on foreign keys Slow joins Add index on all foreign key columns
Blocking migrations Downtime during deployment Use CONCURRENTLY, NOT VALID patterns

Cross-References

Related skills:

  • FastAPI dependency injectionfastapi-development (database dependencies)
  • API testingapi-testing (testing database code)
  • Microservicesmicroservices-architecture (per-service databases)
  • Securityordis-security-architect (SQL injection, connection security)

Further Reading