Claude Code Plugins

Community-maintained marketplace

Feedback

Comprehensive Alembic database migration management for customer support systems

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name alembic
version 1.0.0
description Comprehensive Alembic database migration management for customer support systems
tags alembic, database, migrations, sqlalchemy, postgresql, customer-support, schema-evolution, data-migrations
categories database, backend, devops
context customer support operations, ticket management systems, user data management, schema versioning, production deployments
dependencies alembic>=1.13.0, sqlalchemy>=2.0.0, psycopg2-binary>=2.9.0, pytest>=7.0.0
author Customer Support Tech Enablement Team

Alembic Database Migration Management Skill

Overview

This skill provides comprehensive guidance for managing database migrations using Alembic in customer support environments. It covers everything from initial setup through complex production deployment scenarios, with a focus on maintaining data integrity and minimizing downtime for support operations.

Core Concepts

What is Alembic?

Alembic is a lightweight database migration tool for use with SQLAlchemy. It provides a way to manage changes to your database schema over time through version-controlled migration scripts. For customer support systems, this means:

  • Version Control: Track all schema changes in your support database
  • Reproducibility: Apply the same migrations across dev, staging, and production
  • Rollback Capability: Safely revert problematic changes
  • Team Collaboration: Merge schema changes from multiple developers
  • Data Preservation: Migrate data during schema transformations

Migration Lifecycle in Support Systems

  1. Development: Create migrations locally while developing new features
  2. Testing: Validate migrations in staging environment
  3. Review: Code review migration scripts before production
  4. Deployment: Apply migrations to production with minimal downtime
  5. Monitoring: Track migration status and handle failures
  6. Rollback: Revert if issues arise in production

Installation and Initial Setup

Installing Alembic

# Install Alembic with PostgreSQL support
pip install alembic psycopg2-binary sqlalchemy

# Or add to requirements.txt
alembic>=1.13.0
sqlalchemy>=2.0.0
psycopg2-binary>=2.9.0

Initialize Alembic in Your Project

# Initialize Alembic (creates alembic/ directory and alembic.ini)
alembic init alembic

# For multiple database support
alembic init --template multidb alembic

This creates:

  • alembic/: Directory containing migration scripts
  • alembic/versions/: Where individual migration files live
  • alembic/env.py: Migration environment configuration
  • alembic.ini: Alembic configuration file

Configure Database Connection

Edit alembic.ini to set your database URL:

# For development
sqlalchemy.url = postgresql://user:password@localhost/support_dev

# For production (use environment variables)
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s

Better approach - use environment variables in env.py:

import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context

# Import your models
from myapp.models import Base

# This is the Alembic Config object
config = context.config

# Override sqlalchemy.url from environment
db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev')
config.set_main_option('sqlalchemy.url', db_url)

# Set up target metadata for autogenerate
target_metadata = Base.metadata

Creating Migrations

Manual Migration Creation

Create a migration manually when you need precise control:

# Create empty migration file
alembic revision -m "add ticket priority column"

This generates a file like versions/abc123_add_ticket_priority_column.py:

"""add ticket priority column

Revision ID: abc123
Revises: def456
Create Date: 2025-01-15 10:30:00.000000
"""

from alembic import op
import sqlalchemy as sa

# revision identifiers
revision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Add priority column to tickets table
    op.add_column('tickets',
        sa.Column('priority', sa.String(20), nullable=True, server_default='normal')
    )

    # Create index for performance
    op.create_index('ix_tickets_priority', 'tickets', ['priority'])

def downgrade() -> None:
    # Remove index first
    op.drop_index('ix_tickets_priority', 'tickets')

    # Remove column
    op.drop_column('tickets', 'priority')

Autogenerate Migrations

Let Alembic detect schema changes automatically:

# Generate migration by comparing models to database
alembic revision --autogenerate -m "add customer satisfaction table"

Important: Always review autogenerated migrations! They may miss:

  • Renamed columns (appears as drop + add)
  • Changed column types requiring data conversion
  • Complex constraints
  • Data migrations

Example autogenerated migration:

"""add customer satisfaction table

Revision ID: xyz789
Revises: abc123
Create Date: 2025-01-15 11:00:00.000000
"""

from alembic import op
import sqlalchemy as sa

