Claude Code Plugins

Community-maintained marketplace

Feedback

bloodbank-event-publisher

@delorenj/bloodbank
0
0

Complete guide for creating, publishing, and consuming events in the DeLoNET home network's 33GOD agentic developer pipeline. Built on RabbitMQ with strict type safety via Pydantic, async Python (aio-pika), FastAPI, and Redis-backed correlation tracking. This skill is REQUIRED for any work involving the home network event bus.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name bloodbank-event-publisher
description Complete guide for creating, publishing, and consuming events in the DeLoNET home network's 33GOD agentic developer pipeline. Built on RabbitMQ with strict type safety via Pydantic, async Python (aio-pika), FastAPI, and Redis-backed correlation tracking. This skill is REQUIRED for any work involving the home network event bus.
version 2.0.0

Bloodbank Event Publishing Guide

When to Use This Skill

ALWAYS use this skill when:

  • Working with any component in the DeLoNET home network
  • Building or modifying agentic workflows in 33GOD
  • Integrating external services (webhooks, APIs) with the home network
  • Creating tools that need to communicate across services
  • Debugging event flows or troubleshooting message routing

This skill is NOT needed for:

  • Standalone scripts with no home network integration
  • Work outside the DeLoNET ecosystem

What is Bloodbank?

Bloodbank is the event bus infrastructure that powers 33GOD (the agentic developer pipeline). It provides:

  • Type-safe event publishing/consuming via Pydantic models
  • RabbitMQ topic-based routing for flexible message distribution
  • Multiple access patterns: HTTP API, CLI, Python SDK, MCP server
  • Rich context capture: Agent state, git context, file references
  • Causation tracking: Correlation IDs link related events
  • Durable messaging: Events survive broker restarts

Architecture Components

┌─────────────────────────────────────────────────────────────┐
│                     Event Producers                          │
├─────────────┬──────────────┬────────────┬──────────────────┤
│   CLI       │  HTTP API    │  MCP       │  File Watcher    │
│  (Typer)    │  (FastAPI)   │  Server    │  (watchdog)      │
└──────┬──────┴──────┬───────┴─────┬──────┴────────┬─────────┘
       │             │             │               │
       └─────────────┴─────────────┴───────────────┘
                          │
                          ▼
              ┌───────────────────────┐
              │   RabbitMQ Broker     │
              │  (Topic Exchange)     │
              │  amq.topic            │
              └───────────┬───────────┘
                          │
         ┌────────────────┼────────────────┐
         ▼                ▼                ▼
    ┌─────────┐    ┌──────────┐    ┌──────────┐
    │  n8n    │    │   RAG    │    │  Agents  │
    │Workflows│    │ Ingester │    │(Consumers)│
    └─────────┘    └──────────┘    └──────────┘

Core Concepts

1. Event Structure

All events follow this structure:

EventEnvelope[T](
    event_id: UUID,              # Unique identifier for THIS event
    event_type: str,             # Routing key (e.g., "fireflies.transcript.ready")
    timestamp: datetime,         # When event was created
    version: str,                # Envelope schema version
    source: Source,              # WHO/WHAT triggered this event
    correlation_id: UUID | None, # Links to previous related event
    agent_context: AgentContext | None,  # Rich agent metadata (if applicable)
    payload: T                   # Your typed event data
)

2. Source Metadata

Source identifies WHO or WHAT triggered the event:

class TriggerType(str, Enum):
    MANUAL = "manual"           # Human-initiated
    AGENT = "agent"             # AI agent triggered
    SCHEDULED = "scheduled"     # Cron/timer triggered
    FILE_WATCH = "file_watch"   # File system event
    HOOK = "hook"               # External webhook

class Source(BaseModel):
    host: str                   # Machine that generated event (e.g., "big-chungus")
    type: TriggerType           # How was this triggered?
    app: Optional[str]          # Application name (e.g., "n8n", "claude-code")
    meta: Optional[Dict[str, Any]]  # Additional context

Examples:

# Manual CLI invocation
Source(host="workstation", type=TriggerType.MANUAL, app="bb-cli")

# Claude Code agent
Source(host="workstation", type=TriggerType.AGENT, app="claude-code")

# n8n workflow responding to file change
Source(host="big-chungus", type=TriggerType.FILE_WATCH, app="n8n")

# Fireflies webhook
Source(host="fireflies.ai", type=TriggerType.HOOK, app="fireflies")

3. Agent Context (Optional)

When to include: Only when source.type == TriggerType.AGENT

AgentContext provides rich metadata about the AI agent:

