Claude Code Plugins

Community-maintained marketplace

Feedback

Database migration management for SQLAlchemy projects using Alembic

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name alembic
description Database migration management for SQLAlchemy projects using Alembic
when_to_use When you need to create, apply, or manage database schema changes in SQLAlchemy projects

Alembic Database Migrations

Alembic is a database migration tool for SQLAlchemy projects that provides version control for your database schema.

Quick Start

Create Migration (Autogenerate)

# Generate migration from model changes
alembic revision --autogenerate -m "Add user table"

# Check if there are pending changes
alembic check

Apply Migrations

# Upgrade to latest version
alembic upgrade head

# Upgrade to specific revision
alembic upgrade ae1027a6acf

# Downgrade one revision
alembic downgrade -1

# Downgrade to base (empty schema)
alembic downgrade base

Check Status

# Show current database revision
alembic current

# Show all revision history
alembic history

# Show revision details
alembic show ae1027a6acf

Common Patterns

Autogenerate Configuration

env.py setup for async SQLAlchemy:

import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context

# Import your models
from app.models import Base
from app.config import get_settings

config = context.config
settings = get_settings()

# Configure database URL for async
database_url = settings.database_url.replace("postgresql://", "postgresql+asyncpg://")
config.set_main_option("sqlalchemy.url", database_url)

target_metadata = Base.metadata

async def run_async_migrations():
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

def do_run_migrations(connection):
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
        compare_server_default=True,
        render_as_batch=False,  # Set to True for SQLite
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    asyncio.run(run_async_migrations())

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Manual Migration Operations

Common schema changes:

from alembic import op
import sqlalchemy as sa

def upgrade():
    # Add column
    op.add_column('users', sa.Column('email', sa.String(255), nullable=True))

    # Rename table
    op.rename_table('old_table', 'new_table')

    # Create index
    op.create_index('ix_users_email', 'users', ['email'])

    # Add constraint
    op.create_check_constraint('ck_age_positive', 'users', 'age > 0')

def downgrade():
    # Reverse operations
    op.drop_constraint('ck_age_positive', 'users')
    op.drop_index('ix_users_email')
    op.rename_table('new_table', 'old_table')
    op.drop_column('users', 'email')

Batch Mode (for SQLite)

Configure batch mode in env.py:

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    render_as_batch=True  # Required for SQLite migrations
)

Generated batch migration:

def upgrade():
    with op.batch_alter_table('users', schema=None) as batch_op:
        batch_op.add_column(sa.Column('email', sa.String(length=255), nullable=True))
        batch_op.create_index('ix_users_email', ['email'], unique=False)

Filtering Objects

Skip certain objects in autogenerate:

def include_object(object, name, type_, reflected, compare_to):
    # Skip temporary tables
    if type_ == "table" and name.startswith("temp_"):
        return False

    # Skip columns with skip_autogenerate flag
    if type_ == "column" and not reflected:
        if object.info.get("skip_autogenerate", False):
            return False

    return True

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    include_object=include_object
)

Filter by schema:

def include_name(name, type_, parent_names):
    if type_ == "schema":
        return name in [None, "public", "auth"]  # Include default + specific schemas
    elif type_ == "table":
        return parent_names["schema_qualified_table_name"] in target_metadata.tables
    return True

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    include_name=include_name,
    include_schemas=True
)

Custom Migration Processing

Modify generated migrations:

def process_revision_directives(context, revision, directives):
    script = directives[0]

    # Skip empty migrations
    if config.cmd_opts.autogenerate and script.upgrade_ops.is_empty():
        directives[:] = []
        return

    # Remove downgrade operations for one-way migrations
    script.downgrade_ops.ops[:] = []

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    process_revision_directives=process_revision_directives
)

Data Migrations

Migrate data during schema change:

def upgrade():
    # Add new column
    op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))

    # Migrate data
    connection = op.get_bind()
    connection.execute(
        sa.text("UPDATE users SET full_name = first_name || ' ' || last_name")
    )

    # Make column required after data migration
    op.alter_column('users', 'full_name', nullable=False)

def downgrade():
    op.drop_column('users', 'full_name')

Branch Migrations

Work with multiple branches:

# Create branch
alembic revision -m "Create feature branch" --head=base --branch-label=feature_x

# Upgrade specific branch
alembic upgrade feature_x@head

# Merge branches
alembic merge -m "Merge feature_x into main" feature_x@head main@head

Practical Code Snippets

Check if Database is Up-to-Date

from alembic import config, script
from alembic.runtime import migration
from sqlalchemy import create_engine

def is_database_up_to_date(alembic_cfg_path, database_url):
    """Check if database schema matches latest migrations"""
    cfg = config.Config(alembic_cfg_path)
    directory = script.ScriptDirectory.from_config(cfg)

    engine = create_engine(database_url)
    with engine.begin() as connection:
        context = migration.MigrationContext.configure(connection)
        current_heads = set(context.get_current_heads())
        latest_heads = set(directory.get_heads())
        return current_heads == latest_heads

Programmatically Run Migrations

from alembic import command
from alembic.config import Config

def run_migrations(alembic_ini_path):
    """Run all pending migrations"""
    alembic_cfg = Config(alembic_ini_path)
    command.upgrade(alembic_cfg, "head")

def create_migration(alembic_ini_path, message, autogenerate=True):
    """Create new migration"""
    alembic_cfg = Config(alembic_ini_path)
    command.revision(alembic_cfg, message=message, autogenerate=autogenerate)

Custom Migration Operations

from alembic.autogenerate import rewriter
from alembic.operations import ops

writer = rewriter.Rewriter()

@writer.rewrites(ops.AddColumnOp)
def add_column_non_nullable(context, revision, op):
    """Add non-nullable columns in two steps"""
    if not op.column.nullable:
        op.column.nullable = True
        return [
            op,
            ops.AlterColumnOp(
                op.table_name,
                op.column.name,
                nullable=False,
                existing_type=op.column.type,
                schema=op.schema
            )
        ]
    return op

# Use in env.py
context.configure(
    connection=connection,
    target_metadata=target_metadata,
    process_revision_directives=writer
)

Requirements

  • Python 3.8+: Required for async support
  • SQLAlchemy 2.0+: For modern async patterns
  • PostgreSQL/MySQL/SQLite: Supported databases
  • Alembic 1.8+: Migration tooling

Common Dependencies

# Core dependencies
pip install alembic sqlalchemy

# For PostgreSQL with async
pip install asyncpg

# For MySQL with async
pip install aiomysql

# For SQLite (built-in)
# No additional packages needed

Development Setup

# Initialize Alembic in existing project
alembic init alembic

# Configure env.py for your models
# Edit alembic.ini for database URL

# First migration
alembic revision --autogenerate -m "Initial schema"
alembic upgrade head