revision = 'xyz789'
down_revision = 'abc123'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Auto-generated - review before running!
    op.create_table(
        'customer_satisfaction',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('ticket_id', sa.Integer(), nullable=False),
        sa.Column('rating', sa.Integer(), nullable=False),
        sa.Column('feedback', sa.Text(), nullable=True),
        sa.Column('created_at', sa.DateTime(), nullable=False),
        sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'),
        sa.PrimaryKeyConstraint('id')
    )
    op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id'])
    op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at'])

def downgrade() -> None:
    op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction')
    op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction')
    op.drop_table('customer_satisfaction')

Data Migrations

Migrating Data During Schema Changes

When you need to transform existing data:

"""convert ticket status to new enum

Revision ID: data001
Revises: xyz789
Create Date: 2025-01-15 12:00:00.000000
"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column

revision = 'data001'
down_revision = 'xyz789'

def upgrade() -> None:
    # Create new status column
    op.add_column('tickets',
        sa.Column('status_new', sa.String(50), nullable=True)
    )

    # Migrate data using bulk update
    tickets = table('tickets',
        column('status', sa.String),
        column('status_new', sa.String)
    )

    # Map old statuses to new ones
    status_mapping = {
        'open': 'OPEN',
        'in_progress': 'IN_PROGRESS',
        'pending': 'WAITING_ON_CUSTOMER',
        'resolved': 'RESOLVED',
        'closed': 'CLOSED'
    }

    connection = op.get_bind()
    for old_status, new_status in status_mapping.items():
        connection.execute(
            tickets.update().where(
                tickets.c.status == old_status
            ).values(status_new=new_status)
        )

    # Make new column non-nullable now that data is migrated
    op.alter_column('tickets', 'status_new', nullable=False)

    # Drop old column and rename new one
    op.drop_column('tickets', 'status')
    op.alter_column('tickets', 'status_new', new_column_name='status')

def downgrade() -> None:
    # Reverse the migration
    op.add_column('tickets',
        sa.Column('status_old', sa.String(50), nullable=True)
    )

    tickets = table('tickets',
        column('status', sa.String),
        column('status_old', sa.String)
    )

    # Reverse mapping
    reverse_mapping = {
        'OPEN': 'open',
        'IN_PROGRESS': 'in_progress',
        'WAITING_ON_CUSTOMER': 'pending',
        'RESOLVED': 'resolved',
        'CLOSED': 'closed'
    }

    connection = op.get_bind()
    for new_status, old_status in reverse_mapping.items():
        connection.execute(
            tickets.update().where(
                tickets.c.status == new_status
            ).values(status_old=old_status)
        )

    op.alter_column('tickets', 'status_old', nullable=False)
    op.drop_column('tickets', 'status')
    op.alter_column('tickets', 'status_old', new_column_name='status')

Large Data Migrations with Batching

For large tables, process data in batches:

"""add computed resolution time to tickets

Revision ID: data002
Revises: data001
Create Date: 2025-01-15 13:00:00.000000
"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, select

revision = 'data002'
down_revision = 'data001'

def upgrade() -> None:
    # Add new column
    op.add_column('tickets',
        sa.Column('resolution_time_seconds', sa.Integer(), nullable=True)
    )

    connection = op.get_bind()
    tickets = table('tickets',
        column('id', sa.Integer),
        column('created_at', sa.DateTime),
        column('resolved_at', sa.DateTime),
        column('resolution_time_seconds', sa.Integer)
    )

    # Process in batches to avoid memory issues
    batch_size = 1000
    offset = 0

    while True:
        # Get batch of tickets that need processing
        batch = connection.execute(
            select(
                tickets.c.id,
                tickets.c.created_at,
                tickets.c.resolved_at
            ).where(
                sa.and_(
                    tickets.c.resolved_at.isnot(None),
                    tickets.c.resolution_time_seconds.is_(None)
                )
            ).limit(batch_size).offset(offset)
        ).fetchall()

        if not batch:
            break

        # Update batch
        for row in batch:
            if row.resolved_at and row.created_at:
                resolution_time = (row.resolved_at - row.created_at).total_seconds()
                connection.execute(
                    tickets.update().where(
                        tickets.c.id == row.id
                    ).values(resolution_time_seconds=int(resolution_time))
                )

        offset += batch_size

    # Now make column non-nullable for future rows
    op.alter_column('tickets', 'resolution_time_seconds',
        nullable=False, server_default='0')

def downgrade() -> None:
    op.drop_column('tickets', 'resolution_time_seconds')

Running Migrations

Upgrade Database to Latest

# Upgrade to latest revision (head)
alembic upgrade head

