Claude Code Plugins

Community-maintained marketplace

Feedback

langgraph-patterns

@rozwer/cross2B-Pro
0
0

Advanced LangGraph patterns for production applications. Use when implementing streaming (tokens, updates, custom data), parallel node execution, subgraphs for modular workflows, error handling and retries, or Command-based control flow. Covers stream modes, fan-out/fan-in, nested graphs, and production best practices.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name langgraph-patterns
description Advanced LangGraph patterns for production applications. Use when implementing streaming (tokens, updates, custom data), parallel node execution, subgraphs for modular workflows, error handling and retries, or Command-based control flow. Covers stream modes, fan-out/fan-in, nested graphs, and production best practices.

LangGraph Patterns

Advanced patterns for production LangGraph applications.

Streaming

Stream Modes

Mode Output Use Case
"values" Full state after each step Debugging
"updates" Partial updates per node Progress tracking
"messages" LLM tokens as generated Chat UX
"custom" User-defined data Custom progress

Basic Streaming

# Updates mode (most common)
for event in app.stream(inputs, stream_mode="updates"):
    for node_name, output in event.items():
        print(f"{node_name}: {output}")

# Async streaming
async for event in app.astream(inputs, stream_mode="updates"):
    print(event)

Token Streaming

Stream LLM tokens as they generate:

for msg, metadata in app.stream(inputs, stream_mode="messages"):
    if msg.content:
        print(msg.content, end="", flush=True)

Filter by node:

for msg, metadata in app.stream(inputs, stream_mode="messages"):
    if metadata["langgraph_node"] == "agent":
        print(msg.content, end="")

Custom Streaming

Send custom data from within nodes:

from langgraph.config import get_stream_writer

def my_node(state: State) -> dict:
    writer = get_stream_writer()
    writer({"progress": "Starting analysis..."})
    # ... do work ...
    writer({"progress": "50% complete"})
    return {"result": "done"}

# Receive custom data
for event in app.stream(inputs, stream_mode=["updates", "custom"]):
    print(event)

Multiple Stream Modes

for event in app.stream(inputs, stream_mode=["updates", "messages"]):
    # event contains both update and message data
    pass

Parallel Execution

Fan-Out / Fan-In

Multiple nodes execute in parallel, then merge:

from typing import Annotated
import operator

class State(TypedDict):
    input: str
    results: Annotated[list, operator.add]

def task_a(state: State) -> dict:
    return {"results": [f"A: {state['input']}"]}

def task_b(state: State) -> dict:
    return {"results": [f"B: {state['input']}"]}

def merge(state: State) -> dict:
    return {"final": " + ".join(state["results"])}

graph = StateGraph(State)
graph.add_node("task_a", task_a)
graph.add_node("task_b", task_b)
graph.add_node("merge", merge)

graph.add_edge(START, "task_a")
graph.add_edge(START, "task_b")
graph.add_edge(["task_a", "task_b"], "merge")  # Wait for both
graph.add_edge("merge", END)

Async Parallel Within Node

import asyncio

async def parallel_node(state: State) -> dict:
    tasks = [
        fetch_data_a(state["query"]),
        fetch_data_b(state["query"]),
        fetch_data_c(state["query"]),
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Handle failures
    successful = [r for r in results if not isinstance(r, Exception)]
    return {"data": successful}

Subgraphs

Compose graphs for modularity:

# Define subgraph
class SubState(TypedDict):
    query: str
    result: str

sub_builder = StateGraph(SubState)
sub_builder.add_node("process", process_node)
sub_builder.add_edge(START, "process")
sub_builder.add_edge("process", END)
subgraph = sub_builder.compile()

# Use in parent graph
class ParentState(TypedDict):
    query: str
    result: str
    other_data: str

parent = StateGraph(ParentState)
parent.add_node("subgraph", subgraph)  # Add compiled graph as node
parent.add_node("finalize", finalize_node)
parent.add_edge(START, "subgraph")
parent.add_edge("subgraph", "finalize")
parent.add_edge("finalize", END)

State mapping (when schemas differ):

def call_subgraph(state: ParentState) -> dict:
    # Map parent state to subgraph input
    sub_input = {"query": state["query"]}
    result = subgraph.invoke(sub_input)
    # Map subgraph output to parent state
    return {"result": result["result"]}

parent.add_node("subgraph", call_subgraph)

Command-Based Control Flow

Use Command for complex routing:

from langgraph.types import Command
from typing import Literal

def router_node(state: State) -> Command[Literal["path_a", "path_b", "__end__"]]:
    if state["condition_a"]:
        return Command(
            update={"route": "a"},
            goto="path_a"
        )
    elif state["condition_b"]:
        return Command(
            update={"route": "b"},
            goto="path_b"
        )
    return Command(goto="__end__")

Jump to Parent Graph

From subgraph, return control to parent:

def subgraph_node(state: State) -> Command:
    return Command(
        update={"result": "done"},
        goto="parent_node",
        graph=Command.PARENT
    )

Error Handling

Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def resilient_node(state: State) -> dict:
    result = await flaky_api_call()
    return {"data": result}

Graceful Degradation

def safe_node(state: State) -> dict:
    try:
        result = risky_operation(state["input"])
        return {"result": result, "error": None}
    except Exception as e:
        return {"result": None, "error": str(e)}

def error_handler(state: State) -> str:
    if state["error"]:
        return "fallback"
    return "continue"

graph.add_conditional_edges("safe_node", error_handler)

Checkpoint Recovery

With checkpointing, resume from last successful step:

try:
    result = app.invoke(inputs, config)
except Exception:
    # Get last successful state
    snapshot = app.get_state(config)
    # Resume or handle error
    result = app.invoke(None, config)  # Retry from checkpoint

Production Patterns

Rate Limiting

from asyncio import Semaphore

semaphore = Semaphore(5)  # Max 5 concurrent

async def rate_limited_node(state: State) -> dict:
    async with semaphore:
        result = await api_call()
    return {"result": result}

Timeout

import asyncio

async def timed_node(state: State) -> dict:
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=30.0
        )
        return {"result": result}
    except asyncio.TimeoutError:
        return {"result": None, "error": "timeout"}

Logging and Observability

import logging
from langchain.callbacks import tracing_v2_enabled

logger = logging.getLogger(__name__)

def logged_node(state: State) -> dict:
    logger.info(f"Processing: {state['input'][:50]}...")
    result = process(state)
    logger.info(f"Completed with {len(result)} items")
    return {"result": result}

# With LangSmith tracing
with tracing_v2_enabled(project_name="my-project"):
    result = app.invoke(inputs)

Key Points

  • Use stream_mode="updates" for progress, "messages" for chat
  • Fan-in with add_edge([nodes], target) waits for all
  • Subgraphs enable modular, reusable workflows
  • Command provides explicit routing control
  • Always handle errors gracefully in production
  • Use checkpointing for fault-tolerance