Claude Code Plugins

Community-maintained marketplace

Feedback

cva-patterns-workflows

@joaopelegrino/hello-word-closure
0
0

Multi-agent workflow orchestration patterns for Clojure+Vertex AI. Covers sequential/parallel/DAG topologies, state management (stateless/stateful/hybrid), error handling (retry/circuit breaker), and monitoring. Includes production metrics from healthcare pipeline (99.4% latency reduction, 92.4% cost reduction). Use when designing multi-agent systems, implementing complex pipelines, optimizing workflow performance, or building production orchestration.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name cva-patterns-workflows
description Multi-agent workflow orchestration patterns for Clojure+Vertex AI. Covers sequential/parallel/DAG topologies, state management (stateless/stateful/hybrid), error handling (retry/circuit breaker), and monitoring. Includes production metrics from healthcare pipeline (99.4% latency reduction, 92.4% cost reduction). Use when designing multi-agent systems, implementing complex pipelines, optimizing workflow performance, or building production orchestration.
allowed-tools Read,Bash,Edit,Write,Glob

Workflow Pipeline Orchestration

Pattern Type: Architectural Complexity: High Best For: Multi-agent systems with 3+ agents, production pipelines requiring reliability and observability

🎯 Overview

Workflow pipeline patterns enable orchestration of multiple AI agents into cohesive, production-ready systems. This pattern addresses three critical challenges:

  1. Topology Design: How to connect agents (sequential, parallel, DAG)
  2. State Management: How to handle intermediate results and failures
  3. Reliability: How to ensure robustness with retries, fallbacks, and monitoring

When to Use:

  • Pipeline has 3+ agents with dependencies
  • Need retry/recovery capabilities
  • Production environment requiring observability
  • Cost optimization through parallelization

Trade-offs:

  • Complexity: Higher code overhead (+40% for DAG vs sequential)
  • Performance: Parallel topologies reduce latency by 32%
  • Reliability: State management adds +250ms overhead but enables recovery

Production ROI: Healthcare pipeline achieved 99.4% latency reduction (4h 15m → 1m 30s) and 92.4% cost reduction (R$ 192.50 → R$ 14.70).

📊 Pattern Explanation

Core Concept

Multi-agent pipelines follow three topology patterns:

1. SEQUENTIAL (Linear)
   Input → A → B → C → D → Output
   - Simple, predictable
   - Full latency = sum(all systems)
   - Use: Strong dependencies between agents

2. PARALLEL (Fork-Join)
            ┌→ B ┐
   Input → A┤    ├→ D → Output
            └→ C ┘
   - 32% latency reduction
   - Same cost, better performance
   - Use: Independent agents after A

3. DAG (Directed Acyclic Graph)
                 ┌→ B → D ┐
   Input → A → ┌┤        ├→ E → Output
               │└→ C ────┘
               └→ Validation (optional)
   - Maximum flexibility
   - Supports optional nodes, validations
   - Use: Complex dependencies, 5+ agents

Implementation Approach

Step 1: Define agent dependencies

  • Map each agent's input requirements
  • Identify independent agents (parallelizable)
  • Determine optional vs critical nodes

Step 2: Choose state management strategy

  • Stateless: In-memory only (dev/simple pipelines)
  • Stateful: Database-backed (production, long pipelines)
  • Hybrid: Checkpoints for expensive agents only

Step 3: Add error handling

  • Per-system retry with exponential backoff
  • Circuit breaker for external APIs
  • Fallback strategies (cache, skip, human review)

Step 4: Instrument for observability

  • Track latency, cost, tokens per agent
  • Export metrics to Prometheus/CloudWatch
  • Alert on budget thresholds

💻 Clojure Implementation

Basic Example: Sequential Pipeline

(ns lab.workflows.sequential
  "Sequential pipeline with error propagation"
  (:require [lab.agents.data-extraction :as s11]
            [lab.agents.claims-identification :as s12]
            [lab.agents.reference-search :as s212]))

