| name | langgraph-python-expert |
| description | Expert guidance for LangGraph Python library. Build stateful, multi-actor applications with LLMs using nodes, edges, and state management. Use when working with LangGraph, building agent workflows, state machines, or complex multi-step LLM applications. Requires langgraph, langchain-core packages. |
LangGraph Python Expert
Comprehensive expert for building sophisticated stateful applications with LangGraph, focusing on production-ready workflows, state management, and agent orchestration.
📚 Official Source Documentation
This skill includes access to the official LangGraph source code through the source/langgraph/ directory (managed as git submodule with sparse-checkout), which contains:
- Core Libraries:
libs/langgraph/,libs/prebuilt/,libs/checkpoint*/ - Official Examples:
examples/- Up-to-date examples and tutorials - Complete Documentation:
docs/docs/- Latest documentation and API references
Source Structure (66MB with sparse-checkout)
source/langgraph/
├── libs/
│ ├── langgraph/ # Core StateGraph, nodes, edges
│ ├── prebuilt/ # create_react_agent, ToolNode
│ ├── checkpoint/ # Base checkpoint classes
│ ├── checkpoint-sqlite/ # SQLite persistence
│ └── checkpoint-postgres/# PostgreSQL persistence
├── examples/ # Official examples and tutorials
├── docs/docs/ # Documentation (concepts, how-tos, reference)
├── README.md # Project overview
├── CLAUDE.md # Claude Code instructions
└── AGENTS.md # Agent development guide
Updating Source Code
cd source/langgraph
git pull origin main
For detailed structure, see SOURCE_STRUCTURE.md.
Quick Start
Installation
pip install langgraph langchain-core langchain-openai
Basic Concepts
StateGraph: The core component for building workflows with state persistence Nodes: Functions that process the state and return updates Edges: Define the flow between nodes (conditional or direct) State: TypedDict that holds conversation/application state Persistence: Checkpointing for memory and conversation history
Core Components
1. State Definition
from typing import TypedDict, List, Optional
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
messages: List[BaseMessage]
current_user: Optional[str]
step_count: int
requires_action: bool
2. Node Functions
from langchain_core.messages import HumanMessage, AIMessage
def llm_node(state: AgentState) -> AgentState:
"""Process messages with LLM and return updated state"""
messages = state["messages"]
response = llm.invoke(messages)
return {
"messages": messages + [response],
"step_count": state["step_count"] + 1
}
def router_node(state: AgentState) -> str:
"""Decide next node based on state"""
last_message = state["messages"][-1]
if "tool_call" in last_message.additional_kwargs:
return "tool_executor"
return "end"
3. Graph Construction
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
# Create graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("agent", agent_node)
workflow.add_node("tool_executor", tool_node)
workflow.add_node("router", router_node)
# Add edges
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
router_node,
{
"tool_executor": "tool_executor",
"end": END
}
)
workflow.add_edge("tool_executor", "agent")
# Memory
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
Advanced Patterns
1. Multi-Agent Collaboration
from langgraph.graph import StateGraph, MessagesState
from langgraph.prebuilt import create_react_agent
class MultiAgentState(MessagesState):
researcher_notes: str
writer_content: str
reviewer_feedback: List[str]
def researcher_node(state: MultiAgentState) -> MultiAgentState:
"""Research agent that gathers information"""
researcher_agent = create_react_agent(llm, research_tools)
result = researcher_agent.invoke({
"messages": state["messages"][-2:] # Last two messages
})
return {
"researcher_notes": result["messages"][-1].content,
"messages": state["messages"] + result["messages"]
}
def writer_node(state: MultiAgentState) -> MultiAgentState:
"""Writer agent that creates content based on research"""
writer_agent = create_react_agent(llm, writing_tools)
prompt = f"Research notes: {state['researcher_notes']}"
result = writer_agent.invoke({
"messages": [HumanMessage(content=prompt)]
})
return {
"writer_content": result["messages"][-1].content,
"messages": state["messages"] + result["messages"]
}
2. Dynamic Tool Selection
from typing import Dict, Any
from langchain_core.tools import BaseTool
class DynamicToolNode:
def __init__(self, tool_registry: Dict[str, BaseTool]):
self.tool_registry = tool_registry
def __call__(self, state: AgentState) -> AgentState:
last_message = state["messages"][-1]
if not last_message.tool_calls:
return state
# Dynamically select tools based on context
selected_tools = self.select_tools_by_context(state)
# Execute tool calls
tool_messages = []
for tool_call in last_message.tool_calls:
if tool_call["name"] in selected_tools:
tool = selected_tools[tool_call["name"]]
result = tool.invoke(tool_call["args"])
tool_messages.append(
ToolMessage(
tool_call_id=tool_call["id"],
content=str(result)
)
)
return {
"messages": state["messages"] + tool_messages
}
def select_tools_by_context(self, state: AgentState) -> Dict[str, BaseTool]:
"""Intelligently select tools based on conversation context"""
context = " ".join([msg.content for msg in state["messages"][-5:]])
available_tools = {}
if "code" in context.lower():
available_tools.update({"code_executor": code_tool})
if "search" in context.lower():
available_tools.update({"web_search": search_tool})
if "math" in context.lower():
available_tools.update({"calculator": math_tool})
return available_tools
3. State Persistence and Recovery
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
# Production-ready persistence
def create_production_app():
# Use PostgreSQL for production
connection_string = "postgresql://user:pass@localhost/langgraph"
checkpointer = PostgresSaver.from_conn_string(connection_string)
# Build workflow
workflow = StateGraph(AgentState)
# ... add nodes and edges
# Compile with persistence
app = workflow.compile(checkpointer=checkpointer)
return app
# Thread-based conversation management
def manage_conversation(app, thread_id: str):
"""Manage persistent conversations across sessions"""
config = {"configurable": {"thread_id": thread_id}}
# Continue existing conversation
result = app.invoke({
"messages": [HumanMessage(content="Continue our discussion")]
}, config)
return result
4. Error Handling and Retry Logic
from typing import Union
from langgraph.graph import StateGraph
import time
class RobustAgentState(TypedDict):
messages: List[BaseMessage]
retry_count: int
max_retries: int
error_history: List[str]
def error_handling_node(state: RobustAgentState) -> Union[RobustAgentState, str]:
"""Node with built-in error handling and retry logic"""
try:
# Attempt the primary operation
result = perform_operation(state)
# Reset retry count on success
return {
**result,
"retry_count": 0,
"error_history": []
}
except Exception as e:
error_msg = str(e)
new_retry_count = state["retry_count"] + 1
if new_retry_count >= state["max_retries"]:
return "error_handler" # Route to error handling
# Add delay for exponential backoff
time.sleep(2 ** new_retry_count)
return {
"retry_count": new_retry_count,
"error_history": state["error_history"] + [error_msg]
}
def fallback_node(state: RobustAgentState) -> RobustAgentState:
"""Fallback strategy when primary operation fails"""
last_error = state["error_history"][-1] if state["error_history"] else "Unknown error"
fallback_message = AIMessage(
content=f"I encountered an error: {last_error}. "
f"Let me try a different approach."
)
return {
"messages": state["messages"] + [fallback_message],
"retry_count": 0
}
Integration Examples
1. RAG with LangGraph
def create_rag_graph():
class RAGState(TypedDict):
question: str
context: List[str]
answer: str
sources: List[str]
def retrieve_node(state: RAGState) -> RAGState:
# Retrieve relevant documents
docs = retriever.invoke(state["question"])
return {
"context": [doc.page_content for doc in docs],
"sources": [doc.metadata.get("source", "unknown") for doc in docs]
}
def generate_node(state: RAGState) -> RAGState:
# Generate answer using retrieved context
prompt = f"""
Question: {state['question']}
Context: {state['context']}
Generate a comprehensive answer based on the context.
"""
response = llm.invoke([HumanMessage(content=prompt)])
return {
"answer": response.content
}
# Build RAG workflow
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)
return workflow.compile()
2. Sequential Task Processing
def create_sequential_processor():
class TaskState(TypedDict):
tasks: List[Dict[str, Any]]
current_task_index: int
results: List[Any]
status: str
def task_executor(state: TaskState) -> TaskState:
idx = state["current_task_index"]
if idx >= len(state["tasks"]):
return {"status": "completed"}
current_task = state["tasks"][idx]
result = execute_task(current_task)
return {
"current_task_index": idx + 1,
"results": state["results"] + [result],
"status": "processing" if idx + 1 < len(state["tasks"]) else "completed"
}
def task_router(state: TaskState) -> str:
if state["status"] == "completed":
return END
return "continue_processing"
workflow = StateGraph(TaskState)
workflow.add_node("execute_task", task_executor)
workflow.add_conditional_edges("execute_task", task_router)
return workflow.compile()
Best Practices
1. State Design
- Keep state minimal and focused
- Use TypedDict for type safety
- Avoid storing large objects in state
- Use references/IDs instead of full objects when possible
2. Node Design
- Make nodes pure functions when possible
- Handle errors gracefully
- Return only the state keys that need updating
- Use descriptive names for clarity
3. Graph Architecture
- Break complex workflows into smaller, reusable subgraphs
- Use conditional edges for intelligent routing
- Implement proper error handling paths
- Design for testability and debugging
4. Performance Optimization
- Use streaming for long-running operations
- Implement proper caching strategies
- Consider async/await for I/O operations
- Monitor and optimize checkpoint sizes
Testing and Debugging
1. Unit Testing Nodes
import pytest
from langgraph.graph import StateGraph
def test_llm_node():
# Mock state
test_state = {
"messages": [HumanMessage(content="Test message")],
"step_count": 0
}
# Mock LLM
with patch('your_module.llm') as mock_llm:
mock_llm.invoke.return_value = AIMessage(content="Test response")
result = llm_node(test_state)
assert result["step_count"] == 1
assert len(result["messages"]) == 2
mock_llm.invoke.assert_called_once()
2. Integration Testing
def test_full_workflow():
app = create_test_workflow()
initial_state = {
"messages": [HumanMessage(content="Hello")],
"step_count": 0
}
result = app.invoke(initial_state)
assert "messages" in result
assert result["messages"][-1].type == "ai"
3. Debugging Tools
# Enable debug mode
import langgraph
langgraph.debug = True
# Print state transitions
def debug_node(state: AgentState) -> AgentState:
print(f"Node input: {state}")
result = your_node_logic(state)
print(f"Node output: {result}")
return result
# Use with context manager
from langgraph.graph import StateGraph
def create_debug_workflow():
workflow = StateGraph(AgentState)
workflow.add_node("debug_step", debug_node)
# ... rest of workflow
return workflow.compile()
Common Patterns and Solutions
1. Human-in-the-Loop
def human_approval_node(state: AgentState) -> AgentState:
"""Wait for human approval before proceeding"""
last_message = state["messages"][-1]
if state.get("awaiting_approval"):
# Check if approval was received
user_input = input(f"Approve this action? {last_message.content} (y/n): ")
if user_input.lower() == 'y':
return {
"awaiting_approval": False,
"messages": state["messages"] + [
AIMessage(content="Action approved by human")
]
}
else:
return {
"awaiting_approval": False,
"messages": state["messages"] + [
AIMessage(content="Action rejected by human")
]
}
else:
# Request approval
return {
"awaiting_approval": True,
"messages": state["messages"]
}
2. Parallel Processing
from langgraph.graph import StateGraph, START, END
def parallel_processor(state: Dict[str, Any]) -> Dict[str, Any]:
"""Process multiple items in parallel"""
input_data = state["input_items"]
# Define parallel tasks
def task_1(data):
return process_type_1(data)
def task_2(data):
return process_type_2(data)
# Execute in parallel (using threading or async)
with ThreadPoolExecutor(max_workers=2) as executor:
future_1 = executor.submit(task_1, input_data)
future_2 = executor.submit(task_2, input_data)
result_1 = future_1.result()
result_2 = future_2.result()
return {
"result_1": result_1,
"result_2": result_2
}
Production Deployment
1. Environment Setup
import os
from langgraph.graph import StateGraph
from langgraph.checkpoint.postgres import PostgresSaver
def create_production_app():
# Load configuration
db_url = os.getenv("DATABASE_URL")
openai_api_key = os.getenv("OPENAI_API_KEY")
# Initialize components
checkpointer = PostgresSaver.from_conn_string(db_url)
# Build workflow with production settings
workflow = StateGraph(ProductionState)
# ... add nodes and edges
app = workflow.compile(
checkpointer=checkpointer,
# Enable interrupts for human-in-the-loop
interrupt_before=["human_approval"],
interrupt_after=["critical_action"]
)
return app
2. Monitoring and Logging
import logging
from datetime import datetime
class LoggingMiddleware:
def __init__(self, logger_name="langgraph"):
self.logger = logging.getLogger(logger_name)
def __call__(self, func):
def wrapper(state):
start_time = datetime.now()
self.logger.info(f"Starting {func.__name__} at {start_time}")
try:
result = func(state)
duration = datetime.now() - start_time
self.logger.info(
f"Completed {func.__name__} in {duration.total_seconds():.2f}s"
)
return result
except Exception as e:
self.logger.error(f"Error in {func.__name__}: {str(e)}")
raise
return wrapper
# Apply to nodes
@LoggingMiddleware()
def production_node(state: AgentState) -> AgentState:
# Your node logic here
pass
Troubleshooting
Common Issues and Solutions
State Size Too Large
- Problem: Checkpoint files become too large
- Solution: Store large data externally, use references
Memory Leaks
- Problem: Memory usage increases over time
- Solution: Clean up unused state, use proper object disposal
Concurrency Issues
- Problem: Race conditions in multi-threaded execution
- Solution: Use proper locking mechanisms, avoid shared mutable state
Tool Execution Failures
- Problem: Tools fail or timeout
- Solution: Implement proper error handling and retry logic
Requirements
Ensure these packages are installed in your environment:
pip install langgraph>=0.2.0
pip install langchain-core>=0.3.0
pip install langchain-openai>=0.1.0
pip install langchain-anthropic>=0.1.0
pip install psycopg2-binary # For PostgreSQL persistence
pip install sqlalchemy # Alternative persistence options
Source Code Access
The LangGraph source code is managed as a git submodule with sparse-checkout to reduce size (66MB vs full repo):
# Update to latest version
cd source/langgraph
git pull origin main
# View sparse-checkout configuration
git sparse-checkout list
# Temporarily access full repo (if needed)
git sparse-checkout disable
# ... do work ...
git sparse-checkout reapply
Key locations:
source/langgraph/libs/langgraph/langgraph/- Core API (StateGraph, nodes, edges)source/langgraph/libs/prebuilt/langgraph/- Prebuilt components (create_react_agent)source/langgraph/examples/- Official examples and tutorialssource/langgraph/docs/docs/- Documentation (concepts, how-tos, reference)
See SOURCE_STRUCTURE.md for detailed navigation guide.
Performance Tips
- Use streaming for long-running operations
- Optimize state size - avoid storing large objects
- Cache effectively - implement proper caching strategies
- Monitor checkpoints - keep checkpoint sizes reasonable
- Use async/await for I/O-bound operations
- Batch operations when possible to reduce overhead