| name | multi-agent-orchestration |
| description | Enable Claude to orchestrate complex tasks by spawning and managing specialized sub-agents for parallel or sequential decomposition. Use when tasks have clear independent subtasks, require specialized approaches for different components, benefit from parallel processing, need fault isolation, or involve complex state management across multiple steps. Best for data pipelines, code analysis workflows, content creation pipelines, and multi-stage processing tasks. |
Multi-Agent Orchestration
Orchestrate complex tasks through specialized sub-agents with parallel or sequential execution.
Core Principles
- Minimal Context: Each sub-agent receives only task-specific data
- Clear Contracts: Explicit input/output for all agents
- File-Based Communication: Use filesystem for data exchange between agents
- Graceful Failure: Errors isolated to individual agents
- Resource Limits: Track concurrent operations and token usage
When to Use
✅ Use for:
- Data pipelines with extraction → transformation → analysis stages
- Code analysis with parallel linting, security scanning, performance checks
- Content creation with research → outline → writing → editing phases
- Multi-source data aggregation requiring parallel fetches
❌ Don't use for:
- Simple linear tasks
- Tasks where coordination overhead exceeds benefits
- Real-time interactions requiring constant back-and-forth
Quick Start
1. Import the Orchestrator
from scripts.orchestrator import Orchestrator, AgentTask, AgentStatus
2. Define Agent Tasks
orchestrator = Orchestrator(max_concurrent=3)
# Parallel extraction
orchestrator.register_task(AgentTask(
agent_id="extract_api",
role="API Data Extractor",
context={"endpoint": "api.example.com/data"},
instructions="Extract data and save to data/api_data.json"
))
orchestrator.register_task(AgentTask(
agent_id="extract_db",
role="Database Extractor",
context={"query": "SELECT * FROM users"},
instructions="Extract data and save to data/db_data.json"
))
# Sequential transformation (depends on extraction)
orchestrator.register_task(AgentTask(
agent_id="transform",
role="Data Transformer",
context={"input_files": ["data/api_data.json", "data/db_data.json"]},
instructions="Merge data and save to data/transformed.json",
dependencies=["extract_api", "extract_db"]
))
3. Execute
results = await orchestrator.execute_chain()
if results["transform"].status == AgentStatus.SUCCESS:
print(f"Output: {results['transform'].output}")
Execution Patterns
Parallel Execution
Independent tasks run simultaneously:
results = await orchestrator.execute_parallel()
Sequential Chain
Tasks execute respecting dependencies:
results = await orchestrator.execute_chain()
Mixed Workflow
Combine patterns - parallel stages with sequential dependencies between stages.
Agent Context Isolation
Each agent receives minimal context:
context = {
"input_file": "data.csv",
"operation": "filter_nulls",
"output_file": "filtered.csv"
}
Never pass:
- Full orchestrator state
- Other agents' results (unless explicit dependency)
- Entire conversation history
File-Based Communication
Agents communicate via filesystem:
# Agent 1 writes
with open("data/stage1_output.json", "w") as f:
json.dump(results, f)
# Agent 2 reads (via dependency)
context = {"input_file": "data/stage1_output.json"}
Use workspace directory structure:
/tmp/orchestration_workspace/
├── agent1/
│ ├── input.json
│ └── output.json
├── agent2/
│ └── results.json
└── shared/
└── common_data.json
Error Handling
# Automatic retry with exponential backoff
orchestrator = Orchestrator(max_retries=3)
# Check results
for agent_id, result in results.items():
if result.status == AgentStatus.FAILED:
print(f"Agent {agent_id} failed: {result.error}")
Resource Management
# Limit concurrent agents
orchestrator = Orchestrator(max_concurrent=5)
# Track token usage
orchestrator = TokenAwareOrchestrator(token_budget=100000)
Common Patterns
Data Pipeline
Extract (parallel) → Transform (sequential) → Analyze (sequential)
Code Analysis
Lint + Security Scan + Performance Check (parallel) → Report (sequential)
Content Creation
Research → Outline → [Intro + Body + Conclusion] (parallel) → Edit
Implementation Details
See references/implementation-guide.md for:
- Complete class implementations
- Advanced patterns (hierarchical delegation, message bus)
- Integration with Claude Code subprocess execution
- Monitoring and debugging strategies
See scripts/orchestrator.py for ready-to-use orchestrator classes.
Best Practices
- Start small: Test with 2-3 agents before scaling
- Profile execution: Log timing to identify bottlenecks
- Use timeouts: Set reasonable limits per agent
- Cleanup workspace: Remove temporary files after completion
- Log everything: Maintain execution log for debugging