Claude Code Plugins

Community-maintained marketplace

Feedback
8
0

Design and implement database schemas using SQLModel with sync and async patterns. Use this skill when creating database models, setting up Neon PostgreSQL connections, defining relationships (one-to-many, many-to-many), implementing FastAPI dependency injection, or migrating schemas. Covers both sync Session and async AsyncSession patterns.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name sqlmodel-database
description Design and implement database schemas using SQLModel with sync and async patterns. Use this skill when creating database models, setting up Neon PostgreSQL connections, defining relationships (one-to-many, many-to-many), implementing FastAPI dependency injection, or migrating schemas. Covers both sync Session and async AsyncSession patterns.

SQLModel Database

Design and implement database schemas using SQLModel - the library that combines SQLAlchemy's power with Pydantic's validation.

When to Use

  • Creating database models for FastAPI applications
  • Setting up PostgreSQL/Neon database connections
  • Defining model relationships (one-to-many, many-to-many)
  • Implementing async database operations
  • Creating API request/response schemas from database models
  • FastAPI dependency injection for sessions

Quick Start

# Sync only
uv add sqlmodel

# Async support (recommended for FastAPI)
uv add sqlmodel sqlalchemy[asyncio] asyncpg  # PostgreSQL
# or
uv add sqlmodel sqlalchemy[asyncio] aiosqlite  # SQLite

Core Patterns

1. Model Hierarchy (Base → Table → API)

from sqlmodel import SQLModel, Field
from datetime import datetime
from typing import Optional, Literal

# Base model - shared fields, validation, NO table
class TaskBase(SQLModel):
    title: str = Field(max_length=200, index=True)
    description: Optional[str] = None
    status: Literal["pending", "in_progress", "completed"] = "pending"
    priority: Literal["low", "medium", "high"] = "medium"
    progress_percent: int = Field(default=0, ge=0, le=100)

