| name | workflow-canvas |
| description | Celery canvas patterns for workflow composition including chains, groups, chords, signatures, error handling, and nested workflows. Use when building complex task workflows, parallel execution patterns, task synchronization, callback handling, or when user mentions canvas primitives, workflow composition, task chains, parallel processing, or chord patterns. |
| allowed-tools | Read, Write, Edit, Grep, Glob, Bash |
Workflow Canvas
Purpose: Implement Celery canvas patterns for composing complex, distributed task workflows.
Activation Triggers:
- Sequential task execution needed (chains)
- Parallel task processing required (groups)
- Synchronization after parallel work (chords)
- Complex workflow composition
- Task result aggregation
- Error handling in workflows
- Nested or conditional workflows
Key Resources:
scripts/test-workflow.sh- Test workflow execution and validate patternsscripts/validate-canvas.sh- Validate canvas structure and dependenciesscripts/generate-workflow.sh- Generate workflows from templatestemplates/- Complete workflow pattern implementationsexamples/- Real-world workflow scenarios with explanations
Canvas Primitives
1. Signatures
Foundation of canvas system - encapsulate task invocation details:
from celery import signature
# Named signature
signature('tasks.add', args=(2, 2), countdown=10)
# Using task method
add.signature((2, 2), countdown=10)
# Shortcut syntax (most common)
add.s(2, 2)
# Immutable signature (prevents result forwarding)
add.si(2, 2)
Use templates/chain-workflow.py for signature examples.
2. Chains
Sequential task execution where each task's result becomes next task's first argument:
from celery import chain
# Explicit chain
chain(add.s(2, 2), add.s(4), add.s(8))()
# Pipe syntax (recommended)
(add.s(2, 2) | add.s(4) | add.s(8))()
# With immutable tasks (independent execution)
(add.si(2, 2) | process.s() | notify.si('done'))()
Key Pattern: Use .si() when task should NOT receive previous result.
See templates/chain-workflow.py for complete patterns.
3. Groups
Execute multiple tasks in parallel, returns GroupResult:
from celery import group
# Parallel execution
group(add.s(i, i) for i in range(10))()
# With result tracking
job = group(process.s(item) for item in batch)
result = job.apply_async()
result.get() # Wait for all tasks
Requirements:
- Tasks must NOT ignore results
- Result backend required
See templates/group-parallel.py for implementation.
4. Chords
Group header + callback that executes after all header tasks complete:
from celery import chord
# Basic chord
chord(add.s(i, i) for i in range(100))(tsum.s()).get()
# With error handling
chord(
process.s(item) for item in batch
)(aggregate_results.s()).on_error(handle_chord_error.s())
Critical Requirements:
- Result backend MUST be enabled
- Set
task_ignore_result=Falseexplicitly - Redis 2.2+ for proper operation
Error Handling: Failed header tasks → callback receives ChordError with task ID and exception.
See templates/chord-pattern.py for complete implementation.
5. Map & Starmap
Built-in tasks for sequence processing (single message, sequential execution):
# Map: one call per element
add.map([(2, 2), (4, 4), (8, 8)])
# Starmap: unpacks tuples
add.starmap(zip(range(100), range(100)))
Difference from groups: Single task message vs multiple messages.
6. Chunks
Divide large iterables into sized batches:
# Process 100 items in chunks of 10
add.chunks(zip(range(100), range(100)), 10)
Returns: Nested lists corresponding to chunk outputs.
Advanced Patterns
Partial Application
# Incomplete signature
partial = add.s(2)
# Complete later (arguments prepended)
partial.delay(4) # Executes add(4, 2)
# Kwargs merge with precedence
partial = process.s(timeout=30)
partial.apply_async(kwargs={'timeout': 60}) # Uses 60
Nested Workflows
Combine primitives for complex logic:
# Chain of chords
workflow = chain(
chord([task1.s(), task2.s()])(callback1.s()),
chord([task3.s(), task4.s()])(callback2.s())
)
# Groups in chains
workflow = (
prepare.s() |
group([process.s(i) for i in range(10)]) |
finalize.s()
)
See templates/nested-workflows.py for production patterns.
Error Callbacks
# Task-level error handling
add.s(2, 2).on_error(log_error.s()).delay()
# Chord error handling
chord(
header_tasks
)(callback.s()).on_error(handle_error.s())
Errback signature: (request, exc, traceback)
Execution: Synchronous in worker.
See templates/error-handling-workflow.py for comprehensive patterns.
Complex Workflow Example
from celery import chain, group, chord
# Data processing pipeline
workflow = chain(
# Stage 1: Fetch and validate
fetch_data.s(),
validate_data.s(),
# Stage 2: Parallel processing
group([
transform_batch.s(i) for i in range(num_batches)
]),
# Stage 3: Aggregate results
chord([
aggregate_batch.s(i) for i in range(num_batches)
])(finalize_report.s()),
# Stage 4: Notify
send_notification.si('pipeline_complete')
)
# Execute with error handling
result = workflow.apply_async(
link_error=handle_pipeline_error.s()
)
See templates/complex-workflow.py for production-ready implementation.
Testing Workflows
Validate Canvas Structure
# Check workflow composition
./scripts/validate-canvas.sh path/to/workflow.py
# Validates:
# - Result backend enabled
# - Task ignore_result settings
# - Proper signature usage
# - Error callback patterns
Test Workflow Execution
# Run workflow with test data
./scripts/test-workflow.sh workflow_name
# Options:
# --dry-run : Validate without execution
# --verbose : Show detailed task flow
# --timeout 30 : Set execution timeout
Best Practices
DO:
- Enable result backend for groups/chords
- Use
.si()for independent tasks in chains - Implement error callbacks for critical workflows
- Set explicit timeouts for long-running workflows
- Use chunks for large data processing
- Add metadata with stamping API (5.3+)
- Make error callbacks idempotent
DON'T:
- Have tasks wait synchronously for other tasks
- Ignore results for tasks in groups
- Forget to call
super()inafter_return()overrides - Use chords without Redis 2.2+
- Create deeply nested workflows (>3 levels)
- Mix synchronous and async task calls
Configuration Requirements
Celery Config:
# Required for canvas
result_backend = 'redis://localhost:6379/0'
result_extended = True # Store task args/kwargs
# Recommended
task_track_started = True
result_expires = 3600 # 1 hour
# For large workflows
worker_prefetch_multiplier = 1
task_acks_late = True
Debugging
Visualize workflow:
from celery import DependencyGraph
graph = workflow.__graph__()
with open('workflow.dot', 'w') as f:
f.write(graph.to_dot())
# Convert to image:
# dot -Tpng workflow.dot -o workflow.png
Monitor execution:
result = workflow.apply_async()
print(f"Task ID: {result.id}")
print(f"Status: {result.status}")
print(f"Children: {result.children}")
Resources
Templates:
chain-workflow.py- Sequential task patternsgroup-parallel.py- Parallel execution patternschord-pattern.py- Synchronization patternscomplex-workflow.py- Multi-stage pipelineserror-handling-workflow.py- Error callback patternsnested-workflows.py- Advanced composition
Scripts:
test-workflow.sh- Execute and validate workflowsvalidate-canvas.sh- Static analysis of canvas patternsgenerate-workflow.sh- Generate workflows from templates
Examples:
examples/chain-example.md- Real-world chain scenariosexamples/group-example.md- Parallel processing use casesexamples/chord-example.md- Synchronization patternsexamples/complex-workflows.md- Production workflow architectures
Security Compliance
This skill follows strict security rules:
- All code examples use placeholder values only
- No real API keys, passwords, or secrets
- Environment variable references in all code
.gitignoreprotection documented
Version: 1.0.0 Celery Compatibility: 5.0+ Required Backend: Redis 2.2+ or compatible result backend