| name | rag-pipeline-builder |
| description | Build hybrid RAG pipelines combining vector search, graph traversal, and keyword matching for accurate context retrieval |
RAG Pipeline Builder
Build production-ready hybrid RAG pipelines combining vector, graph, and keyword search for the SRS AI Systems project.
What This Skill Provides
Core Capabilities
- Qdrant Vector Store Integration: Setup, configuration, and optimization for production workloads
- Hybrid Retrieval Strategies: Combine vector similarity, graph traversal, and keyword matching
- Embedding Pipeline: sentence-transformers integration with chunking and batching
- Context Assembly: Intelligent merging of multi-source retrieval results
- Evaluation Framework: Metrics for recall, precision, relevance, and latency
- Abstraction Patterns: Repository and Strategy patterns for easy migration between vector stores
Technical Stack
- Vector Store: Qdrant (with abstraction for Milvus/ChromaDB migration)
- Graph Database: Neo4j for relationship-based retrieval
- Embeddings: sentence-transformers (all-MiniLM-L6-v2, all-mpnet-base-v2)
- Language: Python 3.9+ with type hints
- Patterns: Repository, Strategy, Factory for swappability
When to Use
Perfect For
- Building semantic search over technical documentation (SRS project)
- Implementing multi-hop reasoning with graph relationships
- Creating hybrid retrieval that combines multiple search strategies
- Evaluating and optimizing RAG pipeline performance
- Migrating between vector stores (Qdrant → Milvus → ChromaDB)
Not Ideal For
- Simple keyword search (use Elasticsearch directly)
- Real-time streaming data (add streaming layer first)
- Sub-10ms latency requirements (consider caching layer)
- Non-semantic workloads (structured queries)
Decision Points
Use Vector Search When: Semantic similarity is primary concern (queries like "how to configure error handling")
Add Graph Search When: Relationships matter (queries like "all components connected to authentication service")
Include Keyword Search When: Exact matches needed (queries like "API endpoint /v1/users/create")
Use Hybrid When: Production systems requiring high recall and precision across diverse query types
Quick Start Workflows
Workflow 1: New RAG Pipeline (30 minutes)
# Step 1: Setup vector store
cd scripts/
./setup_vector_store.py --store qdrant --collection srs_docs --dimension 384
# Step 2: Generate embeddings for documents
./generate_embeddings.py --input ../data/srs_docs/ --model all-MiniLM-L6-v2 --chunk-size 512
# Step 3: Test retrieval
./hybrid_retrieval.py --query "authentication flow" --top-k 5 --strategy hybrid
# Step 4: Evaluate performance
./evaluate_rag.py --test-queries queries.json --metrics recall,precision,mrr
Workflow 2: Migrate Vector Store (15 minutes)
# Export from Qdrant
./export_vectors.py --source qdrant --collection srs_docs --output vectors.parquet
# Import to Milvus
./import_vectors.py --target milvus --collection srs_docs --input vectors.parquet
# Validate migration
./validate_migration.py --source qdrant --target milvus --sample-queries 100
Workflow 3: Optimize Retrieval (20 minutes)
# Benchmark different strategies
./benchmark_retrieval.py --strategies vector,graph,keyword,hybrid --iterations 100
# Tune hyperparameters
./tune_hyperparameters.py --param chunk_size --range 256,512,1024 --metric mrr
# A/B test embedding models
./compare_embeddings.py --models all-MiniLM-L6-v2,all-mpnet-base-v2 --test-set queries.json
Decision Trees
Which Embedding Model?
START: What's your priority?
├─ Speed + Low Memory (< 100MB)
│ └─ all-MiniLM-L6-v2 (384 dim, 80MB)
│ Use cases: Real-time search, edge deployment
├─ Balanced Performance
│ └─ all-mpnet-base-v2 (768 dim, 420MB)
│ Use cases: General-purpose RAG, high accuracy needed
├─ Maximum Accuracy
│ └─ e5-large-v2 (1024 dim, 1.3GB)
│ Use cases: Critical retrieval, quality > speed
└─ Domain-Specific (Technical Docs)
└─ Fine-tuned CodeBERT or specialized model
Use cases: Code search, API documentation
Which Retrieval Strategy?
START: Analyze your queries
├─ Mostly semantic similarity ("how to X", "similar to Y")
│ └─ Pure Vector Search (90%+ of queries)
│ Config: top_k=10, score_threshold=0.7
├─ Relationship traversal ("components using X", "dependencies of Y")
│ └─ Graph-First with Vector Fallback
│ Config: neo4j_depth=2, vector_fallback_k=5
├─ Exact term matching ("error code 404", "function name")
│ └─ Keyword-First with Vector Expansion
│ Config: bm25_top_k=20, vector_rerank_k=10
└─ Mixed query types (production systems)
└─ Hybrid with Learned Fusion
Config: vector_weight=0.5, graph_weight=0.3, keyword_weight=0.2
Tune weights using evaluation set
Chunk Size Selection
START: What type of content?
├─ Short-form (FAQ, API docs, error messages)
│ └─ 256-512 tokens, 50 token overlap
│ Reason: Precise matching, low noise
├─ Medium-form (How-to guides, tutorials)
│ └─ 512-1024 tokens, 100 token overlap
│ Reason: Balance context and specificity
├─ Long-form (Architecture docs, specifications)
│ └─ 1024-2048 tokens, 200 token overlap
│ Reason: Preserve document structure
└─ Mixed content types
└─ Semantic chunking (split on headers, sections)
Use: langchain.text_splitter.MarkdownHeaderTextSplitter
Quality Checklist
Before Deployment
- Embedding Coverage: 100% of corpus embedded without errors
- Index Health: Vector store index built and optimized (HNSW/IVF parameters tuned)
- Retrieval Metrics: MRR > 0.7, Recall@10 > 0.85 on test set
- Latency: p95 < 200ms for single query (vector + rerank)
- Fallback Handling: Graceful degradation when vector store unavailable
- Monitoring: Logging for query latency, retrieval scores, cache hit rates
Code Quality
- Type Hints: All functions have complete type annotations
- Docstrings: Google-style docstrings with examples
- Error Handling: Try-except blocks with specific exceptions
- Configuration: Environment variables for secrets (API keys, DB passwords)
- Testing: Unit tests for core functions, integration tests for pipelines
- Abstraction: Repository pattern implemented for vector store swappability
Performance
- Batch Processing: Embeddings generated in batches (32-128 documents)
- Connection Pooling: Reuse connections to vector store and graph DB
- Caching: Cache embeddings for repeated queries
- Async Operations: Use asyncio for concurrent retrieval from multiple sources
- Resource Limits: Memory usage < 2GB per worker, CPU < 80%
Common Pitfalls
1. Poor Chunk Size
Problem: Chunks too large (low precision) or too small (missing context)
Solution:
- Start with 512 tokens, 50 token overlap
- Evaluate on sample queries
- Use semantic chunking for structured docs (Markdown, HTML)
Example:
# Bad: Fixed 1000-char chunks (ignores semantic boundaries)
chunks = [text[i:i+1000] for i in range(0, len(text), 1000)]
# Good: Semantic chunking with overlap
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=50,
separators=["\n\n", "\n", ". ", " ", ""]
)
chunks = splitter.split_text(text)
2. Ignoring Metadata
Problem: Losing context after chunking (which document, section, timestamp)
Solution: Store rich metadata with each chunk
# Store with vector
metadata = {
"source": "srs_architecture.md",
"section": "Authentication Flow",
"chunk_index": 12,
"timestamp": "2025-01-15T10:30:00Z",
"doc_type": "technical_spec"
}
qdrant_client.upsert(collection_name="srs", points=[{
"id": chunk_id,
"vector": embedding,
"payload": metadata
}])
3. No Evaluation Strategy
Problem: Can't measure if changes improve retrieval quality
Solution: Build evaluation dataset early
# Create test set: (query, relevant_doc_ids)
eval_queries = [
{
"query": "How does authentication work?",
"relevant_docs": ["doc_123", "doc_456"],
"relevance_scores": [1.0, 0.8]
}
]
# Calculate metrics
mrr = calculate_mrr(eval_queries, retrieval_results)
recall_at_k = calculate_recall(eval_queries, retrieval_results, k=10)
4. Single Retrieval Strategy
Problem: Pure vector search fails on exact matches, keyword search fails on semantic queries
Solution: Implement hybrid retrieval with fusion
# Reciprocal Rank Fusion
def hybrid_search(query, top_k=10):
vector_results = vector_search(query, k=top_k*2)
keyword_results = bm25_search(query, k=top_k*2)
# RRF fusion
scores = defaultdict(float)
for rank, doc in enumerate(vector_results):
scores[doc.id] += 1.0 / (rank + 60)
for rank, doc in enumerate(keyword_results):
scores[doc.id] += 1.0 / (rank + 60)
return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
5. Hardcoded Vector Store
Problem: Tight coupling to Qdrant makes migration painful
Solution: Use Repository pattern
class VectorStoreRepository(ABC):
@abstractmethod
def upsert(self, vectors: List[Vector]) -> None: pass
@abstractmethod
def search(self, query_vector: List[float], top_k: int) -> List[SearchResult]: pass
# Swap implementations easily
store = QdrantRepository() # or MilvusRepository() or ChromaDBRepository()
store.upsert(vectors)
Pro Tips
1. Cache Query Embeddings
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def get_cached_embedding(query: str) -> List[float]:
return embedding_model.encode(query).tolist()
# Or use Redis for distributed caching
import redis
r = redis.Redis()
def get_query_embedding(query: str) -> List[float]:
cache_key = f"emb:{hashlib.md5(query.encode()).hexdigest()}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
embedding = embedding_model.encode(query).tolist()
r.setex(cache_key, 3600, json.dumps(embedding)) # 1 hour TTL
return embedding
2. Pre-filter with Metadata
# Filter by metadata before vector search (more efficient)
from qdrant_client.models import Filter, FieldCondition, MatchValue
results = qdrant_client.search(
collection_name="srs",
query_vector=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="doc_type",
match=MatchValue(value="technical_spec")
),
FieldCondition(
key="timestamp",
range={"gte": "2024-01-01T00:00:00Z"}
)
]
),
limit=10
)
3. Use Query Expansion
# Expand query with synonyms/related terms before embedding
def expand_query(query: str, expansion_count: int = 3) -> List[str]:
"""Generate query variations for better recall."""
base_terms = query.split()
# Use LLM for query expansion
expansion_prompt = f"""
Given the query: "{query}"
Generate {expansion_count} alternative phrasings that capture the same intent.
Return as JSON array.
"""
expanded = llm_client.generate(expansion_prompt)
# Embed all variations and average vectors
embeddings = [embedding_model.encode(q) for q in [query] + expanded]
avg_embedding = np.mean(embeddings, axis=0)
return avg_embedding
4. Monitor Retrieval Quality
import logging
from datetime import datetime
def search_with_logging(query: str, top_k: int = 10):
start = datetime.now()
results = vector_store.search(query, top_k)
latency = (datetime.now() - start).total_seconds()
# Log for analysis
logging.info({
"query": query,
"top_score": results[0].score if results else 0,
"result_count": len(results),
"latency_ms": latency * 1000,
"timestamp": datetime.now().isoformat()
})
# Alert on quality issues
if results and results[0].score < 0.5:
logging.warning(f"Low relevance score for query: {query}")
return results
5. Implement Semantic Caching
# Cache results for semantically similar queries
def semantic_cache_search(query: str, similarity_threshold: float = 0.95):
"""Check cache for semantically similar queries before searching."""
query_embedding = get_embedding(query)
# Search cache (separate Qdrant collection)
cache_results = cache_store.search(
query_vector=query_embedding,
limit=1,
score_threshold=similarity_threshold
)
if cache_results:
logging.info(f"Cache hit for query: {query}")
return cache_results[0].payload["results"]
# Cache miss - perform actual search
results = vector_store.search(query, top_k=10)
# Store in cache
cache_store.upsert([{
"vector": query_embedding,
"payload": {
"query": query,
"results": results,
"timestamp": datetime.now().isoformat()
}
}])
return results
6. Progressive Retrieval
# Start with strict filters, progressively relax if results insufficient
def progressive_search(query: str, min_results: int = 5):
"""Progressively relax search constraints to ensure sufficient results."""
strategies = [
# Strategy 1: Recent docs only, high threshold
{"score_threshold": 0.8, "filters": {"timestamp": {"gte": "2024-01-01"}}},
# Strategy 2: All docs, high threshold
{"score_threshold": 0.8, "filters": {}},
# Strategy 3: Lower threshold
{"score_threshold": 0.6, "filters": {}},
# Strategy 4: No threshold
{"score_threshold": 0.0, "filters": {}}
]
for strategy in strategies:
results = vector_store.search(
query=query,
score_threshold=strategy["score_threshold"],
filters=strategy["filters"],
limit=20
)
if len(results) >= min_results:
return results
return results # Return whatever we found
Reference Documentation
See /references/ for deep technical knowledge:
patterns.md- RAG architecture patterns and design principlesbest_practices.md- Chunking strategies, embedding model selection, evaluation metricsadvanced_topics.md- GraphRAG, multi-hop reasoning, query decompositiontroubleshooting.md- Debugging low recall/precision, latency optimizationvector_stores_comparison.md- Qdrant vs Milvus vs ChromaDB feature comparison
Script Reference
See /scripts/ for executable tools:
setup_vector_store.py- Initialize and configure Qdrant/Milvus/ChromaDBgenerate_embeddings.py- Batch embedding generation with chunkinghybrid_retrieval.py- Multi-strategy retrieval with fusionevaluate_rag.py- Calculate retrieval metrics (MRR, recall, precision)export_vectors.py- Export vectors for migrationimport_vectors.py- Import vectors to new store
Template Reference
See /templates/ for copy-paste boilerplate:
vector_store_setup.py- Production-ready Qdrant setup with error handlingembedding_pipeline.py- Full pipeline: load → chunk → embed → storeretrieval_pipeline.py- Hybrid retrieval with Repository patterncontext_assembly.py- Intelligent context merging from multi-source resultsrag_evaluation.py- Complete evaluation framework with multiple metrics
Related Skills
- llm-integration: For LLM-based query expansion and reranking
- data-pipeline: For ETL of source documents
- testing-framework: For integration testing of RAG pipelines
- monitoring: For production observability of retrieval quality
Version
1.0.0 - Initial release for SRS AI Systems project