# Table model - has table=True, adds id and timestamps
class Task(TaskBase, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    created_at: datetime = Field(default_factory=datetime.now)
    updated_at: datetime = Field(default_factory=datetime.now)

# Create model - for POST requests (inherits validation)
class TaskCreate(TaskBase):
    pass

# Update model - all fields optional for PATCH
class TaskUpdate(SQLModel):
    title: Optional[str] = None
    description: Optional[str] = None
    status: Optional[str] = None
    priority: Optional[str] = None
    progress_percent: Optional[int] = Field(default=None, ge=0, le=100)

# Read model - for responses (includes id)
class TaskRead(TaskBase):
    id: int
    created_at: datetime
    updated_at: datetime

2. Sync Database Connection

from sqlmodel import create_engine, Session, SQLModel
import os

DATABASE_URL = os.getenv("DATABASE_URL")
# PostgreSQL: postgresql://user:pass@host/db?sslmode=require
# SQLite: sqlite:///database.db

engine = create_engine(DATABASE_URL, echo=True)

def create_db_and_tables():
    SQLModel.metadata.create_all(engine)

def get_session():
    with Session(engine) as session:
        yield session

3. Async Database Connection (Recommended)

from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from sqlalchemy.orm import sessionmaker
import os

# Async URLs use different drivers
DATABASE_URL = os.getenv("DATABASE_URL")
# PostgreSQL async: postgresql+asyncpg://user:pass@host/db
# SQLite async: sqlite+aiosqlite:///database.db

async_engine = create_async_engine(DATABASE_URL, echo=True)

async def create_db_and_tables():
    async with async_engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

async def get_session() -> AsyncSession:
    async with AsyncSession(async_engine) as session:
        yield session

4. FastAPI Integration (Async)

from fastapi import FastAPI, Depends, HTTPException
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession

app = FastAPI()

@app.on_event("startup")
async def on_startup():
    await create_db_and_tables()

@app.post("/tasks", response_model=TaskRead, status_code=201)
async def create_task(
    task: TaskCreate,
    session: AsyncSession = Depends(get_session),
):
    db_task = Task.model_validate(task)
    session.add(db_task)
    await session.commit()
    await session.refresh(db_task)
    return db_task

@app.get("/tasks", response_model=list[TaskRead])
async def list_tasks(
    session: AsyncSession = Depends(get_session),
    offset: int = 0,
    limit: int = 100,
):
    statement = select(Task).offset(offset).limit(limit)
    results = await session.exec(statement)
    return results.all()

@app.get("/tasks/{task_id}", response_model=TaskRead)
async def get_task(
    task_id: int,
    session: AsyncSession = Depends(get_session),
):
    task = await session.get(Task, task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    return task

@app.patch("/tasks/{task_id}", response_model=TaskRead)
async def update_task(
    task_id: int,
    task_update: TaskUpdate,
    session: AsyncSession = Depends(get_session),
):
    task = await session.get(Task, task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    # Only update fields that were explicitly set
    update_data = task_update.model_dump(exclude_unset=True)
    task.sqlmodel_update(update_data)

    session.add(task)
    await session.commit()
    await session.refresh(task)
    return task

@app.delete("/tasks/{task_id}")
async def delete_task(
    task_id: int,
    session: AsyncSession = Depends(get_session),
):
    task = await session.get(Task, task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    await session.delete(task)
    await session.commit()
    return {"ok": True}

5. Relationships

One-to-Many (Task belongs to Project)

from sqlmodel import Field, Relationship
from typing import Optional, List

class Project(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(index=True)
    slug: str = Field(unique=True)

    # One project has many tasks
    tasks: List["Task"] = Relationship(back_populates="project")

class Task(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    title: str

    # Foreign key
    project_id: Optional[int] = Field(default=None, foreign_key="project.id")

    # Relationship back to project
    project: Optional[Project] = Relationship(back_populates="tasks")

Many-to-Many (Task has many Workers, Worker has many Tasks)

# Link table (no extra fields)
class TaskWorkerLink(SQLModel, table=True):
    task_id: Optional[int] = Field(
        default=None, foreign_key="task.id", primary_key=True
    )
    worker_id: Optional[str] = Field(
        default=None, foreign_key="worker.id", primary_key=True
    )

class Worker(SQLModel, table=True):
    id: str = Field(primary_key=True)  # e.g., "@claude-code"
    name: str
    type: Literal["human", "agent"]

    tasks: List["Task"] = Relationship(
        back_populates="workers",
        link_model=TaskWorkerLink
    )

class Task(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    title: str

    workers: List[Worker] = Relationship(
        back_populates="tasks",
        link_model=TaskWorkerLink
    )

Many-to-Many with Extra Fields

# Link table WITH extra fields
class TaskAssignment(SQLModel, table=True):
    task_id: Optional[int] = Field(
        default=None, foreign_key="task.id", primary_key=True
    )
    worker_id: Optional[str] = Field(
        default=None, foreign_key="worker.id", primary_key=True
    )
    assigned_at: datetime = Field(default_factory=datetime.now)
    role: str = "assignee"  # assignee, reviewer, etc.

    # Relationships to both sides
    task: "Task" = Relationship(back_populates="assignments")
    worker: "Worker" = Relationship(back_populates="assignments")

class Task(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    title: str
    assignments: List[TaskAssignment] = Relationship(back_populates="task")

class Worker(SQLModel, table=True):
    id: str = Field(primary_key=True)
    name: str
    assignments: List[TaskAssignment] = Relationship(back_populates="worker")

6. Self-Referential (Parent-Child Tasks)

class Task(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    title: str

    # Self-referential foreign key
    parent_id: Optional[int] = Field(default=None, foreign_key="task.id")

    # Parent relationship
    parent: Optional["Task"] = Relationship(
        back_populates="subtasks",
        sa_relationship_kwargs={"remote_side": "Task.id"}
    )

    # Children relationship
    subtasks: List["Task"] = Relationship(back_populates="parent")

7. Partial Updates with exclude_unset

# The key pattern for PATCH endpoints
update_data = task_update.model_dump(exclude_unset=True)
db_task.sqlmodel_update(update_data)

# This ensures:
# - Fields not in request body are NOT overwritten
# - Fields explicitly set to null ARE updated to null
# - Only provided fields are modified

8. Query Patterns

from sqlmodel import select, or_, and_, col

# Basic select
statement = select(Task)
results = await session.exec(statement)
tasks = results.all()

# With filters
statement = select(Task).where(Task.status == "pending")

# Multiple conditions (AND)
statement = select(Task).where(
    Task.status == "in_progress",
    Task.priority == "high"
)

# OR conditions
statement = select(Task).where(
    or_(Task.status == "pending", Task.status == "in_progress")
)

# With ordering
statement = select(Task).order_by(Task.created_at.desc())

# With pagination
statement = select(Task).offset(20).limit(10)

# Get single by ID
task = await session.get(Task, task_id)

# Get first match
statement = select(Task).where(Task.title == "Example")
result = await session.exec(statement)
task = result.first()

# Count
from sqlalchemy import func
statement = select(func.count()).select_from(Task)
result = await session.exec(statement)
count = result.one()

# Join with eager loading
from sqlalchemy.orm import selectinload
statement = select(Task).options(selectinload(Task.project))

9. Neon PostgreSQL Connection

import os
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine

# Neon connection string (async)
# Original: postgresql://user:pass@host/db?sslmode=require
# Async: postgresql+asyncpg://user:pass@host/db?ssl=require

DATABASE_URL = os.getenv("DATABASE_URL")

# Convert sync URL to async if needed
if DATABASE_URL.startswith("postgresql://"):
    DATABASE_URL = DATABASE_URL.replace(
        "postgresql://",
        "postgresql+asyncpg://",
        1
    ).replace("sslmode=", "ssl=")

async_engine = create_async_engine(
    DATABASE_URL,
    echo=True,
    pool_size=5,
    max_overflow=10,
)

Testing Patterns

import pytest
from sqlmodel import SQLModel, create_engine, Session
from sqlmodel.pool import StaticPool
from fastapi.testclient import TestClient

@pytest.fixture
def session():
    """In-memory SQLite for fast tests."""
    engine = create_engine(
        "sqlite://",  # In-memory
        connect_args={"check_same_thread": False},
        poolclass=StaticPool,
    )
    SQLModel.metadata.create_all(engine)

    with Session(engine) as session:
        yield session

@pytest.fixture
def client(session):
    """Test client with overridden session."""
    def get_session_override():
        return session

    app.dependency_overrides[get_session] = get_session_override
    client = TestClient(app)
    yield client
    app.dependency_overrides.clear()

def test_create_task(client):
    response = client.post("/tasks", json={"title": "Test"})
    assert response.status_code == 201
    assert response.json()["title"] == "Test"

def test_update_partial(client, session):
    # Create task
    task = Task(title="Original", priority="low")
    session.add(task)
    session.commit()

    # Partial update - only change priority
    response = client.patch(
        f"/tasks/{task.id}",
        json={"priority": "high"}
    )

    assert response.status_code == 200
    assert response.json()["title"] == "Original"  # Unchanged
    assert response.json()["priority"] == "high"   # Updated

Critical: Async Session Patterns (MissingGreenlet Prevention)

After session.commit(), SQLAlchemy objects become detached. Accessing attributes on detached objects triggers lazy loading, which fails in async context with MissingGreenlet error.

The Pattern: Extract → Flush → Commit

@router.post("/entities", response_model=EntityRead, status_code=201)
async def create_entity(
    data: EntityCreate,
    session: AsyncSession = Depends(get_session),
    user: CurrentUser = Depends(get_current_user),
) -> EntityRead:
    # 1. Get related objects and EXTRACT primitives immediately
    worker = await get_worker(session, user.id)
    worker_id = worker.id        # Extract BEFORE any commit
    worker_type = worker.type    # Extract BEFORE any commit

    # 2. Create entity
    entity = Entity(**data.model_dump())
    session.add(entity)

    # 3. Use flush() to get generated ID without committing
    await session.flush()
    entity_id = entity.id        # Extract immediately after flush

    # 4. Do related operations (audit logging, etc.)
    # Pass primitives, NOT objects
    await log_action(
        session,
        entity_id=entity_id,
        actor_id=worker_id,
        actor_type=worker_type,  # String, not object
    )

    # 5. Single commit at the end
    await session.commit()

    # 6. Refresh if you need to return the object
    await session.refresh(entity)
    return entity

Service Functions: Never Commit Internally

# WRONG - service commits, caller loses transaction control
async def log_action(session: AsyncSession, ...):
    log = AuditLog(...)
    session.add(log)
    await session.commit()  # ❌ Don't do this!

# CORRECT - caller owns the transaction
async def log_action(session: AsyncSession, ...):
    log = AuditLog(...)
    session.add(log)
    # No commit - caller will commit
    return log

The Problem: Object Detachment After Commit

# ❌ WRONG - causes MissingGreenlet
worker = await session.get(Worker, 1)
await session.commit()           # Worker is now DETACHED
print(worker.name)               # ERROR! Triggers lazy load in async

# ✅ CORRECT - extract before commit
worker = await session.get(Worker, 1)
worker_name = worker.name        # Extract while attached
await session.commit()
print(worker_name)               # Safe - it's a string

When Passing Objects Between Functions

# ❌ WRONG - passing ORM object that may be detached
async def do_work(entity: Entity):
    print(entity.name)  # May fail if entity was detached

# ✅ CORRECT - pass primitive IDs
async def do_work(entity_id: int, session: AsyncSession):
    entity = await session.get(Entity, entity_id)  # Fresh fetch
    print(entity.name)

Common Pitfalls

1. Forgetting table=True

# WRONG - no table created
class Task(SQLModel):
    id: int

# CORRECT
class Task(SQLModel, table=True):
    id: int

2. Using sync Session with async engine

# WRONG
from sqlmodel import Session
async with Session(async_engine) as session:  # Error!

# CORRECT
from sqlmodel.ext.asyncio.session import AsyncSession
async with AsyncSession(async_engine) as session:

3. Missing await on async operations

# WRONG
task = session.get(Task, 1)  # Returns coroutine, not Task!

# CORRECT
task = await session.get(Task, 1)

4. Relationship without back_populates

# WRONG - can cause issues
tasks: List["Task"] = Relationship()

# CORRECT
tasks: List["Task"] = Relationship(back_populates="project")

5. MissingGreenlet after commit (see "Critical: Async Session Patterns" above)

# WRONG - accessing detached object
await session.commit()
print(entity.name)  # MissingGreenlet error!

# CORRECT - extract before commit or refresh after
name = entity.name  # Before commit
await session.commit()
# OR
await session.commit()
await session.refresh(entity)  # Reattach
print(entity.name)

6. MissingGreenlet in log statements (sneaky!)

This is a common trap - log statements that access ORM attributes after commit:

# WRONG - logger.info triggers attribute access after commit!
session.add(notification)
await session.commit()
logger.info("Created notification %d for %s", notification.id, notification.user_id)
#                                              ^^^^^^^^^^^^^^^ MissingGreenlet!

# CORRECT - refresh after commit before accessing any attributes
session.add(notification)
await session.commit()
await session.refresh(notification)  # Reattach to session
logger.info("Created notification %d for %s", notification.id, notification.user_id)

# ALTERNATIVE - extract values before commit
session.add(notification)
await session.flush()  # Get ID without committing
notif_id = notification.id  # Extract while attached
user_id = notification.user_id
await session.commit()
logger.info("Created notification %d for %s", notif_id, user_id)

Why this is sneaky: The commit succeeds, data is saved, but then the log line crashes. The notification exists in the database, but you get an error in logs.


Input Validation Patterns

Converting 0 to None for nullable foreign keys

Swagger UI and some clients send 0 as default for optional int fields. This violates FK constraints.

from pydantic import field_validator

class TaskCreate(SQLModel):
    assignee_id: int | None = None
    parent_task_id: int | None = None

    @field_validator("assignee_id", "parent_task_id", mode="after")
    @classmethod
    def zero_to_none(cls, v: int | None) -> int | None:
        """Convert 0 to None (0 is not a valid FK)."""
        return None if v == 0 else v

Normalizing timezone-aware datetimes

Database stores naive UTC, but clients may send ISO 8601 with timezone.

from datetime import UTC, datetime
from pydantic import field_validator

class TaskCreate(SQLModel):
    due_date: datetime | None = None

    @field_validator("due_date", mode="after")
    @classmethod
    def normalize_datetime(cls, v: datetime | None) -> datetime | None:
        """Strip timezone after converting to UTC."""
        if v is None:
            return None
        if v.tzinfo is not None:
            v = v.astimezone(UTC).replace(tzinfo=None)
        return v

References

For additional documentation, use Context7 MCP:

mcp__context7__resolve-library-id with libraryName="sqlmodel"
mcp__context7__get-library-docs with topic="relationships" or "async"

See also: references/migrations.md for schema migration patterns.