CQRS Implementation
Comprehensive guide to implementing CQRS (Command Query Responsibility Segregation) patterns.
When to Use This Skill
- Separating read and write concerns
- Scaling reads independently from writes
- Building event-sourced systems
- Optimizing complex query scenarios
- Different read/write data models needed
- High-performance reporting requirements
Core Concepts
1. CQRS Architecture
┌─────────────┐
│ Client │
└──────┬──────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Commands │ │ Queries │
│ API │ │ API │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Command │ │ Query │
│ Handlers │ │ Handlers │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Write │─────────►│ Read │
│ Model │ Events │ Model │
└─────────────┘ └─────────────┘
2. Key Components
| Component |
Responsibility |
| Command |
Intent to change state |
| Command Handler |
Validates and executes commands |
| Event |
Record of state change |
| Query |
Request for data |
| Query Handler |
Retrieves data from read model |
| Projector |
Updates read model from events |
Templates
Template 1: Command Infrastructure
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, Dict, Any, Type
from datetime import datetime
import uuid
# Command base
@dataclass
class Command:
command_id: str = None
timestamp: datetime = None
def __post_init__(self):
self.command_id = self.command_id or str(uuid.uuid4())
self.timestamp = self.timestamp or datetime.utcnow()
# Concrete commands
@dataclass
class CreateOrder(Command):
customer_id: str
items: list
shipping_address: dict
@dataclass
class AddOrderItem(Command):
order_id: str
product_id: str
quantity: int
price: float
@dataclass
class CancelOrder(Command):
order_id: str
reason: str
# Command handler base
T = TypeVar('T', bound=Command)
class CommandHandler(ABC, Generic[T]):
@abstractmethod
async def handle(self, command: T) -> Any:
pass
# Command bus
class CommandBus:
def __init__(self):
self._handlers: Dict[Type[Command], CommandHandler] = {}
def register(self, command_type: Type[Command], handler: CommandHandler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> Any:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"No handler for {type(command).__name__}")
return await handler.handle(command)
# Command handler implementation
class CreateOrderHandler(CommandHandler[CreateOrder]):
def __init__(self, order_repository, event_store):
self.order_repository = order_repository
self.event_store = event_store
async def handle(self, command: CreateOrder) -> str:
# Validate
if not command.items:
raise ValueError("Order must have at least one item")
# Create aggregate
order = Order.create(
customer_id=command.customer_id,
items=command.items,
shipping_address=command.shipping_address
)
# Persist events
await self.event_store.append_events(
stream_id=f"Order-{order.id}",
stream_type="Order",
events=order.uncommitted_events
)
return order.id
Template 2: Query Infrastructure
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, List, Optional
# Query base
@dataclass
class Query:
pass
# Concrete queries
@dataclass
class GetOrderById(Query):
order_id: str
@dataclass
class GetCustomerOrders(Query):
customer_id: str
status: Optional[str] = None
page: int = 1
page_size: int = 20
@dataclass
class SearchOrders(Query):
query: str
filters: dict = None
sort_by: str = "created_at"
sort_order: str = "desc"
# Query result types
@dataclass
class OrderView:
order_id: str
customer_id: str
status: str
total_amount: float
item_count: int
created_at: datetime
shipped_at: Optional[datetime] = None
@dataclass
class PaginatedResult(Generic[T]):
items: List[T]
total: int
page: int
page_size: int
@property
def total_pages(self) -> int:
return (self.total + self.page_size - 1) // self.page_size
# Query handler base
T = TypeVar('T', bound=Query)
R = TypeVar('R')
class QueryHandler(ABC, Generic[T, R]):
@abstractmethod
async def handle(self, query: T) -> R:
pass
# Query bus
class QueryBus:
def __init__(self):
self._handlers: Dict[Type[Query], QueryHandler] = {}
def register(self, query_type: Type[Query], handler: QueryHandler):
self._handlers[query_type] = handler
async def dispatch(self, query: Query) -> Any:
handler = self._handlers.get(type(query))
if not handler:
raise ValueError(f"No handler for {type(query).__name__}")
return await handler.handle(query)
# Query handler implementation
class GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]):
def __init__(self, read_db):
self.read_db = read_db
async def handle(self, query: GetOrderById) -> Optional[OrderView]:
async with self.read_db.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT order_id, customer_id, status, total_amount,
item_count, created_at, shipped_at
FROM order_views
WHERE order_id = $1
""",
query.order_id
)
if row:
return OrderView(**dict(row))
return None
class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]):
def __init__(self, read_db):
self.read_db = read_db
async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:
async with self.read_db.acquire() as conn:
# Build query with optional status filter
where_clause = "customer_id = $1"
params = [query.customer_id]
if query.status:
where_clause += " AND status = $2"
params.append(query.status)
# Get total count
total = await conn.fetchval(
f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",
*params
)
# Get paginated results
offset = (query.page - 1) * query.page_size
rows = await conn.fetch(
f"""
SELECT order_id, customer_id, status, total_amount,
item_count, created_at, shipped_at
FROM order_views
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
""",
*params, query.page_size, offset
)
return PaginatedResult(
items=[OrderView(**dict(row)) for row in rows],
total=total,
page=query.page,
page_size=query.page_size
)
Template 3: FastAPI CQRS Application
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
# Request/Response models
class CreateOrderRequest(BaseModel):
customer_id: str
items: List[dict]
shipping_address: dict
class OrderResponse(BaseModel):
order_id: str
customer_id: str
status: str
total_amount: float
item_count: int
created_at: datetime
# Dependency injection
def get_command_bus() -> CommandBus:
return app.state.command_bus
def get_query_bus() -> QueryBus:
return app.state.query_bus
# Command endpoints (POST, PUT, DELETE)
@app.post("/orders", response_model=dict)
async def create_order(
request: CreateOrderRequest,
command_bus: CommandBus = Depends(get_command_bus)
):
command = CreateOrder(
customer_id=request.customer_id,
items=request.items,
shipping_address=request.shipping_address
)
order_id = await command_bus.dispatch(command)
return {"order_id": order_id}
@app.post("/orders/{order_id}/items")
async def add_item(
order_id: str,
product_id: str,
quantity: int,
price: float,
command_bus: CommandBus = Depends(get_command_bus)
):
command = AddOrderItem(
order_id=order_id,
product_id=product_id,
quantity=quantity,
price=price
)
await command_bus.dispatch(command)
return {"status": "item_added"}
@app.delete("/orders/{order_id}")
async def cancel_order(
order_id: str,
reason: str,
command_bus: CommandBus = Depends(get_command_bus)
):
command = CancelOrder(order_id=order_id, reason=reason)
await command_bus.dispatch(command)
return {"status": "cancelled"}
# Query endpoints (GET)
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
order_id: str,
query_bus: QueryBus = Depends(get_query_bus)
):
query = GetOrderById(order_id=order_id)
result = await query_bus.dispatch(query)
if not result:
raise HTTPException(status_code=404, detail="Order not found")
return result
@app.get("/customers/{customer_id}/orders")
async def get_customer_orders(
customer_id: str,
status: Optional[str] = None,
page: int = 1,
page_size: int = 20,
query_bus: QueryBus = Depends(get_query_bus)
):
query = GetCustomerOrders(
customer_id=customer_id,
status=status,
page=page,
page_size=page_size
)
return await query_bus.dispatch(query)
@app.get("/orders/search")
async def search_orders(
q: str,
sort_by: str = "created_at",
query_bus: QueryBus = Depends(get_query_bus)
):
query = SearchOrders(query=q, sort_by=sort_by)
return await query_bus.dispatch(query)
Template 4: Read Model Synchronization
class ReadModelSynchronizer:
"""Keeps read models in sync with events."""
def __init__(self, event_store, read_db, projections: List[Projection]):
self.event_store = event_store
self.read_db = read_db
self.projections = {p.name: p for p in projections}
async def run(self):
"""Continuously sync read models."""
while True:
for name, projection in self.projections.items():
await self._sync_projection(projection)
await asyncio.sleep(0.1)
async def _sync_projection(self, projection: Projection):
checkpoint = await self._get_checkpoint(projection.name)
events = await self.event_store.read_all(
from_position=checkpoint,
limit=100
)
for event in events:
if event.event_type in projection.handles():
try:
await projection.apply(event)
except Exception as e:
# Log error, possibly retry or skip
logger.error(f"Projection error: {e}")
continue
await self._save_checkpoint(projection.name, event.global_position)
async def rebuild_projection(self, projection_name: str):
"""Rebuild a projection from scratch."""
projection = self.projections[projection_name]
# Clear existing data
await projection.clear()
# Reset checkpoint
await self._save_checkpoint(projection_name, 0)
# Rebuild
while True:
checkpoint = await self._get_checkpoint(projection_name)
events = await self.event_store.read_all(checkpoint, 1000)
if not events:
break
for event in events:
if event.event_type in projection.handles():
await projection.apply(event)
await self._save_checkpoint(
projection_name,
events[-1].global_position
)
Template 5: Eventual Consistency Handling
class ConsistentQueryHandler:
"""Query handler that can wait for consistency."""
def __init__(self, read_db, event_store):
self.read_db = read_db
self.event_store = event_store
async def query_after_command(
self,
query: Query,
expected_version: int,
stream_id: str,
timeout: float = 5.0
):
"""
Execute query, ensuring read model is at expected version.
Used for read-your-writes consistency.
"""
start_time = time.time()
while time.time() - start_time < timeout:
# Check if read model is caught up
projection_version = await self._get_projection_version(stream_id)
if projection_version >= expected_version:
return await self.execute_query(query)
# Wait a bit and retry
await asyncio.sleep(0.1)
# Timeout - return stale data with warning
return {
"data": await self.execute_query(query),
"_warning": "Data may be stale"
}
async def _get_projection_version(self, stream_id: str) -> int:
"""Get the last processed event version for a stream."""
async with self.read_db.acquire() as conn:
return await conn.fetchval(
"SELECT last_event_version FROM projection_state WHERE stream_id = $1",
stream_id
) or 0
Best Practices
Do's
- Separate command and query models - Different needs
- Use eventual consistency - Accept propagation delay
- Validate in command handlers - Before state change
- Denormalize read models - Optimize for queries
- Version your events - For schema evolution
Don'ts
- Don't query in commands - Use only for writes
- Don't couple read/write schemas - Independent evolution
- Don't over-engineer - Start simple
- Don't ignore consistency SLAs - Define acceptable lag
Resources