# See what would be executed (SQL only, don't run)
alembic upgrade head --sql

# Upgrade one step at a time
alembic upgrade +1

# Upgrade to specific revision
alembic upgrade abc123

Downgrade Database

# Downgrade one revision
alembic downgrade -1

# Downgrade to specific revision
alembic downgrade abc123

# Downgrade to base (empty database)
alembic downgrade base

# Generate SQL for downgrade without executing
alembic downgrade -1 --sql

Check Current Status

# Show current database revision
alembic current

# Show current revision with details
alembic current --verbose

# Show migration history
alembic history

# Show history with current revision marked
alembic history --indicate-current

# Show specific revision range
alembic history -r base:head

Branching and Merging

Why Branch Migrations?

In customer support systems, you might have:

  • Feature branches: New features developed in parallel
  • Hotfix branches: Urgent fixes that can't wait for feature completion
  • Team branches: Multiple teams working on different modules

Creating a Branch

# Create base for new branch
alembic revision -m "create reporting branch" \
    --head=base \
    --branch-label=reporting \
    --version-path=alembic/versions/reporting

# Add migration to specific branch
alembic revision -m "add report tables" \
    --head=reporting@head

Example branch structure:

base
├── main branch
│   ├── abc123: initial schema
│   ├── def456: add tickets
│   └── ghi789: add users
└── reporting branch
    ├── rep001: create reports table
    └── rep002: add scheduled reports

Working with Multiple Branches

# Show all branch heads
alembic heads

# Show branch points
alembic branches

# Upgrade specific branch
alembic upgrade reporting@head

# Upgrade all branches
alembic upgrade heads

Merging Branches

When features are ready to merge:

# Merge two branches
alembic merge -m "merge reporting into main" \
    main@head reporting@head

Generated merge migration:

"""merge reporting into main

Revision ID: merge001
Revises: ghi789, rep002
Create Date: 2025-01-15 14:00:00.000000
"""

from alembic import op
import sqlalchemy as sa

revision = 'merge001'
down_revision = ('ghi789', 'rep002')  # Multiple parents
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Usually empty for simple merges
    # Add code if you need to reconcile conflicting changes
    pass

def downgrade() -> None:
    pass

Cross-Branch Dependencies

When one branch depends on another:

# Create migration that depends on specific revision from another branch
alembic revision -m "reporting needs user table" \
    --head=reporting@head \
    --depends-on=def456  # Revision from main branch

Testing Migrations

Unit Testing Migrations

# tests/test_migrations.py
import pytest
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker

@pytest.fixture
def alembic_config():
    """Provide Alembic configuration for testing"""
    config = Config("alembic.ini")
    config.set_main_option(
        "sqlalchemy.url",
        "postgresql://localhost/support_test"
    )
    return config

@pytest.fixture
def test_db(alembic_config):
    """Create test database and apply migrations"""
    # Create engine
    engine = create_engine(
        alembic_config.get_main_option("sqlalchemy.url")
    )

    # Run migrations to head
    command.upgrade(alembic_config, "head")

    yield engine

    # Cleanup - downgrade to base
    command.downgrade(alembic_config, "base")
    engine.dispose()

def test_migration_creates_tickets_table(test_db):
    """Test that migrations create expected tables"""
    inspector = inspect(test_db)
    tables = inspector.get_table_names()

    assert 'tickets' in tables
    assert 'users' in tables
    assert 'customer_satisfaction' in tables

def test_tickets_table_structure(test_db):
    """Test ticket table has correct columns"""
    inspector = inspect(test_db)
    columns = {col['name']: col for col in inspector.get_columns('tickets')}

    assert 'id' in columns
    assert 'priority' in columns
    assert 'status' in columns
    assert 'created_at' in columns
    assert 'resolution_time_seconds' in columns

    # Check column types
    assert columns['priority']['type'].python_type == str
    assert columns['status']['type'].python_type == str

def test_migration_upgrade_downgrade_cycle(alembic_config):
    """Test that upgrade -> downgrade -> upgrade works"""
    # Start at base
    command.downgrade(alembic_config, "base")

    # Upgrade to head
    command.upgrade(alembic_config, "head")

    # Downgrade one step
    command.downgrade(alembic_config, "-1")

    # Upgrade back to head
    command.upgrade(alembic_config, "head")

    # Should complete without errors

