Claude Code Plugins

Community-maintained marketplace

Feedback

event-driven-architecture

@Azeem-2/HackthonII
0
0

Generic Event-Driven Architecture patterns with Kafka, Dapr, and modern messaging systems. Provides reusable patterns for building scalable, resilient event-driven microservices. Framework-agnostic implementation supporting multiple message brokers, state stores, and event patterns. Follows 2025 best practices for distributed systems.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name event-driven-architecture
description Generic Event-Driven Architecture patterns with Kafka, Dapr, and modern messaging systems. Provides reusable patterns for building scalable, resilient event-driven microservices. Framework-agnostic implementation supporting multiple message brokers, state stores, and event patterns. Follows 2025 best practices for distributed systems.
license MIT

Event-Driven Architecture with Modern Patterns

This skill provides comprehensive patterns for implementing event-driven microservices using modern messaging systems, distributed runtimes, and cloud-native patterns. It's designed to be framework-agnostic and applicable to any domain requiring event-driven capabilities.

When to Use This Skill

Use this skill when you need to:

  • Build event-driven microservices architecture
  • Implement pub/sub patterns with Kafka, RabbitMQ, or cloud services
  • Use Dapr for distributed application patterns
  • Implement real-time notifications and workflows
  • Build recurring task and reminder systems
  • Create audit trails and activity logs
  • Implement distributed state management
  • Build serverless event workflows
  • Handle event sourcing and CQRS patterns

1. Core Event Architecture

Base Event Schema

# events/core.py
from pydantic import BaseModel, Field, validator
from datetime import datetime
from typing import Optional, Dict, Any, Union, List
from enum import Enum
from abc import ABC, abstractmethod
import uuid
import json

class EventVersion(str, Enum):
    """Event versioning support"""
    V1_0 = "1.0"
    V1_1 = "1.1"
    V2_0 = "2.0"

class EventPriority(str, Enum):
    """Event priority levels"""
    LOW = "low"
    NORMAL = "normal"
    HIGH = "high"
    CRITICAL = "critical"

class BaseEvent(BaseModel, ABC):
    """Base event schema with common fields"""

    # Core identification
    event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    event_type: str = Field(..., description="Type of the event")
    event_version: EventVersion = Field(default=EventVersion.V1_0)

    # Metadata
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    correlation_id: Optional[str] = None
    causation_id: Optional[str] = None  # Event that caused this one
    source: str = Field(..., description="Source service/identifier")

    # Context
    user_id: Optional[str] = None
    tenant_id: Optional[str] = None  # Multi-tenant support
    session_id: Optional[str] = None

    # Processing metadata
    priority: EventPriority = Field(default=EventPriority.NORMAL)
    retry_count: int = Field(default=0)
    max_retries: int = Field(default=3)
    delay_until: Optional[datetime] = None  # For delayed processing

    # Schema versioning
    schema_version: str = Field(default="1.0")

    class Config:
        # Allow additional fields for extensibility
        extra = "allow"
        # Use enum values
        use_enum_values = True

    @validator('event_type')
    def validate_event_type(cls, v):
        """Validate event type format"""
        if not v or '.' not in v:
            raise ValueError('event_type must be in format: domain.event_name')
        return v.lower()

    @validator('correlation_id')
    def validate_correlation_id(cls, v, values):
        """Set correlation_id from causation_id if not provided"""
        if not v and 'causation_id' in values:
            return values['causation_id']
        return v

    def to_dict(self) -> Dict[str, Any]:
        """Convert event to dictionary for serialization"""
        return self.model_dump()

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'BaseEvent':
        """Create event from dictionary"""
        return cls(**data)

    def with_context(self, **context) -> 'BaseEvent':
        """Add context to event"""
        for key, value in context.items():
            if hasattr(self, key):
                setattr(self, key, value)
        return self

class DomainEvent(BaseEvent):
    """Domain-specific event"""

    aggregate_id: str = Field(..., description="Aggregate root ID")
    aggregate_type: str = Field(..., description="Aggregate type")
    event_data: Dict[str, Any] = Field(default_factory=dict)

    @validator('aggregate_type')
    def validate_aggregate_type(cls, v):
        """Validate aggregate type"""
        if not v:
            raise ValueError('aggregate_type is required')
        return v.lower()

class IntegrationEvent(BaseEvent):
    """Integration event for cross-service communication"""

    target_services: List[str] = Field(default_factory=list)
    routing_key: Optional[str] = None
    message_format: str = Field(default="json")

    @validator('routing_key')
    def validate_routing_key(cls, v, values):
        """Generate routing key from event_type if not provided"""
        if not v and 'event_type' in values:
            return values['event_type'].replace('.', '/')
        return v

class CommandEvent(BaseEvent):
    """Command event representing an intent"""

    command_type: str = Field(..., description="Type of command")
    command_data: Dict[str, Any] = Field(default_factory=dict)
    expected_version: Optional[int] = None  # For optimistic concurrency

    @validator('command_type')
    def validate_command_type(cls, v):
        """Validate command type"""
        if not v.endswith('.command'):
            v = f"{v}.command"
        return v.lower()

class QueryEvent(BaseEvent):
    """Query event for data retrieval"""

    query_type: str = Field(..., description="Type of query")
    query_params: Dict[str, Any] = Field(default_factory=dict)
    result_topic: Optional[str] = None  # Where to send results

    @validator('query_type')
    def validate_query_type(cls, v):
        """Validate query type"""
        if not v.endswith('.query'):
            v = f"{v}.query"
        return v.lower()

Event Store Pattern

# events/store.py
import asyncio
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, AsyncIterator
from datetime import datetime, timedelta
import json
import uuid

class EventStore(ABC):
    """Abstract event store interface"""

    @abstractmethod
    async def save_event(self, event: BaseEvent, stream_id: str) -> None:
        """Save event to a stream"""
        pass

    @abstractmethod
    async def get_events(
        self,
        stream_id: str,
        from_version: Optional[int] = None,
        to_version: Optional[int] = None,
        limit: Optional[int] = None
    ) -> AsyncIterator[BaseEvent]:
        """Get events from a stream"""
        pass

    @abstractmethod
    async def get_event_by_id(self, event_id: str) -> Optional[BaseEvent]:
        """Get a specific event by ID"""
        pass

    @abstractmethod
    async def get_events_by_type(
        self,
        event_type: str,
        from_timestamp: Optional[datetime] = None,
        to_timestamp: Optional[datetime] = None
    ) -> AsyncIterator[BaseEvent]:
        """Get events by type"""
        pass

    @abstractmethod
    async def get_aggregate_snapshot(
        self,
        aggregate_id: str,
        aggregate_type: str
    ) -> Optional[Dict[str, Any]]:
        """Get aggregate snapshot"""
        pass

    @abstractmethod
    async def save_aggregate_snapshot(
        self,
        aggregate_id: str,
        aggregate_type: str,
        data: Dict[str, Any],
        version: int
    ) -> None:
        """Save aggregate snapshot"""
        pass

