Event Store Design
Comprehensive guide to designing event stores for event-sourced applications.
When to Use This Skill
- Designing event sourcing infrastructure
- Choosing between event store technologies
- Implementing custom event stores
- Optimizing event storage and retrieval
- Setting up event store schemas
- Planning for event store scaling
Core Concepts
1. Event Store Architecture
┌─────────────────────────────────────────────────────┐
│ Event Store │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │
│ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ Event 1 │ │ Event 1 │ │ Event 1 │ │
│ │ Event 2 │ │ Event 2 │ │ Event 2 │ │
│ │ Event 3 │ │ ... │ │ Event 3 │ │
│ │ ... │ │ │ │ Event 4 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │
└─────────────────────────────────────────────────────┘
2. Event Store Requirements
| Requirement |
Description |
| Append-only |
Events are immutable, only appends |
| Ordered |
Per-stream and global ordering |
| Versioned |
Optimistic concurrency control |
| Subscriptions |
Real-time event notifications |
| Idempotent |
Handle duplicate writes safely |
Technology Comparison
| Technology |
Best For |
Limitations |
| EventStoreDB |
Pure event sourcing |
Single-purpose |
| PostgreSQL |
Existing Postgres stack |
Manual implementation |
| Kafka |
High-throughput streaming |
Not ideal for per-stream queries |
| DynamoDB |
Serverless, AWS-native |
Query limitations |
| Marten |
.NET ecosystems |
.NET specific |
Templates
Template 1: PostgreSQL Event Store Schema
-- Events table
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version BIGINT NOT NULL,
global_position BIGSERIAL,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);
-- Index for stream queries
CREATE INDEX idx_events_stream_id ON events(stream_id, version);
-- Index for global subscription
CREATE INDEX idx_events_global_position ON events(global_position);
-- Index for event type queries
CREATE INDEX idx_events_event_type ON events(event_type);
-- Index for time-based queries
CREATE INDEX idx_events_created_at ON events(created_at);
-- Snapshots table
CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
stream_type VARCHAR(255) NOT NULL,
snapshot_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Subscriptions checkpoint table
CREATE TABLE subscription_checkpoints (
subscription_id VARCHAR(255) PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Template 2: Python Event Store Implementation
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional, List
from uuid import UUID, uuid4
import json
import asyncpg
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
event_id: UUID = field(default_factory=uuid4)
version: Optional[int] = None
global_position: Optional[int] = None
created_at: datetime = field(default_factory=datetime.utcnow)
class EventStore:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def append_events(
self,
stream_id: str,
stream_type: str,
events: List[Event],
expected_version: Optional[int] = None
) -> List[Event]:
"""Append events to a stream with optimistic concurrency."""
async with self.pool.acquire() as conn:
async with conn.transaction():
# Check expected version
if expected_version is not None:
current = await conn.fetchval(
"SELECT MAX(version) FROM events WHERE stream_id = $1",
stream_id
)
current = current or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, got {current}"
)
# Get starting version
start_version = await conn.fetchval(
"SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
stream_id
)
# Insert events
saved_events = []
for i, event in enumerate(events):
event.version = start_version + i
row = await conn.fetchrow(
"""
INSERT INTO events (id, stream_id, stream_type, event_type,
event_data, metadata, version, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING global_position
""",
event.event_id,
stream_id,
stream_type,
event.event_type,
json.dumps(event.data),
json.dumps(event.metadata),
event.version,
event.created_at
)
event.global_position = row['global_position']
saved_events.append(event)
return saved_events
async def read_stream(
self,
stream_id: str,
from_version: int = 0,
limit: int = 1000
) -> List[Event]:
"""Read events from a stream."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE stream_id = $1 AND version >= $2
ORDER BY version
LIMIT $3
""",
stream_id, from_version, limit
)
return [self._row_to_event(row) for row in rows]
async def read_all(
self,
from_position: int = 0,
limit: int = 1000
) -> List[Event]:
"""Read all events globally."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE global_position > $1
ORDER BY global_position
LIMIT $2
""",
from_position, limit
)
return [self._row_to_event(row) for row in rows]
async def subscribe(
self,
subscription_id: str,
handler,
from_position: int = 0,
batch_size: int = 100
):
"""Subscribe to all events from a position."""
# Get checkpoint
async with self.pool.acquire() as conn:
checkpoint = await conn.fetchval(
"""
SELECT last_position FROM subscription_checkpoints
WHERE subscription_id = $1
""",
subscription_id
)
position = checkpoint or from_position
while True:
events = await self.read_all(position, batch_size)
if not events:
await asyncio.sleep(1) # Poll interval
continue
for event in events:
await handler(event)
position = event.global_position
# Save checkpoint
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO subscription_checkpoints (subscription_id, last_position)
VALUES ($1, $2)
ON CONFLICT (subscription_id)
DO UPDATE SET last_position = $2, updated_at = NOW()
""",
subscription_id, position
)
def _row_to_event(self, row) -> Event:
return Event(
event_id=row['id'],
stream_id=row['stream_id'],
event_type=row['event_type'],
data=json.loads(row['event_data']),
metadata=json.loads(row['metadata']),
version=row['version'],
global_position=row['global_position'],
created_at=row['created_at']
)
class ConcurrencyError(Exception):
"""Raised when optimistic concurrency check fails."""
pass
Template 3: EventStoreDB Usage
from esdbclient import EventStoreDBClient, NewEvent, StreamState
import json
# Connect
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")
# Append events
def append_events(stream_name: str, events: list, expected_revision=None):
new_events = [
NewEvent(
type=event['type'],
data=json.dumps(event['data']).encode(),
metadata=json.dumps(event.get('metadata', {})).encode()
)
for event in events
]
if expected_revision is None:
state = StreamState.ANY
elif expected_revision == -1:
state = StreamState.NO_STREAM
else:
state = expected_revision
return client.append_to_stream(
stream_name=stream_name,
events=new_events,
current_version=state
)
# Read stream
def read_stream(stream_name: str, from_revision: int = 0):
events = client.get_stream(
stream_name=stream_name,
stream_position=from_revision
)
return [
{
'type': event.type,
'data': json.loads(event.data),
'metadata': json.loads(event.metadata) if event.metadata else {},
'stream_position': event.stream_position,
'commit_position': event.commit_position
}
for event in events
]
# Subscribe to all
async def subscribe_to_all(handler, from_position: int = 0):
subscription = client.subscribe_to_all(commit_position=from_position)
async for event in subscription:
await handler({
'type': event.type,
'data': json.loads(event.data),
'stream_id': event.stream_name,
'position': event.commit_position
})
# Category projection ($ce-Category)
def read_category(category: str):
"""Read all events for a category using system projection."""
return read_stream(f"$ce-{category}")
Template 4: DynamoDB Event Store
import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json
import uuid
class DynamoEventStore:
def __init__(self, table_name: str):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def append_events(self, stream_id: str, events: list, expected_version: int = None):
"""Append events with conditional write for concurrency."""
with self.table.batch_writer() as batch:
for i, event in enumerate(events):
version = (expected_version or 0) + i + 1
item = {
'PK': f"STREAM#{stream_id}",
'SK': f"VERSION#{version:020d}",
'GSI1PK': 'EVENTS',
'GSI1SK': datetime.utcnow().isoformat(),
'event_id': str(uuid.uuid4()),
'stream_id': stream_id,
'event_type': event['type'],
'event_data': json.dumps(event['data']),
'version': version,
'created_at': datetime.utcnow().isoformat()
}
batch.put_item(Item=item)
return events
def read_stream(self, stream_id: str, from_version: int = 0):
"""Read events from a stream."""
response = self.table.query(
KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
Key('SK').gte(f"VERSION#{from_version:020d}")
)
return [
{
'event_type': item['event_type'],
'data': json.loads(item['event_data']),
'version': item['version']
}
for item in response['Items']
]
# Table definition (CloudFormation/Terraform)
"""
DynamoDB Table:
- PK (Partition Key): String
- SK (Sort Key): String
- GSI1PK, GSI1SK for global ordering
Capacity: On-demand or provisioned based on throughput needs
"""
Best Practices
Do's
- Use stream IDs that include aggregate type -
Order-{uuid}
- Include correlation/causation IDs - For tracing
- Version events from day one - Plan for schema evolution
- Implement idempotency - Use event IDs for deduplication
- Index appropriately - For your query patterns
Don'ts
- Don't update or delete events - They're immutable facts
- Don't store large payloads - Keep events small
- Don't skip optimistic concurrency - Prevents data corruption
- Don't ignore backpressure - Handle slow consumers
Resources