def test_data_migration_preserves_data(test_db):
    """Test that data migrations don't lose data"""
    from sqlalchemy.orm import sessionmaker
    from myapp.models import Ticket

    Session = sessionmaker(bind=test_db)
    session = Session()

    # Insert test data
    ticket = Ticket(
        title="Test ticket",
        status="OPEN",
        priority="high"
    )
    session.add(ticket)
    session.commit()
    ticket_id = ticket.id
    session.close()

    # Run a migration that modifies tickets table
    # (This would be a specific revision)
    # command.upgrade(alembic_config, "specific_revision")

    # Verify data still exists
    session = Session()
    retrieved = session.query(Ticket).filter_by(id=ticket_id).first()
    assert retrieved is not None
    assert retrieved.title == "Test ticket"
    session.close()

Integration Testing

# tests/test_migration_integration.py
import pytest
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.migration import MigrationContext

def test_no_pending_migrations(alembic_config, test_db):
    """Ensure all migrations are applied in test environment"""
    script = ScriptDirectory.from_config(alembic_config)

    with test_db.connect() as connection:
        context = MigrationContext.configure(connection)
        current_heads = set(context.get_current_heads())
        script_heads = set(script.get_heads())

        assert current_heads == script_heads, \
            f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}"

def test_migration_order_is_valid(alembic_config):
    """Verify migration chain has no gaps or conflicts"""
    script = ScriptDirectory.from_config(alembic_config)

    # Get all revisions
    revisions = list(script.walk_revisions())

    # Check each revision has valid down_revision
    for revision in revisions:
        if revision.down_revision is not None:
            if isinstance(revision.down_revision, tuple):
                # Merge point
                for down_rev in revision.down_revision:
                    assert script.get_revision(down_rev) is not None
            else:
                assert script.get_revision(revision.down_revision) is not None

def test_check_command_detects_drift(alembic_config, test_db):
    """Test that check command detects schema drift"""
    # This test verifies that `alembic check` works correctly
    try:
        command.check(alembic_config)
        # If no exception, database matches models
        assert True
    except Exception as e:
        # If exception, there's drift between DB and models
        pytest.fail(f"Schema drift detected: {e}")

Testing Migration Performance

# tests/test_migration_performance.py
import time
import pytest
from alembic import command

def test_migration_completes_within_time_limit(alembic_config):
    """Ensure migrations complete within acceptable time"""
    # Downgrade to base
    command.downgrade(alembic_config, "base")

    # Time the upgrade
    start = time.time()
    command.upgrade(alembic_config, "head")
    duration = time.time() - start

    # Assert completes within 60 seconds
    assert duration < 60, f"Migration took {duration}s, exceeds 60s limit"

@pytest.mark.slow
def test_data_migration_with_large_dataset(alembic_config, test_db):
    """Test data migration performance with realistic data volume"""
    from sqlalchemy.orm import sessionmaker
    from myapp.models import Ticket

    Session = sessionmaker(bind=test_db)
    session = Session()

    # Create 10,000 test tickets
    tickets = [
        Ticket(
            title=f"Test ticket {i}",
            status="OPEN",
            priority="normal"
        )
        for i in range(10000)
    ]
    session.bulk_save_objects(tickets)
    session.commit()
    session.close()

    # Run data migration and measure time
    start = time.time()
    command.upgrade(alembic_config, "data002")  # Specific data migration
    duration = time.time() - start

    # Should process 10k records in reasonable time
    assert duration < 30, f"Data migration took {duration}s for 10k records"

CI/CD Integration

GitHub Actions Workflow

# .github/workflows/migrations.yml
name: Database Migrations

on:
  pull_request:
    paths:
      - 'alembic/versions/**'
      - 'myapp/models/**'
      - 'alembic.ini'
      - 'alembic/env.py'
  push:
    branches:
      - main
      - develop