class InMemoryEventStore(EventStore):
    """In-memory event store for testing and development"""

    def __init__(self):
        self._events: Dict[str, List[BaseEvent]] = {}
        self._snapshots: Dict[str, Dict[str, Any]] = {}
        self._type_index: Dict[str, List[str]] = {}

    async def save_event(self, event: BaseEvent, stream_id: str) -> None:
        """Save event to memory"""
        if stream_id not in self._events:
            self._events[stream_id] = []

        self._events[stream_id].append(event)

        # Update type index
        if event.event_type not in self._type_index:
            self._type_index[event.event_type] = []
        self._type_index[event.event_type].append(event.event_id)

    async def get_events(
        self,
        stream_id: str,
        from_version: Optional[int] = None,
        to_version: Optional[int] = None,
        limit: Optional[int] = None
    ) -> AsyncIterator[BaseEvent]:
        """Get events from memory"""
        events = self._events.get(stream_id, [])

        # Apply filters
        if from_version is not None:
            events = events[from_version:]
        if to_version is not None:
            events = events[:to_version]
        if limit is not None:
            events = events[:limit]

        for event in events:
            yield event

    async def get_event_by_id(self, event_id: str) -> Optional[BaseEvent]:
        """Get event by ID from memory"""
        for events in self._events.values():
            for event in events:
                if event.event_id == event_id:
                    return event
        return None

    async def get_events_by_type(
        self,
        event_type: str,
        from_timestamp: Optional[datetime] = None,
        to_timestamp: Optional[datetime] = None
    ) -> AsyncIterator[BaseEvent]:
        """Get events by type from memory"""
        event_ids = self._type_index.get(event_type, [])

        for event_id in event_ids:
            event = await self.get_event_by_id(event_id)
            if event:
                # Apply time filters
                if from_timestamp and event.timestamp < from_timestamp:
                    continue
                if to_timestamp and event.timestamp > to_timestamp:
                    continue
                yield event

    async def get_aggregate_snapshot(
        self,
        aggregate_id: str,
        aggregate_type: str
    ) -> Optional[Dict[str, Any]]:
        """Get snapshot from memory"""
        snapshot_key = f"{aggregate_type}:{aggregate_id}"
        return self._snapshots.get(snapshot_key)

    async def save_aggregate_snapshot(
        self,
        aggregate_id: str,
        aggregate_type: str,
        data: Dict[str, Any],
        version: int
    ) -> None:
        """Save snapshot to memory"""
        snapshot_key = f"{aggregate_type}:{aggregate_id}"
        self._snapshots[snapshot_key] = {
            "data": data,
            "version": version,
            "timestamp": datetime.utcnow()
        }

# PostgreSQL Event Store Implementation
class PostgreSQLEventStore(EventStore):
    """PostgreSQL-based event store"""

    def __init__(self, db_pool):
        self.db_pool = db_pool

    async def initialize_schema(self):
        """Initialize database schema"""
        async with self.db_pool.acquire() as conn:
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS event_store (
                    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                    stream_id VARCHAR(255) NOT NULL,
                    stream_version INTEGER NOT NULL,
                    event_id VARCHAR(255) UNIQUE NOT NULL,
                    event_type VARCHAR(255) NOT NULL,
                    event_data JSONB NOT NULL,
                    metadata JSONB,
                    timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
                    INDEX (stream_id, stream_version),
                    INDEX (event_type),
                    INDEX (timestamp)
                );

                CREATE TABLE IF NOT EXISTS snapshots (
                    aggregate_id VARCHAR(255) NOT NULL,
                    aggregate_type VARCHAR(255) NOT NULL,
                    data JSONB NOT NULL,
                    version INTEGER NOT NULL,
                    timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
                    PRIMARY KEY (aggregate_type, aggregate_id)
                );
            """)

    async def save_event(self, event: BaseEvent, stream_id: str) -> None:
        """Save event to PostgreSQL"""
        async with self.db_pool.acquire() as conn:
            # Get next version
            result = await conn.fetchval(
                "SELECT COALESCE(MAX(stream_version), 0) + 1 FROM event_store WHERE stream_id = $1",
                stream_id
            )

            await conn.execute(
                """
                INSERT INTO event_store (
                    stream_id, stream_version, event_id, event_type,
                    event_data, metadata, timestamp
                ) VALUES ($1, $2, $3, $4, $5, $6, $7)
                """,
                stream_id,
                result,
                event.event_id,
                event.event_type,
                event.to_dict(),
                {
                    "correlation_id": event.correlation_id,
                    "causation_id": event.causation_id,
                    "source": event.source,
                    "user_id": event.user_id,
                    "tenant_id": event.tenant_id
                },
                event.timestamp
            )

    async def get_events(
        self,
        stream_id: str,
        from_version: Optional[int] = None,
        to_version: Optional[int] = None,
        limit: Optional[int] = None
    ) -> AsyncIterator[BaseEvent]:
        """Get events from PostgreSQL"""
        query = "SELECT event_data FROM event_store WHERE stream_id = $1"
        params = [stream_id]

        if from_version is not None:
            query += " AND stream_version >= $2"
            params.append(from_version)
            version_offset = 2
        else:
            version_offset = 1

        if to_version is not None:
            query += f" AND stream_version <= ${version_offset + 1}"
            params.append(to_version)
            version_offset += 1

        query += " ORDER BY stream_version"

        if limit is not None:
            query += f" LIMIT ${version_offset + 1}"
            params.append(limit)

        async with self.db_pool.acquire() as conn:
            async for row in conn.cursor(query, *params):
                yield BaseEvent.from_dict(row["event_data"])

    async def get_aggregate_snapshot(
        self,
        aggregate_id: str,
        aggregate_type: str
    ) -> Optional[Dict[str, Any]]:
        """Get snapshot from PostgreSQL"""
        async with self.db_pool.acquire() as conn:
            row = await conn.fetchrow(
                """
                SELECT data, version FROM snapshots
                WHERE aggregate_id = $1 AND aggregate_type = $2
                """,
                aggregate_id,
                aggregate_type
            )

            if row:
                return {
                    "data": row["data"],
                    "version": row["version"]
                }
            return None

    async def save_aggregate_snapshot(
        self,
        aggregate_id: str,
        aggregate_type: str,
        data: Dict[str, Any],
        version: int
    ) -> None:
        """Save snapshot to PostgreSQL"""
        async with self.db_pool.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO snapshots (aggregate_id, aggregate_type, data, version, timestamp)
                VALUES ($1, $2, $3, $4, NOW())
                ON CONFLICT (aggregate_type, aggregate_id)
                DO UPDATE SET data = $3, version = $4, timestamp = NOW()
                """,
                aggregate_id,
                aggregate_type,
                data,
                version
            )

