Claude Code Plugins

Community-maintained marketplace

Feedback
8
0

Integrate Dapr for pub/sub messaging and scheduled jobs. Use this skill when implementing event-driven architectures with Dapr, handling CloudEvent message formats, setting up pub/sub subscriptions, or scheduling jobs with Dapr Jobs API. Covers common pitfalls like CloudEvent unwrapping and callback URL patterns.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name dapr-integration
description Integrate Dapr for pub/sub messaging and scheduled jobs. Use this skill when implementing event-driven architectures with Dapr, handling CloudEvent message formats, setting up pub/sub subscriptions, or scheduling jobs with Dapr Jobs API. Covers common pitfalls like CloudEvent unwrapping and callback URL patterns.

Dapr Integration

Integrate Dapr sidecar for pub/sub messaging, state management, and scheduled jobs in Kubernetes environments.

When to Use

  • Setting up Dapr pub/sub for event-driven microservices
  • Scheduling jobs with Dapr Jobs API (v1.0-alpha1)
  • Handling CloudEvent message formats
  • Implementing subscription handlers in FastAPI
  • Debugging Dapr integration issues

Quick Start

# Install Dapr CLI
curl -fsSL https://raw.githubusercontent.com/dapr/cli/master/install/install.sh | bash

# Initialize Dapr (local)
dapr init

# Run app with Dapr sidecar
dapr run --app-id myapp --app-port 8000 -- uvicorn main:app

Core Patterns

1. Pub/Sub Subscription Handler (FastAPI)

from fastapi import APIRouter, Request
from sqlmodel.ext.asyncio.session import AsyncSession

router = APIRouter(prefix="/dapr", tags=["Dapr"])

# Topics we subscribe to
SUBSCRIPTIONS = [
    {"pubsubname": "taskflow-pubsub", "topic": "task-events", "route": "/dapr/events/task-events"},
    {"pubsubname": "taskflow-pubsub", "topic": "reminders", "route": "/dapr/events/reminders"},
]

@router.get("/subscribe")
async def get_subscriptions() -> list[dict]:
    """Dapr calls this on startup to discover subscriptions."""
    return SUBSCRIPTIONS

2. CloudEvent Handling (CRITICAL)

Dapr wraps all pub/sub messages in CloudEvent format. You MUST unwrap it.

@router.post("/events/task-events")
async def handle_task_events(
    request: Request,
    session: AsyncSession = Depends(get_session),
) -> dict:
    try:
        # Step 1: Get raw CloudEvent
        raw_event = await request.json()

        # Step 2: ALWAYS unwrap CloudEvent "data" field
        # CloudEvent structure:
        # {
        #   "data": {              <-- Your payload is HERE
        #     "event_type": "task.created",
        #     "data": {...},
        #     "timestamp": "..."
        #   },
        #   "datacontenttype": "application/json",
        #   "id": "...",
        #   "pubsubname": "taskflow-pubsub",
        #   "source": "myapp",
        #   "topic": "task-events",
        #   ...
        # }
        event = raw_event.get("data", raw_event)  # Unwrap or use as-is

        # Step 3: Now access your payload
        event_type = event.get("event_type")  # "task.created"
        data = event.get("data", {})          # Your actual data

        # Process event...

        return {"status": "SUCCESS"}

    except Exception as e:
        logger.exception("Error handling event: %s", e)
        # Return SUCCESS to prevent Dapr retries for bad events
        return {"status": "SUCCESS"}

3. Publishing Events

import httpx

DAPR_HTTP_ENDPOINT = "http://localhost:3500"
PUBSUB_NAME = "taskflow-pubsub"

async def publish_event(
    topic: str,
    event_type: str,
    data: dict,
) -> bool:
    """Publish event to Dapr pub/sub."""
    url = f"{DAPR_HTTP_ENDPOINT}/v1.0/publish/{PUBSUB_NAME}/{topic}"

    payload = {
        "event_type": event_type,
        "data": data,
        "timestamp": datetime.utcnow().isoformat(),
    }

    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.post(url, json=payload)
            return response.status_code == 204
    except Exception as e:
        logger.error("Failed to publish event: %s", e)
        return False

4. Dapr Jobs API (Scheduled Jobs)

CRITICAL: Dapr Jobs v1.0-alpha1 calls back to /job/{job_name} by default!