(defn execute-sequential-pipeline
  "Execute 3-agent pipeline sequentially.

  Agent flow:
  1. S.1.1 extracts structured data from input
  2. S.1.2 identifies claims requiring evidence
  3. S.2-1.2 searches scientific references for claims

  Args:
    agents-map - Map {:s11 agent1, :s12 agent2, :s212 agent3}
    db-spec    - Database connection spec
    input      - Map {:texto \"...\", :requisitos \"...\"}

  Returns:
    {:success? boolean
     :results  {:s11 {...}, :s12 {...}, :s212 {...}}
     :metrics  {:total-time-ms int, :total-cost float}}"
  [agents-map db-spec input]
  (let [start-time (System/currentTimeMillis)
        cost-tracker (atom 0.0)]

    (try
      ;; System 1.1: Data extraction (LGPD-compliant)
      (let [s11-result (s11/extract-data (:s11 agents-map) db-spec input)]
        (when-not (:success? s11-result)
          (throw (ex-info "S.1.1 failed"
                          {:system :s11, :error (:errors s11-result)})))
        (swap! cost-tracker + 0.045)

        ;; System 1.2: Claims identification
        (let [s12-result (s12/identify-claims
                           (:s12 agents-map)
                           (:data s11-result))]
          (when-not (:success? s12-result)
            (throw (ex-info "S.1.2 failed"
                            {:system :s12, :error (:errors s12-result)})))
          (swap! cost-tracker + 0.021)

          ;; System 2-1.2: Reference search
          (let [s212-result (s212/search-references
                              (:s212 agents-map)
                              (:claims s12-result))]
            (when-not (:success? s212-result)
              (throw (ex-info "S.2-1.2 failed"
                              {:system :s212, :error (:errors s212-result)})))
            (swap! cost-tracker + 0.067)

            ;; Return successful result
            {:success? true
             :results {:s11 (:data s11-result)
                       :s12 (:claims s12-result)
                       :s212 (:references s212-result)}
             :metrics {:total-time-ms (- (System/currentTimeMillis) start-time)
                       :total-cost @cost-tracker
                       :systems-count 3}})))

      (catch Exception e
        ;; Centralized error handling
        {:success? false
         :error (.getMessage e)
         :error-data (ex-data e)
         :metrics {:total-time-ms (- (System/currentTimeMillis) start-time)
                   :total-cost @cost-tracker
                   :partial? true}}))))

(comment
  ;; Usage example
  (def agents {:s11 (create-gemini-agent "gemini-2.0-flash")
               :s12 (create-gemini-agent "gemini-2.0-flash")
               :s212 (create-claude-agent "claude-3-haiku")})

  (def result (execute-sequential-pipeline
                agents
                db-spec
                {:texto "Criar post sobre TCC para ansiedade"
                 :requisitos "Tom profissional, 800 palavras"}))

  (:metrics result)
  ;; => {:total-time-ms 14300, :total-cost 0.133, :systems-count 3}

  ;; Production metrics (5-agent healthcare pipeline):
  ;; - Latency: 32.2s average
  ;; - Cost: $0.391 per execution
  ;; - Success rate: 87.8%
  )

Production Example: Parallel Pipeline with State Management

(ns lab.workflows.parallel
  "Parallel pipeline with hybrid state management"
  (:require [clojure.core.async :as async]
            [lab.workflows.sequential :as seq-wf]
            [next.jdbc :as jdbc]
            [cheshire.core :as json]))

(defn save-checkpoint!
  "Save expensive agent result to database for recovery.

  Checkpoints enable resume-from-failure without re-running
  expensive agents (e.g., external API calls, long-running LLMs).

  Args:
    db-spec     - Database connection spec
    pipeline-id - UUID of pipeline execution
    agent-id    - Keyword (:s212, :s4, etc.)
    result      - Agent result map
    metadata    - Map {:cost float, :latency-ms int, :model string}

  Returns:
    Database row ID"
  [db-spec pipeline-id agent-id result metadata]
  (jdbc/execute-one!
    (jdbc/get-datasource db-spec)
    ["INSERT INTO pipeline_checkpoints
      (pipeline_id, agent_id, result_json, metadata_json, created_at)
      VALUES (?::uuid, ?, ?::jsonb, ?::jsonb, NOW())
      RETURNING id"
     (str pipeline-id)
     (name agent-id)
     (json/generate-string result)
     (json/generate-string metadata)]))

(defn load-checkpoint
  "Load checkpoint from database if exists.

  Returns:
    {:result map, :metadata map} or nil if not found"
  [db-spec pipeline-id agent-id]
  (when-let [row (jdbc/execute-one!
                   (jdbc/get-datasource db-spec)
                   ["SELECT result_json, metadata_json
                     FROM pipeline_checkpoints
                     WHERE pipeline_id = ?::uuid
                       AND agent_id = ?
                     ORDER BY created_at DESC
                     LIMIT 1"
                    (str pipeline-id)
                    (name agent-id)])]
    {:result (json/parse-string (:result_json row) true)
     :metadata (json/parse-string (:metadata_json row) true)}))

(defn execute-parallel-subsystems
  "Execute independent agents in parallel using core.async.

  Parallelization strategy:
  - S.1.2 and S.3-2 are independent after S.1.1
  - Execute concurrently to reduce total latency
  - Both must succeed for pipeline to continue

  Args:
    agents-map  - Map of agent instances
    db-spec     - Database spec
    prof-id     - Professional UUID
    s11-output  - Output from S.1.1 (already executed)

  Returns:
    {:s12-result map
     :s32-result map
     :parallel-time-ms int
     :time-saved-ms int}"
  [agents-map db-spec prof-id s11-output]
  (let [start-time (System/currentTimeMillis)

        ;; Create channels for results
        s12-chan (async/chan)
        s32-chan (async/chan)

        ;; Execute S.1.2 in separate thread
        _ (async/go
            (try
              (let [result (s12/identify-claims (:s12 agents-map) s11-output)]
                (async/>! s12-chan result))
              (catch Exception e
                (async/>! s12-chan {:success? false, :error (.getMessage e)}))))

        ;; Execute S.3-2 in separate thread
        _ (async/go
            (try
              (let [result (s32/optimize-seo
                             (:s32 agents-map)
                             db-spec
                             s11-output
                             prof-id)]
                (async/>! s32-chan result))
              (catch Exception e
                (async/>! s32-chan {:success? false, :error (.getMessage e)}))))

        ;; Wait for both results (blocking)
        s12-result (async/<!! s12-chan)
        s32-result (async/<!! s32-chan)

        end-time (System/currentTimeMillis)
        parallel-time (- end-time start-time)
        sequential-time (+ 2100 5200)  ; S.1.2 + S.3-2 if sequential
        time-saved (- sequential-time parallel-time)]

    {:s12-result s12-result
     :s32-result s32-result
     :parallel-time-ms parallel-time
     :time-saved-ms time-saved}))

(defn execute-hybrid-pipeline
  "Production pipeline with parallel execution + selective checkpoints.

  Optimization strategy:
  - Fast agents (S.1.1, S.1.2, S.3-2): In-memory only
  - Expensive agents (S.2-1.2, S.4): Checkpoint to database
  - Independent agents (S.1.2, S.3-2): Execute in parallel

  Recovery scenario:
  If S.4 fails, resume from S.2-1.2 checkpoint without
  re-running S.1.1, S.1.2, S.3-2 (saves $0.144 + 10.1s).

  Args:
    agents-map  - Map of agent instances
    db-spec     - Database spec
    prof-id     - Professional UUID
    input       - Initial input map
    opts        - {:pipeline-id UUID, :resume-from keyword}

  Returns:
    {:success? boolean
     :pipeline-id UUID
     :results map
     :metrics map}"
  [agents-map db-spec prof-id input & {:keys [pipeline-id resume-from]}]
  (let [pipeline-id (or pipeline-id (java.util.UUID/randomUUID))
        start-time (System/currentTimeMillis)]

    (println "🚀 Pipeline" pipeline-id "started")

    ;; S.1.1: Always execute (fast, unique input)
    (let [s11-result (seq-wf/s11/extract-data (:s11 agents-map) db-spec input)
          _ (when-not (:success? s11-result)
              (throw (ex-info "S.1.1 failed" {:system :s11})))

          ;; Parallel execution: S.1.2 + S.3-2
          parallel-results (execute-parallel-subsystems
                             agents-map db-spec prof-id (:data s11-result))

          s12-result (:s12-result parallel-results)
          s32-result (:s32-result parallel-results)
          _ (when-not (and (:success? s12-result) (:success? s32-result))
              (throw (ex-info "Parallel execution failed"
                              {:s12 (:success? s12-result)
                               :s32 (:success? s32-result)})))

          ;; S.2-1.2: CHECKPOINT (expensive external API)
          s212-result (or (when (= resume-from :s4)
                           (:result (load-checkpoint db-spec pipeline-id :s212)))
                         (let [result (seq-wf/s212/search-references
                                        (:s212 agents-map)
                                        (:claims s12-result))]
                           (save-checkpoint! db-spec pipeline-id :s212 result
                                            {:cost 0.067, :latency-ms 8400})
                           result))
          _ (when-not (:success? s212-result)
              (throw (ex-info "S.2-1.2 failed" {:system :s212})))

          ;; S.4: CHECKPOINT (expensive consolidation)
          s4-result (or (when (= resume-from :done)
                         (:result (load-checkpoint db-spec pipeline-id :s4)))
                       (let [result (seq-wf/s4/consolidate-final-text
                                      (:s4 agents-map)
                                      db-spec prof-id
                                      {:s11 (:data s11-result)
                                       :s12 (:claims s12-result)
                                       :s212 (:references s212-result)
                                       :s32 (:seo s32-result)}
                                      "query")]
                         (save-checkpoint! db-spec pipeline-id :s4 result
                                          {:cost 0.18, :latency-ms 12700})
                         result))

          end-time (System/currentTimeMillis)]

      {:success? true
       :pipeline-id pipeline-id
       :results {:s11 (:data s11-result)
                 :s12 (:claims s12-result)
                 :s212 (:references s212-result)
                 :s32 (:seo s32-result)
                 :s4 (:final-text s4-result)}
       :final-output (:final-text s4-result)
       :metrics {:total-time-ms (- end-time start-time)
                 :total-cost 0.391
                 :parallelization-gain-ms (:time-saved-ms parallel-results)
                 :checkpoint-overhead-ms 250}})))

(comment
  ;; Execute production pipeline
  (def result (execute-hybrid-pipeline
                agents db-spec prof-id input))

  ;; If S.4 fails, resume from checkpoint
  (def resumed (execute-hybrid-pipeline
                 agents db-spec prof-id input
                 :pipeline-id (:pipeline-id result)
                 :resume-from :s4))

  ;; Savings on resume:
  ;; - Cost: -$0.144 (no re-run of S.1.1, S.1.2, S.2-1.2, S.3-2)
  ;; - Time: -10.1s (skip 4 systems)

  ;; Production metrics (healthcare pipeline, hybrid mode):
  ;; - Latency: 21.8s (vs 32.2s sequential, -32%)
  ;; - Cost: $0.391 (same as sequential)
  ;; - Checkpoint overhead: +250ms (vs +1250ms full stateful)
  ;; - Recovery cost savings: -75% on failure
  )

💡 Best Practices

  1. Start Sequential, Optimize to Parallel

    • Rationale: Sequential is simpler to debug and reason about. Profile first to identify bottlenecks, then parallelize independent agents.
    • Example: Healthcare pipeline started sequential (87.8% success rate), then parallelized S.1.2+S.3-2 for 32% latency reduction with zero quality impact.
  2. Checkpoint Expensive Agents Only

    • Rationale: Database I/O adds overhead (+50ms per checkpoint). Only checkpoint agents with high cost/latency (>$0.05 or >5s).
    • Example: Checkpoint S.2-1.2 (external API, 8.4s, $0.067) but not S.1.2 (fast, 2.1s, $0.021). Saves 75% on recovery cost.
  3. Use Circuit Breakers for External APIs

    • Rationale: Prevent cascading failures when external services are down. Circuit breaker fails fast, avoiding wasted API calls.
    • Example: PubMed API circuit breaker (3 failures → open for 60s) saved $0.30/min during outage by avoiding 5 retry attempts per request.
  4. Instrument Everything

    • Rationale: Production issues are invisible without metrics. Track latency, cost, tokens per agent for optimization insights.
    • Example: Discovered S.4 consumed 46% of total cost ($0.18/$0.391). Optimized with multi-model routing, reduced to $0.14 (-22%).
  5. Define Fallback Strategies Per Agent

    • Rationale: Not all agent failures should halt the pipeline. SEO optimization can be skipped, but LGPD extraction cannot.
    • Example: S.3-2 (SEO) fallback is "skip", allowing pipeline to complete. S.1.1 (LGPD) fallback is "human review" (critical compliance).
  6. Test Recovery Paths

    • Rationale: Stateful pipelines have complex recovery logic. Untested recovery can corrupt data or lose intermediate results.
    • Example: Integration test suite simulates failures at each checkpoint, validates resume-from-checkpoint produces identical output.

🔗 Related Skills

📘 Additional Resources

Pattern Variations

Streaming Pipelines: Use core.async channels for continuous data flow (e.g., real-time event processing). Not covered here as healthcare pipeline is batch-oriented.

Saga Pattern: Distributed transactions with compensating actions. Useful for multi-database pipelines requiring ACID guarantees.

Event-Driven Orchestration: Pub/sub messaging between agents (vs direct calls). Better for loosely-coupled systems, adds messaging infrastructure complexity.

Advanced Topics

Dynamic DAG Construction: Build pipeline topology at runtime based on input characteristics. Example: Skip S.3-2 (SEO) if content type is "internal documentation".

Multi-Tenant Isolation: Ensure pipeline executions for different tenants don't interfere (separate database schemas, resource quotas).

A/B Testing Framework: Run parallel pipeline variants to compare quality/cost. Example: Gemini Flash vs Haiku for S.2-1.2 (73% cost reduction with Haiku, 2% quality drop).

Performance Benchmarks

Topology Latency Cost Complexity Recovery Use Case
Sequential 32.2s $0.391 MVP, prototype
Parallel 21.8s $0.391 ⭐⭐ Production med volume
Hybrid 21.8s $0.391 ⭐⭐⭐ Recommended production
DAG 22.5s $0.391 ⭐⭐⭐⭐ Complex (5+ agents)

Recommendation: Start with Parallel + Hybrid state for production. Migrate to DAG only if you have 5+ agents with complex dependencies.