2. Message Broker Abstraction

Generic Message Broker Interface

# messaging/broker.py
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Callable, List, AsyncIterator
from dataclasses import dataclass
from enum import Enum
import json
import logging

logger = logging.getLogger(__name__)

class MessageBrokerType(str, Enum):
    """Supported message broker types"""
    KAFKA = "kafka"
    RABBITMQ = "rabbitmq"
    REDIS = "redis"
    AWS_SQS = "aws_sqs"
    AZURE_SB = "azure_service_bus"
    GCP_PUBSUB = "gcp_pubsub"
    NATS = "nats"
    EMQX = "emqx"

@dataclass
class Message:
    """Generic message representation"""
    topic: str
    key: Optional[str]
    value: Any
    headers: Optional[Dict[str, str]] = None
    partition: Optional[int] = None
    timestamp: Optional[int] = None

    def to_dict(self) -> Dict[str, Any]:
        """Convert message to dictionary"""
        return {
            "topic": self.topic,
            "key": self.key,
            "value": self.value,
            "headers": self.headers or {},
            "partition": self.partition,
            "timestamp": self.timestamp
        }

class MessageBroker(ABC):
    """Abstract message broker interface"""

    @abstractmethod
    async def connect(self) -> None:
        """Establish connection to broker"""
        pass

    @abstractmethod
    async def disconnect(self) -> None:
        """Close connection to broker"""
        pass

    @abstractmethod
    async def publish(self, message: Message) -> None:
        """Publish a message"""
        pass

    @abstractmethod
    async def publish_batch(self, messages: List[Message]) -> None:
        """Publish multiple messages"""
        pass

    @abstractmethod
    async def subscribe(
        self,
        topic: str,
        handler: Callable[[Message], None],
        group_id: Optional[str] = None
    ) -> None:
        """Subscribe to a topic"""
        pass

    @abstractmethod
    async def create_topic(self, topic: str, **kwargs) -> None:
        """Create a topic if it doesn't exist"""
        pass

class KafkaBroker(MessageBroker):
    """Kafka message broker implementation"""

    def __init__(
        self,
        bootstrap_servers: str,
        producer_config: Optional[Dict[str, Any]] = None,
        consumer_config: Optional[Dict[str, Any]] = None
    ):
        self.bootstrap_servers = bootstrap_servers
        self.producer_config = producer_config or {}
        self.consumer_config = consumer_config or {}
        self.producer = None
        self.consumers: List[Any] = []

        # Default configurations
        self.producer_config.update({
            'bootstrap_servers': bootstrap_servers,
            'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
            'key_serializer': lambda k: k.encode('utf-8') if k else None,
            'acks': 'all',
            'retries': 3,
            'retry_backoff_ms': 100
        })

        self.consumer_config.update({
            'bootstrap_servers': bootstrap_servers,
            'value_deserializer': lambda v: json.loads(v.decode('utf-8')),
            'key_deserializer': lambda k: k.decode('utf-8') if k else None,
            'auto_offset_reset': 'earliest',
            'enable_auto_commit': True
        })

    async def connect(self) -> None:
        """Connect to Kafka"""
        from aiokafka import AIOKafkaProducer

        self.producer = AIOKafkaProducer(**self.producer_config)
        await self.producer.start()
        logger.info("Connected to Kafka broker")

    async def disconnect(self) -> None:
        """Disconnect from Kafka"""
        if self.producer:
            await self.producer.stop()

        for consumer in self.consumers:
            await consumer.stop()

        logger.info("Disconnected from Kafka broker")

    async def publish(self, message: Message) -> None:
        """Publish message to Kafka"""
        if not self.producer:
            raise RuntimeError("Producer not initialized")

        await self.producer.send_and_wait(
            topic=message.topic,
            value=message.value,
            key=message.key,
            headers=message.headers,
            partition=message.partition
        )

    async def publish_batch(self, messages: List[Message]) -> None:
        """Publish batch of messages to Kafka"""
        if not self.producer:
            raise RuntimeError("Producer not initialized")

        # Group messages by topic
        topics = {}
        for msg in messages:
            if msg.topic not in topics:
                topics[msg.topic] = []
            topics[msg.topic].append(msg)

        # Send batched messages
        for topic, topic_messages in topics.items():
            for msg in topic_messages:
                await self.producer.send_and_wait(
                    topic=topic,
                    value=msg.value,
                    key=msg.key,
                    headers=msg.headers,
                    partition=msg.partition
                )

    async def subscribe(
        self,
        topic: str,
        handler: Callable[[Message], None],
        group_id: Optional[str] = None
    ) -> None:
        """Subscribe to Kafka topic"""
        from aiokafka import AIOKafkaConsumer

        consumer_config = self.consumer_config.copy()
        if group_id:
            consumer_config['group_id'] = group_id

        consumer = AIOKafkaConsumer(topic, **consumer_config)
        await consumer.start()
        self.consumers.append(consumer)

        # Start consuming task
        asyncio.create_task(self._consume_messages(consumer, handler))

    async def _consume_messages(
        self,
        consumer: Any,
        handler: Callable[[Message], None]
    ) -> None:
        """Consume messages from Kafka"""
        try:
            async for msg in consumer:
                message = Message(
                    topic=msg.topic,
                    key=msg.key,
                    value=msg.value,
                    headers=dict(msg.headers) if msg.headers else {},
                    partition=msg.partition,
                    timestamp=msg.timestamp
                )

                try:
                    await self._handle_message(handler, message)
                except Exception as e:
                    logger.error(f"Error handling message: {e}", exc_info=True)
        except Exception as e:
            logger.error(f"Consumer error: {e}", exc_info=True)

    async def _handle_message(
        self,
        handler: Callable[[Message], None],
        message: Message
    ) -> None:
        """Handle a single message"""
        if asyncio.iscoroutinefunction(handler):
            await handler(message)
        else:
            # Run sync handler in thread pool
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(None, handler, message)

    async def create_topic(self, topic: str, **kwargs) -> None:
        """Create Kafka topic"""
        from aiokafka.admin import AIOKafkaAdminClient, NewTopic

        admin_client = AIOKafkaAdminClient(
            bootstrap_servers=self.bootstrap_servers
        )

        try:
            await admin_client.start()

            topic_metadata = NewTopic(
                name=topic,
                num_partitions=kwargs.get('num_partitions', 3),
                replication_factor=kwargs.get('replication_factor', 1)
            )

            await admin_client.create_topics([topic_metadata])
            logger.info(f"Created topic: {topic}")
        except Exception as e:
            logger.warning(f"Failed to create topic {topic}: {e}")
        finally:
            await admin_client.close()

