| name | migrations-api-reference |
| description | Use when working with database migrations in pgdbm - provides complete AsyncMigrationManager API with all methods and migration file format |
pgdbm Migrations API Reference
Overview
Complete API reference for AsyncMigrationManager and migration file format.
All signatures, parameters, return types. No documentation lookup needed.
AsyncMigrationManager
Initialization
from pgdbm import AsyncMigrationManager
migrations = AsyncMigrationManager(
db_manager: AsyncDatabaseManager,
migrations_path: str = "./migrations",
migrations_table: str = "schema_migrations",
module_name: Optional[str] = None,
)
Parameters:
db_manager: AsyncDatabaseManager instance (schema already set)migrations_path: Directory containing .sql migration filesmigrations_table: Table name for tracking migrations (default: "schema_migrations")module_name: CRITICAL - Unique identifier for this module's migrations
IMPORTANT:
- DO NOT pass
schemaparameter (doesn't exist, schema comes from db_manager) - ALWAYS specify
module_nameto prevent conflicts - For dual-mode libraries:
module_name=f"mylib_{schema}"(include schema)
Core Methods
# Apply all pending migrations
result = await migrations.apply_pending_migrations(
dry_run: bool = False
) -> dict[str, Any]
# Returns: {
# "status": "success" | "up_to_date" | "dry_run" | "error",
# "applied": [{"filename": "001_...", "execution_time_ms": 123.4}, ...],
# "skipped": ["001_already_applied.sql", ...],
# "total": 5,
# "total_time_ms": 456.7 # Only if status="success"
# }
# Get applied migrations
applied = await migrations.get_applied_migrations() -> dict[str, Migration]
# Returns: {"001_users.sql": Migration(...), ...}
# Get pending migrations
pending = await migrations.get_pending_migrations() -> list[Migration]
# Returns: [Migration(...), Migration(...)]
# Find migration files on disk
files = await migrations.find_migration_files() -> list[Migration]
# Returns: All .sql files in migrations_path
# Apply single migration
execution_time = await migrations.apply_migration(
migration: Migration
) -> float
# Returns: Execution time in milliseconds
# Get migration history (recent migrations)
history = await migrations.get_migration_history(
limit: int = 10
) -> list[dict[str, Any]]
# Returns: Recent migrations with timestamps, checksums
# Ensure migrations table exists
await migrations.ensure_migrations_table() -> None
# Creates schema_migrations table if doesn't exist
Development Methods
# Create new migration file
filepath = await migrations.create_migration(
name: str,
content: str,
auto_transaction: bool = True
) -> str
# Returns: Path to created file
# Example
path = await migrations.create_migration(
name="add_users_table",
content="CREATE TABLE {{tables.users}} (id SERIAL PRIMARY KEY)",
auto_transaction=True # Wraps in BEGIN/COMMIT
)
# Creates: migrations/20251025_120000_add_users_table.sql
# Rollback migration (removes from tracking, doesn't undo changes)
await migrations.rollback_migration(filename: str) -> None
# Marks migration as not applied
# WARNING: Doesn't undo the migration, just removes tracking
Migration File Format
File Naming
Must match one of these patterns:
001_description.sql(numeric prefix)V1__description.sql(Flyway pattern)20251025120000_description.sql(timestamp)
Examples:
migrations/
├── 001_create_users.sql
├── 002_add_profiles.sql
├── 003_create_sessions.sql
└── 004_add_indexes.sql
Template Syntax
Always use templates for schema portability:
-- All template placeholders
{{tables.tablename}} -- Table with schema qualification
{{schema}} -- Schema name only
Example migration:
-- migrations/001_create_users.sql
CREATE TABLE IF NOT EXISTS {{tables.users}} (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
full_name VARCHAR(255),
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS users_email
ON {{tables.users}} (email);
CREATE INDEX IF NOT EXISTS users_created
ON {{tables.users}} (created_at DESC);
-- Trigger using schema placeholder
CREATE OR REPLACE FUNCTION {{schema}}.update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER users_updated_at
BEFORE UPDATE ON {{tables.users}}
FOR EACH ROW
EXECUTE FUNCTION {{schema}}.update_updated_at();
Transaction Handling
Automatic: Migrations run in transactions by default
Manual control:
-- migrations/002_manual_transaction.sql
BEGIN;
CREATE TABLE {{tables.posts}} (
id SERIAL PRIMARY KEY,
title TEXT NOT NULL
);
-- If this fails, whole migration rolls back
CREATE INDEX posts_title ON {{tables.posts}} (title);
COMMIT;
Non-transactional operations:
-- migrations/003_create_index_concurrently.sql
-- CREATE INDEX CONCURRENTLY cannot run in transaction
CREATE INDEX CONCURRENTLY users_email_concurrent
ON {{tables.users}} (email);
Complete Usage Examples
Basic Setup
from pgdbm import AsyncDatabaseManager, AsyncMigrationManager, DatabaseConfig
# Create database manager
config = DatabaseConfig(connection_string="postgresql://localhost/myapp")
db = AsyncDatabaseManager(config)
await db.connect()
# Create migration manager
migrations = AsyncMigrationManager(
db,
migrations_path="./migrations",
module_name="myapp" # REQUIRED
)
# Apply all pending
result = await migrations.apply_pending_migrations()
if result["status"] == "success":
print(f"Applied {len(result['applied'])} migrations")
for mig in result["applied"]:
print(f" - {mig['filename']} ({mig['execution_time_ms']:.1f}ms)")
Dry Run
# Check what would be applied without applying
result = await migrations.apply_pending_migrations(dry_run=True)
if result["status"] == "dry_run":
print(f"Would apply {len(result['pending'])} migrations:")
for filename in result["pending"]:
print(f" - {filename}")
Check Migration Status
# Get applied migrations
applied = await migrations.get_applied_migrations()
print(f"Applied: {list(applied.keys())}")
# Get pending migrations
pending = await migrations.get_pending_migrations()
print(f"Pending: {[m.filename for m in pending]}")
# Get history
history = await migrations.get_migration_history(limit=5)
for entry in history:
print(f"{entry['applied_at']}: {entry['filename']} ({entry['execution_time_ms']:.1f}ms)")
Creating Migrations Programmatically
# Create new migration
path = await migrations.create_migration(
name="add_users_avatar",
content="""
ALTER TABLE {{tables.users}}
ADD COLUMN avatar_url VARCHAR(500);
""",
auto_transaction=True # Wraps in BEGIN/COMMIT
)
print(f"Created migration: {path}")
# Output: migrations/20251025_143022_add_users_avatar.sql
Development: Rollback Migration Record
# Remove migration from tracking (doesn't undo changes!)
await migrations.rollback_migration("003_add_column.sql")
# Migration can now be re-applied
result = await migrations.apply_pending_migrations()
# Will apply 003_add_column.sql again
WARNING: rollback_migration only removes tracking record. It does NOT undo the migration's database changes. For true rollback, write a down migration.
Migration Class
Used internally, but can be accessed:
class Migration:
filename: str
checksum: str
content: str
applied_at: Optional[datetime]
module_name: Optional[str]
@property
def is_applied(self) -> bool
@property
def version(self) -> str
Complete Method Summary
| Method | Parameters | Returns | Use Case |
|---|---|---|---|
apply_pending_migrations |
dry_run=False | dict | Apply all pending |
get_applied_migrations |
- | dict[str, Migration] | Check what's applied |
get_pending_migrations |
- | list[Migration] | Check what's pending |
find_migration_files |
- | list[Migration] | List files on disk |
apply_migration |
migration | float | Apply single migration |
ensure_migrations_table |
- | None | Create tracking table |
create_migration |
name, content, auto_transaction | str | Create new file |
rollback_migration |
filename | None | Remove tracking (dev only) |
get_migration_history |
limit=10 | list[dict] | Recent migrations |
Migration Best Practices
1. Always Use Templates
-- ✅ CORRECT
CREATE TABLE {{tables.users}} (...);
CREATE INDEX users_email ON {{tables.users}} (email);
-- ❌ WRONG
CREATE TABLE users (...);
CREATE TABLE myschema.users (...);
2. Make Migrations Idempotent
-- ✅ CORRECT - Can run multiple times safely
CREATE TABLE IF NOT EXISTS {{tables.users}} (...);
CREATE INDEX IF NOT EXISTS users_email ON {{tables.users}} (email);
-- ❌ WRONG - Fails if already exists
CREATE TABLE {{tables.users}} (...);
3. Use Unique module_name
# ✅ CORRECT - Unique per module
AsyncMigrationManager(db, "migrations", module_name="users")
AsyncMigrationManager(db, "migrations", module_name="orders")
# ❌ WRONG - Default module name causes conflicts
AsyncMigrationManager(db, "migrations") # module_name="default"
AsyncMigrationManager(db, "migrations") # Same default - conflict!
4. For Dual-Mode Libraries: Include Schema in module_name
# ✅ CORRECT - Can use same library multiple times
module_name = f"mylib_{schema}" # "mylib_tenant1", "mylib_tenant2"
# ❌ WRONG - Conflicts if library used twice
module_name = "mylib" # Always the same
Migration File Examples
Basic Table Creation
-- migrations/001_initial_schema.sql
CREATE TABLE IF NOT EXISTS {{tables.users}} (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS {{tables.posts}} (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES {{tables.users}}(id) ON DELETE CASCADE,
title VARCHAR(500) NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Adding Indexes
-- migrations/002_add_indexes.sql
CREATE INDEX IF NOT EXISTS users_email
ON {{tables.users}} (email);
CREATE INDEX IF NOT EXISTS posts_user_id
ON {{tables.posts}} (user_id);
CREATE INDEX IF NOT EXISTS posts_created
ON {{tables.posts}} (created_at DESC);
Schema-Qualified Functions
-- migrations/003_add_triggers.sql
CREATE OR REPLACE FUNCTION {{schema}}.update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_users_timestamp
BEFORE UPDATE ON {{tables.users}}
FOR EACH ROW
EXECUTE FUNCTION {{schema}}.update_timestamp();
Data Migrations
-- migrations/004_seed_data.sql
INSERT INTO {{tables.users}} (email, full_name)
VALUES
('admin@example.com', 'Admin User'),
('support@example.com', 'Support User')
ON CONFLICT (email) DO NOTHING;
Checksum Validation
Migrations are checksummed to detect modifications:
# If migration file changes after being applied
result = await migrations.apply_pending_migrations()
# Raises: MigrationError(
# "Migration '001_users.sql' has been modified after being applied!"
# "Expected checksum: abc123..."
# "Current checksum: def456..."
# )
Why: Prevents silent schema divergence. Applied migrations must not change.
If you need to modify: Create a new migration instead.
Migration Table Schema
Migrations tracked in schema_migrations table:
CREATE TABLE schema_migrations (
id SERIAL PRIMARY KEY,
filename VARCHAR(255) NOT NULL,
checksum VARCHAR(64) NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
execution_time_ms REAL,
module_name VARCHAR(255),
UNIQUE(filename, module_name)
);
Key points:
filename: Migration file namechecksum: SHA256 of file contentsmodule_name: Isolates migrations per module- UNIQUE constraint on (filename, module_name)
This allows:
- Same filename in different modules (e.g., both "001_initial.sql")
- Each module tracks its own migrations
- No conflicts when modules share database
Error Handling
result = await migrations.apply_pending_migrations()
if result["status"] == "error":
print(f"Failed on: {result['failed_migration']}")
print(f"Error: {result['error']}")
print(f"Successfully applied: {result['applied']}")
elif result["status"] == "success":
print(f"Applied {len(result['applied'])} migrations")
elif result["status"] == "up_to_date":
print("No pending migrations")
Common Patterns
Standard Application Setup
# In application startup (FastAPI lifespan, etc.)
from pgdbm import AsyncDatabaseManager, AsyncMigrationManager, DatabaseConfig
config = DatabaseConfig(connection_string="postgresql://localhost/myapp")
db = AsyncDatabaseManager(config)
await db.connect()
migrations = AsyncMigrationManager(
db,
migrations_path="./migrations",
module_name="myapp"
)
result = await migrations.apply_pending_migrations()
if result["status"] != "success" and result["status"] != "up_to_date":
raise RuntimeError(f"Migration failed: {result}")
Multi-Service Setup
# Each service runs its own migrations
services = [
(users_db, "migrations/users", "users"),
(orders_db, "migrations/orders", "orders"),
(payments_db, "migrations/payments", "payments"),
]
for db, path, name in services:
migrations = AsyncMigrationManager(db, path, module_name=name)
result = await migrations.apply_pending_migrations()
if result["status"] == "success":
print(f"{name}: Applied {len(result['applied'])} migrations")
Dual-Mode Library
class MyLibrary:
async def initialize(self):
# Library ALWAYS runs its own migrations
migrations = AsyncMigrationManager(
self.db,
migrations_path=str(Path(__file__).parent / "migrations"),
module_name=f"mylib_{self.db.schema}" # Include schema!
)
result = await migrations.apply_pending_migrations()
Advanced Usage
Custom Migrations Table
# Use different table name (e.g., for multi-tenant)
migrations = AsyncMigrationManager(
db,
migrations_path="tenant_migrations",
migrations_table="tenant_migrations", # Custom table name
module_name=f"tenant_{tenant_id}"
)
Pre-Flight Checks
# Check pending before applying
pending = await migrations.get_pending_migrations()
if pending:
print(f"Found {len(pending)} pending migrations:")
for mig in pending:
print(f" - {mig.filename}")
# Ask user confirmation
if input("Apply? (y/n): ") == "y":
result = await migrations.apply_pending_migrations()
else:
print("Schema is up to date")
Development Workflow
# 1. Create migration
path = await migrations.create_migration(
name="add_user_roles",
content="""
CREATE TABLE {{tables.roles}} (
id SERIAL PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL
);
ALTER TABLE {{tables.users}}
ADD COLUMN role_id INTEGER REFERENCES {{tables.roles}}(id);
"""
)
# 2. Apply it
result = await migrations.apply_pending_migrations()
# 3. If something wrong, rollback the record
await migrations.rollback_migration("20251025_143022_add_user_roles.sql")
# 4. Fix the file, re-apply
result = await migrations.apply_pending_migrations()
Common Mistakes
❌ Passing schema Parameter
# WRONG - schema parameter doesn't exist
migrations = AsyncMigrationManager(
db,
"migrations",
schema="myschema" # TypeError!
)
Fix: Schema comes from db_manager:
db = AsyncDatabaseManager(pool=pool, schema="myschema")
migrations = AsyncMigrationManager(db, "migrations", module_name="myapp")
❌ Not Specifying module_name
# WRONG - Uses "default" module name
migrations = AsyncMigrationManager(db, "migrations")
Fix: Always specify unique module_name:
migrations = AsyncMigrationManager(db, "migrations", module_name="myservice")
❌ Same module_name for Different Schemas
# WRONG - Conflict if library used twice
migrations = AsyncMigrationManager(db1, "migrations", module_name="mylib")
migrations = AsyncMigrationManager(db2, "migrations", module_name="mylib")
Fix: Include schema in module_name:
migrations = AsyncMigrationManager(db1, "migrations", module_name=f"mylib_{schema1}")
migrations = AsyncMigrationManager(db2, "migrations", module_name=f"mylib_{schema2}")
❌ Hardcoding Table Names
-- WRONG
CREATE TABLE users (...);
CREATE TABLE myschema.users (...);
Fix: Use templates:
-- CORRECT
CREATE TABLE {{tables.users}} (...);
❌ Modifying Applied Migrations
-- You applied 001_users.sql yesterday
-- Today you edit it and add a column
-- Next deploy: ERROR - checksum mismatch!
Fix: Never modify applied migrations. Create new migration:
-- migrations/002_add_user_column.sql
ALTER TABLE {{tables.users}} ADD COLUMN new_field VARCHAR(255);
❌ Not Making Migrations Idempotent
-- WRONG - Fails if run twice
CREATE TABLE {{tables.users}} (...);
Fix: Use IF NOT EXISTS:
-- CORRECT
CREATE TABLE IF NOT EXISTS {{tables.users}} (...);
CREATE INDEX IF NOT EXISTS users_email ON {{tables.users}} (email);
Migration History
# Get recent migration history
history = await migrations.get_migration_history(limit=10)
for entry in history:
print(f"""
Migration: {entry['filename']}
Module: {entry['module_name']}
Applied: {entry['applied_at']}
Time: {entry['execution_time_ms']}ms
Checksum: {entry['checksum']}
""")
Quick Checklist
Before running migrations:
- All migrations use
{{tables.}}syntax - All migrations are idempotent (IF NOT EXISTS)
- Unique
module_namespecified - Migration files follow naming pattern (001_name.sql)
- No modifications to already-applied migrations
- For dual-mode libraries: module_name includes schema
Related Skills
- For patterns:
pgdbm:using-pgdbm,pgdbm:choosing-pattern - For implementation:
pgdbm:shared-pool-pattern,pgdbm:dual-mode-library - For complete API:
pgdbm:core-api-reference