jobs:
  test-migrations:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_PASSWORD: postgres
          POSTGRES_DB: support_test
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
        ports:
          - 5432:5432

    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov

      - name: Run migration tests
        env:
          DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
        run: |
          # Test upgrade to head
          alembic upgrade head

          # Test downgrade to base
          alembic downgrade base

          # Test upgrade again
          alembic upgrade head

          # Run pytest for migration tests
          pytest tests/test_migrations.py -v

      - name: Check for schema drift
        env:
          DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
        run: |
          alembic check

      - name: Validate migration history
        run: |
          # Check for multiple heads (should be only one)
          HEADS_COUNT=$(alembic heads | wc -l)
          if [ "$HEADS_COUNT" -gt 1 ]; then
            echo "ERROR: Multiple heads detected. Please merge branches."
            alembic heads
            exit 1
          fi

  review-migration-sql:
    runs-on: ubuntu-latest
    if: github.event_name == 'pull_request'

    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Generate SQL for review
        run: |
          # Generate SQL without executing
          alembic upgrade head --sql > migration.sql

      - name: Upload SQL artifact
        uses: actions/upload-artifact@v3
        with:
          name: migration-sql
          path: migration.sql

      - name: Comment PR with SQL
        uses: actions/github-script@v6
        with:
          script: |
            const fs = require('fs');
            const sql = fs.readFileSync('migration.sql', 'utf8');

            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: `## Migration SQL\n\n\`\`\`sql\n${sql}\n\`\`\``
            });

Deployment Script

#!/bin/bash
# scripts/deploy_migrations.sh

set -e  # Exit on error

echo "Starting database migration deployment..."

# Environment variables
DB_HOST="${DB_HOST:-localhost}"
DB_NAME="${DB_NAME:-support_prod}"
DB_USER="${DB_USER:-postgres}"
DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}"

# Configuration
BACKUP_DIR="./backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="${BACKUP_DIR}/pre_migration_${TIMESTAMP}.sql"

# Create backup directory
mkdir -p "$BACKUP_DIR"

# 1. Backup database before migration
echo "Creating database backup..."
pg_dump "$DATABASE_URL" > "$BACKUP_FILE"
echo "Backup created: $BACKUP_FILE"

# 2. Check current migration status
echo "Current migration status:"
alembic current

# 3. Show pending migrations
echo "Pending migrations:"
alembic history --verbose | grep -A 5 "head"

# 4. Run migrations with timeout
echo "Running migrations..."
timeout 300 alembic upgrade head || {
    echo "ERROR: Migration failed or timed out!"
    echo "Restoring from backup..."
    psql "$DATABASE_URL" < "$BACKUP_FILE"
    exit 1
}

# 5. Verify migration success
echo "Verifying migration status..."
CURRENT_REV=$(alembic current | grep "Rev:" | awk '{print $2}')
HEAD_REV=$(alembic heads | awk '{print $1}')

if [ "$CURRENT_REV" != "$HEAD_REV" ]; then
    echo "ERROR: Migration incomplete. Current: $CURRENT_REV, Expected: $HEAD_REV"
    echo "Restoring from backup..."
    psql "$DATABASE_URL" < "$BACKUP_FILE"
    exit 1
fi

echo "Migration completed successfully!"
echo "Current revision: $CURRENT_REV"