class AgentType(str, Enum):
    CLAUDE_CODE = "claude-code"
    CLAUDE_CHAT = "claude-chat"
    GEMINI_CLI = "gemini-cli"
    LETTA = "letta"
    AGNO = "agno"
    SMOLAGENT = "smolagent"
    ATOMIC_AGENT = "atomic-agent"
    # ... others

class CodeState(BaseModel):
    """Git context for agent's working environment"""
    repo_url: Optional[str]
    branch: Optional[str]
    working_diff: Optional[str]      # Unstaged changes
    branch_diff: Optional[str]       # Diff vs main
    last_commit_hash: Optional[str]

class AgentContext(BaseModel):
    type: AgentType
    name: Optional[str]              # Agent's persona/name (e.g., "Tonny")
    system_prompt: Optional[str]     # Initial system prompt
    instance_id: Optional[str]       # Unique session identifier
    mcp_servers: Optional[List[str]] # Connected MCP servers
    file_references: Optional[List[str]]  # Files in context
    url_references: Optional[List[str]]   # URLs in context
    code_state: Optional[CodeState]  # Git state snapshot
    checkpoint_id: Optional[str]     # For checkpoint-based agents
    meta: Optional[Dict[str, Any]]   # Extensibility

Example:

AgentContext(
    type=AgentType.CLAUDE_CODE,
    name="Tonny",
    system_prompt="You are Tonny, a helpful code assistant...",
    instance_id="session_abc123",
    mcp_servers=["filesystem", "git", "web-search"],
    file_references=[
        "/home/jarad/code/bloodbank/event_producers/events.py",
        "/home/jarad/code/bloodbank/rabbit.py"
    ],
    code_state=CodeState(
        repo_url="https://github.com/delorenj/bloodbank",
        branch="feature/fireflies-integration",
        working_diff="+ Added FirefliesTranscriptReadyPayload\n- ...",
        last_commit_hash="a7b3c9d"
    )
)

4. Correlation Tracking with Redis

NEW in v2.0: Bloodbank now automatically tracks event causation chains using Redis!

Features:

  • Deterministic Event IDs: Generate the same UUID for identical events (idempotency)
  • Automatic Correlation Tracking: Redis stores parent→child relationships
  • Multiple Parents: Events can be caused by multiple parent events
  • Queryable Chains: Trace full ancestry or descendancy of any event
  • 30-day TTL: Correlation data expires automatically

How it works:

from rabbit import Publisher

publisher = Publisher(enable_correlation_tracking=True)
await publisher.start()

# 1. Generate deterministic event ID
upload_event_id = publisher.generate_event_id(
    "fireflies.transcript.upload",
    meeting_id="abc123",
    user_id="jarad"
)
# ☝️ Same inputs ALWAYS generate the same UUID (idempotency!)

# 2. Publish upload event
await publisher.publish(
    routing_key="fireflies.transcript.upload",
    body=envelope.model_dump(mode="json"),
    event_id=upload_event_id
)

# 3. Later, when Fireflies webhook fires...
ready_event_id = uuid4()
await publisher.publish(
    routing_key="fireflies.transcript.ready",
    body=envelope.model_dump(mode="json"),
    event_id=ready_event_id,
    parent_event_ids=[upload_event_id]  # ← Links back automatically!
)

# 4. Query the correlation chain
chain = publisher.get_correlation_chain(ready_event_id, "ancestors")
# Returns: [upload_event_id, ready_event_id]

Why Deterministic IDs?

Consumers can dedupe based on event_id. If an event is published twice (e.g., webhook retry), consumers see the same UUID and can skip reprocessing.

Redis Schema:

# Forward mapping: child → parents
bloodbank:correlation:forward:{child_uuid}
  → {"parent_event_ids": ["uuid1", "uuid2"], "created_at": "...", "metadata": {...}}

# Reverse mapping: parent → children (for querying)
bloodbank:correlation:reverse:{parent_uuid}
  → Set{"child1_uuid", "child2_uuid", ...}

Configuration:

# .env or environment variables
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=optional
CORRELATION_TTL_DAYS=30

5. Multiple Correlation IDs

NEW in v2.0: correlation_ids is now a List[UUID] (not Optional[UUID])!

Events can be caused by multiple parent events:

# Example: Transcript combines audio from two recordings
processed_envelope = EventEnvelope(
    event_type="transcript.processed",
    correlation_ids=[
        recording1_event_id,
        recording2_event_id
    ],  # ← Multiple parents!
    payload=...
)

Use cases:

  • Merging multiple recordings into one transcript
  • Aggregating data from multiple sources
  • Fan-in patterns (multiple events → one result)

6. Correlation ID Pattern (Legacy Note)

In v1.0: We used correlation_id (singular) and manually set it.

