| name | langchain-orchestration |
| description | Comprehensive guide for building production-grade LLM applications using LangChain's chains, agents, memory systems, RAG patterns, and advanced orchestration |
| version | 1.0.0 |
| category | AI/ML |
| tags | langchain, llm, chains, agents, rag, memory, retrieval, orchestration, streaming, callbacks, python |
| prerequisites | Python 3.8+, langchain>=0.1.0, langchain-core>=0.1.0, langchain-community>=0.0.20, OpenAI API key or other LLM provider credentials, Vector store setup (FAISS, Chroma, Pinecone, etc.) |
LangChain Orchestration Skill
Complete guide for building production-grade LLM applications with LangChain, covering chains, agents, memory, RAG patterns, and advanced orchestration techniques.
Table of Contents
- Core Concepts
- Chains
- Agents
- Memory Systems
- RAG Patterns
- LLM Integrations
- Callbacks & Monitoring
- Retrieval Strategies
- Streaming
- Error Handling
- Production Best Practices
Core Concepts
LangChain Expression Language (LCEL)
LCEL is the declarative way to compose chains in LangChain, enabling streaming, async, and parallel execution.
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# Basic LCEL chain
prompt = ChatPromptTemplate.from_template("Tell me about {topic}")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
output_parser = StrOutputParser()
chain = prompt | llm | output_parser
result = chain.invoke({"topic": "quantum computing"})
Runnable Interface
Every component in LangChain implements the Runnable interface with standard methods:
from langchain_core.runnables import RunnablePassthrough
# Key methods: invoke, stream, batch, ainvoke, astream, abatch
chain = prompt | llm | output_parser
# Synchronous invoke
result = chain.invoke({"topic": "AI"})
# Streaming
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
# Batch processing
results = chain.batch([{"topic": "AI"}, {"topic": "ML"}])
# Async variants
result = await chain.ainvoke({"topic": "AI"})
RunnablePassthrough
Pass inputs directly through or apply transformations:
from langchain_core.runnables import RunnablePassthrough
# Pass through unchanged
chain = RunnablePassthrough() | llm | output_parser
# With transformation
def add_context(x):
return {"text": x["input"], "context": "important"}
chain = RunnablePassthrough.assign(processed=add_context) | llm
Chains
Sequential Chains
Process data through multiple steps sequentially.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0)
# Step 1: Generate ideas
idea_prompt = ChatPromptTemplate.from_template(
"Generate 3 creative ideas for: {topic}"
)
idea_chain = idea_prompt | llm | StrOutputParser()
# Step 2: Evaluate ideas
eval_prompt = ChatPromptTemplate.from_template(
"Evaluate these ideas and pick the best one:\n{ideas}"
)
eval_chain = eval_prompt | llm | StrOutputParser()
# Combine into sequential chain
sequential_chain = (
{"ideas": idea_chain}
| RunnablePassthrough.assign(evaluation=eval_chain)
)
result = sequential_chain.invoke({"topic": "mobile app"})
Map-Reduce Chains
Process multiple inputs in parallel and combine results.
from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
# Define parallel processing
summary_prompt = ChatPromptTemplate.from_template(
"Summarize this text in one sentence: {text}"
)
keywords_prompt = ChatPromptTemplate.from_template(
"Extract 3 keywords from: {text}"
)
sentiment_prompt = ChatPromptTemplate.from_template(
"Analyze sentiment (positive/negative/neutral): {text}"
)
# Map: Process in parallel
map_chain = RunnableParallel(
summary=summary_prompt | llm | StrOutputParser(),
keywords=keywords_prompt | llm | StrOutputParser(),
sentiment=sentiment_prompt | llm | StrOutputParser()
)
# Reduce: Combine results
reduce_prompt = ChatPromptTemplate.from_template(
"""Combine the analysis:
Summary: {summary}
Keywords: {keywords}
Sentiment: {sentiment}
Provide a comprehensive report:"""
)
map_reduce_chain = map_chain | reduce_prompt | llm | StrOutputParser()
result = map_reduce_chain.invoke({
"text": "LangChain is an amazing framework for building LLM applications."
})
Router Chains
Route inputs to different chains based on conditions.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Define specialized chains
technical_prompt = ChatPromptTemplate.from_template(
"Provide a technical explanation of: {query}"
)
simple_prompt = ChatPromptTemplate.from_template(
"Explain in simple terms: {query}"
)
technical_chain = technical_prompt | llm | StrOutputParser()
simple_chain = simple_prompt | llm | StrOutputParser()
# Router function
def route_query(input_dict):
query = input_dict["query"]
complexity = input_dict.get("complexity", "simple")
if complexity == "technical":
return technical_chain
return simple_chain
# Create router chain
from langchain_core.runnables import RunnableLambda
router_chain = RunnableLambda(route_query)
# Use the router
result = router_chain.invoke({
"query": "quantum entanglement",
"complexity": "technical"
})
Conditional Chains
Execute chains based on conditions.
from langchain_core.runnables import RunnableBranch
# Define condition-based routing
classification_prompt = ChatPromptTemplate.from_template(
"Classify this as 'question', 'statement', or 'command': {text}"
)
question_handler = ChatPromptTemplate.from_template(
"Answer this question: {text}"
) | llm | StrOutputParser()
statement_handler = ChatPromptTemplate.from_template(
"Acknowledge this statement: {text}"
) | llm | StrOutputParser()
command_handler = ChatPromptTemplate.from_template(
"Execute this command: {text}"
) | llm | StrOutputParser()
# Create conditional branch
branch = RunnableBranch(
(lambda x: "question" in x["type"].lower(), question_handler),
(lambda x: "statement" in x["type"].lower(), statement_handler),
command_handler # default
)
# Full chain with classification
full_chain = (
{"text": RunnablePassthrough(), "type": classification_prompt | llm | StrOutputParser()}
| branch
)
LLMChain (Legacy)
Traditional chain format still supported:
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
prompt = PromptTemplate(
input_variables=["product"],
template="What is a good name for a company that makes {product}?"
)
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(product="eco-friendly water bottles")
Stuff Documents Chain
Combine documents into a single context:
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.documents import Document
prompt = ChatPromptTemplate.from_template(
"""Answer based on the following context:
<context>
{context}
</context>
Question: {input}"""
)
document_chain = create_stuff_documents_chain(llm, prompt)
docs = [
Document(page_content="LangChain supports multiple LLM providers."),
Document(page_content="Chains can be composed using LCEL.")
]
result = document_chain.invoke({
"input": "What does LangChain support?",
"context": docs
})
Agents
ReAct Agents
Reasoning and Acting agents that use tools iteratively.
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import Tool
from langchain import hub
# Define tools
def search_tool(query: str) -> str:
"""Search for information"""
return f"Search results for: {query}"
def calculator_tool(expression: str) -> str:
"""Calculate mathematical expressions"""
try:
return str(eval(expression))
except:
return "Invalid expression"
tools = [
Tool(
name="Search",
func=search_tool,
description="Useful for searching information"
),
Tool(
name="Calculator",
func=calculator_tool,
description="Useful for math calculations"
)
]
# Create ReAct agent
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5
)
result = agent_executor.invoke({
"input": "What is 25 * 4, and then search for that number's significance"
})
LangGraph ReAct Agent
Modern approach using LangGraph for better control:
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
@tool
def retrieve(query: str) -> str:
"""Retrieve relevant information from the knowledge base"""
# Your retrieval logic here
return f"Retrieved information for: {query}"
@tool
def analyze(text: str) -> str:
"""Analyze text and provide insights"""
return f"Analysis of: {text}"
# Create agent with memory
memory = MemorySaver()
agent_executor = create_react_agent(
llm,
[retrieve, analyze],
checkpointer=memory
)
# Use with configuration
config = {"configurable": {"thread_id": "abc123"}}
for chunk in agent_executor.stream(
{"messages": [("user", "Find information about LangChain")]},
config=config
):
print(chunk)
Conversational ReAct Agent
Agent with built-in conversation memory:
from langchain.agents import create_conversational_retrieval_agent
from langchain_core.tools import Tool
tools = [
Tool(
name="Knowledge Base",
func=lambda q: f"KB result: {q}",
description="Search the knowledge base"
)
]
conversational_agent = create_conversational_retrieval_agent(
llm,
tools,
verbose=True
)
# Maintains conversation context
result1 = conversational_agent.invoke({
"input": "What is LangChain?"
})
result2 = conversational_agent.invoke({
"input": "Tell me more about its features"
})
Zero-Shot React Agent
Agent that works without examples:
from langchain.agents import AgentType, initialize_agent, load_tools
# Load pre-built tools
tools = load_tools(["serpapi", "llm-math"], llm=llm)
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
max_iterations=3
)
result = agent.run(
"What is the population of Tokyo and what is that number divided by 2?"
)
Structured Chat Agent
Agent that uses structured input/output:
from langchain.agents import create_structured_chat_agent
# Define tools with structured schemas
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="The search query")
max_results: int = Field(default=5, description="Maximum results")
@tool(args_schema=SearchInput)
def structured_search(query: str, max_results: int = 5) -> str:
"""Search with structured parameters"""
return f"Found {max_results} results for: {query}"
tools = [structured_search]
prompt = hub.pull("hwchase17/structured-chat-agent")
agent = create_structured_chat_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
Tool Calling Agent
Modern agent using native tool calling:
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b
@tool
def search_database(query: str, limit: int = 10) -> str:
"""Search the database"""
return f"Found {limit} results for {query}"
# Bind tools to LLM
llm_with_tools = llm.bind_tools([multiply, search_database])
# Create simple tool chain
from operator import itemgetter
tool_chain = llm_with_tools | (lambda x: x.tool_calls[0]["args"]) | multiply
result = tool_chain.invoke("What's four times 23")
Memory Systems
ConversationBufferMemory
Store complete conversation history:
from langchain.memory import ConversationBufferMemory
from langchain.chains import LLMChain
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("placeholder", "{chat_history}"),
("human", "{input}")
])
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
# Conversation is automatically stored
response1 = chain.run(input="Hi, I'm Alice")
response2 = chain.run(input="What's my name?") # Will remember Alice
ConversationBufferWindowMemory
Keep only recent K interactions:
from langchain.memory import ConversationBufferWindowMemory
memory = ConversationBufferWindowMemory(
k=5, # Keep last 5 interactions
memory_key="chat_history",
return_messages=True
)
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
ConversationSummaryMemory
Summarize conversation history:
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory(
llm=llm,
memory_key="chat_history",
return_messages=True
)
chain = LLMChain(llm=llm, prompt=prompt, memory=memory)
# Long conversations are automatically summarized
for i in range(20):
chain.run(input=f"Tell me fact {i} about AI")
ConversationSummaryBufferMemory
Hybrid approach: recent messages + summary:
from langchain.memory import ConversationSummaryBufferMemory
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=100, # When to trigger summarization
memory_key="chat_history",
return_messages=True
)
Vector Store Memory
Semantic search over conversation history:
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts([], embeddings)
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
# Save context
memory.save_context(
{"input": "My favorite color is blue"},
{"output": "That's great!"}
)
# Retrieve relevant context
relevant = memory.load_memory_variables({"input": "What's my favorite color?"})
Recall Memories (LangGraph)
Structured memory with save and search:
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_core.tools import tool
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
@tool
def save_recall_memory(memory: str) -> str:
"""Save important information to long-term memory"""
recall_vector_store.add_texts([memory])
return f"Saved memory: {memory}"
@tool
def search_recall_memories(query: str) -> str:
"""Search long-term memories"""
docs = recall_vector_store.similarity_search(query, k=3)
return "\n".join([doc.page_content for doc in docs])
# Use with agent
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(
llm,
[save_recall_memory, search_recall_memories]
)
Custom Memory with LangGraph State
Define custom state for memory:
from typing import List
from langgraph.graph import MessagesState, StateGraph, START, END
class State(MessagesState):
recall_memories: List[str]
def load_memories(state: State):
"""Load relevant memories before agent processes input"""
messages = state["messages"]
last_message = messages[-1].content if messages else ""
# Search for relevant memories
docs = recall_vector_store.similarity_search(last_message, k=3)
memories = [doc.page_content for doc in docs]
return {"recall_memories": memories}
# Add to graph
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_edge(START, "load_memories")
RAG Patterns
Basic RAG Chain
Fundamental retrieval-augmented generation:
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
# Setup vector store
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(
[
"LangChain supports multiple LLM providers including OpenAI, Anthropic, and more.",
"Chains can be composed using LangChain Expression Language (LCEL).",
"Agents can use tools to interact with external systems."
],
embedding=embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# RAG prompt
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# Build RAG chain
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
result = rag_chain.invoke("What does LangChain support?")
RAG with Retrieval Chain
Using built-in retrieval chain constructor:
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
prompt = ChatPromptTemplate.from_template(
"""Answer based on the context:
<context>
{context}
</context>
Question: {input}"""
)
document_chain = create_stuff_documents_chain(llm, prompt)
retrieval_chain = create_retrieval_chain(retriever, document_chain)
response = retrieval_chain.invoke({
"input": "What is LCEL?"
})
# Returns: {"input": "...", "context": [...], "answer": "..."}
RAG with Chat History
Conversational RAG with context:
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder
contextualize_prompt = ChatPromptTemplate.from_messages([
("system", "Given a chat history and the latest user question, "
"formulate a standalone question which can be understood "
"without the chat history."),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
history_aware_retriever = create_history_aware_retriever(
llm,
retriever,
contextualize_prompt
)
# Use in RAG chain
qa_chain = create_retrieval_chain(
history_aware_retriever,
document_chain
)
# First question
result1 = qa_chain.invoke({
"input": "What is LangChain?",
"chat_history": []
})
# Follow-up with context
result2 = qa_chain.invoke({
"input": "What are its main features?",
"chat_history": [
("human", "What is LangChain?"),
("ai", result1["answer"])
]
})
Multi-Query RAG
Generate multiple search queries for better retrieval:
from langchain.retrievers.multi_query import MultiQueryRetriever
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(),
llm=llm
)
# Automatically generates multiple query variations
rag_chain = (
{"context": multi_query_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
RAG with Reranking
Improve relevance with reranking:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerank
# Setup reranker
compressor = FlashrankRerank()
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)
# Use in RAG chain
rag_chain = (
{"context": compression_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
Parent Document Retrieval
Retrieve larger parent documents for full context:
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Storage for parent documents
store = InMemoryStore()
# Splitters
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
parent_retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# Add documents
parent_retriever.add_documents(documents)
Self-Query Retrieval
Natural language to structured queries:
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
metadata_field_info = [
AttributeInfo(
name="source",
description="The document source",
type="string",
),
AttributeInfo(
name="page",
description="The page number",
type="integer",
),
]
document_content_description = "Technical documentation"
self_query_retriever = SelfQueryRetriever.from_llm(
llm,
vectorstore,
document_content_description,
metadata_field_info,
)
LLM Integrations
OpenAI Integration
from langchain_openai import ChatOpenAI, OpenAI
# Chat model
chat_model = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
max_tokens=500,
api_key="your-api-key"
)
# Completion model
completion_model = OpenAI(
model="gpt-3.5-turbo-instruct",
temperature=0.9
)
Anthropic Claude Integration
from langchain_anthropic import ChatAnthropic
claude = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
temperature=0,
max_tokens=1024,
api_key="your-api-key"
)
HuggingFace Integration
from langchain_huggingface import HuggingFaceEndpoint
llm = HuggingFaceEndpoint(
repo_id="meta-llama/Llama-2-7b-chat-hf",
huggingfacehub_api_token="your-token",
task="text-generation",
temperature=0.7
)
Google Vertex AI Integration
from langchain_google_vertexai import ChatVertexAI, VertexAI
# Chat model
chat_model = ChatVertexAI(
model_name="chat-bison",
temperature=0
)
# Completion model
completion_model = VertexAI(
model_name="gemini-1.0-pro-002"
)
Ollama Local Models
from langchain_community.llms import Ollama
llm = Ollama(
model="llama2",
temperature=0.8
)
Binding Tools to LLMs
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers together"""
return a * b
# Bind tools to model
llm_with_tools = llm.bind_tools([multiply])
# Model will return tool calls
response = llm_with_tools.invoke("What is 3 times 4?")
print(response.tool_calls)
Callbacks & Monitoring
Standard Callbacks
Track chain execution:
from langchain_core.callbacks import StdOutCallbackHandler
from langchain.callbacks import get_openai_callback
# Standard output callback
callbacks = [StdOutCallbackHandler()]
chain = prompt | llm | StrOutputParser()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": callbacks}
)
# OpenAI cost tracking
with get_openai_callback() as cb:
result = chain.invoke({"topic": "AI"})
print(f"Total Tokens: {cb.total_tokens}")
print(f"Total Cost: ${cb.total_cost}")
Custom Callbacks
Create custom callback handlers:
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict
class MyCustomCallback(BaseCallbackHandler):
def on_llm_start(self, serialized: Dict[str, Any], prompts: list[str], **kwargs):
print(f"LLM started with prompts: {prompts}")
def on_llm_end(self, response, **kwargs):
print(f"LLM finished with response: {response}")
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs):
print(f"Chain started with inputs: {inputs}")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs):
print(f"Chain ended with outputs: {outputs}")
def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs):
print(f"Tool started with input: {input_str}")
def on_tool_end(self, output: str, **kwargs):
print(f"Tool ended with output: {output}")
# Use custom callback
custom_callback = MyCustomCallback()
result = chain.invoke(
{"topic": "AI"},
config={"callbacks": [custom_callback]}
)
Argilla Callback
Track and log to Argilla:
from langchain_community.callbacks import ArgillaCallbackHandler
argilla_callback = ArgillaCallbackHandler(
dataset_name="langchain-dataset",
api_url="http://localhost:6900",
api_key="your-api-key"
)
callbacks = [argilla_callback]
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
callbacks=callbacks
)
agent.run("Who was the first president of the United States?")
UpTrain Callback
RAG evaluation and monitoring:
from langchain_community.callbacks import UpTrainCallbackHandler
uptrain_callback = UpTrainCallbackHandler(
key_type="uptrain",
api_key="your-api-key"
)
config = {"callbacks": [uptrain_callback]}
# Automatically evaluates context relevance, factual accuracy, completeness
result = rag_chain.invoke("What is LangChain?", config=config)
LangSmith Integration
Production monitoring and debugging:
import os
# Set environment variables
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
# All chains automatically traced
result = chain.invoke({"topic": "AI"})
# View traces at smith.langchain.com
Retrieval Strategies
Vector Store Retrievers
Basic similarity search:
from langchain_community.vectorstores import FAISS, Chroma, Pinecone
# FAISS
faiss_retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
# Maximum Marginal Relevance (MMR)
mmr_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5}
)
# Similarity with threshold
threshold_retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8, "k": 5}
)
Ensemble Retriever
Combine multiple retrievers:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
# BM25 for keyword search
bm25_retriever = BM25Retriever.from_texts(texts)
bm25_retriever.k = 5
# Combine with vector search
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, faiss_retriever],
weights=[0.5, 0.5]
)
docs = ensemble_retriever.get_relevant_documents("LangChain features")
Time-Weighted Retriever
Prioritize recent documents:
from langchain.retrievers import TimeWeightedVectorStoreRetriever
retriever = TimeWeightedVectorStoreRetriever(
vectorstore=vectorstore,
decay_rate=0.01, # Decay factor for older docs
k=5
)
Multi-Vector Retriever
Multiple vectors per document:
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryByteStore
store = InMemoryByteStore()
retriever = MultiVectorRetriever(
vectorstore=vectorstore,
byte_store=store,
id_key="doc_id"
)
# Add documents with multiple representations
retriever.add_documents(documents)
Streaming
Stream Chain Output
Stream tokens as they're generated:
from langchain_core.output_parsers import StrOutputParser
chain = prompt | llm | StrOutputParser()
# Stream method
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
Stream with Callbacks
Handle streaming events:
from langchain_core.callbacks import StreamingStdOutCallbackHandler
streaming_llm = ChatOpenAI(
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
chain = prompt | streaming_llm | StrOutputParser()
result = chain.invoke({"topic": "AI"}) # Streams to stdout
Async Streaming
Stream asynchronously:
async def stream_async():
async for chunk in chain.astream({"topic": "AI"}):
print(chunk, end="", flush=True)
# Run async
import asyncio
asyncio.run(stream_async())
Stream Agent Responses
Stream agent execution:
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(llm, tools)
for chunk in agent.stream(
{"messages": [("user", "Search for LangChain information")]},
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
Streaming RAG
Stream RAG responses:
retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| llm
| StrOutputParser()
)
# Stream the response
for chunk in retrieval_chain.stream("What is LangChain?"):
print(chunk, end="", flush=True)
Error Handling
Retry Logic
Automatic retries on failure:
from langchain_core.runnables import RunnableRetry
# Add retry to chain
chain_with_retry = (prompt | llm | StrOutputParser()).with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
result = chain_with_retry.invoke({"topic": "AI"})
Fallback Chains
Use fallback on errors:
from langchain_core.runnables import RunnableWithFallbacks
primary_llm = ChatOpenAI(model="gpt-4")
fallback_llm = ChatOpenAI(model="gpt-3.5-turbo")
chain_with_fallback = (prompt | primary_llm).with_fallbacks(
[prompt | fallback_llm]
)
result = chain_with_fallback.invoke({"topic": "AI"})
Try-Except Patterns
Manual error handling:
from langchain_core.exceptions import OutputParserException
try:
result = chain.invoke({"topic": "AI"})
except OutputParserException as e:
print(f"Parsing failed: {e}")
result = chain.invoke({"topic": "AI"}) # Retry
except Exception as e:
print(f"Chain execution failed: {e}")
result = None
Timeout Handling
Set execution timeouts:
from langchain_core.runnables import RunnableConfig
config = RunnableConfig(timeout=10.0) # 10 seconds
try:
result = chain.invoke({"topic": "AI"}, config=config)
except TimeoutError:
print("Chain execution timed out")
Validation
Validate inputs and outputs:
from pydantic import BaseModel, Field, validator
class QueryInput(BaseModel):
topic: str = Field(..., min_length=1, max_length=100)
@validator("topic")
def topic_must_be_valid(cls, v):
if not v.strip():
raise ValueError("Topic cannot be empty")
return v.strip()
# Use with chain
def validate_and_invoke(topic: str):
try:
validated = QueryInput(topic=topic)
return chain.invoke({"topic": validated.topic})
except ValueError as e:
return f"Validation error: {e}"
Production Best Practices
Environment Configuration
Manage secrets securely:
import os
from dotenv import load_dotenv
load_dotenv()
# Use environment variables
llm = ChatOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
model=os.getenv("MODEL_NAME", "gpt-4o-mini")
)
# Vector store configuration
VECTOR_STORE_TYPE = os.getenv("VECTOR_STORE", "faiss")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
Caching
Cache LLM responses:
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
# In-memory cache
set_llm_cache(InMemoryCache())
# Persistent cache
set_llm_cache(SQLiteCache(database_path=".langchain.db"))
# Responses are cached automatically
result1 = llm.invoke("What is AI?") # Calls API
result2 = llm.invoke("What is AI?") # Uses cache
Rate Limiting
Control API usage:
from langchain_core.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter(
requests_per_second=1,
check_every_n_seconds=0.1,
max_bucket_size=10
)
llm = ChatOpenAI(rate_limiter=rate_limiter)
Batch Processing
Process multiple inputs efficiently:
# Batch invoke
inputs = [{"topic": f"Topic {i}"} for i in range(10)]
results = chain.batch(inputs, config={"max_concurrency": 5})
# Async batch
async def batch_process():
results = await chain.abatch(inputs)
return results
Monitoring and Logging
Production monitoring:
import logging
from langchain_core.callbacks import BaseCallbackHandler
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProductionCallback(BaseCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
logger.info(f"Chain started: {serialized.get('name', 'unknown')}")
def on_chain_end(self, outputs, **kwargs):
logger.info(f"Chain completed successfully")
def on_chain_error(self, error, **kwargs):
logger.error(f"Chain error: {error}")
# Use in production
production_callback = ProductionCallback()
config = {"callbacks": [production_callback]}
Testing Chains
Unit test your chains:
import pytest
from langchain_core.messages import HumanMessage, AIMessage
def test_basic_chain():
chain = prompt | llm | StrOutputParser()
result = chain.invoke({"topic": "testing"})
assert isinstance(result, str)
assert len(result) > 0
def test_rag_chain():
result = rag_chain.invoke("What is LangChain?")
assert "LangChain" in result
assert len(result) > 50
@pytest.mark.asyncio
async def test_async_chain():
result = await chain.ainvoke({"topic": "async"})
assert isinstance(result, str)
Performance Optimization
Optimize chain execution:
# Use appropriate chunk sizes for text splitting
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
# Limit retrieval results
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# Use smaller, faster models where appropriate
fast_llm = ChatOpenAI(model="gpt-4o-mini")
# Enable streaming for better UX
streaming_chain = prompt | fast_llm.with_streaming() | StrOutputParser()
Documentation
Document your chains:
from langchain_core.runnables import RunnableConfig
class DocumentedChain:
"""
Production RAG chain for technical documentation.
Features:
- Multi-query retrieval for better coverage
- Reranking for improved relevance
- Streaming support
- Error handling with fallbacks
Usage:
chain = DocumentedChain()
result = chain.invoke("Your question here")
"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o-mini")
self.retriever = self._setup_retriever()
self.chain = self._build_chain()
def _setup_retriever(self):
# Setup logic
pass
def _build_chain(self):
# Chain construction
pass
def invoke(self, query: str, config: RunnableConfig = None):
"""Execute the chain with error handling"""
try:
return self.chain.invoke(query, config=config)
except Exception as e:
logger.error(f"Chain execution failed: {e}")
raise
Summary
This skill covers comprehensive LangChain orchestration patterns:
- Chains: Sequential, map-reduce, router, conditional chains
- Agents: ReAct, conversational, zero-shot, structured agents
- Memory: Buffer, window, summary, vector store memory
- RAG: Basic, multi-query, reranking, parent document retrieval
- LLM Integration: OpenAI, Anthropic, HuggingFace, Vertex AI, Ollama
- Callbacks: Standard, custom, Argilla, UpTrain, LangSmith
- Retrieval: Vector store, ensemble, time-weighted, multi-vector
- Streaming: Chain, agent, async streaming
- Error Handling: Retry, fallback, timeout, validation
- Production: Configuration, caching, rate limiting, monitoring, testing
For more examples and patterns, see EXAMPLES.md.