class RedisBroker(MessageBroker):
    """Redis-based message broker using streams"""

    def __init__(self, redis_url: str):
        self.redis_url = redis_url
        self.redis = None
        self.consumer_groups: Dict[str, str] = {}

    async def connect(self) -> None:
        """Connect to Redis"""
        import aioredis

        self.redis = aioredis.from_url(self.redis_url)
        await self.redis.ping()
        logger.info("Connected to Redis broker")

    async def disconnect(self) -> None:
        """Disconnect from Redis"""
        if self.redis:
            await self.redis.close()

    async def publish(self, message: Message) -> None:
        """Publish message to Redis stream"""
        if not self.redis:
            raise RuntimeError("Redis not initialized")

        message_data = {
            'key': message.key,
            'value': json.dumps(message.value),
            'headers': json.dumps(message.headers or {}),
            'timestamp': message.timestamp or int(time.time() * 1000)
        }

        await self.redis.xadd(message.topic, message_data)

    async def publish_batch(self, messages: List[Message]) -> None:
        """Publish batch to Redis streams"""
        # Redis doesn't have native batch xadd, so we use pipeline
        pipe = self.redis.pipeline()

        for message in messages:
            message_data = {
                'key': message.key,
                'value': json.dumps(message.value),
                'headers': json.dumps(message.headers or {}),
                'timestamp': message.timestamp or int(time.time() * 1000)
            }
            pipe.xadd(message.topic, message_data)

        await pipe.execute()

    async def subscribe(
        self,
        topic: str,
        handler: Callable[[Message], None],
        group_id: Optional[str] = None
    ) -> None:
        """Subscribe to Redis stream"""
        if group_id:
            self.consumer_groups[topic] = group_id
            try:
                await self.redis.xgroup_create(topic, group_id, id='0', mkstream=True)
            except Exception:
                pass  # Group might already exist

        # Start consuming task
        asyncio.create_task(self._consume_stream(topic, handler, group_id))

    async def _consume_stream(
        self,
        topic: str,
        handler: Callable[[Message], None],
        group_id: Optional[str]
    ) -> None:
        """Consume messages from Redis stream"""
        while True:
            try:
                if group_id:
                    # Consumer group mode
                    messages = await self.redis.xreadgroup(
                        group_id,
                        'consumer',
                        {topic: '>'},
                        count=10,
                        block=1000
                    )
                else:
                    # Independent consumer mode
                    messages = await self.redis.xread(
                        {topic: '$'},
                        count=10,
                        block=1000
                    )

                for stream, msgs in messages:
                    for msg_id, fields in msgs:
                        message = Message(
                            topic=topic,
                            key=fields.get('key'),
                            value=json.loads(fields['value']),
                            headers=json.loads(fields.get('headers', '{}')),
                            timestamp=int(fields.get('timestamp', 0))
                        )

                        try:
                            await self._handle_message(handler, message)

                            if group_id:
                                await self.redis.xack(topic, group_id, msg_id)
                        except Exception as e:
                            logger.error(f"Error handling message: {e}")
            except Exception as e:
                logger.error(f"Stream consumer error: {e}")
                await asyncio.sleep(1)

class MessageBrokerFactory:
    """Factory for creating message brokers"""

    @staticmethod
    def create(broker_type: MessageBrokerType, **kwargs) -> MessageBroker:
        """Create a message broker instance"""
        if broker_type == MessageBrokerType.KAFKA:
            return KafkaBroker(
                bootstrap_servers=kwargs['bootstrap_servers'],
                producer_config=kwargs.get('producer_config'),
                consumer_config=kwargs.get('consumer_config')
            )
        elif broker_type == MessageBrokerType.REDIS:
            return RedisBroker(redis_url=kwargs['redis_url'])
        else:
            raise ValueError(f"Unsupported broker type: {broker_type}")

3. Event Processing Patterns

Saga Pattern Implementation

# patterns/saga.py
import asyncio
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
from dataclasses import dataclass, field
import uuid
import logging

logger = logging.getLogger(__name__)