In v2.0: Use correlation_ids (plural, list) + automatic Redis tracking via parent_event_ids parameter in publish().

Example Flow:

# Event 1: Upload request
upload_event_id = publisher.generate_event_id(
    "fireflies.transcript.upload",
    meeting_id="abc123"
)

upload_envelope = EventEnvelope[FirefliesTranscriptUploadPayload](
    event_id=upload_event_id,  # ← Deterministic!
    event_type="fireflies.transcript.upload",
    correlation_ids=[],  # First in chain
    payload=FirefliesTranscriptUploadPayload(...)
)

await publisher.publish(
    routing_key="fireflies.transcript.upload",
    body=upload_envelope.model_dump(mode="json"),
    event_id=upload_event_id
)

# Event 2: Transcription complete (webhook from Fireflies)
ready_event_id = uuid4()
ready_envelope = EventEnvelope[FirefliesTranscriptReadyPayload](
    event_id=ready_event_id,
    event_type="fireflies.transcript.ready",
    correlation_ids=[upload_event_id],  # ← Links back!
    payload=FirefliesTranscriptReadyPayload(...)
)

await publisher.publish(
    routing_key="fireflies.transcript.ready",
    body=ready_envelope.model_dump(mode="json"),
    event_id=ready_event_id,
    parent_event_ids=[upload_event_id]  # ← Automatic Redis tracking!
)

# Event 3: RAG ingestion complete
processed_event_id = uuid4()
processed_envelope = EventEnvelope[FirefliesTranscriptProcessedPayload](
    event_id=processed_event_id,
    event_type="fireflies.transcript.processed",
    correlation_ids=[ready_event_id],  # ← Links to ready event
    payload=FirefliesTranscriptProcessedPayload(...)
)

await publisher.publish(
    routing_key="fireflies.transcript.processed",
    body=processed_envelope.model_dump(mode="json"),
    event_id=processed_event_id,
    parent_event_ids=[ready_event_id]
)

# Query the full chain!
chain = publisher.get_correlation_chain(processed_event_id, "ancestors")
# Returns: [upload_event_id, ready_event_id, processed_event_id]

Routing Key Conventions

Format: <namespace>.<entity>.<action>

Examples:

  • fireflies.transcript.upload - Fireflies namespace, transcript entity, upload action
  • fireflies.transcript.ready - Same namespace/entity, ready action
  • fireflies.transcript.processed - Same namespace/entity, processed action
  • logjangler.thread.prompt - Logjangler namespace, thread entity, prompt action
  • logjangler.thread.response - Same namespace/entity, response action
  • artifact.created - Global artifact namespace, created action
  • artifact.updated - Same namespace, updated action

Wildcard Subscription Patterns:

  • fireflies.* - All events in fireflies namespace (any entity.action)
  • *.transcript.* - All transcript events across namespaces
  • *.*.ready - All "ready" events across all namespaces/entities
  • # - Everything (use for debugging only)

Error Event Patterns

NEW in v2.0: Standardized error events for failure tracking!

Convention: Use .failed or .error suffix on the base event type.

Examples:

  • fireflies.transcript.failed - Transcription failed at any stage
  • llm.error - LLM call failed
  • artifact.ingestion.failed - RAG ingestion failed

Error Event Structure:

All error events should include:

class SomeErrorPayload(BaseModel):
    failed_stage: str              # Where did it fail?
    error_message: str             # Human-readable error
    error_code: Optional[str]      # Machine-readable code
    is_retryable: bool             # Can we retry?
    retry_count: int = 0           # How many times have we tried?
    metadata: Dict[str, Any]       # Additional context

Publishing Error Events:

# Example: Fireflies transcription failed
error_event = EventEnvelope[FirefliesTranscriptFailedPayload](
    event_type="fireflies.transcript.failed",
    correlation_ids=[original_upload_event_id],  # ← Link to failed attempt!
    source=Source(host="fireflies.ai", type=TriggerType.HOOK, app="fireflies"),
    payload=FirefliesTranscriptFailedPayload(
        failed_stage="transcription",
        error_message="Audio quality too low",
        error_code="AUDIO_QUALITY_LOW",
        is_retryable=False,
        retry_count=0,
        metadata={"meeting_id": "abc123"}
    )
)

await publisher.publish(
    routing_key="fireflies.transcript.failed",
    body=error_event.model_dump(mode="json"),
    event_id=error_event.event_id,
    parent_event_ids=[original_upload_event_id]
)

Consumer Pattern for Errors:

# Subscribe to all error events
await queue.bind(exchange, routing_key="*.failed")
await queue.bind(exchange, routing_key="*.error")