# 6. Cleanup old backups (keep last 10)
echo "Cleaning up old backups..."
ls -t "$BACKUP_DIR"/*.sql | tail -n +11 | xargs -r rm

echo "Deployment complete!"

Production Best Practices

Pre-Deployment Checklist

  • Migration tested in development environment
  • Migration tested in staging with production-like data
  • Migration reviewed by at least one team member
  • Downgrade path tested and verified
  • Performance impact assessed for large tables
  • Database backup plan in place
  • Rollback procedure documented
  • Maintenance window scheduled (if needed)
  • Team notified of deployment
  • Monitoring alerts configured

Zero-Downtime Migrations

For critical support systems that can't go offline:

Phase 1: Additive Changes

"""add new column (phase 1)

Revision ID: zd001
"""

def upgrade() -> None:
    # Add new column as nullable
    op.add_column('tickets',
        sa.Column('new_field', sa.String(100), nullable=True)
    )

def downgrade() -> None:
    op.drop_column('tickets', 'new_field')

Phase 2: Data Migration (Background)

"""populate new column (phase 2)

Revision ID: zd002
"""

def upgrade() -> None:
    # Update in small batches during low-traffic periods
    connection = op.get_bind()

    batch_size = 100
    while True:
        result = connection.execute(
            """
            UPDATE tickets
            SET new_field = calculate_value(old_field)
            WHERE new_field IS NULL
            LIMIT {batch_size}
            """.format(batch_size=batch_size)
        )
        if result.rowcount == 0:
            break

        # Small delay to reduce database load
        import time
        time.sleep(0.1)

def downgrade() -> None:
    connection = op.get_bind()
    connection.execute("UPDATE tickets SET new_field = NULL")

Phase 3: Make Required

"""make new column required (phase 3)

Revision ID: zd003
"""

def upgrade() -> None:
    # Now that all rows have values, make it non-nullable
    op.alter_column('tickets', 'new_field',
        nullable=False,
        server_default='default_value'
    )

def downgrade() -> None:
    op.alter_column('tickets', 'new_field',
        nullable=True,
        server_default=None
    )

Phase 4: Remove Old Column (Optional)

"""remove old column (phase 4)

Revision ID: zd004
"""

def upgrade() -> None:
    op.drop_column('tickets', 'old_field')

def downgrade() -> None:
    op.add_column('tickets',
        sa.Column('old_field', sa.String(100), nullable=True)
    )

Handling Migration Failures

# alembic/env.py additions for error handling

from alembic import context
import logging

logger = logging.getLogger('alembic.env')

def run_migrations_online():
    """Run migrations in 'online' mode with error handling"""

    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix='sqlalchemy.',
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            transaction_per_migration=True,  # Rollback individual migrations
            compare_type=True,
            compare_server_default=True
        )

        try:
            with context.begin_transaction():
                context.run_migrations()

        except Exception as e:
            logger.error(f"Migration failed: {e}")
            logger.error("Rolling back transaction...")
            # Transaction automatically rolled back
            raise

        else:
            logger.info("Migration completed successfully")

Advanced Configuration

Custom Migration Template

Create custom template for your organization:

# alembic/script.py.mako
"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

Author: ${author if author else 'Support Team'}
Jira: ${jira_ticket if jira_ticket else 'N/A'}
"""

from alembic import op
import sqlalchemy as sa
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}


def upgrade() -> None:
    """Apply migration changes"""
    ${upgrades if upgrades else "pass"}


def downgrade() -> None:
    """Revert migration changes"""
    ${downgrades if downgrades else "pass"}

Multi-Database Support

For systems with separate databases (e.g., main DB + analytics):

# alembic/env.py for multiple databases

def run_migrations_online():
    """Run migrations for multiple databases"""

    # Configuration for each database
    engines = {
        'main': {
            'url': os.getenv('MAIN_DB_URL'),
            'target_metadata': main_metadata
        },
        'analytics': {
            'url': os.getenv('ANALYTICS_DB_URL'),
            'target_metadata': analytics_metadata
        }
    }

    for name, config in engines.items():
        logger.info(f"Running migrations for {name} database")

        engine = create_engine(config['url'])

        with engine.connect() as connection:
            context.configure(
                connection=connection,
                target_metadata=config['target_metadata'],
                upgrade_token=f"{name}_upgrade",
                downgrade_token=f"{name}_downgrade"
            )

            with context.begin_transaction():
                context.run_migrations(engine_name=name)

Troubleshooting

Common Issues and Solutions

Multiple Heads Error

# Problem: "Multiple heads exist"
# Solution: Merge the branches
alembic merge heads -m "merge branches"

Migration Out of Sync

# Problem: Database revision doesn't match migration history
# Solution: Stamp database to specific revision
alembic stamp head

# Or stamp to specific revision
alembic stamp abc123

Failed Migration Cleanup

# Problem: Migration failed midway
# Solution: Manual cleanup

# 1. Check current state
alembic current

# 2. Manually fix database issues
psql $DATABASE_URL

# 3. Stamp to correct revision
alembic stamp previous_working_revision

# 4. Try migration again
alembic upgrade head

Circular Dependencies

# Problem: "Circular dependency detected"
# Solution: Use depends_on instead of down_revision
alembic revision -m "fix circular dependency" \
    --head=branch_a@head \
    --depends-on=branch_b_revision

Summary

This skill covered comprehensive Alembic usage for customer support systems:

  1. Setup: Installation, configuration, and initialization
  2. Creating Migrations: Manual and autogenerated approaches
  3. Data Migrations: Transforming data during schema changes
  4. Running Migrations: Upgrade, downgrade, and status commands
  5. Branching: Managing parallel development streams
  6. Testing: Unit, integration, and performance testing
  7. CI/CD: Automation and deployment strategies
  8. Production: Zero-downtime migrations and best practices
  9. Advanced: Custom templates and multi-database support
  10. Troubleshooting: Common issues and solutions

Always remember:

  • Review autogenerated migrations
  • Test migrations thoroughly before production
  • Keep backups before major migrations
  • Plan for rollback scenarios
  • Monitor migration performance
  • Document complex migrations

For more examples, see EXAMPLES.md in this skill package.