class SagaStatus(str, Enum):
    """Saga status enumeration"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    FAILED = "failed"

@dataclass
class SagaStep:
    """Single step in a saga"""
    step_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    action: Optional[Callable] = None
    compensation: Optional[Callable] = None
    params: Dict[str, Any] = field(default_factory=dict)
    retry_count: int = 0
    max_retries: int = 3
    timeout: float = 30.0
    completed: bool = False
    result: Any = None

class Saga:
    """Saga orchestrator for distributed transactions"""

    def __init__(self, saga_id: Optional[str] = None):
        self.saga_id = saga_id or str(uuid.uuid4())
        self.status = SagaStatus.PENDING
        self.steps: List[SagaStep] = []
        self.completed_steps: List[SagaStep] = []
        self.current_step_index = 0
        self.error: Optional[Exception] = None
        self.created_at = datetime.utcnow()
        self.updated_at = datetime.utcnow()

    def add_step(
        self,
        action: Callable,
        compensation: Callable,
        params: Dict[str, Any] = None,
        max_retries: int = 3,
        timeout: float = 30.0
    ) -> 'Saga':
        """Add a step to the saga"""
        step = SagaStep(
            action=action,
            compensation=compensation,
            params=params or {},
            max_retries=max_retries,
            timeout=timeout
        )
        self.steps.append(step)
        return self

    async def execute(self) -> bool:
        """Execute the saga"""
        try:
            self.status = SagaStatus.RUNNING
            self.updated_at = datetime.utcnow()

            logger.info(f"Starting saga {self.saga_id} with {len(self.steps)} steps")

            # Execute all steps
            for i, step in enumerate(self.steps):
                self.current_step_index = i

                if not await self._execute_step(step):
                    # Step failed, start compensation
                    await self._compensate()
                    return False

                self.completed_steps.append(step)

            # All steps completed successfully
            self.status = SagaStatus.COMPLETED
            self.updated_at = datetime.utcnow()
            logger.info(f"Saga {self.saga_id} completed successfully")
            return True

        except Exception as e:
            self.error = e
            logger.error(f"Saga {self.saga_id} failed: {e}")
            await self._compensate()
            return False

    async def _execute_step(self, step: SagaStep) -> bool:
        """Execute a single step"""
        logger.info(f"Executing step {step.step_id}")

        while step.retry_count <= step.max_retries:
            try:
                # Execute with timeout
                step.result = await asyncio.wait_for(
                    self._run_action(step.action, step.params),
                    timeout=step.timeout
                )

                step.completed = True
                logger.info(f"Step {step.step_id} completed successfully")
                return True

            except asyncio.TimeoutError:
                step.retry_count += 1
                logger.warning(f"Step {step.step_id} timed out, retry {step.retry_count}")
                await asyncio.sleep(2 ** step.retry_count)  # Exponential backoff

            except Exception as e:
                step.retry_count += 1
                logger.error(f"Step {step.step_id} failed: {e}")

                if step.retry_count > step.max_retries:
                    logger.error(f"Step {step.step_id} failed after {step.max_retries} retries")
                    self.error = e
                    return False

                await asyncio.sleep(2 ** step.retry_count)  # Exponential backoff

        return False

    async def _run_action(self, action: Callable, params: Dict[str, Any]) -> Any:
        """Run action with parameters"""
        if asyncio.iscoroutinefunction(action):
            return await action(**params)
        else:
            # Run sync action in thread pool
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(None, action, **params)

    async def _compensate(self) -> None:
        """Compensate completed steps"""
        self.status = SagaStatus.COMPENSATING
        self.updated_at = datetime.utcnow()

        logger.info(f"Starting compensation for saga {self.saga_id}")

        # Compensate in reverse order
        for step in reversed(self.completed_steps):
            try:
                await self._compensate_step(step)
            except Exception as e:
                logger.error(f"Compensation failed for step {step.step_id}: {e}")
                # Continue with other compensations

        self.status = SagaStatus.COMPENSATED if self.error else SagaStatus.FAILED
        self.updated_at = datetime.utcnow()

        status_str = "compensated" if self.error else "failed"
        logger.info(f"Saga {self.saga_id} {status_str}")

    async def _compensate_step(self, step: SagaStep) -> None:
        """Compensate a single step"""
        logger.info(f"Compensating step {step.step_id}")

        compensation_params = step.params.copy()
        compensation_params['result'] = step.result

        if step.compensation:
            if asyncio.iscoroutinefunction(step.compensation):
                await step.compensation(**compensation_params)
            else:
                loop = asyncio.get_event_loop()
                await loop.run_in_executor(
                    None,
                    step.compensation,
                    **compensation_params
                )

class SagaManager:
    """Manages multiple sagas"""

    def __init__(self, event_store: EventStore, message_broker: MessageBroker):
        self.event_store = event_store
        self.message_broker = message_broker
        self.active_sagas: Dict[str, Saga] = {}

    def create_saga(self, saga_id: Optional[str] = None) -> Saga:
        """Create a new saga"""
        saga = Saga(saga_id)
        self.active_sagas[saga.saga_id] = saga
        return saga

    async def execute_saga(self, saga: Saga) -> bool:
        """Execute a saga and persist its state"""
        # Save saga started event
        await self._save_saga_event(saga, "saga.started")

        # Execute saga
        result = await saga.execute()

        # Save saga completed/failed event
        event_type = "saga.completed" if result else "saga.failed"
        await self._save_saga_event(saga, event_type)

        # Remove from active sagas
        if saga.saga_id in self.active_sagas:
            del self.active_sagas[saga.saga_id]

        return result

    async def _save_saga_event(self, saga: Saga, event_type: str) -> None:
        """Save saga lifecycle event"""
        event = DomainEvent(
            event_type=event_type,
            event_data={
                "saga_id": saga.saga_id,
                "status": saga.status.value,
                "total_steps": len(saga.steps),
                "completed_steps": len(saga.completed_steps),
                "current_step": saga.current_step_index,
                "error": str(saga.error) if saga.error else None
            },
            aggregate_id=saga.saga_id,
            aggregate_type="saga",
            source="saga_manager"
        )

        await self.event_store.save_event(event, f"saga-{saga.saga_id}")

        # Also publish to message broker
        message = Message(
            topic="saga-events",
            key=saga.saga_id,
            value=event.to_dict()
        )
        await self.message_broker.publish(message)

CQRS Pattern Implementation

# patterns/cqrs.py
import asyncio
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Type, TypeVar, Generic
from dataclasses import dataclass, field
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

T = TypeVar('T')

@dataclass
class CommandResponse:
    """Response from command execution"""
    success: bool
    message: Optional[str] = None
    data: Optional[Dict[str, Any]] = None
    errors: List[str] = field(default_factory=list)

class Command(ABC):
    """Base command class"""

    def __init__(self, aggregate_id: str):
        self.aggregate_id = aggregate_id
        self.id = str(uuid.uuid4())
        self.timestamp = datetime.utcnow()

class CommandHandler(ABC, Generic[T]):
    """Base command handler"""

    @abstractmethod
    async def handle(self, command: T) -> CommandResponse:
        """Handle the command"""
        pass

class Query(ABC):
    """Base query class"""

    def __init__(self):
        self.id = str(uuid.uuid4())
        self.timestamp = datetime.utcnow()

class QueryResult(ABC):
    """Base query result"""
    pass

class QueryHandler(ABC, Generic[T]):
    """Base query handler"""

    @abstractmethod
    async def handle(self, query: T) -> QueryResult:
        """Handle the query"""
        pass

class CommandBus:
    """Command bus for dispatching commands"""

    def __init__(self):
        self.handlers: Dict[Type[Command], CommandHandler] = {}
        self.middleware: List[Callable] = []

    def register(
        self,
        command_type: Type[Command],
        handler: CommandHandler
    ) -> None:
        """Register a command handler"""
        self.handlers[command_type] = handler
        logger.info(f"Registered handler for {command_type.__name__}")

    def add_middleware(self, middleware: Callable) -> None:
        """Add middleware to the command bus"""
        self.middleware.append(middleware)

    async def dispatch(self, command: Command) -> CommandResponse:
        """Dispatch a command to its handler"""
        command_type = type(command)

        if command_type not in self.handlers:
            return CommandResponse(
                success=False,
                message=f"No handler registered for {command_type.__name__}"
            )

        handler = self.handlers[command_type]

        # Apply middleware
        for middleware in self.middleware:
            command = await middleware(command)

        try:
            # Save command event
            await self._save_command_event(command)

            # Handle command
            response = await handler.handle(command)

            # Save response event
            await self._save_response_event(command, response)

            return response

        except Exception as e:
            logger.error(f"Error handling command {command_type.__name__}: {e}")
            return CommandResponse(
                success=False,
                message=str(e),
                errors=[str(e)]
            )

    async def _save_command_event(self, command: Command) -> None:
        """Save command event to event store"""
        # Implementation would save to event store
        pass

    async def _save_response_event(
        self,
        command: Command,
        response: CommandResponse
    ) -> None:
        """Save response event to event store"""
        # Implementation would save to event store
        pass

class QueryBus:
    """Query bus for dispatching queries"""

    def __init__(self):
        self.handlers: Dict[Type[Query], QueryHandler] = {}
        self.middleware: List[Callable] = []

    def register(
        self,
        query_type: Type[Query],
        handler: QueryHandler
    ) -> None:
        """Register a query handler"""
        self.handlers[query_type] = handler
        logger.info(f"Registered handler for {query_type.__name__}")

    def add_middleware(self, middleware: Callable) -> None:
        """Add middleware to the query bus"""
        self.middleware.append(middleware)

    async def dispatch(self, query: Query) -> QueryResult:
        """Dispatch a query to its handler"""
        query_type = type(query)

        if query_type not in self.handlers:
            raise ValueError(f"No handler registered for {query_type.__name__}")

        handler = self.handlers[query_type]

        # Apply middleware
        for middleware in self.middleware:
            query = await middleware(query)

        # Handle query
        return await handler.handle(query)

class EventProjector(ABC):
    """Base event projector for building read models"""

    def __init__(self, event_store: EventStore):
        self.event_store = event_store

    @abstractmethod
    async def project(self, event: BaseEvent) -> None:
        """Project an event to the read model"""
        pass

    async def project_from_stream(
        self,
        stream_id: str,
        from_version: Optional[int] = None
    ) -> None:
        """Project events from a stream"""
        async for event in self.event_store.get_events(
            stream_id,
            from_version=from_version
        ):
            await self.project(event)

    async def project_by_type(
        self,
        event_type: str,
        from_timestamp: Optional[datetime] = None
    ) -> None:
        """Project events by type"""
        async for event in self.event_store.get_events_by_type(
            event_type,
            from_timestamp=from_timestamp
        ):
            await self.project(event)

# Example CQRS Implementation
class CreateTaskCommand(Command):
    """Create task command"""

    def __init__(self, aggregate_id: str, title: str, description: str = None):
        super().__init__(aggregate_id)
        self.title = title
        self.description = description

class TaskCreatedEvent(DomainEvent):
    """Task created event"""

    def __init__(self, aggregate_id: str, title: str, description: str = None):
        super().__init__(
            event_type="task.created",
            aggregate_id=aggregate_id,
            aggregate_type="task",
            event_data={
                "title": title,
                "description": description
            }
        )

class TaskReadModel:
    """Task read model for queries"""

    def __init__(self):
        self.tasks: Dict[str, Dict[str, Any]] = {}

    async def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
        """Get task from read model"""
        return self.tasks.get(task_id)

    async def list_tasks(self, user_id: str = None) -> List[Dict[str, Any]]:
        """List all tasks or user's tasks"""
        tasks = list(self.tasks.values())
        if user_id:
            tasks = [t for t in tasks if t.get("user_id") == user_id]
        return tasks