# Handle retry logic
if error_payload.is_retryable and error_payload.retry_count < 3:
    # Republish original event with incremented retry count
    await retry_logic(error_payload)
else:
    # Send to dead letter queue or alert humans
    await send_alert(error_payload)

File Locations

bloodbank/
├── rabbit.py                    # Publisher class with correlation tracking
├── correlation_tracker.py       # ← NEW: Redis-backed correlation tracking
├── config.py                    # Settings via Pydantic (includes Redis config)
├── pyproject.toml               # Dependencies (now includes redis)
├── kubernetes/
│   └── deploy.yaml              # K8s deployment
└── event_producers/
    ├── __init__.py
    ├── events.py                # ← ALL EVENT PAYLOADS (with error events!)
    ├── cli.py                   # Typer CLI for publishing
    ├── http.py                  # FastAPI HTTP endpoints (with debug endpoints)
    ├── mcp_server.py            # MCP server for agents
    ├── watch.py                 # File watcher → events
    ├── n8n/
    │   ├── workflows/           # n8n workflow JSON files
    │   └── docs/                # n8n integration guides
    └── scripts/
        ├── artifact_consumer.py # Example consumer
        └── rag_transcript_consumer.py  # RAG ingestion consumer

How to Define New Events

Step 1: Define Payload Model in event_producers/events.py

from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional, List

class YourEventPayload(BaseModel):
    """
    Description of what this event represents.
    
    Published when: [describe trigger condition]
    Consumed by: [list consumers]
    Routing Key: namespace.entity.action
    """
    # Required fields
    some_id: str
    content: str
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    
    # Optional fields
    metadata: Optional[dict] = None
    tags: List[str] = Field(default_factory=list)
    
    class Config:
        # Example values for documentation
        json_schema_extra = {
            "example": {
                "some_id": "abc123",
                "content": "Example content",
                "timestamp": "2025-10-18T12:00:00Z",
                "tags": ["example", "documentation"]
            }
        }

Step 2: Create Envelope Function (Helper)

def create_your_event(
    payload: YourEventPayload,
    source_host: str,
    source_app: str,
    correlation_id: Optional[UUID] = None,
    agent_context: Optional[AgentContext] = None
) -> EventEnvelope[YourEventPayload]:
    """Helper to create properly-formed event envelope"""
    return EventEnvelope[YourEventPayload](
        event_id=uuid4(),
        event_type="namespace.entity.action",
        timestamp=datetime.now(timezone.utc),
        version="1.0.0",
        source=Source(
            host=source_host,
            type=TriggerType.MANUAL,  # Adjust as needed
            app=source_app
        ),
        correlation_id=correlation_id,
        agent_context=agent_context,
        payload=payload
    )

How to Publish Events

Method 1: Python SDK with Correlation Tracking (Recommended)

from rabbit import Publisher
from event_producers.events import YourEventPayload, EventEnvelope, Source, TriggerType
from uuid import uuid4
import asyncio

async def publish_example():
    # Create publisher with correlation tracking enabled
    publisher = Publisher(enable_correlation_tracking=True)
    await publisher.start()
    
    try:
        # Option A: Generate deterministic event ID (idempotency)
        event_id = publisher.generate_event_id(
            "namespace.entity.action",
            unique_field="abc123",
            timestamp="2025-10-18"
        )
        # ☝️ Same inputs = same UUID every time!
        
        # Option B: Generate random UUID
        event_id = uuid4()
        
        # Create payload
        payload = YourEventPayload(
            some_id="abc123",
            content="Hello, Bloodbank!"
        )
        
        # Create envelope
        envelope = EventEnvelope[YourEventPayload](
            event_id=event_id,
            event_type="namespace.entity.action",
            timestamp=datetime.now(timezone.utc),
            version="1.0.0",
            source=Source(
                host="workstation",
                type=TriggerType.MANUAL,
                app="my-service"
            ),
            correlation_ids=[],  # Or [parent1_id, parent2_id] if this is a follow-up
            payload=payload
        )
        
        # Publish with automatic correlation tracking
        await publisher.publish(
            routing_key=envelope.event_type,
            body=envelope.model_dump(mode="json"),
            event_id=envelope.event_id,
            parent_event_ids=[]  # Or [parent1_id, parent2_id] for auto-tracking
        )
        
        print(f"✓ Published event {envelope.event_id}")
        
        # Query correlation chain (if needed)
        if parent_event_ids:
            chain = publisher.get_correlation_chain(envelope.event_id, "ancestors")
            print(f"Correlation chain: {chain}")
        
    finally:
        await publisher.close()

if __name__ == "__main__":
    asyncio.run(publish_example())

Method 2: HTTP API (Webhooks, External Services)

