| name | alembic |
| version | 1.0.0 |
| description | Comprehensive Alembic database migration management for customer support systems |
| tags | alembic, database, migrations, sqlalchemy, postgresql, customer-support, schema-evolution, data-migrations |
| categories | database, backend, devops |
| context | customer support operations, ticket management systems, user data management, schema versioning, production deployments |
| dependencies | alembic>=1.13.0, sqlalchemy>=2.0.0, psycopg2-binary>=2.9.0, pytest>=7.0.0 |
| author | Customer Support Tech Enablement Team |
Alembic Database Migration Management Skill
Overview
This skill provides comprehensive guidance for managing database migrations using Alembic in customer support environments. It covers everything from initial setup through complex production deployment scenarios, with a focus on maintaining data integrity and minimizing downtime for support operations.
Core Concepts
What is Alembic?
Alembic is a lightweight database migration tool for use with SQLAlchemy. It provides a way to manage changes to your database schema over time through version-controlled migration scripts. For customer support systems, this means:
- Version Control: Track all schema changes in your support database
- Reproducibility: Apply the same migrations across dev, staging, and production
- Rollback Capability: Safely revert problematic changes
- Team Collaboration: Merge schema changes from multiple developers
- Data Preservation: Migrate data during schema transformations
Migration Lifecycle in Support Systems
- Development: Create migrations locally while developing new features
- Testing: Validate migrations in staging environment
- Review: Code review migration scripts before production
- Deployment: Apply migrations to production with minimal downtime
- Monitoring: Track migration status and handle failures
- Rollback: Revert if issues arise in production
Installation and Initial Setup
Installing Alembic
# Install Alembic with PostgreSQL support
pip install alembic psycopg2-binary sqlalchemy
# Or add to requirements.txt
alembic>=1.13.0
sqlalchemy>=2.0.0
psycopg2-binary>=2.9.0
Initialize Alembic in Your Project
# Initialize Alembic (creates alembic/ directory and alembic.ini)
alembic init alembic
# For multiple database support
alembic init --template multidb alembic
This creates:
alembic/: Directory containing migration scriptsalembic/versions/: Where individual migration files livealembic/env.py: Migration environment configurationalembic.ini: Alembic configuration file
Configure Database Connection
Edit alembic.ini to set your database URL:
# For development
sqlalchemy.url = postgresql://user:password@localhost/support_dev
# For production (use environment variables)
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s
Better approach - use environment variables in env.py:
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# Import your models
from myapp.models import Base
# This is the Alembic Config object
config = context.config
# Override sqlalchemy.url from environment
db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev')
config.set_main_option('sqlalchemy.url', db_url)
# Set up target metadata for autogenerate
target_metadata = Base.metadata
Creating Migrations
Manual Migration Creation
Create a migration manually when you need precise control:
# Create empty migration file
alembic revision -m "add ticket priority column"
This generates a file like versions/abc123_add_ticket_priority_column.py:
"""add ticket priority column
Revision ID: abc123
Revises: def456
Create Date: 2025-01-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add priority column to tickets table
op.add_column('tickets',
sa.Column('priority', sa.String(20), nullable=True, server_default='normal')
)
# Create index for performance
op.create_index('ix_tickets_priority', 'tickets', ['priority'])
def downgrade() -> None:
# Remove index first
op.drop_index('ix_tickets_priority', 'tickets')
# Remove column
op.drop_column('tickets', 'priority')
Autogenerate Migrations
Let Alembic detect schema changes automatically:
# Generate migration by comparing models to database
alembic revision --autogenerate -m "add customer satisfaction table"
Important: Always review autogenerated migrations! They may miss:
- Renamed columns (appears as drop + add)
- Changed column types requiring data conversion
- Complex constraints
- Data migrations
Example autogenerated migration:
"""add customer satisfaction table
Revision ID: xyz789
Revises: abc123
Create Date: 2025-01-15 11:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'xyz789'
down_revision = 'abc123'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Auto-generated - review before running!
op.create_table(
'customer_satisfaction',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('ticket_id', sa.Integer(), nullable=False),
sa.Column('rating', sa.Integer(), nullable=False),
sa.Column('feedback', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id'])
op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at'])
def downgrade() -> None:
op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction')
op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction')
op.drop_table('customer_satisfaction')
Data Migrations
Migrating Data During Schema Changes
When you need to transform existing data:
"""convert ticket status to new enum
Revision ID: data001
Revises: xyz789
Create Date: 2025-01-15 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
revision = 'data001'
down_revision = 'xyz789'
def upgrade() -> None:
# Create new status column
op.add_column('tickets',
sa.Column('status_new', sa.String(50), nullable=True)
)
# Migrate data using bulk update
tickets = table('tickets',
column('status', sa.String),
column('status_new', sa.String)
)
# Map old statuses to new ones
status_mapping = {
'open': 'OPEN',
'in_progress': 'IN_PROGRESS',
'pending': 'WAITING_ON_CUSTOMER',
'resolved': 'RESOLVED',
'closed': 'CLOSED'
}
connection = op.get_bind()
for old_status, new_status in status_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == old_status
).values(status_new=new_status)
)
# Make new column non-nullable now that data is migrated
op.alter_column('tickets', 'status_new', nullable=False)
# Drop old column and rename new one
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_new', new_column_name='status')
def downgrade() -> None:
# Reverse the migration
op.add_column('tickets',
sa.Column('status_old', sa.String(50), nullable=True)
)
tickets = table('tickets',
column('status', sa.String),
column('status_old', sa.String)
)
# Reverse mapping
reverse_mapping = {
'OPEN': 'open',
'IN_PROGRESS': 'in_progress',
'WAITING_ON_CUSTOMER': 'pending',
'RESOLVED': 'resolved',
'CLOSED': 'closed'
}
connection = op.get_bind()
for new_status, old_status in reverse_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == new_status
).values(status_old=old_status)
)
op.alter_column('tickets', 'status_old', nullable=False)
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_old', new_column_name='status')
Large Data Migrations with Batching
For large tables, process data in batches:
"""add computed resolution time to tickets
Revision ID: data002
Revises: data001
Create Date: 2025-01-15 13:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, select
revision = 'data002'
down_revision = 'data001'
def upgrade() -> None:
# Add new column
op.add_column('tickets',
sa.Column('resolution_time_seconds', sa.Integer(), nullable=True)
)
connection = op.get_bind()
tickets = table('tickets',
column('id', sa.Integer),
column('created_at', sa.DateTime),
column('resolved_at', sa.DateTime),
column('resolution_time_seconds', sa.Integer)
)
# Process in batches to avoid memory issues
batch_size = 1000
offset = 0
while True:
# Get batch of tickets that need processing
batch = connection.execute(
select(
tickets.c.id,
tickets.c.created_at,
tickets.c.resolved_at
).where(
sa.and_(
tickets.c.resolved_at.isnot(None),
tickets.c.resolution_time_seconds.is_(None)
)
).limit(batch_size).offset(offset)
).fetchall()
if not batch:
break
# Update batch
for row in batch:
if row.resolved_at and row.created_at:
resolution_time = (row.resolved_at - row.created_at).total_seconds()
connection.execute(
tickets.update().where(
tickets.c.id == row.id
).values(resolution_time_seconds=int(resolution_time))
)
offset += batch_size
# Now make column non-nullable for future rows
op.alter_column('tickets', 'resolution_time_seconds',
nullable=False, server_default='0')
def downgrade() -> None:
op.drop_column('tickets', 'resolution_time_seconds')
Running Migrations
Upgrade Database to Latest
# Upgrade to latest revision (head)
alembic upgrade head
# See what would be executed (SQL only, don't run)
alembic upgrade head --sql
# Upgrade one step at a time
alembic upgrade +1
# Upgrade to specific revision
alembic upgrade abc123
Downgrade Database
# Downgrade one revision
alembic downgrade -1
# Downgrade to specific revision
alembic downgrade abc123
# Downgrade to base (empty database)
alembic downgrade base
# Generate SQL for downgrade without executing
alembic downgrade -1 --sql
Check Current Status
# Show current database revision
alembic current
# Show current revision with details
alembic current --verbose
# Show migration history
alembic history
# Show history with current revision marked
alembic history --indicate-current
# Show specific revision range
alembic history -r base:head
Branching and Merging
Why Branch Migrations?
In customer support systems, you might have:
- Feature branches: New features developed in parallel
- Hotfix branches: Urgent fixes that can't wait for feature completion
- Team branches: Multiple teams working on different modules
Creating a Branch
# Create base for new branch
alembic revision -m "create reporting branch" \
--head=base \
--branch-label=reporting \
--version-path=alembic/versions/reporting
# Add migration to specific branch
alembic revision -m "add report tables" \
--head=reporting@head
Example branch structure:
base
├── main branch
│ ├── abc123: initial schema
│ ├── def456: add tickets
│ └── ghi789: add users
└── reporting branch
├── rep001: create reports table
└── rep002: add scheduled reports
Working with Multiple Branches
# Show all branch heads
alembic heads
# Show branch points
alembic branches
# Upgrade specific branch
alembic upgrade reporting@head
# Upgrade all branches
alembic upgrade heads
Merging Branches
When features are ready to merge:
# Merge two branches
alembic merge -m "merge reporting into main" \
main@head reporting@head
Generated merge migration:
"""merge reporting into main
Revision ID: merge001
Revises: ghi789, rep002
Create Date: 2025-01-15 14:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'merge001'
down_revision = ('ghi789', 'rep002') # Multiple parents
branch_labels = None
depends_on = None
def upgrade() -> None:
# Usually empty for simple merges
# Add code if you need to reconcile conflicting changes
pass
def downgrade() -> None:
pass
Cross-Branch Dependencies
When one branch depends on another:
# Create migration that depends on specific revision from another branch
alembic revision -m "reporting needs user table" \
--head=reporting@head \
--depends-on=def456 # Revision from main branch
Testing Migrations
Unit Testing Migrations
# tests/test_migrations.py
import pytest
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
@pytest.fixture
def alembic_config():
"""Provide Alembic configuration for testing"""
config = Config("alembic.ini")
config.set_main_option(
"sqlalchemy.url",
"postgresql://localhost/support_test"
)
return config
@pytest.fixture
def test_db(alembic_config):
"""Create test database and apply migrations"""
# Create engine
engine = create_engine(
alembic_config.get_main_option("sqlalchemy.url")
)
# Run migrations to head
command.upgrade(alembic_config, "head")
yield engine
# Cleanup - downgrade to base
command.downgrade(alembic_config, "base")
engine.dispose()
def test_migration_creates_tickets_table(test_db):
"""Test that migrations create expected tables"""
inspector = inspect(test_db)
tables = inspector.get_table_names()
assert 'tickets' in tables
assert 'users' in tables
assert 'customer_satisfaction' in tables
def test_tickets_table_structure(test_db):
"""Test ticket table has correct columns"""
inspector = inspect(test_db)
columns = {col['name']: col for col in inspector.get_columns('tickets')}
assert 'id' in columns
assert 'priority' in columns
assert 'status' in columns
assert 'created_at' in columns
assert 'resolution_time_seconds' in columns
# Check column types
assert columns['priority']['type'].python_type == str
assert columns['status']['type'].python_type == str
def test_migration_upgrade_downgrade_cycle(alembic_config):
"""Test that upgrade -> downgrade -> upgrade works"""
# Start at base
command.downgrade(alembic_config, "base")
# Upgrade to head
command.upgrade(alembic_config, "head")
# Downgrade one step
command.downgrade(alembic_config, "-1")
# Upgrade back to head
command.upgrade(alembic_config, "head")
# Should complete without errors
def test_data_migration_preserves_data(test_db):
"""Test that data migrations don't lose data"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Insert test data
ticket = Ticket(
title="Test ticket",
status="OPEN",
priority="high"
)
session.add(ticket)
session.commit()
ticket_id = ticket.id
session.close()
# Run a migration that modifies tickets table
# (This would be a specific revision)
# command.upgrade(alembic_config, "specific_revision")
# Verify data still exists
session = Session()
retrieved = session.query(Ticket).filter_by(id=ticket_id).first()
assert retrieved is not None
assert retrieved.title == "Test ticket"
session.close()
Integration Testing
# tests/test_migration_integration.py
import pytest
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.migration import MigrationContext
def test_no_pending_migrations(alembic_config, test_db):
"""Ensure all migrations are applied in test environment"""
script = ScriptDirectory.from_config(alembic_config)
with test_db.connect() as connection:
context = MigrationContext.configure(connection)
current_heads = set(context.get_current_heads())
script_heads = set(script.get_heads())
assert current_heads == script_heads, \
f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}"
def test_migration_order_is_valid(alembic_config):
"""Verify migration chain has no gaps or conflicts"""
script = ScriptDirectory.from_config(alembic_config)
# Get all revisions
revisions = list(script.walk_revisions())
# Check each revision has valid down_revision
for revision in revisions:
if revision.down_revision is not None:
if isinstance(revision.down_revision, tuple):
# Merge point
for down_rev in revision.down_revision:
assert script.get_revision(down_rev) is not None
else:
assert script.get_revision(revision.down_revision) is not None
def test_check_command_detects_drift(alembic_config, test_db):
"""Test that check command detects schema drift"""
# This test verifies that `alembic check` works correctly
try:
command.check(alembic_config)
# If no exception, database matches models
assert True
except Exception as e:
# If exception, there's drift between DB and models
pytest.fail(f"Schema drift detected: {e}")
Testing Migration Performance
# tests/test_migration_performance.py
import time
import pytest
from alembic import command
def test_migration_completes_within_time_limit(alembic_config):
"""Ensure migrations complete within acceptable time"""
# Downgrade to base
command.downgrade(alembic_config, "base")
# Time the upgrade
start = time.time()
command.upgrade(alembic_config, "head")
duration = time.time() - start
# Assert completes within 60 seconds
assert duration < 60, f"Migration took {duration}s, exceeds 60s limit"
@pytest.mark.slow
def test_data_migration_with_large_dataset(alembic_config, test_db):
"""Test data migration performance with realistic data volume"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Create 10,000 test tickets
tickets = [
Ticket(
title=f"Test ticket {i}",
status="OPEN",
priority="normal"
)
for i in range(10000)
]
session.bulk_save_objects(tickets)
session.commit()
session.close()
# Run data migration and measure time
start = time.time()
command.upgrade(alembic_config, "data002") # Specific data migration
duration = time.time() - start
# Should process 10k records in reasonable time
assert duration < 30, f"Data migration took {duration}s for 10k records"
CI/CD Integration
GitHub Actions Workflow
# .github/workflows/migrations.yml
name: Database Migrations
on:
pull_request:
paths:
- 'alembic/versions/**'
- 'myapp/models/**'
- 'alembic.ini'
- 'alembic/env.py'
push:
branches:
- main
- develop
jobs:
test-migrations:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: support_test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run migration tests
env:
DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
run: |
# Test upgrade to head
alembic upgrade head
# Test downgrade to base
alembic downgrade base
# Test upgrade again
alembic upgrade head
# Run pytest for migration tests
pytest tests/test_migrations.py -v
- name: Check for schema drift
env:
DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
run: |
alembic check
- name: Validate migration history
run: |
# Check for multiple heads (should be only one)
HEADS_COUNT=$(alembic heads | wc -l)
if [ "$HEADS_COUNT" -gt 1 ]; then
echo "ERROR: Multiple heads detected. Please merge branches."
alembic heads
exit 1
fi
review-migration-sql:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Generate SQL for review
run: |
# Generate SQL without executing
alembic upgrade head --sql > migration.sql
- name: Upload SQL artifact
uses: actions/upload-artifact@v3
with:
name: migration-sql
path: migration.sql
- name: Comment PR with SQL
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const sql = fs.readFileSync('migration.sql', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## Migration SQL\n\n\`\`\`sql\n${sql}\n\`\`\``
});
Deployment Script
#!/bin/bash
# scripts/deploy_migrations.sh
set -e # Exit on error
echo "Starting database migration deployment..."
# Environment variables
DB_HOST="${DB_HOST:-localhost}"
DB_NAME="${DB_NAME:-support_prod}"
DB_USER="${DB_USER:-postgres}"
DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}"
# Configuration
BACKUP_DIR="./backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="${BACKUP_DIR}/pre_migration_${TIMESTAMP}.sql"
# Create backup directory
mkdir -p "$BACKUP_DIR"
# 1. Backup database before migration
echo "Creating database backup..."
pg_dump "$DATABASE_URL" > "$BACKUP_FILE"
echo "Backup created: $BACKUP_FILE"
# 2. Check current migration status
echo "Current migration status:"
alembic current
# 3. Show pending migrations
echo "Pending migrations:"
alembic history --verbose | grep -A 5 "head"
# 4. Run migrations with timeout
echo "Running migrations..."
timeout 300 alembic upgrade head || {
echo "ERROR: Migration failed or timed out!"
echo "Restoring from backup..."
psql "$DATABASE_URL" < "$BACKUP_FILE"
exit 1
}
# 5. Verify migration success
echo "Verifying migration status..."
CURRENT_REV=$(alembic current | grep "Rev:" | awk '{print $2}')
HEAD_REV=$(alembic heads | awk '{print $1}')
if [ "$CURRENT_REV" != "$HEAD_REV" ]; then
echo "ERROR: Migration incomplete. Current: $CURRENT_REV, Expected: $HEAD_REV"
echo "Restoring from backup..."
psql "$DATABASE_URL" < "$BACKUP_FILE"
exit 1
fi
echo "Migration completed successfully!"
echo "Current revision: $CURRENT_REV"
# 6. Cleanup old backups (keep last 10)
echo "Cleaning up old backups..."
ls -t "$BACKUP_DIR"/*.sql | tail -n +11 | xargs -r rm
echo "Deployment complete!"
Production Best Practices
Pre-Deployment Checklist
- Migration tested in development environment
- Migration tested in staging with production-like data
- Migration reviewed by at least one team member
- Downgrade path tested and verified
- Performance impact assessed for large tables
- Database backup plan in place
- Rollback procedure documented
- Maintenance window scheduled (if needed)
- Team notified of deployment
- Monitoring alerts configured
Zero-Downtime Migrations
For critical support systems that can't go offline:
Phase 1: Additive Changes
"""add new column (phase 1)
Revision ID: zd001
"""
def upgrade() -> None:
# Add new column as nullable
op.add_column('tickets',
sa.Column('new_field', sa.String(100), nullable=True)
)
def downgrade() -> None:
op.drop_column('tickets', 'new_field')
Phase 2: Data Migration (Background)
"""populate new column (phase 2)
Revision ID: zd002
"""
def upgrade() -> None:
# Update in small batches during low-traffic periods
connection = op.get_bind()
batch_size = 100
while True:
result = connection.execute(
"""
UPDATE tickets
SET new_field = calculate_value(old_field)
WHERE new_field IS NULL
LIMIT {batch_size}
""".format(batch_size=batch_size)
)
if result.rowcount == 0:
break
# Small delay to reduce database load
import time
time.sleep(0.1)
def downgrade() -> None:
connection = op.get_bind()
connection.execute("UPDATE tickets SET new_field = NULL")
Phase 3: Make Required
"""make new column required (phase 3)
Revision ID: zd003
"""
def upgrade() -> None:
# Now that all rows have values, make it non-nullable
op.alter_column('tickets', 'new_field',
nullable=False,
server_default='default_value'
)
def downgrade() -> None:
op.alter_column('tickets', 'new_field',
nullable=True,
server_default=None
)
Phase 4: Remove Old Column (Optional)
"""remove old column (phase 4)
Revision ID: zd004
"""
def upgrade() -> None:
op.drop_column('tickets', 'old_field')
def downgrade() -> None:
op.add_column('tickets',
sa.Column('old_field', sa.String(100), nullable=True)
)
Handling Migration Failures
# alembic/env.py additions for error handling
from alembic import context
import logging
logger = logging.getLogger('alembic.env')
def run_migrations_online():
"""Run migrations in 'online' mode with error handling"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
transaction_per_migration=True, # Rollback individual migrations
compare_type=True,
compare_server_default=True
)
try:
with context.begin_transaction():
context.run_migrations()
except Exception as e:
logger.error(f"Migration failed: {e}")
logger.error("Rolling back transaction...")
# Transaction automatically rolled back
raise
else:
logger.info("Migration completed successfully")
Advanced Configuration
Custom Migration Template
Create custom template for your organization:
# alembic/script.py.mako
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
Author: ${author if author else 'Support Team'}
Jira: ${jira_ticket if jira_ticket else 'N/A'}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
"""Apply migration changes"""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Revert migration changes"""
${downgrades if downgrades else "pass"}
Multi-Database Support
For systems with separate databases (e.g., main DB + analytics):
# alembic/env.py for multiple databases
def run_migrations_online():
"""Run migrations for multiple databases"""
# Configuration for each database
engines = {
'main': {
'url': os.getenv('MAIN_DB_URL'),
'target_metadata': main_metadata
},
'analytics': {
'url': os.getenv('ANALYTICS_DB_URL'),
'target_metadata': analytics_metadata
}
}
for name, config in engines.items():
logger.info(f"Running migrations for {name} database")
engine = create_engine(config['url'])
with engine.connect() as connection:
context.configure(
connection=connection,
target_metadata=config['target_metadata'],
upgrade_token=f"{name}_upgrade",
downgrade_token=f"{name}_downgrade"
)
with context.begin_transaction():
context.run_migrations(engine_name=name)
Troubleshooting
Common Issues and Solutions
Multiple Heads Error
# Problem: "Multiple heads exist"
# Solution: Merge the branches
alembic merge heads -m "merge branches"
Migration Out of Sync
# Problem: Database revision doesn't match migration history
# Solution: Stamp database to specific revision
alembic stamp head
# Or stamp to specific revision
alembic stamp abc123
Failed Migration Cleanup
# Problem: Migration failed midway
# Solution: Manual cleanup
# 1. Check current state
alembic current
# 2. Manually fix database issues
psql $DATABASE_URL
# 3. Stamp to correct revision
alembic stamp previous_working_revision
# 4. Try migration again
alembic upgrade head
Circular Dependencies
# Problem: "Circular dependency detected"
# Solution: Use depends_on instead of down_revision
alembic revision -m "fix circular dependency" \
--head=branch_a@head \
--depends-on=branch_b_revision
Summary
This skill covered comprehensive Alembic usage for customer support systems:
- Setup: Installation, configuration, and initialization
- Creating Migrations: Manual and autogenerated approaches
- Data Migrations: Transforming data during schema changes
- Running Migrations: Upgrade, downgrade, and status commands
- Branching: Managing parallel development streams
- Testing: Unit, integration, and performance testing
- CI/CD: Automation and deployment strategies
- Production: Zero-downtime migrations and best practices
- Advanced: Custom templates and multi-database support
- Troubleshooting: Common issues and solutions
Always remember:
- Review autogenerated migrations
- Test migrations thoroughly before production
- Keep backups before major migrations
- Plan for rollback scenarios
- Monitor migration performance
- Document complex migrations
For more examples, see EXAMPLES.md in this skill package.