Claude Code Plugins

Community-maintained marketplace

Feedback

shared-pool-pattern

@juanre/pgdbm
0
0

Use when implementing shared connection pool for multiple services in same application - provides complete setup steps for FastAPI/multi-service apps without needing examples

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name shared-pool-pattern
description Use when implementing shared connection pool for multiple services in same application - provides complete setup steps for FastAPI/multi-service apps without needing examples

Shared Pool Pattern: Complete Implementation

Overview

Core Principle: ONE pool created at startup, multiple schema-isolated managers, each runs own migrations.

This is the production-recommended pattern for applications with multiple services/modules sharing a database.

When to Use This

From pgdbm:choosing-pattern skill - use when:

  • Multiple services in same Python process (FastAPI with routers)
  • Need connection efficiency (avoid multiple pools)
  • Want schema isolation between services
  • Building monolith with logical service separation

Critical Setup Steps

Step 1: Create Shared Pool (ONCE at startup)

from pgdbm import AsyncDatabaseManager, DatabaseConfig

# In your FastAPI lifespan or app startup
config = DatabaseConfig(
    connection_string="postgresql://localhost/myapp",
    min_connections=10,   # Total for ALL services
    max_connections=50,   # Shared across ALL services
)

shared_pool = await AsyncDatabaseManager.create_shared_pool(config)

CRITICAL: Only call create_shared_pool() once. Store the pool somewhere accessible (app state, global, etc).

Step 2: Create Schema-Isolated Managers

# Each service gets own manager with dedicated schema
users_db = AsyncDatabaseManager(pool=shared_pool, schema="users")
orders_db = AsyncDatabaseManager(pool=shared_pool, schema="orders")
payments_db = AsyncDatabaseManager(pool=shared_pool, schema="payments")

CRITICAL:

  • Never call .connect() on these managers (pool already exists)
  • Each manager's schema is permanent (don't try to change it)

Step 3: Run Migrations for Each Service

from pgdbm import AsyncMigrationManager

for db, path, module in [
    (users_db, "migrations/users", "users"),
    (orders_db, "migrations/orders", "orders"),
    (payments_db, "migrations/payments", "payments"),
]:
    migrations = AsyncMigrationManager(
        db,
        migrations_path=path,
        module_name=module  # MUST be unique per service
    )
    await migrations.apply_pending_migrations()

CRITICAL: Each service uses unique module_name to prevent migration conflicts.

FastAPI Integration (Complete Example)

Pattern: Lifespan Handler

from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from pgdbm import AsyncDatabaseManager, DatabaseConfig, AsyncMigrationManager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: Create shared pool
    config = DatabaseConfig(connection_string="postgresql://localhost/myapp")
    shared_pool = await AsyncDatabaseManager.create_shared_pool(config)

    # Create schema-isolated managers
    app.state.dbs = {
        'users': AsyncDatabaseManager(pool=shared_pool, schema="users"),
        'orders': AsyncDatabaseManager(pool=shared_pool, schema="orders"),
    }

    # Run migrations for each
    for name, db in app.state.dbs.items():
        migrations = AsyncMigrationManager(
            db,
            migrations_path=f"migrations/{name}",
            module_name=name
        )
        await migrations.apply_pending_migrations()

    yield

    # Shutdown: Close pool
    await shared_pool.close()

app = FastAPI(lifespan=lifespan)

# Use in routes
@app.post("/users")
async def create_user(email: str, request: Request):
    db = request.app.state.dbs['users']
    user_id = await db.fetch_value(
        "INSERT INTO {{tables.users}} (email) VALUES ($1) RETURNING id",
        email
    )
    return {"id": user_id}

Pattern: Dependency Injection (Cleaner)

# dependencies.py
from typing import Annotated
from fastapi import Depends, Request

async def get_users_db(request: Request):
    return request.app.state.dbs['users']

async def get_orders_db(request: Request):
    return request.app.state.dbs['orders']

UsersDB = Annotated[AsyncDatabaseManager, Depends(get_users_db)]
OrdersDB = Annotated[AsyncDatabaseManager, Depends(get_orders_db)]

# routes.py
@app.post("/users")
async def create_user(email: str, db: UsersDB):
    user_id = await db.fetch_value(
        "INSERT INTO {{tables.users}} (email) VALUES ($1) RETURNING id",
        email
    )
    return {"id": user_id}

Migration File Structure

migrations/
├── users/
│   ├── 001_create_users.sql
│   └── 002_add_profiles.sql
├── orders/
│   ├── 001_create_orders.sql
│   └── 002_add_items.sql
└── payments/
    └── 001_create_payments.sql

Each migration MUST use {{tables.}} syntax:

-- migrations/users/001_create_users.sql
CREATE TABLE IF NOT EXISTS {{tables.users}} (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX IF NOT EXISTS users_email
    ON {{tables.users}} (email);

This automatically becomes "users".users table in users schema.

Pool Sizing Guide

Calculate based on service needs:

# Rough estimate
services = [
    ("users", 10),      # High traffic
    ("orders", 15),     # Very high traffic
    ("payments", 5),    # Low traffic
    ("analytics", 10),  # Periodic jobs
]

total_min = sum(s[1] for s in services) // 2  # ~20
total_max = sum(s[1] for s in services)       # ~40
surge = int(total_max * 0.25)                 # +10

config = DatabaseConfig(
    connection_string="...",
    min_connections=total_min,   # 20
    max_connections=total_max + surge,  # 50
)

Key insight: Shared pool uses LESS connections than separate pools because services don't peak simultaneously.

Common Mistakes

❌ Creating Multiple Pools

# WRONG: Each service creates own pool
users_db = AsyncDatabaseManager(DatabaseConfig(...))  # Pool 1
orders_db = AsyncDatabaseManager(DatabaseConfig(...)) # Pool 2

You'll see warning: "⚠️ Creating another connection pool to..."

Fix: Use create_shared_pool() once, pass to all managers.

❌ Calling connect() on Pool-Based Managers

# WRONG
users_db = AsyncDatabaseManager(pool=shared_pool, schema="users")
await users_db.connect()  # ERROR: "Cannot call connect() when using external pool"

Fix: Don't call .connect() - the pool already exists.

❌ Forgetting module_name

# WRONG: All services use default module name
migrations = AsyncMigrationManager(users_db, "migrations/users")
migrations = AsyncMigrationManager(orders_db, "migrations/orders")
# Migration conflicts!

Fix: Always specify unique module_name:

AsyncMigrationManager(users_db, "migrations/users", module_name="users")
AsyncMigrationManager(orders_db, "migrations/orders", module_name="orders")

❌ Not Using {{tables.}} Syntax

# WRONG: Hardcoded table name
await db.execute("INSERT INTO users (email) VALUES ($1)", email)
# Goes to wrong schema or fails!

Fix: Always use templates:

await db.execute("INSERT INTO {{tables.users}} (email) VALUES ($1)", email)

❌ Trying to Change Schema at Runtime

# WRONG: Schema is permanent on manager
db.schema = "different_schema"  # Don't do this!

Fix: Create separate manager for each schema:

schema1_db = AsyncDatabaseManager(pool=shared_pool, schema="schema1")
schema2_db = AsyncDatabaseManager(pool=shared_pool, schema="schema2")

Singleton Pattern (Alternative)

For more complex apps, create singleton manager:

# shared/database.py
import asyncio
from typing import Optional
import asyncpg
from pgdbm import AsyncDatabaseManager, DatabaseConfig

class SharedDatabaseManager:
    _instance: Optional["SharedDatabaseManager"] = None
    _pool: Optional[asyncpg.Pool] = None
    _initialized: bool = False

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    async def initialize(self, database_url: str):
        if self._initialized:
            return

        config = DatabaseConfig(connection_string=database_url)
        self._pool = await AsyncDatabaseManager.create_shared_pool(config)
        self._initialized = True

    def get_pool(self) -> asyncpg.Pool:
        if not self._initialized:
            raise RuntimeError("Not initialized")
        return self._pool

    async def close(self):
        if self._pool:
            await self._pool.close()

# Usage
db_mgr = SharedDatabaseManager()
await db_mgr.initialize("postgresql://...")
users_db = AsyncDatabaseManager(pool=db_mgr.get_pool(), schema="users")

Multi-Tenant Extension

For multi-tenant SaaS, create managers dynamically:

# tenant_manager.py
class TenantManager:
    def __init__(self, shared_pool):
        self.shared_pool = shared_pool
        self.tenant_dbs = {}

    async def get_tenant_db(self, tenant_id: str):
        if tenant_id not in self.tenant_dbs:
            schema = f"tenant_{tenant_id}"

            # Create schema
            admin_db = AsyncDatabaseManager(pool=self.shared_pool)
            await admin_db.execute(f'CREATE SCHEMA IF NOT EXISTS "{schema}"')

            # Create tenant manager
            tenant_db = AsyncDatabaseManager(pool=self.shared_pool, schema=schema)

            # Run migrations
            migrations = AsyncMigrationManager(
                tenant_db,
                migrations_path="tenant_migrations",
                module_name=f"tenant_{tenant_id}"
            )
            await migrations.apply_pending_migrations()

            self.tenant_dbs[tenant_id] = tenant_db

        return self.tenant_dbs[tenant_id]

# Usage
tenant_mgr = TenantManager(shared_pool)
tenant_db = await tenant_mgr.get_tenant_db("customer_123")

Monitoring Pool Usage

# Check pool health
stats = await shared_pool.get_pool_stats()

usage = stats['used_size'] / stats['size']
if usage > 0.8:
    logger.warning(f"High pool usage: {usage:.1%}")

# Metrics
print(f"Total connections: {stats['size']}")
print(f"Active: {stats['used_size']}")
print(f"Idle: {stats['free_size']}")

Quick Checklist

Before deploying shared pool pattern:

  • Create pool with create_shared_pool() ONCE at startup
  • Each service gets AsyncDatabaseManager(pool=shared_pool, schema="service_name")
  • Never call .connect() on pool-based managers
  • Each service runs migrations with unique module_name
  • All SQL uses {{tables.}} syntax
  • Pool closed in shutdown/finally block
  • Pool size calculated based on total service needs

Complete Minimal Example

from contextlib import asynccontextmanager
from fastapi import FastAPI
from pgdbm import AsyncDatabaseManager, DatabaseConfig, AsyncMigrationManager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Create ONE pool
    config = DatabaseConfig(connection_string="postgresql://localhost/myapp")
    pool = await AsyncDatabaseManager.create_shared_pool(config)

    # Schema-isolated managers
    app.state.users_db = AsyncDatabaseManager(pool=pool, schema="users")
    app.state.orders_db = AsyncDatabaseManager(pool=pool, schema="orders")

    # Migrations
    for db, path, name in [
        (app.state.users_db, "migrations/users", "users"),
        (app.state.orders_db, "migrations/orders", "orders"),
    ]:
        await AsyncMigrationManager(db, path, name).apply_pending_migrations()

    yield
    await pool.close()

app = FastAPI(lifespan=lifespan)

That's it. This is the complete shared pool pattern.

Related Skills

  • For pattern selection: pgdbm:choosing-pattern
  • For mental model: pgdbm:using-pgdbm
  • For complete API: pgdbm:core-api-reference
  • For testing: pgdbm:testing-database-code