Add endpoint to event_producers/http.py:

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from .events import YourEventPayload, EventEnvelope, Source, TriggerType

@app.post("/events/your-namespace/your-entity")
async def publish_your_event(payload: YourEventPayload, request: Request):
    """Publish your custom event"""
    envelope = EventEnvelope[YourEventPayload](
        event_id=uuid4(),
        event_type="namespace.entity.action",
        timestamp=datetime.now(timezone.utc),
        version="1.0.0",
        source=Source(
            host=request.client.host,
            type=TriggerType.HOOK,
            app="http-api"
        ),
        payload=payload
    )
    
    await publisher.publish(
        routing_key=envelope.event_type,
        body=envelope.model_dump(mode="json"),
        message_id=str(envelope.event_id)
    )
    
    return JSONResponse({"event_id": str(envelope.event_id)})

Usage:

curl -X POST http://localhost:8682/events/your-namespace/your-entity \
  -H "Content-Type: application/json" \
  -d '{
    "some_id": "abc123",
    "content": "Hello from curl!"
  }'

Method 3: CLI (Manual/Scripting)

Using the bb CLI (from event_producers/cli.py):

# Publish LLM prompt
bb publish-prompt --provider claude --model sonnet-4 "Write me a haiku"

# Wrap an LLM tool to auto-publish events
bb wrap claude-code -- "Fix the bug in app.py"

Extend the CLI for your events:

# In event_producers/cli.py
@app.command()
def publish_custom(some_id: str, content: str):
    """Publish a custom event"""
    payload = YourEventPayload(some_id=some_id, content=content)
    envelope = create_your_event(
        payload=payload,
        source_host=socket.gethostname(),
        source_app="bb-cli"
    )
    
    # Sync wrapper around async publish
    asyncio.run(_publish(envelope))
    typer.echo(f"✓ Published {envelope.event_id}")

Method 4: MCP Server (For AI Agents)

The MCP server (event_producers/mcp_server.py) exposes tools to agents:

# Example MCP tool definition
@mcp.tool()
async def publish_event(
    event_type: str,
    payload: dict,
    correlation_id: Optional[str] = None
) -> str:
    """Publish an event to Bloodbank event bus"""
    # Agent can call this tool with structured data
    # Implementation handles envelope creation and publishing
    ...

Usage from Claude:

I need to publish a transcript.ready event. Let me use the MCP tool...

How to Consume Events

Pattern 1: Python Consumer (Long-Running Service)

import asyncio
import aio_pika
import json
from typing import Callable
from event_producers.events import EventEnvelope, YourEventPayload

class YourConsumer:
    def __init__(self, rabbitmq_url: str):
        self.rabbitmq_url = rabbitmq_url
        self.connection = None
        self.channel = None
    
    async def start(self):
        """Connect to RabbitMQ and start consuming"""
        self.connection = await aio_pika.connect_robust(self.rabbitmq_url)
        self.channel = await self.connection.channel()
        
        # Declare exchange (idempotent)
        exchange = await self.channel.declare_exchange(
            "amq.topic",
            aio_pika.ExchangeType.TOPIC,
            durable=True
        )
        
        # Create queue
        queue = await self.channel.declare_queue(
            "your-service-queue",
            durable=True
        )
        
        # Bind to routing keys you care about
        await queue.bind(exchange, routing_key="namespace.entity.*")
        
        # Start consuming
        await queue.consume(self._handle_message)
        print("✓ Consumer started, waiting for messages...")
    
    async def _handle_message(self, message: aio_pika.IncomingMessage):
        """Process incoming message"""
        async with message.process():
            try:
                # Parse envelope
                data = json.loads(message.body.decode())
                envelope = EventEnvelope[YourEventPayload](**data)
                
                # Handle event
                await self._process_event(envelope)
                
                # Message auto-acked via context manager
                
            except Exception as e:
                print(f"✗ Error processing message: {e}")
                # Message will be requeued or sent to DLQ
    
    async def _process_event(self, envelope: EventEnvelope[YourEventPayload]):
        """Your business logic here"""
        print(f"Processing event {envelope.event_id}")
        print(f"  Type: {envelope.event_type}")
        print(f"  Payload: {envelope.payload}")
        
        # Do something with the event
        # ...
    
    async def stop(self):
        """Clean shutdown"""
        if self.channel:
            await self.channel.close()
        if self.connection:
            await self.connection.close()

# Run the consumer
async def main():
    consumer = YourConsumer("amqp://user:pass@localhost:5673/")
    await consumer.start()
    
    # Keep running
    try:
        await asyncio.Future()  # Run forever
    except KeyboardInterrupt:
        await consumer.stop()