# Scheduling a job
async def schedule_job(
    job_name: str,
    due_time: datetime,
    data: dict,
    dapr_http_endpoint: str = "http://localhost:3500",
) -> bool:
    """Schedule a one-time Dapr job."""
    url = f"{dapr_http_endpoint}/v1.0-alpha1/jobs/{job_name}"

    payload = {
        "dueTime": due_time.strftime("%Y-%m-%dT%H:%M:%SZ"),  # RFC3339
        "data": data,
    }

    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.post(url, json=payload)
            return response.status_code == 204
    except Exception as e:
        logger.error("Failed to schedule job: %s", e)
        return False

Handling the callback - Dapr calls /job/{job_name}, NOT a custom endpoint:

# WRONG - Dapr won't call this!
@router.post("/api/jobs/trigger")
async def handle_trigger(...):
    pass

# CORRECT - This is what Dapr actually calls
@router.post("/job/{job_name}")
async def handle_dapr_job_callback(
    job_name: str,
    request: Request,
    session: AsyncSession = Depends(get_session),
) -> dict:
    """Handle Dapr Jobs v1.0-alpha1 callback.

    Dapr calls POST /job/{job_name} when a scheduled job fires.
    """
    try:
        body = await request.json()
        job_data = body.get("data", body)  # Unwrap if needed

        task_id = job_data.get("task_id")
        job_type = job_data.get("type")

        logger.info("Job callback: job=%s, type=%s", job_name, job_type)

        if job_type == "reminder":
            return await handle_reminder(session, job_data)
        elif job_type == "spawn":
            return await handle_spawn(session, task_id)

        return {"status": "unknown_type"}

    except Exception as e:
        logger.exception("Error handling job %s: %s", job_name, e)
        return {"status": "error"}

5. Deleting Scheduled Jobs

async def delete_job(
    job_name: str,
    dapr_http_endpoint: str = "http://localhost:3500",
) -> bool:
    """Cancel a scheduled Dapr job."""
    url = f"{dapr_http_endpoint}/v1.0-alpha1/jobs/{job_name}"

    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.delete(url)
            # 204 = deleted, 500 = not found (both OK)
            return response.status_code in (204, 500)
    except Exception as e:
        logger.error("Failed to delete job: %s", e)
        return False

Kubernetes/Helm Configuration

Dapr Pub/Sub Component (Redis)

# dapr-pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: taskflow-pubsub
  namespace: taskflow
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: "redis:6379"
    - name: redisPassword
      secretKeyRef:
        name: redis-secret
        key: password
    - name: enableTLS
      value: "true"  # Required for Upstash

Dapr Annotations for Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: taskflow-api
spec:
  template:
    metadata:
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "taskflow-api"
        dapr.io/app-port: "8000"
        dapr.io/enable-api-logging: "true"

Common Pitfalls

1. Not Unwrapping CloudEvent

# WRONG - event_type will be None!
event = await request.json()
event_type = event.get("event_type")  # None - it's nested in "data"

# CORRECT
raw_event = await request.json()
event = raw_event.get("data", raw_event)  # Unwrap CloudEvent
event_type = event.get("event_type")  # "task.created"

2. Wrong Job Callback URL

# WRONG - Dapr calls /job/{name}, not custom endpoints
@router.post("/api/jobs/trigger")  # Dapr won't call this!

# CORRECT
@router.post("/job/{job_name}")  # Dapr WILL call this

3. Forgetting to Return SUCCESS

# WRONG - Dapr will retry on errors
@router.post("/events/task-events")
async def handle(request: Request):
    try:
        # process...
        return {"status": "SUCCESS"}
    except Exception:
        raise  # Dapr will retry!

# CORRECT - Always return SUCCESS to stop retries
@router.post("/events/task-events")
async def handle(request: Request):
    try:
        # process...
    except Exception as e:
        logger.exception("Error: %s", e)
    return {"status": "SUCCESS"}  # Always, even on error

4. Using Wrong Dapr HTTP Port

# Local development
DAPR_HTTP_ENDPOINT = "http://localhost:3500"

# In Kubernetes (sidecar)
DAPR_HTTP_ENDPOINT = "http://localhost:3500"  # Same! Sidecar is localhost

Debugging

Check Dapr Sidecar Logs

# Kubernetes
kubectl logs deploy/myapp -c daprd -n mynamespace

# Look for:
# - "Scheduler stream connected" = Jobs API working
# - "HTTP API Called" = API calls to Dapr

Verify Subscriptions

# Call your subscribe endpoint
curl http://localhost:8000/dapr/subscribe

# Should return your subscriptions list

Test Pub/Sub Locally

# Publish test event
curl -X POST http://localhost:3500/v1.0/publish/taskflow-pubsub/task-events \
  -H "Content-Type: application/json" \
  -d '{"event_type": "test", "data": {}}'

References