| name | langgraph-workflows |
| description | Design and implement multi-agent workflows with LangGraph 0.2+ - state management, supervisor-worker patterns, conditional routing, and fault-tolerant checkpointing |
| version | 2.0.0 |
| author | YG Starter Template |
| tags | langgraph, workflows, multi-agent, state-management, checkpointing, 2025 |
LangGraph Workflows
Master multi-agent workflow orchestration with LangGraph 0.2+
Overview
LangGraph is a library for building stateful, multi-agent workflows as directed graphs. This skill covers production patterns for building complex AI workflows with fault tolerance, checkpointing, and observability.
Real-World Use Cases:
- Multi-Agent Code Review: Security, performance, style, and test coverage agents
- E-commerce Product Enrichment: Image classification, attribute extraction, SEO optimization
- Customer Support Routing: Intent classification, priority scoring, agent assignment
- Document Processing Pipeline: OCR, entity extraction, summarization, QA validation
- Research Assistant: Query expansion, retrieval, synthesis, fact-checking
When to use this skill:
- Building multi-step AI workflows with agent coordination
- Implementing supervisor-worker patterns (one agent routes to specialists)
- Creating fault-tolerant workflows with checkpointing
- Managing complex state across multiple LLM calls
- Conditional routing based on workflow state
When NOT to use this skill:
- Single-agent tasks (use simple LangChain chains)
- Stateless API calls (no need for graph complexity)
- Simple sequential pipelines (LangChain LCEL is simpler)
Core Concepts
1. State Management
LangGraph workflows operate on shared state passed between nodes.
Two State Approaches:
# Approach 1: TypedDict (simple, type-safe)
from typing import TypedDict, Annotated
from operator import add
class WorkflowState(TypedDict):
input: str
output: str
agent_responses: Annotated[list[dict], add] # List accumulates
metadata: dict
# Approach 2: Pydantic (validation, complex logic)
from pydantic import BaseModel, Field
class WorkflowState(BaseModel):
input: str = Field(description="User input")
output: str = ""
agent_responses: list[dict] = Field(default_factory=list)
def add_response(self, agent: str, result: str):
self.agent_responses.append({"agent": agent, "result": result})
Real-World Example: Code Review Pipeline
class CodeReviewState(TypedDict):
repository: str
pull_request_id: int
code_diff: str
# Agent outputs (each agent adds to these)
security_findings: Annotated[list[SecurityIssue], add]
performance_issues: Annotated[list[PerformanceWarning], add]
style_violations: Annotated[list[StyleViolation], add]
test_coverage_gaps: Annotated[list[CoverageGap], add]
# Control flow
current_agent: str
agents_completed: list[str]
quality_passed: bool
requires_human_review: bool
Key Pattern: Annotated[list[T], add]
- Without
add: Each node replaces the list - With
add: Each node appends to the list - Critical for multi-agent accumulation!
2. Supervisor-Worker Pattern
The most common multi-agent pattern: one supervisor routes to specialized workers.
from langgraph.graph import StateGraph, END
# Define nodes
def supervisor(state: WorkflowState) -> WorkflowState:
"""Route to next worker based on state."""
if state["needs_analysis"]:
state["next"] = "analyzer"
elif state["needs_validation"]:
state["next"] = "validator"
else:
state["next"] = END
return state
def analyzer(state: WorkflowState) -> WorkflowState:
"""Specialized analysis worker."""
result = analyze(state["input"])
state["results"].append(result)
return state
# Build graph
workflow = StateGraph(WorkflowState)
workflow.add_node("supervisor", supervisor)
workflow.add_node("analyzer", analyzer)
workflow.add_node("validator", validator)
# Supervisor routes dynamically
workflow.add_conditional_edges(
"supervisor",
lambda s: s["next"], # Route based on state
{
"analyzer": "analyzer",
"validator": "validator",
END: END
}
)
# Workers return to supervisor
workflow.add_edge("analyzer", "supervisor")
workflow.add_edge("validator", "supervisor")
workflow.set_entry_point("supervisor")
app = workflow.compile()
Production Example: Code Review Supervisor
# app/workflows/code_review_workflow.py
def supervisor_node(state: CodeReviewState) -> CodeReviewState:
"""Route to next available review agent."""
completed = set(state["agents_completed"])
available_agents = [a for a in ALL_REVIEW_AGENTS if a not in completed]
if not available_agents:
state["next"] = "quality_gate"
else:
# Priority-based routing (security first, then performance, etc.)
state["next"] = available_agents[0]
return state
# Specialist review agents
REVIEW_AGENTS = [
"security_scanner", # OWASP Top 10, CVE detection
"performance_analyzer", # N+1 queries, algorithmic complexity
"style_checker", # ESLint, Prettier, PEP8
"test_coverage", # Missing tests, assertions
"documentation_review", # Docstrings, READMEs
"dependency_audit" # Outdated libs, license compliance
]
for agent_name in REVIEW_AGENTS:
workflow.add_node(agent_name, create_review_agent(agent_name))
workflow.add_edge(agent_name, "supervisor") # Return to supervisor
Benefits:
- Easy to add/remove agents (just modify routing logic)
- Centralized coordination (supervisor sees all state)
- Parallel execution possible (if agents independent)
3. Conditional Routing
Conditional edges let you route dynamically based on state.
def route_based_on_quality(state: WorkflowState) -> str:
"""Decide next step based on quality score."""
if state["quality_score"] >= 0.8:
return "publish"
elif state["retry_count"] < 3:
return "retry"
else:
return "manual_review"
workflow.add_conditional_edges(
"quality_check",
route_based_on_quality,
{
"publish": "publish_node",
"retry": "generator",
"manual_review": "review_queue"
}
)
this project Example: Quality Gate
def route_after_quality_gate(state: AnalysisState) -> str:
"""Route based on quality gate result."""
if state["quality_passed"]:
return "compress_findings" # Success path
elif state["retry_count"] < 2:
return "supervisor" # Retry with more agents
else:
return END # Failed, return partial results
workflow.add_conditional_edges(
"quality_gate",
route_after_quality_gate,
{
"compress_findings": "compress_findings",
"supervisor": "supervisor",
END: END
}
)
Routing Patterns:
- Sequential:
A -> B -> C(simple edges) - Branching:
A -> (B or C)(conditional edges) - Looping:
A -> B -> A(retry logic) - Convergence:
(A or B) -> C(multiple inputs, one output)
4. Checkpointing & Persistence
Problem: If a workflow crashes mid-execution, you lose all progress.
Solution: LangGraph checkpointing saves state after each node.
from langgraph.checkpoint import MemorySaver, SqliteSaver
# In-memory (development)
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# Persistent (production) - SQLite
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
app = workflow.compile(checkpointer=checkpointer)
# Persistent (production) - PostgreSQL
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
app = workflow.compile(checkpointer=checkpointer)
Using Checkpoints:
# Start new workflow
config = {"configurable": {"thread_id": "analysis-123"}}
result = app.invoke(initial_state, config=config)
# Resume interrupted workflow
config = {"configurable": {"thread_id": "analysis-123"}}
result = app.invoke(None, config=config) # Resumes from last checkpoint
this project Checkpointing:
# backend/app/workflows/checkpoints.py
from langgraph.checkpoint.postgres import PostgresSaver
def create_checkpointer():
"""Create PostgreSQL checkpointer for production."""
return PostgresSaver.from_conn_string(
settings.DATABASE_URL,
# Save after each agent completes
save_every=1
)
# Compile with checkpointing
app = workflow.compile(
checkpointer=create_checkpointer(),
interrupt_before=["quality_gate"] # Manual review point
)
# Resume after crash
result = app.invoke(
None,
config={"configurable": {"thread_id": analysis_id}}
)
Benefits:
- Fault tolerance: Resume after crashes
- Human-in-the-loop: Pause for approval (
interrupt_before) - Debugging: Inspect state at each checkpoint
- Cost savings: Don't re-run expensive LLM calls
5. Integration with Langfuse
LangGraph + Langfuse = Full Observability
from langfuse.decorators import observe, langfuse_context
from langfuse import Langfuse
langfuse = Langfuse()
@observe() # Traces entire workflow
def run_analysis_workflow(url: str):
"""Run LangGraph workflow with Langfuse tracing."""
# Set trace metadata
langfuse_context.update_current_trace(
name="content_analysis",
metadata={"url": url},
tags=["langgraph", "multi-agent"]
)
# Compile workflow
app = workflow.compile(checkpointer=checkpointer)
# Each node is automatically traced as a span
result = app.invoke({"url": url})
# Log final metrics
langfuse_context.update_current_observation(
output=result,
metadata={"agents_used": len(result["agents_completed"])}
)
return result
# Node-level tracing
@observe(as_type="generation") # Mark as LLM call
def security_agent_node(state: AnalysisState):
"""Security analysis agent."""
langfuse_context.update_current_observation(
name="security_agent",
input=state["raw_content"][:200] # First 200 chars
)
result = security_agent.analyze(state["raw_content"])
langfuse_context.update_current_observation(
output=result,
usage={
"input_tokens": result["usage"]["input_tokens"],
"output_tokens": result["usage"]["output_tokens"]
}
)
state["findings"].append(result)
state["agents_completed"].append("security")
return state
Langfuse Dashboard Shows:
- Full workflow execution graph
- Per-node latency and costs
- Token usage by agent
- Failed nodes and retry attempts
- State at each checkpoint
this project's 8-Agent Analysis Pipeline
Architecture:
User Content
↓
[Supervisor] → Routes to 8 specialist agents
↓
[Security Agent] ──┐
[Tech Comparator] ──┤
[Implementation] ──┤
[Tutorial] ──┼→ [Supervisor] → [Quality Gate]
[Depth Analyzer] ──┤ ↓
[Prerequisites] ──┤ Pass: Compress
[Best Practices] ──┤ Fail: Retry or END
[Code Examples] ──┘
State Schema:
class Finding(BaseModel):
agent: str
category: str
content: str
confidence: float
class AnalysisState(TypedDict):
# Input
url: str
raw_content: str
# Agent outputs
findings: Annotated[list[Finding], add]
embeddings: Annotated[list[Embedding], add]
# Control flow
current_agent: str
agents_completed: list[str]
next: str
# Quality control
quality_score: float
quality_passed: bool
retry_count: int
# Final output
compressed_summary: str
artifact: dict
Key Design Decisions:
- Supervisor pattern: Centralized routing, easy to modify agent list
- Accumulating state:
Annotated[list[T], add]ensures all findings preserved - Quality gate: Validates before compression (prevents bad outputs)
- Checkpointing: Resume expensive multi-agent workflows after failures
- Langfuse tracing: Track costs and latency per agent
Common Patterns
Pattern 1: Map-Reduce (Parallel Agents)
from langgraph.graph import StateGraph
def fan_out(state):
"""Split work into parallel tasks."""
state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}]
return state
def worker(state):
"""Process one task."""
# LangGraph handles parallel execution
task = state["current_task"]
result = process(task)
return {"results": [result]}
def fan_in(state):
"""Combine parallel results."""
combined = aggregate(state["results"])
return {"final": combined}
workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("worker", worker)
workflow.add_node("fan_in", fan_in)
workflow.add_edge("fan_out", "worker")
workflow.add_edge("worker", "fan_in") # Waits for all workers
Pattern 2: Human-in-the-Loop
workflow = StateGraph(State)
workflow.add_node("draft", generate_draft)
workflow.add_node("review", human_review)
workflow.add_node("publish", publish_content)
# Interrupt before review (wait for human)
app = workflow.compile(interrupt_before=["review"])
# Step 1: Generate draft (stops at review)
result = app.invoke({"topic": "AI"}, config=config)
# Step 2: Human reviews, modifies state
state = app.get_state(config)
state["approved"] = True # Human decision
app.update_state(config, state)
# Step 3: Resume workflow
result = app.invoke(None, config=config) # Continues to publish
Pattern 3: Retry with Backoff
def llm_call_with_retry(state):
"""Retry failed LLM calls."""
try:
result = call_llm(state["input"])
state["output"] = result
state["retry_count"] = 0
return state
except Exception as e:
state["retry_count"] += 1
state["error"] = str(e)
return state
def should_retry(state) -> str:
if state["retry_count"] == 0:
return "success"
elif state["retry_count"] < 3:
return "retry"
else:
return "failed"
workflow.add_conditional_edges(
"llm_call",
should_retry,
{
"success": "next_step",
"retry": "llm_call", # Loop back
"failed": "error_handler"
}
)
Best Practices
1. State Design
- Keep state flat: Avoid deeply nested dicts (hard to debug)
- Use TypedDict: Type safety catches errors early
- Annotated accumulators: Use
Annotated[list, add]for multi-agent outputs - Immutable inputs: Don't modify input fields (helps with checkpointing)
2. Node Design
- Pure functions: Nodes should not have side effects (except I/O)
- Idempotent: Safe to re-run (important for checkpointing)
- Single responsibility: One agent = one node
- Return new state: Don't mutate in place (use
state.copy())
3. Error Handling
- Wrap nodes: Try/catch to prevent workflow crash
- Dead letter queue: Send failed items to error handler
- Retry logic: Exponential backoff for transient errors
- Checkpoints: Enable recovery without losing progress
4. Performance
- Parallel execution: Use
SendAPI for independent tasks - Lazy loading: Don't load heavy data until needed
- Streaming: Stream LLM responses for better UX
- Caching: Cache expensive operations (embeddings, API calls)
5. Observability
- Trace everything: Use
@observe()on all nodes - Log state changes: Before/after state for debugging
- Cost tracking: Record token usage per node
- Alerting: Set up alerts for workflow failures
Debugging LangGraph Workflows
Visualize the Graph
from IPython.display import Image
# Generate graph visualization
image = app.get_graph().draw_mermaid_png()
Image(image)
Inspect Checkpoints
# Get all checkpoints for a workflow
checkpoints = app.get_state_history(config)
for checkpoint in checkpoints:
print(f"Step: {checkpoint.metadata['step']}")
print(f"Node: {checkpoint.metadata['source']}")
print(f"State: {checkpoint.values}")
Step-by-Step Execution
# Execute one node at a time
for step in app.stream(initial_state, config):
print(f"After {step['node']}: {step['state']}")
input("Press Enter to continue...")
Migration from LangChain Chains
Old Way (LCEL Chain):
chain = (
load_content
| analyze
| summarize
| format_output
)
result = chain.invoke({"url": url})
New Way (LangGraph):
workflow = StateGraph(State)
workflow.add_node("load", load_content)
workflow.add_node("analyze", analyze)
workflow.add_node("summarize", summarize)
workflow.add_node("format", format_output)
workflow.add_edge("load", "analyze")
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", "format")
app = workflow.compile()
result = app.invoke({"url": url})
When to use LangGraph over LCEL:
- Need state persistence (checkpointing)
- Conditional routing based on results
- Multi-agent coordination
- Human-in-the-loop approval
- Fault tolerance required
References
LangGraph Documentation
this project Examples
backend/app/workflows/content_analysis_workflow.py- Main analysis pipelinebackend/app/workflows/nodes/- Individual agent nodesbackend/app/workflows/state.py- State schema definitions
Related Skills
ai-native-development- LLM integration patternslangfuse-observability- Workflow tracing and monitoringperformance-optimization- Optimize multi-agent execution
Version: 1.0.0 (December 2025) Status: Production-ready patterns from this project's multi-agent pipeline