class TaskProjector(EventProjector):
    """Task event projector"""

    def __init__(self, event_store: EventStore, read_model: TaskReadModel):
        super().__init__(event_store)
        self.read_model = read_model

    async def project(self, event: BaseEvent) -> None:
        """Project task events to read model"""
        if event.event_type == "task.created":
            await self._project_task_created(event)
        elif event.event_type == "task.updated":
            await self._project_task_updated(event)
        elif event.event_type == "task.deleted":
            await self._project_task_deleted(event)

    async def _project_task_created(self, event: DomainEvent) -> None:
        """Project task created event"""
        self.read_model.tasks[event.aggregate_id] = {
            "id": event.aggregate_id,
            "title": event.event_data["title"],
            "description": event.event_data.get("description"),
            "status": "todo",
            "created_at": event.timestamp,
            "updated_at": event.timestamp
        }

    async def _project_task_updated(self, event: DomainEvent) -> None:
        """Project task updated event"""
        if event.aggregate_id in self.read_model.tasks:
            task = self.read_model.tasks[event.aggregate_id]
            task.update(event.event_data)
            task["updated_at"] = event.timestamp

    async def _project_task_deleted(self, event: DomainEvent) -> None:
        """Project task deleted event"""
        if event.aggregate_id in self.read_model.tasks:
            del self.read_model.tasks[event.aggregate_id]

4. Dapr Integration Patterns

Generic Dapr Client

# dapr/client.py
import asyncio
import httpx
from typing import Dict, Any, List, Optional
import json
import logging

logger = logging.getLogger(__name__)

