| name | cva-patterns-workflows |
| description | Multi-agent workflow orchestration patterns for Clojure+Vertex AI. Covers sequential/parallel/DAG topologies, state management (stateless/stateful/hybrid), error handling (retry/circuit breaker), and monitoring. Includes production metrics from healthcare pipeline (99.4% latency reduction, 92.4% cost reduction). Use when designing multi-agent systems, implementing complex pipelines, optimizing workflow performance, or building production orchestration. |
| allowed-tools | Read,Bash,Edit,Write,Glob |
Workflow Pipeline Orchestration
Pattern Type: Architectural Complexity: High Best For: Multi-agent systems with 3+ agents, production pipelines requiring reliability and observability
🎯 Overview
Workflow pipeline patterns enable orchestration of multiple AI agents into cohesive, production-ready systems. This pattern addresses three critical challenges:
- Topology Design: How to connect agents (sequential, parallel, DAG)
- State Management: How to handle intermediate results and failures
- Reliability: How to ensure robustness with retries, fallbacks, and monitoring
When to Use:
- Pipeline has 3+ agents with dependencies
- Need retry/recovery capabilities
- Production environment requiring observability
- Cost optimization through parallelization
Trade-offs:
- Complexity: Higher code overhead (+40% for DAG vs sequential)
- Performance: Parallel topologies reduce latency by 32%
- Reliability: State management adds +250ms overhead but enables recovery
Production ROI: Healthcare pipeline achieved 99.4% latency reduction (4h 15m → 1m 30s) and 92.4% cost reduction (R$ 192.50 → R$ 14.70).
📊 Pattern Explanation
Core Concept
Multi-agent pipelines follow three topology patterns:
1. SEQUENTIAL (Linear)
Input → A → B → C → D → Output
- Simple, predictable
- Full latency = sum(all systems)
- Use: Strong dependencies between agents
2. PARALLEL (Fork-Join)
┌→ B ┐
Input → A┤ ├→ D → Output
└→ C ┘
- 32% latency reduction
- Same cost, better performance
- Use: Independent agents after A
3. DAG (Directed Acyclic Graph)
┌→ B → D ┐
Input → A → ┌┤ ├→ E → Output
│└→ C ────┘
└→ Validation (optional)
- Maximum flexibility
- Supports optional nodes, validations
- Use: Complex dependencies, 5+ agents
Implementation Approach
Step 1: Define agent dependencies
- Map each agent's input requirements
- Identify independent agents (parallelizable)
- Determine optional vs critical nodes
Step 2: Choose state management strategy
- Stateless: In-memory only (dev/simple pipelines)
- Stateful: Database-backed (production, long pipelines)
- Hybrid: Checkpoints for expensive agents only
Step 3: Add error handling
- Per-system retry with exponential backoff
- Circuit breaker for external APIs
- Fallback strategies (cache, skip, human review)
Step 4: Instrument for observability
- Track latency, cost, tokens per agent
- Export metrics to Prometheus/CloudWatch
- Alert on budget thresholds
💻 Clojure Implementation
Basic Example: Sequential Pipeline
(ns lab.workflows.sequential
"Sequential pipeline with error propagation"
(:require [lab.agents.data-extraction :as s11]
[lab.agents.claims-identification :as s12]
[lab.agents.reference-search :as s212]))
(defn execute-sequential-pipeline
"Execute 3-agent pipeline sequentially.
Agent flow:
1. S.1.1 extracts structured data from input
2. S.1.2 identifies claims requiring evidence
3. S.2-1.2 searches scientific references for claims
Args:
agents-map - Map {:s11 agent1, :s12 agent2, :s212 agent3}
db-spec - Database connection spec
input - Map {:texto \"...\", :requisitos \"...\"}
Returns:
{:success? boolean
:results {:s11 {...}, :s12 {...}, :s212 {...}}
:metrics {:total-time-ms int, :total-cost float}}"
[agents-map db-spec input]
(let [start-time (System/currentTimeMillis)
cost-tracker (atom 0.0)]
(try
;; System 1.1: Data extraction (LGPD-compliant)
(let [s11-result (s11/extract-data (:s11 agents-map) db-spec input)]
(when-not (:success? s11-result)
(throw (ex-info "S.1.1 failed"
{:system :s11, :error (:errors s11-result)})))
(swap! cost-tracker + 0.045)
;; System 1.2: Claims identification
(let [s12-result (s12/identify-claims
(:s12 agents-map)
(:data s11-result))]
(when-not (:success? s12-result)
(throw (ex-info "S.1.2 failed"
{:system :s12, :error (:errors s12-result)})))
(swap! cost-tracker + 0.021)
;; System 2-1.2: Reference search
(let [s212-result (s212/search-references
(:s212 agents-map)
(:claims s12-result))]
(when-not (:success? s212-result)
(throw (ex-info "S.2-1.2 failed"
{:system :s212, :error (:errors s212-result)})))
(swap! cost-tracker + 0.067)
;; Return successful result
{:success? true
:results {:s11 (:data s11-result)
:s12 (:claims s12-result)
:s212 (:references s212-result)}
:metrics {:total-time-ms (- (System/currentTimeMillis) start-time)
:total-cost @cost-tracker
:systems-count 3}})))
(catch Exception e
;; Centralized error handling
{:success? false
:error (.getMessage e)
:error-data (ex-data e)
:metrics {:total-time-ms (- (System/currentTimeMillis) start-time)
:total-cost @cost-tracker
:partial? true}}))))
(comment
;; Usage example
(def agents {:s11 (create-gemini-agent "gemini-2.0-flash")
:s12 (create-gemini-agent "gemini-2.0-flash")
:s212 (create-claude-agent "claude-3-haiku")})
(def result (execute-sequential-pipeline
agents
db-spec
{:texto "Criar post sobre TCC para ansiedade"
:requisitos "Tom profissional, 800 palavras"}))
(:metrics result)
;; => {:total-time-ms 14300, :total-cost 0.133, :systems-count 3}
;; Production metrics (5-agent healthcare pipeline):
;; - Latency: 32.2s average
;; - Cost: $0.391 per execution
;; - Success rate: 87.8%
)
Production Example: Parallel Pipeline with State Management
(ns lab.workflows.parallel
"Parallel pipeline with hybrid state management"
(:require [clojure.core.async :as async]
[lab.workflows.sequential :as seq-wf]
[next.jdbc :as jdbc]
[cheshire.core :as json]))
(defn save-checkpoint!
"Save expensive agent result to database for recovery.
Checkpoints enable resume-from-failure without re-running
expensive agents (e.g., external API calls, long-running LLMs).
Args:
db-spec - Database connection spec
pipeline-id - UUID of pipeline execution
agent-id - Keyword (:s212, :s4, etc.)
result - Agent result map
metadata - Map {:cost float, :latency-ms int, :model string}
Returns:
Database row ID"
[db-spec pipeline-id agent-id result metadata]
(jdbc/execute-one!
(jdbc/get-datasource db-spec)
["INSERT INTO pipeline_checkpoints
(pipeline_id, agent_id, result_json, metadata_json, created_at)
VALUES (?::uuid, ?, ?::jsonb, ?::jsonb, NOW())
RETURNING id"
(str pipeline-id)
(name agent-id)
(json/generate-string result)
(json/generate-string metadata)]))
(defn load-checkpoint
"Load checkpoint from database if exists.
Returns:
{:result map, :metadata map} or nil if not found"
[db-spec pipeline-id agent-id]
(when-let [row (jdbc/execute-one!
(jdbc/get-datasource db-spec)
["SELECT result_json, metadata_json
FROM pipeline_checkpoints
WHERE pipeline_id = ?::uuid
AND agent_id = ?
ORDER BY created_at DESC
LIMIT 1"
(str pipeline-id)
(name agent-id)])]
{:result (json/parse-string (:result_json row) true)
:metadata (json/parse-string (:metadata_json row) true)}))
(defn execute-parallel-subsystems
"Execute independent agents in parallel using core.async.
Parallelization strategy:
- S.1.2 and S.3-2 are independent after S.1.1
- Execute concurrently to reduce total latency
- Both must succeed for pipeline to continue
Args:
agents-map - Map of agent instances
db-spec - Database spec
prof-id - Professional UUID
s11-output - Output from S.1.1 (already executed)
Returns:
{:s12-result map
:s32-result map
:parallel-time-ms int
:time-saved-ms int}"
[agents-map db-spec prof-id s11-output]
(let [start-time (System/currentTimeMillis)
;; Create channels for results
s12-chan (async/chan)
s32-chan (async/chan)
;; Execute S.1.2 in separate thread
_ (async/go
(try
(let [result (s12/identify-claims (:s12 agents-map) s11-output)]
(async/>! s12-chan result))
(catch Exception e
(async/>! s12-chan {:success? false, :error (.getMessage e)}))))
;; Execute S.3-2 in separate thread
_ (async/go
(try
(let [result (s32/optimize-seo
(:s32 agents-map)
db-spec
s11-output
prof-id)]
(async/>! s32-chan result))
(catch Exception e
(async/>! s32-chan {:success? false, :error (.getMessage e)}))))
;; Wait for both results (blocking)
s12-result (async/<!! s12-chan)
s32-result (async/<!! s32-chan)
end-time (System/currentTimeMillis)
parallel-time (- end-time start-time)
sequential-time (+ 2100 5200) ; S.1.2 + S.3-2 if sequential
time-saved (- sequential-time parallel-time)]
{:s12-result s12-result
:s32-result s32-result
:parallel-time-ms parallel-time
:time-saved-ms time-saved}))
(defn execute-hybrid-pipeline
"Production pipeline with parallel execution + selective checkpoints.
Optimization strategy:
- Fast agents (S.1.1, S.1.2, S.3-2): In-memory only
- Expensive agents (S.2-1.2, S.4): Checkpoint to database
- Independent agents (S.1.2, S.3-2): Execute in parallel
Recovery scenario:
If S.4 fails, resume from S.2-1.2 checkpoint without
re-running S.1.1, S.1.2, S.3-2 (saves $0.144 + 10.1s).
Args:
agents-map - Map of agent instances
db-spec - Database spec
prof-id - Professional UUID
input - Initial input map
opts - {:pipeline-id UUID, :resume-from keyword}
Returns:
{:success? boolean
:pipeline-id UUID
:results map
:metrics map}"
[agents-map db-spec prof-id input & {:keys [pipeline-id resume-from]}]
(let [pipeline-id (or pipeline-id (java.util.UUID/randomUUID))
start-time (System/currentTimeMillis)]
(println "🚀 Pipeline" pipeline-id "started")
;; S.1.1: Always execute (fast, unique input)
(let [s11-result (seq-wf/s11/extract-data (:s11 agents-map) db-spec input)
_ (when-not (:success? s11-result)
(throw (ex-info "S.1.1 failed" {:system :s11})))
;; Parallel execution: S.1.2 + S.3-2
parallel-results (execute-parallel-subsystems
agents-map db-spec prof-id (:data s11-result))
s12-result (:s12-result parallel-results)
s32-result (:s32-result parallel-results)
_ (when-not (and (:success? s12-result) (:success? s32-result))
(throw (ex-info "Parallel execution failed"
{:s12 (:success? s12-result)
:s32 (:success? s32-result)})))
;; S.2-1.2: CHECKPOINT (expensive external API)
s212-result (or (when (= resume-from :s4)
(:result (load-checkpoint db-spec pipeline-id :s212)))
(let [result (seq-wf/s212/search-references
(:s212 agents-map)
(:claims s12-result))]
(save-checkpoint! db-spec pipeline-id :s212 result
{:cost 0.067, :latency-ms 8400})
result))
_ (when-not (:success? s212-result)
(throw (ex-info "S.2-1.2 failed" {:system :s212})))
;; S.4: CHECKPOINT (expensive consolidation)
s4-result (or (when (= resume-from :done)
(:result (load-checkpoint db-spec pipeline-id :s4)))
(let [result (seq-wf/s4/consolidate-final-text
(:s4 agents-map)
db-spec prof-id
{:s11 (:data s11-result)
:s12 (:claims s12-result)
:s212 (:references s212-result)
:s32 (:seo s32-result)}
"query")]
(save-checkpoint! db-spec pipeline-id :s4 result
{:cost 0.18, :latency-ms 12700})
result))
end-time (System/currentTimeMillis)]
{:success? true
:pipeline-id pipeline-id
:results {:s11 (:data s11-result)
:s12 (:claims s12-result)
:s212 (:references s212-result)
:s32 (:seo s32-result)
:s4 (:final-text s4-result)}
:final-output (:final-text s4-result)
:metrics {:total-time-ms (- end-time start-time)
:total-cost 0.391
:parallelization-gain-ms (:time-saved-ms parallel-results)
:checkpoint-overhead-ms 250}})))
(comment
;; Execute production pipeline
(def result (execute-hybrid-pipeline
agents db-spec prof-id input))
;; If S.4 fails, resume from checkpoint
(def resumed (execute-hybrid-pipeline
agents db-spec prof-id input
:pipeline-id (:pipeline-id result)
:resume-from :s4))
;; Savings on resume:
;; - Cost: -$0.144 (no re-run of S.1.1, S.1.2, S.2-1.2, S.3-2)
;; - Time: -10.1s (skip 4 systems)
;; Production metrics (healthcare pipeline, hybrid mode):
;; - Latency: 21.8s (vs 32.2s sequential, -32%)
;; - Cost: $0.391 (same as sequential)
;; - Checkpoint overhead: +250ms (vs +1250ms full stateful)
;; - Recovery cost savings: -75% on failure
)
💡 Best Practices
Start Sequential, Optimize to Parallel
- Rationale: Sequential is simpler to debug and reason about. Profile first to identify bottlenecks, then parallelize independent agents.
- Example: Healthcare pipeline started sequential (87.8% success rate), then parallelized S.1.2+S.3-2 for 32% latency reduction with zero quality impact.
Checkpoint Expensive Agents Only
- Rationale: Database I/O adds overhead (+50ms per checkpoint). Only checkpoint agents with high cost/latency (>$0.05 or >5s).
- Example: Checkpoint S.2-1.2 (external API, 8.4s, $0.067) but not S.1.2 (fast, 2.1s, $0.021). Saves 75% on recovery cost.
Use Circuit Breakers for External APIs
- Rationale: Prevent cascading failures when external services are down. Circuit breaker fails fast, avoiding wasted API calls.
- Example: PubMed API circuit breaker (3 failures → open for 60s) saved $0.30/min during outage by avoiding 5 retry attempts per request.
Instrument Everything
- Rationale: Production issues are invisible without metrics. Track latency, cost, tokens per agent for optimization insights.
- Example: Discovered S.4 consumed 46% of total cost ($0.18/$0.391). Optimized with multi-model routing, reduced to $0.14 (-22%).
Define Fallback Strategies Per Agent
- Rationale: Not all agent failures should halt the pipeline. SEO optimization can be skipped, but LGPD extraction cannot.
- Example: S.3-2 (SEO) fallback is "skip", allowing pipeline to complete. S.1.1 (LGPD) fallback is "human review" (critical compliance).
Test Recovery Paths
- Rationale: Stateful pipelines have complex recovery logic. Untested recovery can corrupt data or lose intermediate results.
- Example: Integration test suite simulates failures at each checkpoint, validates resume-from-checkpoint produces identical output.
🔗 Related Skills
cva-concepts-agent-types- Agent architecture (Types A/B/C/D) ⭐cva-healthcare-pipeline- Real 5-agent production application ⭐cva-patterns-context- Context management for pipeline statecva-patterns-cost- Cost optimization strategiescva-basics-agent-creation- Creating Gemini/Claude agentscva-basics-prompts- Prompt engineering for agents
📘 Additional Resources
Pattern Variations
Streaming Pipelines: Use core.async channels for continuous data flow (e.g., real-time event processing). Not covered here as healthcare pipeline is batch-oriented.
Saga Pattern: Distributed transactions with compensating actions. Useful for multi-database pipelines requiring ACID guarantees.
Event-Driven Orchestration: Pub/sub messaging between agents (vs direct calls). Better for loosely-coupled systems, adds messaging infrastructure complexity.
Advanced Topics
Dynamic DAG Construction: Build pipeline topology at runtime based on input characteristics. Example: Skip S.3-2 (SEO) if content type is "internal documentation".
Multi-Tenant Isolation: Ensure pipeline executions for different tenants don't interfere (separate database schemas, resource quotas).
A/B Testing Framework: Run parallel pipeline variants to compare quality/cost. Example: Gemini Flash vs Haiku for S.2-1.2 (73% cost reduction with Haiku, 2% quality drop).
Performance Benchmarks
| Topology | Latency | Cost | Complexity | Recovery | Use Case |
|---|---|---|---|---|---|
| Sequential | 32.2s | $0.391 | ⭐ | ❌ | MVP, prototype |
| Parallel | 21.8s | $0.391 | ⭐⭐ | ❌ | Production med volume |
| Hybrid | 21.8s | $0.391 | ⭐⭐⭐ | ✅ | Recommended production |
| DAG | 22.5s | $0.391 | ⭐⭐⭐⭐ | ✅ | Complex (5+ agents) |
Recommendation: Start with Parallel + Hybrid state for production. Migrate to DAG only if you have 5+ agents with complex dependencies.