if __name__ == "__main__":
    asyncio.run(main())

Pattern 2: n8n Workflow (No Code)

  1. Add RabbitMQ Trigger node

    • Mode: receiver
    • Exchange: amq.topic
    • Routing Key: namespace.entity.* (your pattern)
    • Queue: n8n-your-workflow
  2. Process the message

    // n8n Function node
    const envelope = JSON.parse($json.content);
    return {
        event_id: envelope.event_id,
        event_type: envelope.event_type,
        payload: envelope.payload
    };
    
  3. Route by event type

    • IF node: {{ $json.event_type == "namespace.entity.action" }}
    • Different branches for different event types

See event_producers/n8n/README.md for complete n8n integration guide.

Debug & Monitoring

Correlation Debug Endpoints

NEW in v2.0: HTTP API includes correlation debugging!

# Get full correlation debug info for an event
curl http://localhost:8682/debug/correlation/{event_id}

# Response:
{
  "event_id": "abc-123",
  "parents": ["xyz-789"],
  "children": ["def-456", "ghi-789"],
  "ancestors": ["root-000", "xyz-789", "abc-123"],
  "descendants": ["abc-123", "def-456", "ghi-789"],
  "metadata": {"reason": "transcription completed"}
}

# Get correlation chain
curl http://localhost:8682/debug/correlation/{event_id}/chain?direction=ancestors

# Response:
{
  "event_id": "abc-123",
  "direction": "ancestors",
  "chain": ["root-000", "xyz-789", "abc-123"]
}

Query from Python

# Debug correlation from your code
debug_data = publisher.debug_correlation(event_id)
print(f"Parents: {debug_data['parents']}")
print(f"Ancestors: {debug_data['ancestors']}")

# Get just the chain
ancestors = publisher.get_correlation_chain(event_id, "ancestors")
descendants = publisher.get_correlation_chain(event_id, "descendants")

Redis Direct Access

# View forward mapping (child → parents)
redis-cli GET "bloodbank:correlation:forward:abc-123"

# View reverse mapping (parent → children)
redis-cli SMEMBERS "bloodbank:correlation:reverse:xyz-789"

Complete Example: Fireflies Integration

Event Flow

┌─────────────────────────────────────────────────────────────┐
│ 1. File Watch detects new recording in ~/Recordings         │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│ 2. Publish: fireflies.transcript.upload                     │
│    - event_id: abc-123                                       │
│    - media_file: /path/to/recording.mp3                     │
│    - correlation_id: None (first in chain)                  │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│ 3. n8n workflow uploads to Fireflies API                    │
│    - Consumes: fireflies.transcript.upload                  │
│    - Uploads media file                                      │
│    - Fireflies processes asynchronously                      │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│ 4. Fireflies webhook fires (transcription complete)         │
│    - POST to /webhooks/fireflies                            │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│ 5. Publish: fireflies.transcript.ready                      │
│    - event_id: def-456                                       │
│    - correlation_id: abc-123 (links back!)                  │
│    - payload: Full transcript with sentences, speakers, etc │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│ 6. RAG consumer ingests transcript                          │
│    - Consumes: fireflies.transcript.ready                   │
│    - Chunks sentences into documents                         │
│    - Generates embeddings                                    │
│    - Stores in vector database                               │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│ 7. Publish: fireflies.transcript.processed                  │
│    - event_id: ghi-789                                       │
│    - correlation_id: def-456 (links back!)                  │
│    - rag_document_id: Internal ID for queries               │
└─────────────────────────────────────────────────────────────┘

Result: Complete audit trail
abc-123 → def-456 → ghi-789

Implementation

See /home/claude/fireflies_events_proposal.py for complete schemas.

Step 1: Define events in events.py (already done in proposal)

Step 2: Add webhook endpoint in http.py

@app.post("/webhooks/fireflies")
async def fireflies_webhook(req: Request):
    """Receive Fireflies completion webhook"""
    body = await req.json()
    
    # Parse webhook into our schema
    payload = FirefliesTranscriptReadyPayload(
        id=body["data"]["id"],
        title=body["data"]["title"],
        date=datetime.fromtimestamp(body["data"]["date"] / 1000),
        duration=body["data"]["duration"],
        transcript_url=body["data"]["transcript_url"],
        sentences=[
            TranscriptSentence(**s) for s in body["data"]["sentences"]
        ],
        # ... map other fields
    )
    
    # Look up original upload event to get correlation_id
    # (In practice, you'd query a state store)
    original_event_id = await get_upload_event_id(payload.id)
    
    # Create envelope
    envelope = EventEnvelope[FirefliesTranscriptReadyPayload](
        event_id=uuid4(),
        event_type="fireflies.transcript.ready",
        timestamp=datetime.now(timezone.utc),
        version="1.0.0",
        source=Source(
            host="fireflies.ai",
            type=TriggerType.HOOK,
            app="fireflies"
        ),
        correlation_id=original_event_id,  # Links back!
        payload=payload
    )
    
    # Publish
    await publisher.publish(
        routing_key=envelope.event_type,
        body=envelope.model_dump(mode="json"),
        message_id=str(envelope.event_id)
    )
    
    return {"status": "ok", "event_id": str(envelope.event_id)}

