| name | langgraph-architecture |
| description | Guides architectural decisions for LangGraph applications. Use when deciding between LangGraph vs alternatives, choosing state management strategies, designing multi-agent systems, or selecting persistence and streaming approaches. |
LangGraph Architecture Decisions
When to Use LangGraph
Use LangGraph When You Need:
- Stateful conversations - Multi-turn interactions with memory
- Human-in-the-loop - Approval gates, corrections, interventions
- Complex control flow - Loops, branches, conditional routing
- Multi-agent coordination - Multiple LLMs working together
- Persistence - Resume from checkpoints, time travel debugging
- Streaming - Real-time token streaming, progress updates
- Reliability - Retries, error recovery, durability guarantees
Consider Alternatives When:
| Scenario | Alternative | Why |
|---|---|---|
| Single LLM call | Direct API call | Overhead not justified |
| Linear pipeline | LangChain LCEL | Simpler abstraction |
| Stateless tool use | Function calling | No persistence needed |
| Simple RAG | LangChain retrievers | Built-in patterns |
| Batch processing | Async tasks | Different execution model |
State Schema Decisions
TypedDict vs Pydantic
| TypedDict | Pydantic |
|---|---|
| Lightweight, faster | Runtime validation |
| Dict-like access | Attribute access |
| No validation overhead | Type coercion |
| Simpler serialization | Complex nested models |
Recommendation: Use TypedDict for most cases. Use Pydantic when you need validation or complex nested structures.
Reducer Selection
| Use Case | Reducer | Example |
|---|---|---|
| Chat messages | add_messages |
Handles IDs, RemoveMessage |
| Simple append | operator.add |
Annotated[list, operator.add] |
| Keep latest | None (LastValue) | field: str |
| Custom merge | Lambda | Annotated[list, lambda a, b: ...] |
| Overwrite list | Overwrite |
Bypass reducer |
State Size Considerations
# SMALL STATE (< 1MB) - Put in state
class State(TypedDict):
messages: Annotated[list, add_messages]
context: str
# LARGE DATA - Use Store
class State(TypedDict):
messages: Annotated[list, add_messages]
document_ref: str # Reference to store
def node(state, *, store: BaseStore):
doc = store.get(namespace, state["document_ref"])
# Process without bloating checkpoints
Graph Structure Decisions
Single Graph vs Subgraphs
Single Graph when:
- All nodes share the same state schema
- Simple linear or branching flow
- < 10 nodes
Subgraphs when:
- Different state schemas needed
- Reusable components across graphs
- Team separation of concerns
- Complex hierarchical workflows
Conditional Edges vs Command
| Conditional Edges | Command |
|---|---|
| Routing based on state | Routing + state update |
| Separate router function | Decision in node |
| Clearer visualization | More flexible |
| Standard patterns | Dynamic destinations |
# Conditional Edge - when routing is the focus
def router(state) -> Literal["a", "b"]:
return "a" if condition else "b"
builder.add_conditional_edges("node", router)
# Command - when combining routing with updates
def node(state) -> Command:
return Command(goto="next", update={"step": state["step"] + 1})
Static vs Dynamic Routing
Static Edges (add_edge):
- Fixed flow known at build time
- Clearer graph visualization
- Easier to reason about
Dynamic Routing (add_conditional_edges, Command, Send):
- Runtime decisions based on state
- Agent-driven navigation
- Fan-out patterns
Persistence Strategy
Checkpointer Selection
| Checkpointer | Use Case | Characteristics |
|---|---|---|
InMemorySaver |
Testing only | Lost on restart |
SqliteSaver |
Development | Single file, local |
PostgresSaver |
Production | Scalable, concurrent |
| Custom | Special needs | Implement BaseCheckpointSaver |
Checkpointing Scope
# Full persistence (default)
graph = builder.compile(checkpointer=checkpointer)
# Subgraph options
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent
checkpointer=True, # Independent checkpointing
checkpointer=False, # No checkpointing (runs atomically)
)
When to Disable Checkpointing
- Short-lived subgraphs that should be atomic
- Subgraphs with incompatible state schemas
- Performance-critical paths without need for resume
Multi-Agent Architecture
Supervisor Pattern
Best for:
- Clear hierarchy
- Centralized decision making
- Different agent specializations
┌─────────────┐
│ Supervisor │
└──────┬──────┘
┌────────┬───┴───┬────────┐
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ │Agent4│
└──────┘ └──────┘ └──────┘ └──────┘
Peer-to-Peer Pattern
Best for:
- Collaborative agents
- No clear hierarchy
- Flexible communication
┌──────┐ ┌──────┐
│Agent1│◄───►│Agent2│
└──┬───┘ └───┬──┘
│ │
▼ ▼
┌──────┐ ┌──────┐
│Agent3│◄───►│Agent4│
└──────┘ └──────┘
Handoff Pattern
Best for:
- Sequential specialization
- Clear stage transitions
- Different capabilities per stage
┌────────┐ ┌────────┐ ┌────────┐
│Research│───►│Planning│───►│Execute │
└────────┘ └────────┘ └────────┘
Streaming Strategy
Stream Mode Selection
| Mode | Use Case | Data |
|---|---|---|
updates |
UI updates | Node outputs only |
values |
State inspection | Full state each step |
messages |
Chat UX | LLM tokens |
custom |
Progress/logs | Your data via StreamWriter |
debug |
Debugging | Tasks + checkpoints |
Subgraph Streaming
# Stream from subgraphs
async for chunk in graph.astream(
input,
stream_mode="updates",
subgraphs=True # Include subgraph events
):
namespace, data = chunk # namespace indicates depth
Human-in-the-Loop Design
Interrupt Placement
| Strategy | Use Case |
|---|---|
interrupt_before |
Approval before action |
interrupt_after |
Review after completion |
interrupt() in node |
Dynamic, contextual pauses |
Resume Patterns
# Simple resume (same thread)
graph.invoke(None, config)
# Resume with value
graph.invoke(Command(resume="approved"), config)
# Resume specific interrupt
graph.invoke(Command(resume={interrupt_id: value}), config)
# Modify state and resume
graph.update_state(config, {"field": "new_value"})
graph.invoke(None, config)
Error Handling Strategy
Retry Configuration
# Per-node retry
RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_interval=60.0,
max_attempts=3,
retry_on=lambda e: isinstance(e, (APIError, TimeoutError))
)
# Multiple policies (first match wins)
builder.add_node("node", fn, retry_policy=[
RetryPolicy(retry_on=RateLimitError, max_attempts=5),
RetryPolicy(retry_on=Exception, max_attempts=2),
])
Fallback Patterns
def node_with_fallback(state):
try:
return primary_operation(state)
except PrimaryError:
return fallback_operation(state)
# Or use conditional edges for complex fallback routing
def route_on_error(state) -> Literal["retry", "fallback", "__end__"]:
if state.get("error") and state["attempts"] < 3:
return "retry"
elif state.get("error"):
return "fallback"
return END
Scaling Considerations
Horizontal Scaling
- Use PostgresSaver for shared state
- Consider LangGraph Platform for managed infrastructure
- Use stores for large data outside checkpoints
Performance Optimization
- Minimize state size - Use references for large data
- Parallel nodes - Fan out when possible
- Cache expensive operations - Use CachePolicy
- Async everywhere - Use ainvoke, astream
Resource Limits
# Set recursion limit
config = {"recursion_limit": 50}
graph.invoke(input, config)
# Track remaining steps in state
class State(TypedDict):
remaining_steps: RemainingSteps
def check_budget(state):
if state["remaining_steps"] < 5:
return "wrap_up"
return "continue"
Decision Checklist
Before implementing:
- Is LangGraph the right tool? (vs simpler alternatives)
- State schema defined with appropriate reducers?
- Persistence strategy chosen? (dev vs prod checkpointer)
- Streaming needs identified?
- Human-in-the-loop points defined?
- Error handling and retry strategy?
- Multi-agent coordination pattern? (if applicable)
- Resource limits configured?