| name | alembic |
| description | Database migration management for SQLAlchemy projects using Alembic |
| when_to_use | When you need to create, apply, or manage database schema changes in SQLAlchemy projects |
Alembic Database Migrations
Alembic is a database migration tool for SQLAlchemy projects that provides version control for your database schema.
Quick Start
Create Migration (Autogenerate)
# Generate migration from model changes
alembic revision --autogenerate -m "Add user table"
# Check if there are pending changes
alembic check
Apply Migrations
# Upgrade to latest version
alembic upgrade head
# Upgrade to specific revision
alembic upgrade ae1027a6acf
# Downgrade one revision
alembic downgrade -1
# Downgrade to base (empty schema)
alembic downgrade base
Check Status
# Show current database revision
alembic current
# Show all revision history
alembic history
# Show revision details
alembic show ae1027a6acf
Common Patterns
Autogenerate Configuration
env.py setup for async SQLAlchemy:
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
# Import your models
from app.models import Base
from app.config import get_settings
config = context.config
settings = get_settings()
# Configure database URL for async
database_url = settings.database_url.replace("postgresql://", "postgresql+asyncpg://")
config.set_main_option("sqlalchemy.url", database_url)
target_metadata = Base.metadata
async def run_async_migrations():
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
render_as_batch=False, # Set to True for SQLite
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
Manual Migration Operations
Common schema changes:
from alembic import op
import sqlalchemy as sa
def upgrade():
# Add column
op.add_column('users', sa.Column('email', sa.String(255), nullable=True))
# Rename table
op.rename_table('old_table', 'new_table')
# Create index
op.create_index('ix_users_email', 'users', ['email'])
# Add constraint
op.create_check_constraint('ck_age_positive', 'users', 'age > 0')
def downgrade():
# Reverse operations
op.drop_constraint('ck_age_positive', 'users')
op.drop_index('ix_users_email')
op.rename_table('new_table', 'old_table')
op.drop_column('users', 'email')
Batch Mode (for SQLite)
Configure batch mode in env.py:
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=True # Required for SQLite migrations
)
Generated batch migration:
def upgrade():
with op.batch_alter_table('users', schema=None) as batch_op:
batch_op.add_column(sa.Column('email', sa.String(length=255), nullable=True))
batch_op.create_index('ix_users_email', ['email'], unique=False)
Filtering Objects
Skip certain objects in autogenerate:
def include_object(object, name, type_, reflected, compare_to):
# Skip temporary tables
if type_ == "table" and name.startswith("temp_"):
return False
# Skip columns with skip_autogenerate flag
if type_ == "column" and not reflected:
if object.info.get("skip_autogenerate", False):
return False
return True
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object
)
Filter by schema:
def include_name(name, type_, parent_names):
if type_ == "schema":
return name in [None, "public", "auth"] # Include default + specific schemas
elif type_ == "table":
return parent_names["schema_qualified_table_name"] in target_metadata.tables
return True
context.configure(
connection=connection,
target_metadata=target_metadata,
include_name=include_name,
include_schemas=True
)
Custom Migration Processing
Modify generated migrations:
def process_revision_directives(context, revision, directives):
script = directives[0]
# Skip empty migrations
if config.cmd_opts.autogenerate and script.upgrade_ops.is_empty():
directives[:] = []
return
# Remove downgrade operations for one-way migrations
script.downgrade_ops.ops[:] = []
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives
)
Data Migrations
Migrate data during schema change:
def upgrade():
# Add new column
op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))
# Migrate data
connection = op.get_bind()
connection.execute(
sa.text("UPDATE users SET full_name = first_name || ' ' || last_name")
)
# Make column required after data migration
op.alter_column('users', 'full_name', nullable=False)
def downgrade():
op.drop_column('users', 'full_name')
Branch Migrations
Work with multiple branches:
# Create branch
alembic revision -m "Create feature branch" --head=base --branch-label=feature_x
# Upgrade specific branch
alembic upgrade feature_x@head
# Merge branches
alembic merge -m "Merge feature_x into main" feature_x@head main@head
Practical Code Snippets
Check if Database is Up-to-Date
from alembic import config, script
from alembic.runtime import migration
from sqlalchemy import create_engine
def is_database_up_to_date(alembic_cfg_path, database_url):
"""Check if database schema matches latest migrations"""
cfg = config.Config(alembic_cfg_path)
directory = script.ScriptDirectory.from_config(cfg)
engine = create_engine(database_url)
with engine.begin() as connection:
context = migration.MigrationContext.configure(connection)
current_heads = set(context.get_current_heads())
latest_heads = set(directory.get_heads())
return current_heads == latest_heads
Programmatically Run Migrations
from alembic import command
from alembic.config import Config
def run_migrations(alembic_ini_path):
"""Run all pending migrations"""
alembic_cfg = Config(alembic_ini_path)
command.upgrade(alembic_cfg, "head")
def create_migration(alembic_ini_path, message, autogenerate=True):
"""Create new migration"""
alembic_cfg = Config(alembic_ini_path)
command.revision(alembic_cfg, message=message, autogenerate=autogenerate)
Custom Migration Operations
from alembic.autogenerate import rewriter
from alembic.operations import ops
writer = rewriter.Rewriter()
@writer.rewrites(ops.AddColumnOp)
def add_column_non_nullable(context, revision, op):
"""Add non-nullable columns in two steps"""
if not op.column.nullable:
op.column.nullable = True
return [
op,
ops.AlterColumnOp(
op.table_name,
op.column.name,
nullable=False,
existing_type=op.column.type,
schema=op.schema
)
]
return op
# Use in env.py
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=writer
)
Requirements
- Python 3.8+: Required for async support
- SQLAlchemy 2.0+: For modern async patterns
- PostgreSQL/MySQL/SQLite: Supported databases
- Alembic 1.8+: Migration tooling
Common Dependencies
# Core dependencies
pip install alembic sqlalchemy
# For PostgreSQL with async
pip install asyncpg
# For MySQL with async
pip install aiomysql
# For SQLite (built-in)
# No additional packages needed
Development Setup
# Initialize Alembic in existing project
alembic init alembic
# Configure env.py for your models
# Edit alembic.ini for database URL
# First migration
alembic revision --autogenerate -m "Initial schema"
alembic upgrade head