Step 3: Create RAG consumer

# See event_producers/scripts/rag_transcript_consumer.py for complete example

class RAGTranscriptConsumer:
    async def start(self):
        # ... connection setup ...
        await queue.bind(exchange, routing_key="fireflies.transcript.ready")
        await queue.consume(self._handle_transcript)
    
    async def _handle_transcript(self, message):
        async with message.process():
            envelope = EventEnvelope[FirefliesTranscriptReadyPayload](
                **json.loads(message.body.decode())
            )
            
            # Ingest into RAG
            doc_id = await self._ingest_to_rag(envelope.payload)
            
            # Publish processed event
            await self._publish_processed(
                transcript_id=envelope.payload.id,
                rag_doc_id=doc_id,
                correlation_id=envelope.event_id  # Links back!
            )

Best Practices

1. Event Naming

DO:

  • Use lowercase with dots: fireflies.transcript.ready
  • Follow namespace.entity.action pattern
  • Be specific: llm.thread.response not llm.response

DON'T:

  • Use camelCase: fireflies.transcriptReady
  • Mix concerns: fireflies.transcript.ready.and.processed
  • Be too generic: event or message

2. Payload Design

DO:

  • Include enough data to avoid additional API calls
  • Use ISO 8601 for timestamps
  • Provide example values in docstrings
  • Version your payload schemas

DON'T:

  • Include sensitive credentials or tokens
  • Make payloads enormous (>1MB) - use URLs/references
  • Change existing fields - add new ones instead

3. Idempotency & Deduplication

DO:

  • Use deterministic event IDs when possible:
    event_id = publisher.generate_event_id(
        "fireflies.transcript.upload",
        meeting_id="abc123"
    )
    
  • Consumers should dedupe based on event_id
  • Store processed event IDs in Redis with TTL
  • Handle duplicate events gracefully (idempotent operations)

DON'T:

  • Assume events arrive only once
  • Process the same event multiple times
  • Use random UUIDs for events that should be idempotent

Consumer Deduplication Pattern:

import redis

class IdempotentConsumer:
    def __init__(self):
        self.redis = redis.Redis()
        self.processed_ttl = 86400  # 24 hours
    
    async def _handle_message(self, message):
        envelope = json.loads(message.body)
        event_id = envelope["event_id"]
        
        # Check if already processed
        cache_key = f"processed:{event_id}"
        if self.redis.exists(cache_key):
            print(f"Skipping duplicate event {event_id}")
            await message.ack()
            return
        
        # Process event
        await self._process(envelope)
        
        # Mark as processed
        self.redis.setex(cache_key, self.processed_ttl, "1")
        await message.ack()

4. Correlation IDs

DO:

  • Always set correlation_ids when event is caused by another event
  • Use parent_event_ids parameter for automatic tracking
  • Document correlation patterns in payload docstrings
  • Query chains for debugging: publisher.get_correlation_chain()

DON'T:

  • Create circular references (A → B → A)
  • Lose the correlation chain (always link to immediate parent)
  • Forget that correlation_ids is now a list (can have multiple parents)

5. Error Handling

DO:

  • Publish error events (e.g., namespace.entity.failed)
  • Include error details in payload (failed_stage, error_message, error_code)
  • Set correlation_ids to the failed event's ID
  • Use is_retryable flag for retry logic
  • Implement exponential backoff for retries
  • Use DLQ (Dead Letter Queue) for persistent failures
  • Track retry_count to prevent infinite loops

DON'T:

  • Silently swallow errors
  • Retry indefinitely (set max retry count = 3)
  • Log errors without publishing events
  • Lose context when publishing error events

Error Event Publishing Pattern:

try:
    # Attempt operation
    result = await process_transcript(transcript_id)
