Claude Code Plugins

Community-maintained marketplace

Feedback

Temporal.io workflow orchestration for SignalRoom. Use when designing workflows, debugging activities, managing schedules, or troubleshooting stuck workflows.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name temporal
description Temporal.io workflow orchestration for SignalRoom. Use when designing workflows, debugging activities, managing schedules, or troubleshooting stuck workflows.

Temporal Workflow Orchestration

Architecture

Temporal Cloud (signalroom-713.nzg5u)
    │
    ├── Schedules (cron triggers)
    │
    └── Workflows (orchestration)
            │
            └── Activities (actual work)
                    │
                    └── dlt Pipelines / Reports / Notifications

Key Concepts

Concept Purpose Location
Workflow Orchestration logic (no I/O) temporal/workflows.py
Activity Retryable unit of work temporal/activities.py
Worker Process that executes workflows/activities workers/main.py
Schedule Cron-like trigger Temporal Cloud UI

Workflows vs Activities

Workflows (pure orchestration):

  • No I/O, no network calls, no file access
  • Must be deterministic (same input = same output)
  • Can call activities and wait for results

Activities (actual work):

  • API calls, database writes, file operations
  • Retried automatically on failure
  • Can be long-running

SignalRoom Workflows

# Sync a single source
SyncSourceWorkflow(source_name, resources, notify_on_success, notify_on_failure)

# Sync multiple sources sequentially
ScheduledSyncWorkflow(sources)

# Run and send a report
RunReportWorkflow(report_name, channel, send)

Triggering Workflows

# Via script (recommended)
python scripts/trigger_workflow.py everflow -w

# Programmatically
from signalroom.temporal.config import get_temporal_client

client = await get_temporal_client()
await client.start_workflow(
    SyncSourceWorkflow.run,
    args=[...],
    id="sync-everflow-manual",
    task_queue="api-tasks"
)

Schedules

Active Schedules

Schedule ID Cron Workflow
hourly-sync-everflow-redtrack 0 12-23 * * * (7am-11pm ET) ScheduledSyncWorkflow
daily-sync-s3 0 11 * * * (6am ET) SyncSourceWorkflow

Managing Schedules

# Setup/update schedules
python scripts/setup_schedules.py

# Delete all schedules
python scripts/setup_schedules.py --delete

Temporal Cloud UI

https://cloud.temporal.io/namespaces/signalroom-713.nzg5u/schedules

Retry Policy

Defined in temporal/config.py:

RETRY_POLICY = RetryPolicy(
    initial_interval=timedelta(seconds=1),
    maximum_interval=timedelta(minutes=5),
    backoff_coefficient=2.0,
    maximum_attempts=5,
    non_retryable_error_types=["ValueError", "KeyError"],
)

Debugging Stuck Workflows

1. Check Temporal UI

https://cloud.temporal.io/namespaces/signalroom-713.nzg5u/workflows

Look for:

  • Workflow status (Running, Completed, Failed)
  • Pending activities
  • Event history

2. Check Worker Logs

# Local
make logs-worker

# Fly.io
fly logs

3. Common Issues

"Activity timed out"

  • Pipeline took > 30 minutes
  • Worker crashed mid-activity

"No worker available"

  • Worker not running
  • Wrong task queue

"Workflow stuck in Running"

  • Activity repeatedly failing and retrying
  • Check activity error in event history

UnsandboxedWorkflowRunner

Temporal sandboxes workflows by default. If you get RestrictedWorkflowAccessError from imports like structlog:

from temporalio.worker import UnsandboxedWorkflowRunner

worker = Worker(
    client,
    task_queue=settings.temporal_task_queue,
    workflows=[...],
    activities=[...],
    workflow_runner=UnsandboxedWorkflowRunner(),  # Add this
)

Activity Patterns

Async Context

Activities run in async context. Don't use asyncio.run() inside:

# WRONG - nested event loop
@activity.defn
async def my_activity():
    result = some_sync_function_that_uses_asyncio_run()  # Fails!

# RIGHT - await directly
@activity.defn
async def my_activity():
    result = await some_async_function()

Heartbeats (Long-Running)

For activities > 1 minute:

@activity.defn
async def long_activity():
    for batch in batches:
        activity.heartbeat()  # Prevent timeout
        await process(batch)

Resources