class DaprClient:
    """Generic Dapr client with comprehensive feature support"""

    def __init__(
        self,
        dapr_port: int = 3500,
        dapr_host: str = "localhost",
        timeout: float = 30.0
    ):
        self.dapr_port = dapr_port
        self.dapr_host = dapr_host
        self.base_url = f"http://{dapr_host}:{dapr_port}/v1.0"
        self.timeout = timeout

        # HTTP client for Dapr API calls
        self.client = httpx.AsyncClient(timeout=timeout)

    async def close(self):
        """Close the Dapr client"""
        await self.client.aclose()

    # State Management
    async def save_state(
        self,
        store_name: str,
        key: str,
        value: Any,
        metadata: Optional[Dict[str, str]] = None,
        options: Optional[Dict[str, Any]] = None
    ) -> None:
        """Save state to Dapr state store"""
        url = f"{self.base_url}/state/{store_name}"

        state_item = {
            "key": key,
            "value": value
        }

        if metadata:
            state_item["metadata"] = metadata

        if options:
            state_item["options"] = options

        response = await self.client.post(url, json=[state_item])
        response.raise_for_status()

    async def get_state(
        self,
        store_name: str,
        key: str,
        metadata: Optional[Dict[str, str]] = None
    ) -> Any:
        """Get state from Dapr state store"""
        url = f"{self.base_url}/state/{store_name}/{key}"

        params = {}
        if metadata:
            params["metadata"] = json.dumps(metadata)

        response = await self.client.get(url, params=params)
        response.raise_for_status()

        if response.status_code == 204:
            return None

        return response.json()

    async def delete_state(
        self,
        store_name: str,
        key: str,
        metadata: Optional[Dict[str, str]] = None
    ) -> None:
        """Delete state from Dapr state store"""
        url = f"{self.base_url}/state/{store_name}/{key}"

        params = {}
        if metadata:
            params["metadata"] = json.dumps(metadata)

        response = await self.client.delete(url, params=params)
        response.raise_for_status()

    async def get_bulk_state(
        self,
        store_name: str,
        keys: List[str],
        parallelism: int = 10,
        metadata: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """Get multiple state items"""
        url = f"{self.base_url}/state/{store_name}/bulk"

        request_data = {
            "keys": keys,
            "parallelism": parallelism
        }

        if metadata:
            request_data["metadata"] = metadata

        response = await self.client.post(url, json=request_data)
        response.raise_for_status()

        return response.json()

    # Pub/Sub
    async def publish_event(
        self,
        pubsub_name: str,
        topic: str,
        data: Any,
        metadata: Optional[Dict[str, str]] = None
    ) -> None:
        """Publish event via Dapr pub/sub"""
        url = f"{self.base_url}/publish/{pubsub_name}/{topic}"

        headers = {}
        if metadata:
            for key, value in metadata.items():
                headers[f"metadata-{key}"] = value

        response = await self.client.post(
            url,
            content=json.dumps(data),
            headers=headers
        )
        response.raise_for_status()

    # Service Invocation
    async def invoke_service(
        self,
        app_id: str,
        method_name: str,
        data: Any = None,
        http_verb: str = "POST",
        metadata: Optional[Dict[str, str]] = None
    ) -> Any:
        """Invoke a service method"""
        url = f"{self.base_url}/invoke/{app_id}/method/{method_name}"

        headers = {}
        if metadata:
            for key, value in metadata.items():
                headers[f"metadata-{key}"] = value

        response = await self.client.request(
            http_verb,
            url,
            content=json.dumps(data) if data else None,
            headers=headers
        )
        response.raise_for_status()

        return response.json()

    # Bindings
    async def invoke_binding(
        self,
        binding_name: str,
        operation: str,
        data: Any,
        metadata: Optional[Dict[str, str]] = None
    ) -> Any:
        """Invoke an output binding"""
        url = f"{self.base_url}/bindings/{binding_name}"

        request_data = {
            "operation": operation,
            "data": data
        }

        if metadata:
            request_data["metadata"] = metadata

        response = await self.client.post(url, json=request_data)
        response.raise_for_status()

        return response.json()

    # Secrets
    async def get_secret(
        self,
        secret_store_name: str,
        key: str,
        metadata: Optional[Dict[str, str]] = None
    ) -> str:
        """Get secret from Dapr secret store"""
        url = f"{self.base_url}/secrets/{secret_store_name}/{key}"

        params = {}
        if metadata:
            params["metadata"] = json.dumps(metadata)

        response = await self.client.get(url, params=params)
        response.raise_for_status()

        return response.json()[key]

    # Configuration
    async def get_configuration(
        self,
        configuration_store_name: str,
        keys: List[str],
        metadata: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """Get configuration from Dapr configuration store"""
        url = f"{self.base_url}/configuration/{configuration_store_name}"

        request_data = {"keys": keys}
        if metadata:
            request_data["metadata"] = metadata

        response = await self.client.post(url, json=request_data)
        response.raise_for_status()

        return response.json()

    # Distributed Lock
    async def acquire_lock(
        self,
        lock_store_name: str,
        resource_id: str,
        lock_owner: str,
        expiry_in_seconds: int = 60
    ) -> bool:
        """Acquire distributed lock"""
        url = f"{self.base_url}/locks/{lock_store_name}/{resource_id}"

        request_data = {
            "lockOwner": lock_owner,
            "expiryInSeconds": expiry_in_seconds
        }

        response = await self.client.post(url, json=request_data)
        response.raise_for_status()

        return response.json().get("success", False)

    async def release_lock(
        self,
        lock_store_name: str,
        resource_id: str,
        lock_owner: str
    ) -> bool:
        """Release distributed lock"""
        url = f"{self.base_url}/locks/{lock_store_name}/{resource_id}/{lock_owner}"

        response = await self.client.delete(url)
        response.raise_for_status()

        return response.json().get("success", False)

    # Health Checks
    async def health_check(self) -> bool:
        """Check Dapr sidecar health"""
        try:
            url = f"{self.base_url}/healthz"
            response = await self.client.get(url)
            return response.status_code == 200
        except Exception as e:
            logger.error(f"Dapr health check failed: {e}")
            return False

class DaprWorkflow:
    """Dapr Workflow API client"""

    def __init__(self, dapr_client: DaprClient):
        self.dapr_client = dapr_client

    async def start_workflow(
        self,
        workflow_name: str,
        input_data: Any = None,
        instance_id: Optional[str] = None
    ) -> str:
        """Start a workflow instance"""
        url = f"{self.dapr_client.base_url}/workflows/{workflow_name}/start"

        request_data = {}
        if input_data:
            request_data["input"] = input_data
        if instance_id:
            request_data["workflowInstanceId"] = instance_id

        response = await self.dapr_client.client.post(url, json=request_data)
        response.raise_for_status()

        return response.json().get("instanceId")

    async def get_workflow_state(
        self,
        instance_id: str
    ) -> Dict[str, Any]:
        """Get workflow state"""
        url = f"{self.dapr_client.base_url}/workflows/{instance_id}"

        response = await self.dapr_client.client.get(url)
        response.raise_for_status()

        return response.json()

    async def raise_workflow_event(
        self,
        instance_id: str,
        event_name: str,
        event_data: Any
    ) -> None:
        """Raise an event for a workflow"""
        url = f"{self.dapr_client.base_url}/workflows/{instance_id}/raiseEvent/{event_name}"

        response = await self.dapr_client.client.post(
            url,
            content=json.dumps(event_data)
        )
        response.raise_for_status()

    async def terminate_workflow(
        self,
        instance_id: str
    ) -> None:
        """Terminate a workflow instance"""
        url = f"{self.dapr_client.base_url}/workflows/{instance_id}/terminate"

        response = await self.dapr_client.client.post(url)
        response.raise_for_status()

# Dapr Event Store Integration
class DaprEventStore(EventStore):
    """Dapr-based event store implementation"""

    def __init__(self, dapr_client: DaprClient, state_store_name: str = "eventstore"):
        self.dapr_client = dapr_client
        self.state_store_name = state_store_name

    async def save_event(self, event: BaseEvent, stream_id: str) -> None:
        """Save event using Dapr state store"""
        # Get stream events
        stream_key = f"stream:{stream_id}"
        events = await self.dapr_client.get_state(
            self.state_store_name,
            stream_key
        ) or []

        # Add new event
        events.append(event.to_dict())

        # Save back to state store
        await self.dapr_client.save_state(
            self.state_store_name,
            stream_key,
            events
        )

        # Also save by event ID for direct lookup
        event_key = f"event:{event.event_id}"
        await self.dapr_client.save_state(
            self.state_store_name,
            event_key,
            event.to_dict()
        )

        # Update type index
        type_index_key = f"type:{event.event_type}"
        type_events = await self.dapr_client.get_state(
            self.state_store_name,
            type_index_key
        ) or []
        type_events.append(event.event_id)

        await self.dapr_client.save_state(
            self.state_store_name,
            type_index_key,
            type_events
        )

    async def get_events(
        self,
        stream_id: str,
        from_version: Optional[int] = None,
        to_version: Optional[int] = None,
        limit: Optional[int] = None
    ) -> AsyncIterator[BaseEvent]:
        """Get events from Dapr state store"""
        stream_key = f"stream:{stream_id}"
        events_data = await self.dapr_client.get_state(
            self.state_store_name,
            stream_key
        ) or []

        # Apply filters
        if from_version is not None:
            events_data = events_data[from_version:]
        if to_version is not None:
            events_data = events_data[:to_version]
        if limit is not None:
            events_data = events_data[:limit]

        for event_data in events_data:
            yield BaseEvent.from_dict(event_data)

5. Deployment Patterns

Kubernetes with Dapr

# deployments/kubernetes/dapr-config.yaml
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: app-config
  namespace: production
spec:
  tracing:
    samplingRate: "1"
    zipkin:
      endpointAddress: "http://zipkin.default.svc.cluster.local:9411/api/v2/spans"
  metrics:
    enabled: true
    rules: |
      - name: dapr-system
        description: "Dapr system metrics"
        default: true
      - name: app
        description: "Application metrics"
        default: true
  features:
    - name: AppHealthCheck
      enabled: true
  api:
    allowed:
      - name: state
      - name: invoke
      - name: publish
      - name: bindings
      - name: secrets
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: production
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers
    value: "kafka-broker:9092"
  - name: authRequired
    value: "false"
  - name: consumerID
    value: "app-pubsub"
  - name: compressionType
    value: "snappy"
  - name: batch_size
    value: "100"
  - name: batch_timeout
    value: "10ms"
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
  namespace: production
spec:
  type: state.postgresql
  version: v1
  metadata:
  - name: connectionString
    secretKeyRef:
      name: postgres-secret
      key: connection-string
  - name: tableName
    value: "app_state"
  - name: metadataTableName
    value: "app_state_metadata"
  - name: keyPrefix
    value: "none"
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: secretstore
  namespace: production
spec:
  type: secretstores.kubernetes
  version: v1
  metadata:
  - name: namespace
    value: "production"
# deployments/kubernetes/app-with-dapr.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: event-service
  namespace: production
  labels:
    app: event-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: event-service
  template:
    metadata:
      labels:
        app: event-service
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "event-service"
        dapr.io/app-port: "8000"
        dapr.io/enable-metrics: "true"
        dapr.io/enable-profiling: "true"
        dapr.io/config: "app-config"
        dapr.io/log-as-json: "true"
        dapr.io/log-level: "info"
        dapr.io/app-protocol: "http"
    spec:
      containers:
      - name: app
        image: myregistry/event-service:latest
        ports:
        - containerPort: 8000
        env:
        - name: DAPR_HTTP_PORT
          value: "3500"
        - name: DAPR_GRPC_PORT
          value: "50001"
        - name: APP_PORT
          value: "8000"
        - name: LOG_LEVEL
          value: "info"
        resources:
          requests:
            memory: "256Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /healthz
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /readyz
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
        volumeMounts:
        - name: config
          mountPath: /app/config
          readOnly: true
      volumes:
      - name: config
        configMap:
          name: app-config
---
apiVersion: v1
kind: Service
metadata:
  name: event-service
  namespace: production
spec:
  selector:
    app: event-service
  ports:
  - port: 80
    targetPort: 8000
    name: http
  type: ClusterIP

This event-driven architecture skill provides comprehensive patterns for building modern, distributed systems using event sourcing, CQRS, saga patterns, and Dapr integration. It's designed to be framework-agnostic and applicable to any domain requiring robust event-driven capabilities.

★ Insight ───────────────────────────────────── The event-driven architecture patterns shown here emphasize several key 2025 best practices:

  1. Schema Evolution: Events include versioning and support for backward/forward compatibility
  2. Broker Abstraction: Generic interfaces allow swapping Kafka, Redis, or cloud brokers without code changes
  3. Recovery Patterns: Sagas with compensation and retry policies ensure system resilience
  4. Observability: Built-in tracing, metrics, and structured logging for production monitoring
  5. Multi-tenancy: Native support for tenant isolation at the event level

These patterns ensure your event-driven system can evolve, scale, and maintain reliability as requirements change. ─────────────────────────────────────────────────