except Exception as e:
    # Publish error event
    error_payload = FirefliesTranscriptFailedPayload(
        failed_stage="processing",
        error_message=str(e),
        error_code=getattr(e, 'code', None),
        transcript_id=transcript_id,
        is_retryable=isinstance(e, RetryableError),
        retry_count=current_retry_count,
        metadata={
            "original_event_id": str(original_event_id),
            "stack_trace": traceback.format_exc()
        }
    )
    
    error_envelope = create_envelope(
        event_type="fireflies.transcript.failed",
        payload=error_payload,
        source=Source(host=socket.gethostname(), type=TriggerType.AGENT, app="rag-consumer"),
        correlation_ids=[original_event_id]  # Link to failed attempt!
    )
    
    await publisher.publish(
        routing_key="fireflies.transcript.failed",
        body=error_envelope.model_dump(mode="json"),
        event_id=error_envelope.event_id,
        parent_event_ids=[original_event_id]
    )
    
    # Re-raise or handle based on retry logic
    if error_payload.is_retryable and error_payload.retry_count < 3:
        # Schedule retry
        await schedule_retry(original_event_id, delay=2 ** retry_count)
    else:
        # Send to DLQ
        await send_to_dlq(original_event_id)

6. Testing

DO:

  • Test with real RabbitMQ instance (use docker-compose)
  • Verify routing keys match event_type
  • Check correlation IDs propagate correctly
  • Monitor queue depths in production

DON'T:

  • Test only with mocks (routing is complex!)
  • Assume events arrive in order
  • Forget to test failure scenarios

Troubleshooting

Messages not being consumed

  1. Check queue bindings:

    kubectl -n messaging exec statefulset/bloodbank-server -- \
      rabbitmqctl list_bindings
    
  2. Verify routing key matches pattern:

    • Published: fireflies.transcript.ready
    • Binding: fireflies.transcript.*
    • Binding: fireflies.*
    • Binding: llm.* ❌ (won't match)
  3. Check consumer is running:

    kubectl -n messaging exec statefulset/bloodbank-server -- \
      rabbitmqctl list_consumers
    

Published events disappear

  1. Queue might not exist:

    • Consumers create queues on startup
    • If no consumer, no queue = messages dropped
    • Solution: Declare durable queue before publishing
  2. Wrong exchange:

    • Must publish to amq.topic
    • Check Publisher configuration
  3. Message not persistent:

    • Set delivery_mode=2 in BasicProperties
    • Check Publisher code uses persistent=True

Correlation IDs not linking

  1. Check event_id → correlation_id mapping:

    # When handling event A
    event_a_id = envelope.event_id  # abc-123
    
    # When publishing event B
    envelope_b.correlation_id = event_a_id  # Must match!
    
  2. Verify UUIDs are valid:

    • Use uuid4() to generate
    • Convert to string for JSON: str(uuid_obj)

Environment Configuration

Local Development (with port-forward)

# Port-forward RabbitMQ
kubectl -n messaging port-forward svc/bloodbank 15672:15672 5673:5672

# Set environment variables
export RABBIT_URL="amqp://user:pass@localhost:5673/"
export EXCHANGE_NAME="amq.topic"

# Get credentials
kubectl -n messaging get secret bloodbank-default-user \
  -o jsonpath='{.data.username}' | base64 -d; echo
kubectl -n messaging get secret bloodbank-default-user \
  -o jsonpath='{.data.password}' | base64 -d; echo

In-Cluster Deployment

# kubernetes/your-service.yaml
env:
  - name: RABBIT_URL
    valueFrom:
      secretKeyRef:
        name: bloodbank-credentials
        key: amqp-url
  - name: EXCHANGE_NAME
    value: "amq.topic"

Quick Reference

Common Commands

# View RabbitMQ UI
kubectl -n messaging port-forward svc/bloodbank 15672:15672
# Visit: http://localhost:15672

# Check queue depth
kubectl -n messaging exec statefulset/bloodbank-server -- \
  rabbitmqctl list_queues name messages

# List connections
kubectl -n messaging exec statefulset/bloodbank-server -- \
  rabbitmqctl list_connections name state

# Purge a queue (DANGER!)
kubectl -n messaging exec statefulset/bloodbank-server -- \
  rabbitmqctl purge_queue "your-queue-name"

Example Routing Keys

llm.prompt                  → LLM interaction started
llm.response                → LLM responded
fireflies.transcript.upload → Request transcription
fireflies.transcript.ready  → Transcription complete
artifact.created            → New artifact generated
artifact.updated            → Artifact modified
logjangler.thread.prompt    → LogJangler thread prompt
logjangler.thread.response  → LogJangler thread response

Subscription Patterns

fireflies.*                 → All Fireflies events
*.transcript.*              → All transcript events
llm.#                       → All LLM events (any depth)
#                           → Everything (debug only)

Additional Resources


Remember: Always include rich context (source, agent_context if applicable), use correlation IDs to link events, and publish complete data to avoid additional API calls by consumers.

Happy eventing! 🩸