| name | event-driven-architecture |
| description | Generic Event-Driven Architecture patterns with Kafka, Dapr, and modern messaging systems. Provides reusable patterns for building scalable, resilient event-driven microservices. Framework-agnostic implementation supporting multiple message brokers, state stores, and event patterns. Follows 2025 best practices for distributed systems. |
| license | MIT |
Event-Driven Architecture with Modern Patterns
This skill provides comprehensive patterns for implementing event-driven microservices using modern messaging systems, distributed runtimes, and cloud-native patterns. It's designed to be framework-agnostic and applicable to any domain requiring event-driven capabilities.
When to Use This Skill
Use this skill when you need to:
- Build event-driven microservices architecture
- Implement pub/sub patterns with Kafka, RabbitMQ, or cloud services
- Use Dapr for distributed application patterns
- Implement real-time notifications and workflows
- Build recurring task and reminder systems
- Create audit trails and activity logs
- Implement distributed state management
- Build serverless event workflows
- Handle event sourcing and CQRS patterns
1. Core Event Architecture
Base Event Schema
# events/core.py
from pydantic import BaseModel, Field, validator
from datetime import datetime
from typing import Optional, Dict, Any, Union, List
from enum import Enum
from abc import ABC, abstractmethod
import uuid
import json
class EventVersion(str, Enum):
"""Event versioning support"""
V1_0 = "1.0"
V1_1 = "1.1"
V2_0 = "2.0"
class EventPriority(str, Enum):
"""Event priority levels"""
LOW = "low"
NORMAL = "normal"
HIGH = "high"
CRITICAL = "critical"
class BaseEvent(BaseModel, ABC):
"""Base event schema with common fields"""
# Core identification
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = Field(..., description="Type of the event")
event_version: EventVersion = Field(default=EventVersion.V1_0)
# Metadata
timestamp: datetime = Field(default_factory=datetime.utcnow)
correlation_id: Optional[str] = None
causation_id: Optional[str] = None # Event that caused this one
source: str = Field(..., description="Source service/identifier")
# Context
user_id: Optional[str] = None
tenant_id: Optional[str] = None # Multi-tenant support
session_id: Optional[str] = None
# Processing metadata
priority: EventPriority = Field(default=EventPriority.NORMAL)
retry_count: int = Field(default=0)
max_retries: int = Field(default=3)
delay_until: Optional[datetime] = None # For delayed processing
# Schema versioning
schema_version: str = Field(default="1.0")
class Config:
# Allow additional fields for extensibility
extra = "allow"
# Use enum values
use_enum_values = True
@validator('event_type')
def validate_event_type(cls, v):
"""Validate event type format"""
if not v or '.' not in v:
raise ValueError('event_type must be in format: domain.event_name')
return v.lower()
@validator('correlation_id')
def validate_correlation_id(cls, v, values):
"""Set correlation_id from causation_id if not provided"""
if not v and 'causation_id' in values:
return values['causation_id']
return v
def to_dict(self) -> Dict[str, Any]:
"""Convert event to dictionary for serialization"""
return self.model_dump()
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'BaseEvent':
"""Create event from dictionary"""
return cls(**data)
def with_context(self, **context) -> 'BaseEvent':
"""Add context to event"""
for key, value in context.items():
if hasattr(self, key):
setattr(self, key, value)
return self
class DomainEvent(BaseEvent):
"""Domain-specific event"""
aggregate_id: str = Field(..., description="Aggregate root ID")
aggregate_type: str = Field(..., description="Aggregate type")
event_data: Dict[str, Any] = Field(default_factory=dict)
@validator('aggregate_type')
def validate_aggregate_type(cls, v):
"""Validate aggregate type"""
if not v:
raise ValueError('aggregate_type is required')
return v.lower()
class IntegrationEvent(BaseEvent):
"""Integration event for cross-service communication"""
target_services: List[str] = Field(default_factory=list)
routing_key: Optional[str] = None
message_format: str = Field(default="json")
@validator('routing_key')
def validate_routing_key(cls, v, values):
"""Generate routing key from event_type if not provided"""
if not v and 'event_type' in values:
return values['event_type'].replace('.', '/')
return v
class CommandEvent(BaseEvent):
"""Command event representing an intent"""
command_type: str = Field(..., description="Type of command")
command_data: Dict[str, Any] = Field(default_factory=dict)
expected_version: Optional[int] = None # For optimistic concurrency
@validator('command_type')
def validate_command_type(cls, v):
"""Validate command type"""
if not v.endswith('.command'):
v = f"{v}.command"
return v.lower()
class QueryEvent(BaseEvent):
"""Query event for data retrieval"""
query_type: str = Field(..., description="Type of query")
query_params: Dict[str, Any] = Field(default_factory=dict)
result_topic: Optional[str] = None # Where to send results
@validator('query_type')
def validate_query_type(cls, v):
"""Validate query type"""
if not v.endswith('.query'):
v = f"{v}.query"
return v.lower()
Event Store Pattern
# events/store.py
import asyncio
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, AsyncIterator
from datetime import datetime, timedelta
import json
import uuid
class EventStore(ABC):
"""Abstract event store interface"""
@abstractmethod
async def save_event(self, event: BaseEvent, stream_id: str) -> None:
"""Save event to a stream"""
pass
@abstractmethod
async def get_events(
self,
stream_id: str,
from_version: Optional[int] = None,
to_version: Optional[int] = None,
limit: Optional[int] = None
) -> AsyncIterator[BaseEvent]:
"""Get events from a stream"""
pass
@abstractmethod
async def get_event_by_id(self, event_id: str) -> Optional[BaseEvent]:
"""Get a specific event by ID"""
pass
@abstractmethod
async def get_events_by_type(
self,
event_type: str,
from_timestamp: Optional[datetime] = None,
to_timestamp: Optional[datetime] = None
) -> AsyncIterator[BaseEvent]:
"""Get events by type"""
pass
@abstractmethod
async def get_aggregate_snapshot(
self,
aggregate_id: str,
aggregate_type: str
) -> Optional[Dict[str, Any]]:
"""Get aggregate snapshot"""
pass
@abstractmethod
async def save_aggregate_snapshot(
self,
aggregate_id: str,
aggregate_type: str,
data: Dict[str, Any],
version: int
) -> None:
"""Save aggregate snapshot"""
pass
class InMemoryEventStore(EventStore):
"""In-memory event store for testing and development"""
def __init__(self):
self._events: Dict[str, List[BaseEvent]] = {}
self._snapshots: Dict[str, Dict[str, Any]] = {}
self._type_index: Dict[str, List[str]] = {}
async def save_event(self, event: BaseEvent, stream_id: str) -> None:
"""Save event to memory"""
if stream_id not in self._events:
self._events[stream_id] = []
self._events[stream_id].append(event)
# Update type index
if event.event_type not in self._type_index:
self._type_index[event.event_type] = []
self._type_index[event.event_type].append(event.event_id)
async def get_events(
self,
stream_id: str,
from_version: Optional[int] = None,
to_version: Optional[int] = None,
limit: Optional[int] = None
) -> AsyncIterator[BaseEvent]:
"""Get events from memory"""
events = self._events.get(stream_id, [])
# Apply filters
if from_version is not None:
events = events[from_version:]
if to_version is not None:
events = events[:to_version]
if limit is not None:
events = events[:limit]
for event in events:
yield event
async def get_event_by_id(self, event_id: str) -> Optional[BaseEvent]:
"""Get event by ID from memory"""
for events in self._events.values():
for event in events:
if event.event_id == event_id:
return event
return None
async def get_events_by_type(
self,
event_type: str,
from_timestamp: Optional[datetime] = None,
to_timestamp: Optional[datetime] = None
) -> AsyncIterator[BaseEvent]:
"""Get events by type from memory"""
event_ids = self._type_index.get(event_type, [])
for event_id in event_ids:
event = await self.get_event_by_id(event_id)
if event:
# Apply time filters
if from_timestamp and event.timestamp < from_timestamp:
continue
if to_timestamp and event.timestamp > to_timestamp:
continue
yield event
async def get_aggregate_snapshot(
self,
aggregate_id: str,
aggregate_type: str
) -> Optional[Dict[str, Any]]:
"""Get snapshot from memory"""
snapshot_key = f"{aggregate_type}:{aggregate_id}"
return self._snapshots.get(snapshot_key)
async def save_aggregate_snapshot(
self,
aggregate_id: str,
aggregate_type: str,
data: Dict[str, Any],
version: int
) -> None:
"""Save snapshot to memory"""
snapshot_key = f"{aggregate_type}:{aggregate_id}"
self._snapshots[snapshot_key] = {
"data": data,
"version": version,
"timestamp": datetime.utcnow()
}
# PostgreSQL Event Store Implementation
class PostgreSQLEventStore(EventStore):
"""PostgreSQL-based event store"""
def __init__(self, db_pool):
self.db_pool = db_pool
async def initialize_schema(self):
"""Initialize database schema"""
async with self.db_pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS event_store (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
stream_version INTEGER NOT NULL,
event_id VARCHAR(255) UNIQUE NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
INDEX (stream_id, stream_version),
INDEX (event_type),
INDEX (timestamp)
);
CREATE TABLE IF NOT EXISTS snapshots (
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
data JSONB NOT NULL,
version INTEGER NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
PRIMARY KEY (aggregate_type, aggregate_id)
);
""")
async def save_event(self, event: BaseEvent, stream_id: str) -> None:
"""Save event to PostgreSQL"""
async with self.db_pool.acquire() as conn:
# Get next version
result = await conn.fetchval(
"SELECT COALESCE(MAX(stream_version), 0) + 1 FROM event_store WHERE stream_id = $1",
stream_id
)
await conn.execute(
"""
INSERT INTO event_store (
stream_id, stream_version, event_id, event_type,
event_data, metadata, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7)
""",
stream_id,
result,
event.event_id,
event.event_type,
event.to_dict(),
{
"correlation_id": event.correlation_id,
"causation_id": event.causation_id,
"source": event.source,
"user_id": event.user_id,
"tenant_id": event.tenant_id
},
event.timestamp
)
async def get_events(
self,
stream_id: str,
from_version: Optional[int] = None,
to_version: Optional[int] = None,
limit: Optional[int] = None
) -> AsyncIterator[BaseEvent]:
"""Get events from PostgreSQL"""
query = "SELECT event_data FROM event_store WHERE stream_id = $1"
params = [stream_id]
if from_version is not None:
query += " AND stream_version >= $2"
params.append(from_version)
version_offset = 2
else:
version_offset = 1
if to_version is not None:
query += f" AND stream_version <= ${version_offset + 1}"
params.append(to_version)
version_offset += 1
query += " ORDER BY stream_version"
if limit is not None:
query += f" LIMIT ${version_offset + 1}"
params.append(limit)
async with self.db_pool.acquire() as conn:
async for row in conn.cursor(query, *params):
yield BaseEvent.from_dict(row["event_data"])
async def get_aggregate_snapshot(
self,
aggregate_id: str,
aggregate_type: str
) -> Optional[Dict[str, Any]]:
"""Get snapshot from PostgreSQL"""
async with self.db_pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT data, version FROM snapshots
WHERE aggregate_id = $1 AND aggregate_type = $2
""",
aggregate_id,
aggregate_type
)
if row:
return {
"data": row["data"],
"version": row["version"]
}
return None
async def save_aggregate_snapshot(
self,
aggregate_id: str,
aggregate_type: str,
data: Dict[str, Any],
version: int
) -> None:
"""Save snapshot to PostgreSQL"""
async with self.db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO snapshots (aggregate_id, aggregate_type, data, version, timestamp)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (aggregate_type, aggregate_id)
DO UPDATE SET data = $3, version = $4, timestamp = NOW()
""",
aggregate_id,
aggregate_type,
data,
version
)
2. Message Broker Abstraction
Generic Message Broker Interface
# messaging/broker.py
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Callable, List, AsyncIterator
from dataclasses import dataclass
from enum import Enum
import json
import logging
logger = logging.getLogger(__name__)
class MessageBrokerType(str, Enum):
"""Supported message broker types"""
KAFKA = "kafka"
RABBITMQ = "rabbitmq"
REDIS = "redis"
AWS_SQS = "aws_sqs"
AZURE_SB = "azure_service_bus"
GCP_PUBSUB = "gcp_pubsub"
NATS = "nats"
EMQX = "emqx"
@dataclass
class Message:
"""Generic message representation"""
topic: str
key: Optional[str]
value: Any
headers: Optional[Dict[str, str]] = None
partition: Optional[int] = None
timestamp: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert message to dictionary"""
return {
"topic": self.topic,
"key": self.key,
"value": self.value,
"headers": self.headers or {},
"partition": self.partition,
"timestamp": self.timestamp
}
class MessageBroker(ABC):
"""Abstract message broker interface"""
@abstractmethod
async def connect(self) -> None:
"""Establish connection to broker"""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Close connection to broker"""
pass
@abstractmethod
async def publish(self, message: Message) -> None:
"""Publish a message"""
pass
@abstractmethod
async def publish_batch(self, messages: List[Message]) -> None:
"""Publish multiple messages"""
pass
@abstractmethod
async def subscribe(
self,
topic: str,
handler: Callable[[Message], None],
group_id: Optional[str] = None
) -> None:
"""Subscribe to a topic"""
pass
@abstractmethod
async def create_topic(self, topic: str, **kwargs) -> None:
"""Create a topic if it doesn't exist"""
pass
class KafkaBroker(MessageBroker):
"""Kafka message broker implementation"""
def __init__(
self,
bootstrap_servers: str,
producer_config: Optional[Dict[str, Any]] = None,
consumer_config: Optional[Dict[str, Any]] = None
):
self.bootstrap_servers = bootstrap_servers
self.producer_config = producer_config or {}
self.consumer_config = consumer_config or {}
self.producer = None
self.consumers: List[Any] = []
# Default configurations
self.producer_config.update({
'bootstrap_servers': bootstrap_servers,
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
'key_serializer': lambda k: k.encode('utf-8') if k else None,
'acks': 'all',
'retries': 3,
'retry_backoff_ms': 100
})
self.consumer_config.update({
'bootstrap_servers': bootstrap_servers,
'value_deserializer': lambda v: json.loads(v.decode('utf-8')),
'key_deserializer': lambda k: k.decode('utf-8') if k else None,
'auto_offset_reset': 'earliest',
'enable_auto_commit': True
})
async def connect(self) -> None:
"""Connect to Kafka"""
from aiokafka import AIOKafkaProducer
self.producer = AIOKafkaProducer(**self.producer_config)
await self.producer.start()
logger.info("Connected to Kafka broker")
async def disconnect(self) -> None:
"""Disconnect from Kafka"""
if self.producer:
await self.producer.stop()
for consumer in self.consumers:
await consumer.stop()
logger.info("Disconnected from Kafka broker")
async def publish(self, message: Message) -> None:
"""Publish message to Kafka"""
if not self.producer:
raise RuntimeError("Producer not initialized")
await self.producer.send_and_wait(
topic=message.topic,
value=message.value,
key=message.key,
headers=message.headers,
partition=message.partition
)
async def publish_batch(self, messages: List[Message]) -> None:
"""Publish batch of messages to Kafka"""
if not self.producer:
raise RuntimeError("Producer not initialized")
# Group messages by topic
topics = {}
for msg in messages:
if msg.topic not in topics:
topics[msg.topic] = []
topics[msg.topic].append(msg)
# Send batched messages
for topic, topic_messages in topics.items():
for msg in topic_messages:
await self.producer.send_and_wait(
topic=topic,
value=msg.value,
key=msg.key,
headers=msg.headers,
partition=msg.partition
)
async def subscribe(
self,
topic: str,
handler: Callable[[Message], None],
group_id: Optional[str] = None
) -> None:
"""Subscribe to Kafka topic"""
from aiokafka import AIOKafkaConsumer
consumer_config = self.consumer_config.copy()
if group_id:
consumer_config['group_id'] = group_id
consumer = AIOKafkaConsumer(topic, **consumer_config)
await consumer.start()
self.consumers.append(consumer)
# Start consuming task
asyncio.create_task(self._consume_messages(consumer, handler))
async def _consume_messages(
self,
consumer: Any,
handler: Callable[[Message], None]
) -> None:
"""Consume messages from Kafka"""
try:
async for msg in consumer:
message = Message(
topic=msg.topic,
key=msg.key,
value=msg.value,
headers=dict(msg.headers) if msg.headers else {},
partition=msg.partition,
timestamp=msg.timestamp
)
try:
await self._handle_message(handler, message)
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
except Exception as e:
logger.error(f"Consumer error: {e}", exc_info=True)
async def _handle_message(
self,
handler: Callable[[Message], None],
message: Message
) -> None:
"""Handle a single message"""
if asyncio.iscoroutinefunction(handler):
await handler(message)
else:
# Run sync handler in thread pool
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, handler, message)
async def create_topic(self, topic: str, **kwargs) -> None:
"""Create Kafka topic"""
from aiokafka.admin import AIOKafkaAdminClient, NewTopic
admin_client = AIOKafkaAdminClient(
bootstrap_servers=self.bootstrap_servers
)
try:
await admin_client.start()
topic_metadata = NewTopic(
name=topic,
num_partitions=kwargs.get('num_partitions', 3),
replication_factor=kwargs.get('replication_factor', 1)
)
await admin_client.create_topics([topic_metadata])
logger.info(f"Created topic: {topic}")
except Exception as e:
logger.warning(f"Failed to create topic {topic}: {e}")
finally:
await admin_client.close()
class RedisBroker(MessageBroker):
"""Redis-based message broker using streams"""
def __init__(self, redis_url: str):
self.redis_url = redis_url
self.redis = None
self.consumer_groups: Dict[str, str] = {}
async def connect(self) -> None:
"""Connect to Redis"""
import aioredis
self.redis = aioredis.from_url(self.redis_url)
await self.redis.ping()
logger.info("Connected to Redis broker")
async def disconnect(self) -> None:
"""Disconnect from Redis"""
if self.redis:
await self.redis.close()
async def publish(self, message: Message) -> None:
"""Publish message to Redis stream"""
if not self.redis:
raise RuntimeError("Redis not initialized")
message_data = {
'key': message.key,
'value': json.dumps(message.value),
'headers': json.dumps(message.headers or {}),
'timestamp': message.timestamp or int(time.time() * 1000)
}
await self.redis.xadd(message.topic, message_data)
async def publish_batch(self, messages: List[Message]) -> None:
"""Publish batch to Redis streams"""
# Redis doesn't have native batch xadd, so we use pipeline
pipe = self.redis.pipeline()
for message in messages:
message_data = {
'key': message.key,
'value': json.dumps(message.value),
'headers': json.dumps(message.headers or {}),
'timestamp': message.timestamp or int(time.time() * 1000)
}
pipe.xadd(message.topic, message_data)
await pipe.execute()
async def subscribe(
self,
topic: str,
handler: Callable[[Message], None],
group_id: Optional[str] = None
) -> None:
"""Subscribe to Redis stream"""
if group_id:
self.consumer_groups[topic] = group_id
try:
await self.redis.xgroup_create(topic, group_id, id='0', mkstream=True)
except Exception:
pass # Group might already exist
# Start consuming task
asyncio.create_task(self._consume_stream(topic, handler, group_id))
async def _consume_stream(
self,
topic: str,
handler: Callable[[Message], None],
group_id: Optional[str]
) -> None:
"""Consume messages from Redis stream"""
while True:
try:
if group_id:
# Consumer group mode
messages = await self.redis.xreadgroup(
group_id,
'consumer',
{topic: '>'},
count=10,
block=1000
)
else:
# Independent consumer mode
messages = await self.redis.xread(
{topic: '$'},
count=10,
block=1000
)
for stream, msgs in messages:
for msg_id, fields in msgs:
message = Message(
topic=topic,
key=fields.get('key'),
value=json.loads(fields['value']),
headers=json.loads(fields.get('headers', '{}')),
timestamp=int(fields.get('timestamp', 0))
)
try:
await self._handle_message(handler, message)
if group_id:
await self.redis.xack(topic, group_id, msg_id)
except Exception as e:
logger.error(f"Error handling message: {e}")
except Exception as e:
logger.error(f"Stream consumer error: {e}")
await asyncio.sleep(1)
class MessageBrokerFactory:
"""Factory for creating message brokers"""
@staticmethod
def create(broker_type: MessageBrokerType, **kwargs) -> MessageBroker:
"""Create a message broker instance"""
if broker_type == MessageBrokerType.KAFKA:
return KafkaBroker(
bootstrap_servers=kwargs['bootstrap_servers'],
producer_config=kwargs.get('producer_config'),
consumer_config=kwargs.get('consumer_config')
)
elif broker_type == MessageBrokerType.REDIS:
return RedisBroker(redis_url=kwargs['redis_url'])
else:
raise ValueError(f"Unsupported broker type: {broker_type}")
3. Event Processing Patterns
Saga Pattern Implementation
# patterns/saga.py
import asyncio
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
from dataclasses import dataclass, field
import uuid
import logging
logger = logging.getLogger(__name__)
class SagaStatus(str, Enum):
"""Saga status enumeration"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
FAILED = "failed"
@dataclass
class SagaStep:
"""Single step in a saga"""
step_id: str = field(default_factory=lambda: str(uuid.uuid4()))
action: Optional[Callable] = None
compensation: Optional[Callable] = None
params: Dict[str, Any] = field(default_factory=dict)
retry_count: int = 0
max_retries: int = 3
timeout: float = 30.0
completed: bool = False
result: Any = None
class Saga:
"""Saga orchestrator for distributed transactions"""
def __init__(self, saga_id: Optional[str] = None):
self.saga_id = saga_id or str(uuid.uuid4())
self.status = SagaStatus.PENDING
self.steps: List[SagaStep] = []
self.completed_steps: List[SagaStep] = []
self.current_step_index = 0
self.error: Optional[Exception] = None
self.created_at = datetime.utcnow()
self.updated_at = datetime.utcnow()
def add_step(
self,
action: Callable,
compensation: Callable,
params: Dict[str, Any] = None,
max_retries: int = 3,
timeout: float = 30.0
) -> 'Saga':
"""Add a step to the saga"""
step = SagaStep(
action=action,
compensation=compensation,
params=params or {},
max_retries=max_retries,
timeout=timeout
)
self.steps.append(step)
return self
async def execute(self) -> bool:
"""Execute the saga"""
try:
self.status = SagaStatus.RUNNING
self.updated_at = datetime.utcnow()
logger.info(f"Starting saga {self.saga_id} with {len(self.steps)} steps")
# Execute all steps
for i, step in enumerate(self.steps):
self.current_step_index = i
if not await self._execute_step(step):
# Step failed, start compensation
await self._compensate()
return False
self.completed_steps.append(step)
# All steps completed successfully
self.status = SagaStatus.COMPLETED
self.updated_at = datetime.utcnow()
logger.info(f"Saga {self.saga_id} completed successfully")
return True
except Exception as e:
self.error = e
logger.error(f"Saga {self.saga_id} failed: {e}")
await self._compensate()
return False
async def _execute_step(self, step: SagaStep) -> bool:
"""Execute a single step"""
logger.info(f"Executing step {step.step_id}")
while step.retry_count <= step.max_retries:
try:
# Execute with timeout
step.result = await asyncio.wait_for(
self._run_action(step.action, step.params),
timeout=step.timeout
)
step.completed = True
logger.info(f"Step {step.step_id} completed successfully")
return True
except asyncio.TimeoutError:
step.retry_count += 1
logger.warning(f"Step {step.step_id} timed out, retry {step.retry_count}")
await asyncio.sleep(2 ** step.retry_count) # Exponential backoff
except Exception as e:
step.retry_count += 1
logger.error(f"Step {step.step_id} failed: {e}")
if step.retry_count > step.max_retries:
logger.error(f"Step {step.step_id} failed after {step.max_retries} retries")
self.error = e
return False
await asyncio.sleep(2 ** step.retry_count) # Exponential backoff
return False
async def _run_action(self, action: Callable, params: Dict[str, Any]) -> Any:
"""Run action with parameters"""
if asyncio.iscoroutinefunction(action):
return await action(**params)
else:
# Run sync action in thread pool
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, action, **params)
async def _compensate(self) -> None:
"""Compensate completed steps"""
self.status = SagaStatus.COMPENSATING
self.updated_at = datetime.utcnow()
logger.info(f"Starting compensation for saga {self.saga_id}")
# Compensate in reverse order
for step in reversed(self.completed_steps):
try:
await self._compensate_step(step)
except Exception as e:
logger.error(f"Compensation failed for step {step.step_id}: {e}")
# Continue with other compensations
self.status = SagaStatus.COMPENSATED if self.error else SagaStatus.FAILED
self.updated_at = datetime.utcnow()
status_str = "compensated" if self.error else "failed"
logger.info(f"Saga {self.saga_id} {status_str}")
async def _compensate_step(self, step: SagaStep) -> None:
"""Compensate a single step"""
logger.info(f"Compensating step {step.step_id}")
compensation_params = step.params.copy()
compensation_params['result'] = step.result
if step.compensation:
if asyncio.iscoroutinefunction(step.compensation):
await step.compensation(**compensation_params)
else:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
step.compensation,
**compensation_params
)
class SagaManager:
"""Manages multiple sagas"""
def __init__(self, event_store: EventStore, message_broker: MessageBroker):
self.event_store = event_store
self.message_broker = message_broker
self.active_sagas: Dict[str, Saga] = {}
def create_saga(self, saga_id: Optional[str] = None) -> Saga:
"""Create a new saga"""
saga = Saga(saga_id)
self.active_sagas[saga.saga_id] = saga
return saga
async def execute_saga(self, saga: Saga) -> bool:
"""Execute a saga and persist its state"""
# Save saga started event
await self._save_saga_event(saga, "saga.started")
# Execute saga
result = await saga.execute()
# Save saga completed/failed event
event_type = "saga.completed" if result else "saga.failed"
await self._save_saga_event(saga, event_type)
# Remove from active sagas
if saga.saga_id in self.active_sagas:
del self.active_sagas[saga.saga_id]
return result
async def _save_saga_event(self, saga: Saga, event_type: str) -> None:
"""Save saga lifecycle event"""
event = DomainEvent(
event_type=event_type,
event_data={
"saga_id": saga.saga_id,
"status": saga.status.value,
"total_steps": len(saga.steps),
"completed_steps": len(saga.completed_steps),
"current_step": saga.current_step_index,
"error": str(saga.error) if saga.error else None
},
aggregate_id=saga.saga_id,
aggregate_type="saga",
source="saga_manager"
)
await self.event_store.save_event(event, f"saga-{saga.saga_id}")
# Also publish to message broker
message = Message(
topic="saga-events",
key=saga.saga_id,
value=event.to_dict()
)
await self.message_broker.publish(message)
CQRS Pattern Implementation
# patterns/cqrs.py
import asyncio
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Type, TypeVar, Generic
from dataclasses import dataclass, field
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
T = TypeVar('T')
@dataclass
class CommandResponse:
"""Response from command execution"""
success: bool
message: Optional[str] = None
data: Optional[Dict[str, Any]] = None
errors: List[str] = field(default_factory=list)
class Command(ABC):
"""Base command class"""
def __init__(self, aggregate_id: str):
self.aggregate_id = aggregate_id
self.id = str(uuid.uuid4())
self.timestamp = datetime.utcnow()
class CommandHandler(ABC, Generic[T]):
"""Base command handler"""
@abstractmethod
async def handle(self, command: T) -> CommandResponse:
"""Handle the command"""
pass
class Query(ABC):
"""Base query class"""
def __init__(self):
self.id = str(uuid.uuid4())
self.timestamp = datetime.utcnow()
class QueryResult(ABC):
"""Base query result"""
pass
class QueryHandler(ABC, Generic[T]):
"""Base query handler"""
@abstractmethod
async def handle(self, query: T) -> QueryResult:
"""Handle the query"""
pass
class CommandBus:
"""Command bus for dispatching commands"""
def __init__(self):
self.handlers: Dict[Type[Command], CommandHandler] = {}
self.middleware: List[Callable] = []
def register(
self,
command_type: Type[Command],
handler: CommandHandler
) -> None:
"""Register a command handler"""
self.handlers[command_type] = handler
logger.info(f"Registered handler for {command_type.__name__}")
def add_middleware(self, middleware: Callable) -> None:
"""Add middleware to the command bus"""
self.middleware.append(middleware)
async def dispatch(self, command: Command) -> CommandResponse:
"""Dispatch a command to its handler"""
command_type = type(command)
if command_type not in self.handlers:
return CommandResponse(
success=False,
message=f"No handler registered for {command_type.__name__}"
)
handler = self.handlers[command_type]
# Apply middleware
for middleware in self.middleware:
command = await middleware(command)
try:
# Save command event
await self._save_command_event(command)
# Handle command
response = await handler.handle(command)
# Save response event
await self._save_response_event(command, response)
return response
except Exception as e:
logger.error(f"Error handling command {command_type.__name__}: {e}")
return CommandResponse(
success=False,
message=str(e),
errors=[str(e)]
)
async def _save_command_event(self, command: Command) -> None:
"""Save command event to event store"""
# Implementation would save to event store
pass
async def _save_response_event(
self,
command: Command,
response: CommandResponse
) -> None:
"""Save response event to event store"""
# Implementation would save to event store
pass
class QueryBus:
"""Query bus for dispatching queries"""
def __init__(self):
self.handlers: Dict[Type[Query], QueryHandler] = {}
self.middleware: List[Callable] = []
def register(
self,
query_type: Type[Query],
handler: QueryHandler
) -> None:
"""Register a query handler"""
self.handlers[query_type] = handler
logger.info(f"Registered handler for {query_type.__name__}")
def add_middleware(self, middleware: Callable) -> None:
"""Add middleware to the query bus"""
self.middleware.append(middleware)
async def dispatch(self, query: Query) -> QueryResult:
"""Dispatch a query to its handler"""
query_type = type(query)
if query_type not in self.handlers:
raise ValueError(f"No handler registered for {query_type.__name__}")
handler = self.handlers[query_type]
# Apply middleware
for middleware in self.middleware:
query = await middleware(query)
# Handle query
return await handler.handle(query)
class EventProjector(ABC):
"""Base event projector for building read models"""
def __init__(self, event_store: EventStore):
self.event_store = event_store
@abstractmethod
async def project(self, event: BaseEvent) -> None:
"""Project an event to the read model"""
pass
async def project_from_stream(
self,
stream_id: str,
from_version: Optional[int] = None
) -> None:
"""Project events from a stream"""
async for event in self.event_store.get_events(
stream_id,
from_version=from_version
):
await self.project(event)
async def project_by_type(
self,
event_type: str,
from_timestamp: Optional[datetime] = None
) -> None:
"""Project events by type"""
async for event in self.event_store.get_events_by_type(
event_type,
from_timestamp=from_timestamp
):
await self.project(event)
# Example CQRS Implementation
class CreateTaskCommand(Command):
"""Create task command"""
def __init__(self, aggregate_id: str, title: str, description: str = None):
super().__init__(aggregate_id)
self.title = title
self.description = description
class TaskCreatedEvent(DomainEvent):
"""Task created event"""
def __init__(self, aggregate_id: str, title: str, description: str = None):
super().__init__(
event_type="task.created",
aggregate_id=aggregate_id,
aggregate_type="task",
event_data={
"title": title,
"description": description
}
)
class TaskReadModel:
"""Task read model for queries"""
def __init__(self):
self.tasks: Dict[str, Dict[str, Any]] = {}
async def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
"""Get task from read model"""
return self.tasks.get(task_id)
async def list_tasks(self, user_id: str = None) -> List[Dict[str, Any]]:
"""List all tasks or user's tasks"""
tasks = list(self.tasks.values())
if user_id:
tasks = [t for t in tasks if t.get("user_id") == user_id]
return tasks
class TaskProjector(EventProjector):
"""Task event projector"""
def __init__(self, event_store: EventStore, read_model: TaskReadModel):
super().__init__(event_store)
self.read_model = read_model
async def project(self, event: BaseEvent) -> None:
"""Project task events to read model"""
if event.event_type == "task.created":
await self._project_task_created(event)
elif event.event_type == "task.updated":
await self._project_task_updated(event)
elif event.event_type == "task.deleted":
await self._project_task_deleted(event)
async def _project_task_created(self, event: DomainEvent) -> None:
"""Project task created event"""
self.read_model.tasks[event.aggregate_id] = {
"id": event.aggregate_id,
"title": event.event_data["title"],
"description": event.event_data.get("description"),
"status": "todo",
"created_at": event.timestamp,
"updated_at": event.timestamp
}
async def _project_task_updated(self, event: DomainEvent) -> None:
"""Project task updated event"""
if event.aggregate_id in self.read_model.tasks:
task = self.read_model.tasks[event.aggregate_id]
task.update(event.event_data)
task["updated_at"] = event.timestamp
async def _project_task_deleted(self, event: DomainEvent) -> None:
"""Project task deleted event"""
if event.aggregate_id in self.read_model.tasks:
del self.read_model.tasks[event.aggregate_id]
4. Dapr Integration Patterns
Generic Dapr Client
# dapr/client.py
import asyncio
import httpx
from typing import Dict, Any, List, Optional
import json
import logging
logger = logging.getLogger(__name__)
class DaprClient:
"""Generic Dapr client with comprehensive feature support"""
def __init__(
self,
dapr_port: int = 3500,
dapr_host: str = "localhost",
timeout: float = 30.0
):
self.dapr_port = dapr_port
self.dapr_host = dapr_host
self.base_url = f"http://{dapr_host}:{dapr_port}/v1.0"
self.timeout = timeout
# HTTP client for Dapr API calls
self.client = httpx.AsyncClient(timeout=timeout)
async def close(self):
"""Close the Dapr client"""
await self.client.aclose()
# State Management
async def save_state(
self,
store_name: str,
key: str,
value: Any,
metadata: Optional[Dict[str, str]] = None,
options: Optional[Dict[str, Any]] = None
) -> None:
"""Save state to Dapr state store"""
url = f"{self.base_url}/state/{store_name}"
state_item = {
"key": key,
"value": value
}
if metadata:
state_item["metadata"] = metadata
if options:
state_item["options"] = options
response = await self.client.post(url, json=[state_item])
response.raise_for_status()
async def get_state(
self,
store_name: str,
key: str,
metadata: Optional[Dict[str, str]] = None
) -> Any:
"""Get state from Dapr state store"""
url = f"{self.base_url}/state/{store_name}/{key}"
params = {}
if metadata:
params["metadata"] = json.dumps(metadata)
response = await self.client.get(url, params=params)
response.raise_for_status()
if response.status_code == 204:
return None
return response.json()
async def delete_state(
self,
store_name: str,
key: str,
metadata: Optional[Dict[str, str]] = None
) -> None:
"""Delete state from Dapr state store"""
url = f"{self.base_url}/state/{store_name}/{key}"
params = {}
if metadata:
params["metadata"] = json.dumps(metadata)
response = await self.client.delete(url, params=params)
response.raise_for_status()
async def get_bulk_state(
self,
store_name: str,
keys: List[str],
parallelism: int = 10,
metadata: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Get multiple state items"""
url = f"{self.base_url}/state/{store_name}/bulk"
request_data = {
"keys": keys,
"parallelism": parallelism
}
if metadata:
request_data["metadata"] = metadata
response = await self.client.post(url, json=request_data)
response.raise_for_status()
return response.json()
# Pub/Sub
async def publish_event(
self,
pubsub_name: str,
topic: str,
data: Any,
metadata: Optional[Dict[str, str]] = None
) -> None:
"""Publish event via Dapr pub/sub"""
url = f"{self.base_url}/publish/{pubsub_name}/{topic}"
headers = {}
if metadata:
for key, value in metadata.items():
headers[f"metadata-{key}"] = value
response = await self.client.post(
url,
content=json.dumps(data),
headers=headers
)
response.raise_for_status()
# Service Invocation
async def invoke_service(
self,
app_id: str,
method_name: str,
data: Any = None,
http_verb: str = "POST",
metadata: Optional[Dict[str, str]] = None
) -> Any:
"""Invoke a service method"""
url = f"{self.base_url}/invoke/{app_id}/method/{method_name}"
headers = {}
if metadata:
for key, value in metadata.items():
headers[f"metadata-{key}"] = value
response = await self.client.request(
http_verb,
url,
content=json.dumps(data) if data else None,
headers=headers
)
response.raise_for_status()
return response.json()
# Bindings
async def invoke_binding(
self,
binding_name: str,
operation: str,
data: Any,
metadata: Optional[Dict[str, str]] = None
) -> Any:
"""Invoke an output binding"""
url = f"{self.base_url}/bindings/{binding_name}"
request_data = {
"operation": operation,
"data": data
}
if metadata:
request_data["metadata"] = metadata
response = await self.client.post(url, json=request_data)
response.raise_for_status()
return response.json()
# Secrets
async def get_secret(
self,
secret_store_name: str,
key: str,
metadata: Optional[Dict[str, str]] = None
) -> str:
"""Get secret from Dapr secret store"""
url = f"{self.base_url}/secrets/{secret_store_name}/{key}"
params = {}
if metadata:
params["metadata"] = json.dumps(metadata)
response = await self.client.get(url, params=params)
response.raise_for_status()
return response.json()[key]
# Configuration
async def get_configuration(
self,
configuration_store_name: str,
keys: List[str],
metadata: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Get configuration from Dapr configuration store"""
url = f"{self.base_url}/configuration/{configuration_store_name}"
request_data = {"keys": keys}
if metadata:
request_data["metadata"] = metadata
response = await self.client.post(url, json=request_data)
response.raise_for_status()
return response.json()
# Distributed Lock
async def acquire_lock(
self,
lock_store_name: str,
resource_id: str,
lock_owner: str,
expiry_in_seconds: int = 60
) -> bool:
"""Acquire distributed lock"""
url = f"{self.base_url}/locks/{lock_store_name}/{resource_id}"
request_data = {
"lockOwner": lock_owner,
"expiryInSeconds": expiry_in_seconds
}
response = await self.client.post(url, json=request_data)
response.raise_for_status()
return response.json().get("success", False)
async def release_lock(
self,
lock_store_name: str,
resource_id: str,
lock_owner: str
) -> bool:
"""Release distributed lock"""
url = f"{self.base_url}/locks/{lock_store_name}/{resource_id}/{lock_owner}"
response = await self.client.delete(url)
response.raise_for_status()
return response.json().get("success", False)
# Health Checks
async def health_check(self) -> bool:
"""Check Dapr sidecar health"""
try:
url = f"{self.base_url}/healthz"
response = await self.client.get(url)
return response.status_code == 200
except Exception as e:
logger.error(f"Dapr health check failed: {e}")
return False
class DaprWorkflow:
"""Dapr Workflow API client"""
def __init__(self, dapr_client: DaprClient):
self.dapr_client = dapr_client
async def start_workflow(
self,
workflow_name: str,
input_data: Any = None,
instance_id: Optional[str] = None
) -> str:
"""Start a workflow instance"""
url = f"{self.dapr_client.base_url}/workflows/{workflow_name}/start"
request_data = {}
if input_data:
request_data["input"] = input_data
if instance_id:
request_data["workflowInstanceId"] = instance_id
response = await self.dapr_client.client.post(url, json=request_data)
response.raise_for_status()
return response.json().get("instanceId")
async def get_workflow_state(
self,
instance_id: str
) -> Dict[str, Any]:
"""Get workflow state"""
url = f"{self.dapr_client.base_url}/workflows/{instance_id}"
response = await self.dapr_client.client.get(url)
response.raise_for_status()
return response.json()
async def raise_workflow_event(
self,
instance_id: str,
event_name: str,
event_data: Any
) -> None:
"""Raise an event for a workflow"""
url = f"{self.dapr_client.base_url}/workflows/{instance_id}/raiseEvent/{event_name}"
response = await self.dapr_client.client.post(
url,
content=json.dumps(event_data)
)
response.raise_for_status()
async def terminate_workflow(
self,
instance_id: str
) -> None:
"""Terminate a workflow instance"""
url = f"{self.dapr_client.base_url}/workflows/{instance_id}/terminate"
response = await self.dapr_client.client.post(url)
response.raise_for_status()
# Dapr Event Store Integration
class DaprEventStore(EventStore):
"""Dapr-based event store implementation"""
def __init__(self, dapr_client: DaprClient, state_store_name: str = "eventstore"):
self.dapr_client = dapr_client
self.state_store_name = state_store_name
async def save_event(self, event: BaseEvent, stream_id: str) -> None:
"""Save event using Dapr state store"""
# Get stream events
stream_key = f"stream:{stream_id}"
events = await self.dapr_client.get_state(
self.state_store_name,
stream_key
) or []
# Add new event
events.append(event.to_dict())
# Save back to state store
await self.dapr_client.save_state(
self.state_store_name,
stream_key,
events
)
# Also save by event ID for direct lookup
event_key = f"event:{event.event_id}"
await self.dapr_client.save_state(
self.state_store_name,
event_key,
event.to_dict()
)
# Update type index
type_index_key = f"type:{event.event_type}"
type_events = await self.dapr_client.get_state(
self.state_store_name,
type_index_key
) or []
type_events.append(event.event_id)
await self.dapr_client.save_state(
self.state_store_name,
type_index_key,
type_events
)
async def get_events(
self,
stream_id: str,
from_version: Optional[int] = None,
to_version: Optional[int] = None,
limit: Optional[int] = None
) -> AsyncIterator[BaseEvent]:
"""Get events from Dapr state store"""
stream_key = f"stream:{stream_id}"
events_data = await self.dapr_client.get_state(
self.state_store_name,
stream_key
) or []
# Apply filters
if from_version is not None:
events_data = events_data[from_version:]
if to_version is not None:
events_data = events_data[:to_version]
if limit is not None:
events_data = events_data[:limit]
for event_data in events_data:
yield BaseEvent.from_dict(event_data)
5. Deployment Patterns
Kubernetes with Dapr
# deployments/kubernetes/dapr-config.yaml
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: app-config
namespace: production
spec:
tracing:
samplingRate: "1"
zipkin:
endpointAddress: "http://zipkin.default.svc.cluster.local:9411/api/v2/spans"
metrics:
enabled: true
rules: |
- name: dapr-system
description: "Dapr system metrics"
default: true
- name: app
description: "Application metrics"
default: true
features:
- name: AppHealthCheck
enabled: true
api:
allowed:
- name: state
- name: invoke
- name: publish
- name: bindings
- name: secrets
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: production
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka-broker:9092"
- name: authRequired
value: "false"
- name: consumerID
value: "app-pubsub"
- name: compressionType
value: "snappy"
- name: batch_size
value: "100"
- name: batch_timeout
value: "10ms"
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
namespace: production
spec:
type: state.postgresql
version: v1
metadata:
- name: connectionString
secretKeyRef:
name: postgres-secret
key: connection-string
- name: tableName
value: "app_state"
- name: metadataTableName
value: "app_state_metadata"
- name: keyPrefix
value: "none"
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: secretstore
namespace: production
spec:
type: secretstores.kubernetes
version: v1
metadata:
- name: namespace
value: "production"
# deployments/kubernetes/app-with-dapr.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: event-service
namespace: production
labels:
app: event-service
spec:
replicas: 3
selector:
matchLabels:
app: event-service
template:
metadata:
labels:
app: event-service
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "event-service"
dapr.io/app-port: "8000"
dapr.io/enable-metrics: "true"
dapr.io/enable-profiling: "true"
dapr.io/config: "app-config"
dapr.io/log-as-json: "true"
dapr.io/log-level: "info"
dapr.io/app-protocol: "http"
spec:
containers:
- name: app
image: myregistry/event-service:latest
ports:
- containerPort: 8000
env:
- name: DAPR_HTTP_PORT
value: "3500"
- name: DAPR_GRPC_PORT
value: "50001"
- name: APP_PORT
value: "8000"
- name: LOG_LEVEL
value: "info"
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /healthz
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /readyz
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: config
mountPath: /app/config
readOnly: true
volumes:
- name: config
configMap:
name: app-config
---
apiVersion: v1
kind: Service
metadata:
name: event-service
namespace: production
spec:
selector:
app: event-service
ports:
- port: 80
targetPort: 8000
name: http
type: ClusterIP
This event-driven architecture skill provides comprehensive patterns for building modern, distributed systems using event sourcing, CQRS, saga patterns, and Dapr integration. It's designed to be framework-agnostic and applicable to any domain requiring robust event-driven capabilities.
★ Insight ─────────────────────────────────────
The event-driven architecture patterns shown here emphasize several key 2025 best practices:
- Schema Evolution: Events include versioning and support for backward/forward compatibility
- Broker Abstraction: Generic interfaces allow swapping Kafka, Redis, or cloud brokers without code changes
- Recovery Patterns: Sagas with compensation and retry policies ensure system resilience
- Observability: Built-in tracing, metrics, and structured logging for production monitoring
- Multi-tenancy: Native support for tenant isolation at the event level
These patterns ensure your event-driven system can evolve, scale, and maintain reliability as requirements change.
─────────────────────────────────────────────────