| name | rag-implementation |
| description | Comprehensive guide to implementing RAG systems including vector database selection, chunking strategies, embedding models, and retrieval optimization. Use when building RAG systems, implementing semantic search, optimizing retrieval quality, or debugging RAG performance issues. |
RAG Implementation Patterns
Comprehensive guide to implementing Retrieval-Augmented Generation (RAG) systems including vector database selection, chunking strategies, embedding models, retrieval optimization, and production deployment patterns.
Quick Reference
When to use this skill:
- Building RAG/semantic search systems
- Implementing document retrieval pipelines
- Optimizing vector database performance
- Debugging retrieval quality issues
- Choosing between vector database options
- Designing chunking strategies
- Implementing hybrid search
Technologies covered:
- Vector DBs: Qdrant, Pinecone, Chroma, Weaviate, Milvus
- Embeddings: OpenAI, Sentence Transformers, Cohere
- Frameworks: LangChain, LlamaIndex, Haystack
Part 1: Vector Database Selection
Database Comparison Matrix
| Database | Best For | Deployment | Performance | Cost |
|---|---|---|---|---|
| Qdrant | Self-hosted, production | Docker/K8s | Excellent (Rust) | Free (self-host) |
| Pinecone | Managed, rapid prototyping | Cloud | Excellent | Pay-per-use |
| Chroma | Local development, embedded | In-process | Good (Python) | Free |
| Weaviate | Complex schemas, GraphQL | Docker/Cloud | Excellent (Go) | Free + Cloud |
| Milvus | Large-scale, distributed | K8s | Excellent (C++) | Free (self-host) |
Qdrant Setup (Recommended for Production)
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
# Initialize client (local or cloud)
client = QdrantClient(url="http://localhost:6333") # or cloud URL
# Create collection
client.create_collection(
collection_name="documents",
vectors_config=VectorParams(
size=1536, # OpenAI text-embedding-3-small dimension
distance=Distance.COSINE # or DOT, EUCLID
)
)
# Insert vectors with payload
client.upsert(
collection_name="documents",
points=[
PointStruct(
id=1,
vector=[0.1, 0.2, ...], # 1536 dimensions
payload={
"text": "Document content",
"source": "doc.pdf",
"page": 1,
"metadata": {...}
}
)
]
)
# Search
results = client.search(
collection_name="documents",
query_vector=[0.1, 0.2, ...],
limit=5,
score_threshold=0.7 # Minimum similarity
)
Pinecone Setup (Managed Service)
from pinecone import Pinecone, ServerlessSpec
# Initialize
pc = Pinecone(api_key="your-key")
# Create index
pc.create_index(
name="documents",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
# Get index
index = pc.Index("documents")
# Upsert vectors
index.upsert(vectors=[
("doc1", [0.1, 0.2, ...], {"text": "...", "source": "..."})
])
# Query
results = index.query(
vector=[0.1, 0.2, ...],
top_k=5,
include_metadata=True
)
Part 2: Chunking Strategies
Strategy 1: Fixed-Size Chunking (Simple, Fast)
def fixed_size_chunking(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
"""
Split text into fixed-size chunks with overlap.
Pros: Simple, predictable chunk sizes
Cons: May break mid-sentence, poor semantic boundaries
"""
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk = ' '.join(words[i:i + chunk_size])
chunks.append(chunk)
return chunks
# Usage
chunks = fixed_size_chunking(document, chunk_size=512, overlap=50)
When to use:
- Simple documents (logs, transcripts)
- Prototyping/MVP
- Consistent token budgets needed
Strategy 2: Semantic Chunking (Better Quality)
from langchain.text_splitter import RecursiveCharacterTextSplitter
def semantic_chunking(text: str, chunk_size: int = 1000, overlap: int = 200) -> list[str]:
"""
Split on semantic boundaries (paragraphs, sentences).
Pros: Preserves meaning, better retrieval quality
Cons: Variable chunk sizes, slower processing
"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap,
separators=["\n\n", "\n", ". ", " ", ""], # Priority order
length_function=len
)
return splitter.split_text(text)
# Usage
chunks = semantic_chunking(document, chunk_size=1000, overlap=200)
When to use:
- Long-form documents (articles, books, reports)
- Quality > speed
- Natural language content
Strategy 3: Hierarchical Chunking (Best for Structured Docs)
def hierarchical_chunking(document: dict) -> list[dict]:
"""
Chunk based on document structure (sections, subsections).
Pros: Preserves hierarchy, enables parent-child retrieval
Cons: Requires structured input, more complex
"""
chunks = []
for section in document['sections']:
# Parent chunk (section summary)
chunks.append({
'text': section['title'] + '\n' + section['summary'],
'type': 'parent',
'section_id': section['id']
})
# Child chunks (paragraphs)
for para in section['paragraphs']:
chunks.append({
'text': para,
'type': 'child',
'parent_id': section['id']
})
return chunks
When to use:
- Technical documentation
- Books with TOC
- Legal documents
- Need to preserve context hierarchy
Strategy 4: Sliding Window (Maximum Context Preservation)
def sliding_window_chunking(text: str, window_size: int = 512, stride: int = 256) -> list[str]:
"""
Overlapping windows for maximum context.
Pros: No information loss at boundaries
Cons: Storage overhead (duplicate content)
"""
words = text.split()
chunks = []
for i in range(0, len(words) - window_size + 1, stride):
chunk = ' '.join(words[i:i + window_size])
chunks.append(chunk)
return chunks
When to use:
- Critical retrieval accuracy needed
- Short queries need broader context
- Storage cost not a concern
Part 3: Embedding Models
Model Selection Guide
| Model | Dimensions | Speed | Quality | Cost | Use Case |
|---|---|---|---|---|---|
| OpenAI text-embedding-3-small | 1536 | Fast | Excellent | $0.02/1M tokens | Production, general purpose |
| OpenAI text-embedding-3-large | 3072 | Medium | Best | $0.13/1M tokens | High-quality retrieval |
| all-MiniLM-L6-v2 | 384 | Very fast | Good | Free | Self-hosted, prototyping |
| all-mpnet-base-v2 | 768 | Fast | Very good | Free | Self-hosted, quality |
| Cohere embed-english-v3.0 | 1024 | Fast | Excellent | $0.10/1M tokens | Semantic search focus |
OpenAI Embeddings (Recommended)
from openai import OpenAI
client = OpenAI(api_key="your-key")
def get_embeddings(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
"""
Generate embeddings using OpenAI.
Batch size: Up to 2048 inputs per request
Rate limits: Check tier limits
"""
response = client.embeddings.create(
model=model,
input=texts
)
return [item.embedding for item in response.data]
# Usage
chunks = ["chunk 1", "chunk 2", ...]
embeddings = get_embeddings(chunks)
Sentence Transformers (Self-Hosted)
from sentence_transformers import SentenceTransformer
# Load model (cached after first download)
model = SentenceTransformer('all-MiniLM-L6-v2')
def get_embeddings_local(texts: list[str]) -> list[list[float]]:
"""
Generate embeddings locally (no API costs).
GPU recommended for batches > 100
CPU acceptable for small batches
"""
return model.encode(texts, show_progress_bar=True).tolist()
# Usage
embeddings = get_embeddings_local(chunks)
Part 4: Retrieval Optimization
Technique 1: Hybrid Search (Dense + Sparse)
from qdrant_client.models import Filter, FieldCondition, MatchValue
def hybrid_search(query: str, query_vector: list[float], top_k: int = 10):
"""
Combine dense (vector) and sparse (keyword) search.
Dense: Semantic similarity
Sparse: Exact keyword matches
"""
# Dense search
dense_results = client.search(
collection_name="documents",
query_vector=query_vector,
limit=top_k * 2 # Get more candidates
)
# Sparse search (BM25 via metadata)
sparse_results = client.search(
collection_name="documents",
query_filter=Filter(
must=[
FieldCondition(
key="text",
match=MatchValue(value=query)
)
]
),
limit=top_k * 2
)
# Merge and re-rank
combined = merge_results(dense_results, sparse_results, weights=(0.7, 0.3))
return combined[:top_k]
Technique 2: Query Expansion
def expand_query(query: str) -> list[str]:
"""
Generate query variations for better recall.
Techniques:
- Synonym expansion
- Question reformulation
- Entity extraction
"""
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{
"role": "system",
"content": "Generate 3 alternative phrasings of the user's query."
}, {
"role": "user",
"content": query
}]
)
expanded = [query] + response.choices[0].message.content.split('\n')
return expanded
# Usage
queries = expand_query("How to train neural networks?")
# → ["How to train neural networks?",
# "What are neural network training techniques?",
# "Neural network optimization methods",
# "Deep learning model training"]
Technique 3: Reranking
from sentence_transformers import CrossEncoder
# Load cross-encoder (better than bi-encoder for reranking)
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
def rerank_results(query: str, results: list[dict], top_k: int = 5) -> list[dict]:
"""
Rerank initial results using cross-encoder.
More accurate but slower than initial retrieval
Use on top 20-50 candidates only
"""
# Score each query-document pair
pairs = [(query, result['text']) for result in results]
scores = reranker.predict(pairs)
# Combine scores with results
for result, score in zip(results, scores):
result['rerank_score'] = float(score)
# Sort and return top_k
reranked = sorted(results, key=lambda x: x['rerank_score'], reverse=True)
return reranked[:top_k]
Technique 4: Metadata Filtering
def filtered_search(
query_vector: list[float],
filters: dict,
top_k: int = 5
):
"""
Filter search by metadata (date, category, author, etc.)
Pre-filter: Faster but may miss results
Post-filter: More results but slower
"""
from qdrant_client.models import Filter, FieldCondition, Range
# Build filter conditions
conditions = []
if 'date_range' in filters:
conditions.append(
FieldCondition(
key="date",
range=Range(
gte=filters['date_range']['start'],
lte=filters['date_range']['end']
)
)
)
if 'category' in filters:
conditions.append(
FieldCondition(
key="category",
match=MatchValue(value=filters['category'])
)
)
# Search with filters
results = client.search(
collection_name="documents",
query_vector=query_vector,
query_filter=Filter(must=conditions) if conditions else None,
limit=top_k
)
return results
Part 5: Context Management
Pattern 1: Retrieved Context Optimization
def optimize_context(query: str, retrieved_docs: list[str], max_tokens: int = 4000) -> str:
"""
Optimize retrieved context to fit within LLM context window.
Strategies:
1. Relevance-based truncation
2. Extractive summarization
3. Overlap removal
"""
# Sort by relevance
sorted_docs = sorted(retrieved_docs, key=lambda d: d['score'], reverse=True)
# Build context within token budget
context_parts = []
total_tokens = 0
for doc in sorted_docs:
doc_tokens = estimate_tokens(doc['text'])
if total_tokens + doc_tokens <= max_tokens:
context_parts.append(f"[Source: {doc['source']}]\n{doc['text']}")
total_tokens += doc_tokens
else:
# Truncate last document to fit
remaining = max_tokens - total_tokens
truncated = truncate_to_tokens(doc['text'], remaining)
context_parts.append(f"[Source: {doc['source']}]\n{truncated}")
break
return "\n\n---\n\n".join(context_parts)
Pattern 2: Citation Tracking
def generate_with_citations(query: str, context: str, sources: list[dict]) -> dict:
"""
Generate answer with citation tracking.
Returns:
- answer: Generated text
- citations: List of source documents used
"""
from openai import OpenAI
client = OpenAI()
# Create source map
source_map = {i+1: source for i, source in enumerate(sources)}
numbered_context = "\n\n".join([
f"[{i+1}] {source['text']}"
for i, source in enumerate(sources)
])
response = client.chat.completions.create(
model="gpt-4",
messages=[{
"role": "system",
"content": "Answer using the provided sources. Cite sources as [1], [2], etc."
}, {
"role": "user",
"content": f"Context:\n{numbered_context}\n\nQuestion: {query}"
}]
)
answer = response.choices[0].message.content
# Extract citations from answer
import re
cited_nums = set(map(int, re.findall(r'\[(\d+)\]', answer)))
cited_sources = [source_map[num] for num in cited_nums if num in source_map]
return {
'answer': answer,
'citations': cited_sources,
'num_sources_used': len(cited_sources)
}
Part 6: Production Best Practices
Caching Strategy
from functools import lru_cache
import hashlib
class EmbeddingCache:
"""Cache embeddings to avoid recomputation."""
def __init__(self, cache_size: int = 10000):
self.cache = {}
self.max_size = cache_size
def get_or_compute(self, text: str, embed_fn) -> list[float]:
# Create cache key
key = hashlib.sha256(text.encode()).hexdigest()
if key in self.cache:
return self.cache[key]
# Compute and cache
embedding = embed_fn(text)
if len(self.cache) >= self.max_size:
# Evict oldest (FIFO)
self.cache.pop(next(iter(self.cache)))
self.cache[key] = embedding
return embedding
# Usage
cache = EmbeddingCache()
embedding = cache.get_or_compute(text, lambda t: get_embeddings([t])[0])
Async Processing
import asyncio
from typing import List
async def process_documents_async(documents: List[str], batch_size: int = 100):
"""
Process large document sets asynchronously.
Benefits:
- 10-50x faster for I/O-bound operations
- Better resource utilization
- Scalable to millions of documents
"""
async def process_batch(batch):
embeddings = await get_embeddings_async(batch)
await upsert_to_db_async(batch, embeddings)
# Split into batches
batches = [documents[i:i+batch_size] for i in range(0, len(documents), batch_size)]
# Process batches concurrently
await asyncio.gather(*[process_batch(batch) for batch in batches])
# Usage
asyncio.run(process_documents_async(documents))
Monitoring & Observability
import time
from dataclasses import dataclass
from datetime import datetime
@dataclass
class RAGMetrics:
"""Track RAG system performance."""
query_count: int = 0
avg_retrieval_time: float = 0.0
avg_generation_time: float = 0.0
cache_hit_rate: float = 0.0
avg_num_results: float = 0.0
class RAGMonitor:
def __init__(self):
self.metrics = RAGMetrics()
self.query_times = []
def log_query(self, retrieval_time: float, generation_time: float, num_results: int):
self.metrics.query_count += 1
self.query_times.append({
'timestamp': datetime.now(),
'retrieval_time': retrieval_time,
'generation_time': generation_time,
'num_results': num_results
})
# Update averages
self.metrics.avg_retrieval_time = sum(
q['retrieval_time'] for q in self.query_times
) / len(self.query_times)
self.metrics.avg_generation_time = sum(
q['generation_time'] for q in self.query_times
) / len(self.query_times)
def get_metrics(self) -> dict:
return {
'total_queries': self.metrics.query_count,
'avg_retrieval_ms': self.metrics.avg_retrieval_time * 1000,
'avg_generation_ms': self.metrics.avg_generation_time * 1000,
'p95_retrieval_ms': self._percentile([q['retrieval_time'] for q in self.query_times], 95) * 1000
}
Part 7: Common Pitfalls & Solutions
Pitfall 1: Chunk Size Too Small/Large
Problem: Small chunks lack context, large chunks reduce retrieval precision
Solution:
# Experiment with chunk sizes
chunk_sizes = [256, 512, 1024, 2048]
for size in chunk_sizes:
chunks = semantic_chunking(document, chunk_size=size)
# Evaluate retrieval quality
recall = evaluate_retrieval(chunks, test_queries)
print(f"Size {size}: Recall {recall:.2f}")
# Typical sweet spot: 512-1024 tokens
Pitfall 2: Poor Embedding Model Choice
Problem: Model not suited for domain (e.g., code search with general model)
Solution:
# Use domain-specific models
domain_models = {
'code': 'microsoft/codebert-base',
'medical': 'dmis-lab/biobert-v1.1',
'legal': 'nlpaueb/legal-bert-base-uncased',
'general': 'text-embedding-3-small'
}
model = domain_models.get(your_domain, 'text-embedding-3-small')
Pitfall 3: No Query Optimization
Problem: User queries don't match document phrasing
Solution: Implement query expansion + rewriting
def optimize_query(raw_query: str) -> str:
"""Transform user query to better match documents."""
# Example: "how 2 train NN" → "neural network training methods"
# Use LLM to rewrite poorly-formed queries
pass
Pitfall 4: Ignoring Metadata
Problem: Returning irrelevant results due to lack of filtering
Solution: Always store rich metadata
payload = {
'text': chunk,
'source': 'doc.pdf',
'page': 5,
'date': '2024-01-15',
'category': 'engineering',
'author': 'John Doe',
'confidence': 0.95 # Document quality score
}
Quick Decision Trees
"Which vector DB should I use?"
Need managed service?
YES → Pinecone (easy) or Weaviate Cloud
NO → Continue
Need distributed/high-scale?
YES → Milvus or Weaviate
NO → Continue
Self-hosting on Docker?
YES → Qdrant (best performance/features)
NO → Chroma (embedded, simple)
"Which chunking strategy?"
Document type?
Structured (docs, books) → Hierarchical chunking
Unstructured (chat, logs) → Fixed-size chunking
Mixed → Semantic chunking
Quality requirement?
Critical → Sliding window (overlap 50%)
Standard → Semantic (overlap 20%)
Fast/cheap → Fixed-size (overlap 10%)
"Which embedding model?"
Budget?
No limits → text-embedding-3-large
Cost-sensitive → all-mpnet-base-v2 (self-hosted)
Quality requirement?
Best → text-embedding-3-large
Good → text-embedding-3-small or Cohere
Acceptable → all-MiniLM-L6-v2
Example: Complete RAG Pipeline
from qdrant_client import QdrantClient
from openai import OpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
class RAGPipeline:
def __init__(self):
self.qdrant = QdrantClient(url="http://localhost:6333")
self.openai = OpenAI()
self.splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
def ingest_document(self, text: str, metadata: dict):
"""Ingest and index a document."""
# 1. Chunk
chunks = self.splitter.split_text(text)
# 2. Embed
embeddings = self.openai.embeddings.create(
model="text-embedding-3-small",
input=chunks
).data
# 3. Store
points = [
PointStruct(
id=i,
vector=emb.embedding,
payload={'text': chunk, **metadata}
)
for i, (chunk, emb) in enumerate(zip(chunks, embeddings))
]
self.qdrant.upsert(collection_name="docs", points=points)
def query(self, question: str, top_k: int = 5) -> str:
"""Query with RAG."""
# 1. Embed query
query_emb = self.openai.embeddings.create(
model="text-embedding-3-small",
input=[question]
).data[0].embedding
# 2. Retrieve
results = self.qdrant.search(
collection_name="docs",
query_vector=query_emb,
limit=top_k
)
# 3. Build context
context = "\n\n".join([r.payload['text'] for r in results])
# 4. Generate
response = self.openai.chat.completions.create(
model="gpt-4",
messages=[{
"role": "system",
"content": f"Answer based on this context:\n{context}"
}, {
"role": "user",
"content": question
}]
)
return response.choices[0].message.content
# Usage
rag = RAGPipeline()
rag.ingest_document(document_text, {'source': 'manual.pdf'})
answer = rag.query("How do I configure the system?")
Resources
- Qdrant Docs: https://qdrant.tech/documentation/
- Pinecone Docs: https://docs.pinecone.io/
- OpenAI Embeddings: https://platform.openai.com/docs/guides/embeddings
- LangChain RAG: https://python.langchain.com/docs/use_cases/question_answering/
- Sentence Transformers: https://www.sbert.net/
Skill version: 1.0.0 Last updated: 2025-10-25 Maintained by: Applied Artificial Intelligence