| name | configuring-dapr-pubsub |
| description | Configures Dapr pub/sub components for event-driven microservices with Kafka or Redis. Use when wiring agent-to-agent communication, setting up event subscriptions, or integrating Dapr sidecars. Covers component configuration, subscription patterns, publishing events, and Kubernetes deployment. NOT when using direct Kafka clients or non-Dapr messaging patterns. |
Configuring Dapr Pub/Sub
Wire event-driven microservices using Dapr pub/sub with Kafka or Redis backends.
Quick Start
# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
- name: disableTls
value: "true"
# Apply component
kubectl apply -f components/pubsub.yaml
# Test with Dapr CLI
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'
Component Configurations
Kafka (Production)
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
# Required
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
# Consumer settings
- name: consumerGroup
value: "{namespace}-{appId}" # Templated per deployment
- name: consumeRetryInterval
value: "100ms"
- name: heartbeatInterval
value: "3s"
- name: sessionTimeout
value: "10s"
# Performance
- name: maxMessageBytes
value: "1048576" # 1MB
- name: channelBufferSize
value: "256"
Kafka with SASL Authentication
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-secure
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka.example.com:9093"
- name: authType
value: "password"
- name: saslUsername
value: "dapr-user"
- name: saslPassword
secretKeyRef:
name: kafka-secrets
key: password
- name: saslMechanism
value: "SCRAM-SHA-256"
Redis (Development/Simple)
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: redis-pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "redis-master.redis.svc.cluster.local:6379"
- name: redisPassword
secretKeyRef:
name: redis-secrets
key: password
Subscription Patterns
Declarative Subscription (Recommended)
# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: task-created-subscription
spec:
pubsubname: pubsub
topic: task-created
routes:
default: /dapr/task-created
scopes:
- triage-agent
- concepts-agent
Programmatic Subscription (FastAPI)
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/dapr/subscribe")
async def subscribe():
"""Dapr calls this to discover subscriptions."""
return [
{
"pubsubname": "pubsub",
"topic": "task-created",
"route": "/dapr/task-created"
},
{
"pubsubname": "pubsub",
"topic": "task-completed",
"route": "/dapr/task-completed"
}
]
@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
"""Handle incoming CloudEvent."""
event = await request.json()
# CloudEvent wrapper - data is nested
task_data = event.get("data", event)
task_id = task_data.get("task_id")
# Process event
print(f"Task created: {task_id}")
return {"status": "SUCCESS"}
Publishing Events
From FastAPI Service
import httpx
DAPR_URL = "http://localhost:3500"
async def publish_event(topic: str, data: dict):
"""Publish event through Dapr sidecar."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
# Usage
await publish_event("task-created", {
"task_id": "123",
"title": "Learn Python",
"user_id": "user-456"
})
With CloudEvent Metadata
async def publish_cloudevent(topic: str, data: dict, event_type: str):
"""Publish with explicit CloudEvent fields."""
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={
"Content-Type": "application/cloudevents+json",
"ce-specversion": "1.0",
"ce-type": event_type,
"ce-source": "triage-agent",
"ce-id": str(uuid.uuid4())
}
)
Kubernetes Deployment
Component Scoping
Limit component access to specific apps:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka:9092"
scopes:
- triage-agent
- concepts-agent
- debug-agent
App Deployment with Dapr Sidecar
apiVersion: apps/v1
kind: Deployment
metadata:
name: triage-agent
spec:
replicas: 2
selector:
matchLabels:
app: triage-agent
template:
metadata:
labels:
app: triage-agent
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "triage-agent"
dapr.io/app-port: "8000"
dapr.io/enable-api-logging: "true"
spec:
containers:
- name: triage-agent
image: myapp/triage-agent:latest
ports:
- containerPort: 8000
env:
- name: DAPR_HTTP_PORT
value: "3500"
Multi-Agent Routing Pattern
Triage Agent → Specialist Agents
# triage_agent.py
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
DAPR_URL = "http://localhost:3500"
@app.post("/api/question")
async def handle_question(request: Request):
data = await request.json()
question = data["question"]
# Route based on content
if "python" in question.lower() or "code" in question.lower():
topic = "concepts-request"
elif "error" in question.lower() or "bug" in question.lower():
topic = "debug-request"
else:
topic = "concepts-request" # Default
# Publish to appropriate agent
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json={
"question": question,
"user_id": data["user_id"],
"session_id": data["session_id"]
}
)
return {"status": "routed", "topic": topic}
Specialist Agent Handler
# concepts_agent.py
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
DAPR_URL = "http://localhost:3500"
@app.get("/dapr/subscribe")
async def subscribe():
return [{"pubsubname": "pubsub", "topic": "concepts-request", "route": "/dapr/handle"}]
@app.post("/dapr/handle")
async def handle_concepts_request(request: Request):
event = await request.json()
data = event.get("data", event)
# Process with LLM
response = await process_with_llm(data["question"])
# Publish response
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/response-ready",
json={
"session_id": data["session_id"],
"response": response,
"agent": "concepts"
}
)
return {"status": "SUCCESS"}
Local Development
Run with Dapr CLI
# Start subscriber first
dapr run --app-id concepts-agent --app-port 8001 --dapr-http-port 3501 \
--resources-path ./components -- uvicorn concepts:app --port 8001
# Start publisher
dapr run --app-id triage-agent --app-port 8000 --dapr-http-port 3500 \
--resources-path ./components -- uvicorn triage:app --port 8000
Docker Compose with Dapr
version: "3.8"
services:
triage-agent:
build: ./services/triage
ports:
- "8000:8000"
triage-agent-dapr:
image: daprio/daprd:latest
command: ["./daprd",
"--app-id", "triage-agent",
"--app-port", "8000",
"--dapr-http-port", "3500",
"--resources-path", "/components"
]
volumes:
- ./components:/components
network_mode: "service:triage-agent"
depends_on:
- triage-agent
kafka:
image: confluentinc/cp-kafka:latest
# ... kafka config
Troubleshooting
Check Dapr Sidecar
# View sidecar logs
kubectl logs deploy/triage-agent -c daprd
# Check component registration
curl http://localhost:3500/v1.0/metadata
Common Issues
| Error | Cause | Fix |
|---|---|---|
component not found |
Component not loaded | Check --resources-path or K8s namespace |
connection refused |
Kafka not reachable | Verify broker address in component |
consumer group rebalance |
Multiple instances | Use unique consumerGroup per app |
event not received |
Wrong topic/route | Check subscription config |
Debug Event Flow
# Publish test event
dapr publish --pubsub pubsub --topic test --data '{"test": true}'
# Check consumer logs
kubectl logs deploy/my-app -c daprd | grep -i subscribe
Verification
Run: python scripts/verify.py
Related Skills
deploying-kafka-k8s- Kafka cluster setup with Strimziscaffolding-fastapi-dapr- FastAPI services with Daprscaffolding-openai-agents- Agent orchestration patterns