Claude Code Plugins

Community-maintained marketplace

Feedback

configuring-dapr-pubsub

@mjunaidca/mjs-agent-skills
8
0

|

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name configuring-dapr-pubsub
description Configures Dapr pub/sub components for event-driven microservices with Kafka or Redis. Use when wiring agent-to-agent communication, setting up event subscriptions, or integrating Dapr sidecars. Covers component configuration, subscription patterns, publishing events, and Kubernetes deployment. NOT when using direct Kafka clients or non-Dapr messaging patterns.

Configuring Dapr Pub/Sub

Wire event-driven microservices using Dapr pub/sub with Kafka or Redis backends.

Quick Start

# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"
    - name: disableTls
      value: "true"
# Apply component
kubectl apply -f components/pubsub.yaml

# Test with Dapr CLI
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'

Component Configurations

Kafka (Production)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    # Required
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"

    # Consumer settings
    - name: consumerGroup
      value: "{namespace}-{appId}"  # Templated per deployment
    - name: consumeRetryInterval
      value: "100ms"
    - name: heartbeatInterval
      value: "3s"
    - name: sessionTimeout
      value: "10s"

    # Performance
    - name: maxMessageBytes
      value: "1048576"  # 1MB
    - name: channelBufferSize
      value: "256"

Kafka with SASL Authentication

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-secure
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka.example.com:9093"
    - name: authType
      value: "password"
    - name: saslUsername
      value: "dapr-user"
    - name: saslPassword
      secretKeyRef:
        name: kafka-secrets
        key: password
    - name: saslMechanism
      value: "SCRAM-SHA-256"

Redis (Development/Simple)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: "redis-master.redis.svc.cluster.local:6379"
    - name: redisPassword
      secretKeyRef:
        name: redis-secrets
        key: password

Subscription Patterns

Declarative Subscription (Recommended)

# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: task-created-subscription
spec:
  pubsubname: pubsub
  topic: task-created
  routes:
    default: /dapr/task-created
  scopes:
    - triage-agent
    - concepts-agent

Programmatic Subscription (FastAPI)

from fastapi import FastAPI, Request

app = FastAPI()

@app.get("/dapr/subscribe")
async def subscribe():
    """Dapr calls this to discover subscriptions."""
    return [
        {
            "pubsubname": "pubsub",
            "topic": "task-created",
            "route": "/dapr/task-created"
        },
        {
            "pubsubname": "pubsub",
            "topic": "task-completed",
            "route": "/dapr/task-completed"
        }
    ]

@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
    """Handle incoming CloudEvent."""
    event = await request.json()

    # CloudEvent wrapper - data is nested
    task_data = event.get("data", event)
    task_id = task_data.get("task_id")

    # Process event
    print(f"Task created: {task_id}")

    return {"status": "SUCCESS"}

Publishing Events

From FastAPI Service

import httpx

DAPR_URL = "http://localhost:3500"

async def publish_event(topic: str, data: dict):
    """Publish event through Dapr sidecar."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={"Content-Type": "application/json"}
        )
        response.raise_for_status()

# Usage
await publish_event("task-created", {
    "task_id": "123",
    "title": "Learn Python",
    "user_id": "user-456"
})

With CloudEvent Metadata

async def publish_cloudevent(topic: str, data: dict, event_type: str):
    """Publish with explicit CloudEvent fields."""
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={
                "Content-Type": "application/cloudevents+json",
                "ce-specversion": "1.0",
                "ce-type": event_type,
                "ce-source": "triage-agent",
                "ce-id": str(uuid.uuid4())
            }
        )

Kubernetes Deployment

Component Scoping

Limit component access to specific apps:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka:9092"
scopes:
  - triage-agent
  - concepts-agent
  - debug-agent

App Deployment with Dapr Sidecar

apiVersion: apps/v1
kind: Deployment
metadata:
  name: triage-agent
spec:
  replicas: 2
  selector:
    matchLabels:
      app: triage-agent
  template:
    metadata:
      labels:
        app: triage-agent
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "triage-agent"
        dapr.io/app-port: "8000"
        dapr.io/enable-api-logging: "true"
    spec:
      containers:
        - name: triage-agent
          image: myapp/triage-agent:latest
          ports:
            - containerPort: 8000
          env:
            - name: DAPR_HTTP_PORT
              value: "3500"

Multi-Agent Routing Pattern

Triage Agent → Specialist Agents

# triage_agent.py
from fastapi import FastAPI, Request
import httpx

app = FastAPI()
DAPR_URL = "http://localhost:3500"

@app.post("/api/question")
async def handle_question(request: Request):
    data = await request.json()
    question = data["question"]

    # Route based on content
    if "python" in question.lower() or "code" in question.lower():
        topic = "concepts-request"
    elif "error" in question.lower() or "bug" in question.lower():
        topic = "debug-request"
    else:
        topic = "concepts-request"  # Default

    # Publish to appropriate agent
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json={
                "question": question,
                "user_id": data["user_id"],
                "session_id": data["session_id"]
            }
        )

    return {"status": "routed", "topic": topic}

Specialist Agent Handler

# concepts_agent.py
from fastapi import FastAPI, Request
import httpx

app = FastAPI()
DAPR_URL = "http://localhost:3500"

@app.get("/dapr/subscribe")
async def subscribe():
    return [{"pubsubname": "pubsub", "topic": "concepts-request", "route": "/dapr/handle"}]

@app.post("/dapr/handle")
async def handle_concepts_request(request: Request):
    event = await request.json()
    data = event.get("data", event)

    # Process with LLM
    response = await process_with_llm(data["question"])

    # Publish response
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/response-ready",
            json={
                "session_id": data["session_id"],
                "response": response,
                "agent": "concepts"
            }
        )

    return {"status": "SUCCESS"}

Local Development

Run with Dapr CLI

# Start subscriber first
dapr run --app-id concepts-agent --app-port 8001 --dapr-http-port 3501 \
  --resources-path ./components -- uvicorn concepts:app --port 8001

# Start publisher
dapr run --app-id triage-agent --app-port 8000 --dapr-http-port 3500 \
  --resources-path ./components -- uvicorn triage:app --port 8000

Docker Compose with Dapr

version: "3.8"
services:
  triage-agent:
    build: ./services/triage
    ports:
      - "8000:8000"

  triage-agent-dapr:
    image: daprio/daprd:latest
    command: ["./daprd",
      "--app-id", "triage-agent",
      "--app-port", "8000",
      "--dapr-http-port", "3500",
      "--resources-path", "/components"
    ]
    volumes:
      - ./components:/components
    network_mode: "service:triage-agent"
    depends_on:
      - triage-agent

  kafka:
    image: confluentinc/cp-kafka:latest
    # ... kafka config

Troubleshooting

Check Dapr Sidecar

# View sidecar logs
kubectl logs deploy/triage-agent -c daprd

# Check component registration
curl http://localhost:3500/v1.0/metadata

Common Issues

Error Cause Fix
component not found Component not loaded Check --resources-path or K8s namespace
connection refused Kafka not reachable Verify broker address in component
consumer group rebalance Multiple instances Use unique consumerGroup per app
event not received Wrong topic/route Check subscription config

Debug Event Flow

# Publish test event
dapr publish --pubsub pubsub --topic test --data '{"test": true}'

# Check consumer logs
kubectl logs deploy/my-app -c daprd | grep -i subscribe

Verification

Run: python scripts/verify.py

Related Skills

  • deploying-kafka-k8s - Kafka cluster setup with Strimzi
  • scaffolding-fastapi-dapr - FastAPI services with Dapr
  • scaffolding-openai-agents - Agent orchestration patterns