Claude Code Plugins

Community-maintained marketplace

Feedback

rag-pipeline-builder

@jennifer-mckinney/my-skills
1
0

Build hybrid RAG pipelines combining vector search, graph traversal, and keyword matching for accurate context retrieval

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name rag-pipeline-builder
description Build hybrid RAG pipelines combining vector search, graph traversal, and keyword matching for accurate context retrieval

RAG Pipeline Builder

Build production-ready hybrid RAG pipelines combining vector, graph, and keyword search for the SRS AI Systems project.

What This Skill Provides

Core Capabilities

  • Qdrant Vector Store Integration: Setup, configuration, and optimization for production workloads
  • Hybrid Retrieval Strategies: Combine vector similarity, graph traversal, and keyword matching
  • Embedding Pipeline: sentence-transformers integration with chunking and batching
  • Context Assembly: Intelligent merging of multi-source retrieval results
  • Evaluation Framework: Metrics for recall, precision, relevance, and latency
  • Abstraction Patterns: Repository and Strategy patterns for easy migration between vector stores

Technical Stack

  • Vector Store: Qdrant (with abstraction for Milvus/ChromaDB migration)
  • Graph Database: Neo4j for relationship-based retrieval
  • Embeddings: sentence-transformers (all-MiniLM-L6-v2, all-mpnet-base-v2)
  • Language: Python 3.9+ with type hints
  • Patterns: Repository, Strategy, Factory for swappability

When to Use

Perfect For

  • Building semantic search over technical documentation (SRS project)
  • Implementing multi-hop reasoning with graph relationships
  • Creating hybrid retrieval that combines multiple search strategies
  • Evaluating and optimizing RAG pipeline performance
  • Migrating between vector stores (Qdrant → Milvus → ChromaDB)

Not Ideal For

  • Simple keyword search (use Elasticsearch directly)
  • Real-time streaming data (add streaming layer first)
  • Sub-10ms latency requirements (consider caching layer)
  • Non-semantic workloads (structured queries)

Decision Points

Use Vector Search When: Semantic similarity is primary concern (queries like "how to configure error handling")

Add Graph Search When: Relationships matter (queries like "all components connected to authentication service")

Include Keyword Search When: Exact matches needed (queries like "API endpoint /v1/users/create")

Use Hybrid When: Production systems requiring high recall and precision across diverse query types

Quick Start Workflows

Workflow 1: New RAG Pipeline (30 minutes)

# Step 1: Setup vector store
cd scripts/
./setup_vector_store.py --store qdrant --collection srs_docs --dimension 384

# Step 2: Generate embeddings for documents
./generate_embeddings.py --input ../data/srs_docs/ --model all-MiniLM-L6-v2 --chunk-size 512

# Step 3: Test retrieval
./hybrid_retrieval.py --query "authentication flow" --top-k 5 --strategy hybrid

# Step 4: Evaluate performance
./evaluate_rag.py --test-queries queries.json --metrics recall,precision,mrr

Workflow 2: Migrate Vector Store (15 minutes)

# Export from Qdrant
./export_vectors.py --source qdrant --collection srs_docs --output vectors.parquet

# Import to Milvus
./import_vectors.py --target milvus --collection srs_docs --input vectors.parquet

# Validate migration
./validate_migration.py --source qdrant --target milvus --sample-queries 100

Workflow 3: Optimize Retrieval (20 minutes)

# Benchmark different strategies
./benchmark_retrieval.py --strategies vector,graph,keyword,hybrid --iterations 100

# Tune hyperparameters
./tune_hyperparameters.py --param chunk_size --range 256,512,1024 --metric mrr

# A/B test embedding models
./compare_embeddings.py --models all-MiniLM-L6-v2,all-mpnet-base-v2 --test-set queries.json

Decision Trees

Which Embedding Model?

START: What's your priority?

├─ Speed + Low Memory (< 100MB)
│  └─ all-MiniLM-L6-v2 (384 dim, 80MB)
│     Use cases: Real-time search, edge deployment

├─ Balanced Performance
│  └─ all-mpnet-base-v2 (768 dim, 420MB)
│     Use cases: General-purpose RAG, high accuracy needed

├─ Maximum Accuracy
│  └─ e5-large-v2 (1024 dim, 1.3GB)
│     Use cases: Critical retrieval, quality > speed

└─ Domain-Specific (Technical Docs)
   └─ Fine-tuned CodeBERT or specialized model
      Use cases: Code search, API documentation

Which Retrieval Strategy?

START: Analyze your queries

├─ Mostly semantic similarity ("how to X", "similar to Y")
│  └─ Pure Vector Search (90%+ of queries)
│     Config: top_k=10, score_threshold=0.7

├─ Relationship traversal ("components using X", "dependencies of Y")
│  └─ Graph-First with Vector Fallback
│     Config: neo4j_depth=2, vector_fallback_k=5

├─ Exact term matching ("error code 404", "function name")
│  └─ Keyword-First with Vector Expansion
│     Config: bm25_top_k=20, vector_rerank_k=10

└─ Mixed query types (production systems)
   └─ Hybrid with Learned Fusion
      Config: vector_weight=0.5, graph_weight=0.3, keyword_weight=0.2
      Tune weights using evaluation set

Chunk Size Selection

START: What type of content?

├─ Short-form (FAQ, API docs, error messages)
│  └─ 256-512 tokens, 50 token overlap
│     Reason: Precise matching, low noise

├─ Medium-form (How-to guides, tutorials)
│  └─ 512-1024 tokens, 100 token overlap
│     Reason: Balance context and specificity

├─ Long-form (Architecture docs, specifications)
│  └─ 1024-2048 tokens, 200 token overlap
│     Reason: Preserve document structure

└─ Mixed content types
   └─ Semantic chunking (split on headers, sections)
      Use: langchain.text_splitter.MarkdownHeaderTextSplitter

Quality Checklist

Before Deployment

  • Embedding Coverage: 100% of corpus embedded without errors
  • Index Health: Vector store index built and optimized (HNSW/IVF parameters tuned)
  • Retrieval Metrics: MRR > 0.7, Recall@10 > 0.85 on test set
  • Latency: p95 < 200ms for single query (vector + rerank)
  • Fallback Handling: Graceful degradation when vector store unavailable
  • Monitoring: Logging for query latency, retrieval scores, cache hit rates

Code Quality

  • Type Hints: All functions have complete type annotations
  • Docstrings: Google-style docstrings with examples
  • Error Handling: Try-except blocks with specific exceptions
  • Configuration: Environment variables for secrets (API keys, DB passwords)
  • Testing: Unit tests for core functions, integration tests for pipelines
  • Abstraction: Repository pattern implemented for vector store swappability

Performance

  • Batch Processing: Embeddings generated in batches (32-128 documents)
  • Connection Pooling: Reuse connections to vector store and graph DB
  • Caching: Cache embeddings for repeated queries
  • Async Operations: Use asyncio for concurrent retrieval from multiple sources
  • Resource Limits: Memory usage < 2GB per worker, CPU < 80%

Common Pitfalls

1. Poor Chunk Size

Problem: Chunks too large (low precision) or too small (missing context)

Solution:

  • Start with 512 tokens, 50 token overlap
  • Evaluate on sample queries
  • Use semantic chunking for structured docs (Markdown, HTML)

Example:

# Bad: Fixed 1000-char chunks (ignores semantic boundaries)
chunks = [text[i:i+1000] for i in range(0, len(text), 1000)]

# Good: Semantic chunking with overlap
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=50,
    separators=["\n\n", "\n", ". ", " ", ""]
)
chunks = splitter.split_text(text)

2. Ignoring Metadata

Problem: Losing context after chunking (which document, section, timestamp)

Solution: Store rich metadata with each chunk

# Store with vector
metadata = {
    "source": "srs_architecture.md",
    "section": "Authentication Flow",
    "chunk_index": 12,
    "timestamp": "2025-01-15T10:30:00Z",
    "doc_type": "technical_spec"
}
qdrant_client.upsert(collection_name="srs", points=[{
    "id": chunk_id,
    "vector": embedding,
    "payload": metadata
}])

3. No Evaluation Strategy

Problem: Can't measure if changes improve retrieval quality

Solution: Build evaluation dataset early

# Create test set: (query, relevant_doc_ids)
eval_queries = [
    {
        "query": "How does authentication work?",
        "relevant_docs": ["doc_123", "doc_456"],
        "relevance_scores": [1.0, 0.8]
    }
]

# Calculate metrics
mrr = calculate_mrr(eval_queries, retrieval_results)
recall_at_k = calculate_recall(eval_queries, retrieval_results, k=10)

4. Single Retrieval Strategy

Problem: Pure vector search fails on exact matches, keyword search fails on semantic queries

Solution: Implement hybrid retrieval with fusion

# Reciprocal Rank Fusion
def hybrid_search(query, top_k=10):
    vector_results = vector_search(query, k=top_k*2)
    keyword_results = bm25_search(query, k=top_k*2)

    # RRF fusion
    scores = defaultdict(float)
    for rank, doc in enumerate(vector_results):
        scores[doc.id] += 1.0 / (rank + 60)
    for rank, doc in enumerate(keyword_results):
        scores[doc.id] += 1.0 / (rank + 60)

    return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_k]

5. Hardcoded Vector Store

Problem: Tight coupling to Qdrant makes migration painful

Solution: Use Repository pattern

class VectorStoreRepository(ABC):
    @abstractmethod
    def upsert(self, vectors: List[Vector]) -> None: pass

    @abstractmethod
    def search(self, query_vector: List[float], top_k: int) -> List[SearchResult]: pass

# Swap implementations easily
store = QdrantRepository()  # or MilvusRepository() or ChromaDBRepository()
store.upsert(vectors)

Pro Tips

1. Cache Query Embeddings

from functools import lru_cache
import hashlib

@lru_cache(maxsize=1000)
def get_cached_embedding(query: str) -> List[float]:
    return embedding_model.encode(query).tolist()

# Or use Redis for distributed caching
import redis
r = redis.Redis()

def get_query_embedding(query: str) -> List[float]:
    cache_key = f"emb:{hashlib.md5(query.encode()).hexdigest()}"
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    embedding = embedding_model.encode(query).tolist()
    r.setex(cache_key, 3600, json.dumps(embedding))  # 1 hour TTL
    return embedding

2. Pre-filter with Metadata

# Filter by metadata before vector search (more efficient)
from qdrant_client.models import Filter, FieldCondition, MatchValue

results = qdrant_client.search(
    collection_name="srs",
    query_vector=query_embedding,
    query_filter=Filter(
        must=[
            FieldCondition(
                key="doc_type",
                match=MatchValue(value="technical_spec")
            ),
            FieldCondition(
                key="timestamp",
                range={"gte": "2024-01-01T00:00:00Z"}
            )
        ]
    ),
    limit=10
)

3. Use Query Expansion

# Expand query with synonyms/related terms before embedding
def expand_query(query: str, expansion_count: int = 3) -> List[str]:
    """Generate query variations for better recall."""
    base_terms = query.split()

    # Use LLM for query expansion
    expansion_prompt = f"""
    Given the query: "{query}"
    Generate {expansion_count} alternative phrasings that capture the same intent.
    Return as JSON array.
    """
    expanded = llm_client.generate(expansion_prompt)

    # Embed all variations and average vectors
    embeddings = [embedding_model.encode(q) for q in [query] + expanded]
    avg_embedding = np.mean(embeddings, axis=0)

    return avg_embedding

4. Monitor Retrieval Quality

import logging
from datetime import datetime

def search_with_logging(query: str, top_k: int = 10):
    start = datetime.now()
    results = vector_store.search(query, top_k)
    latency = (datetime.now() - start).total_seconds()

    # Log for analysis
    logging.info({
        "query": query,
        "top_score": results[0].score if results else 0,
        "result_count": len(results),
        "latency_ms": latency * 1000,
        "timestamp": datetime.now().isoformat()
    })

    # Alert on quality issues
    if results and results[0].score < 0.5:
        logging.warning(f"Low relevance score for query: {query}")

    return results

5. Implement Semantic Caching

# Cache results for semantically similar queries
def semantic_cache_search(query: str, similarity_threshold: float = 0.95):
    """Check cache for semantically similar queries before searching."""
    query_embedding = get_embedding(query)

    # Search cache (separate Qdrant collection)
    cache_results = cache_store.search(
        query_vector=query_embedding,
        limit=1,
        score_threshold=similarity_threshold
    )

    if cache_results:
        logging.info(f"Cache hit for query: {query}")
        return cache_results[0].payload["results"]

    # Cache miss - perform actual search
    results = vector_store.search(query, top_k=10)

    # Store in cache
    cache_store.upsert([{
        "vector": query_embedding,
        "payload": {
            "query": query,
            "results": results,
            "timestamp": datetime.now().isoformat()
        }
    }])

    return results

6. Progressive Retrieval

# Start with strict filters, progressively relax if results insufficient
def progressive_search(query: str, min_results: int = 5):
    """Progressively relax search constraints to ensure sufficient results."""
    strategies = [
        # Strategy 1: Recent docs only, high threshold
        {"score_threshold": 0.8, "filters": {"timestamp": {"gte": "2024-01-01"}}},
        # Strategy 2: All docs, high threshold
        {"score_threshold": 0.8, "filters": {}},
        # Strategy 3: Lower threshold
        {"score_threshold": 0.6, "filters": {}},
        # Strategy 4: No threshold
        {"score_threshold": 0.0, "filters": {}}
    ]

    for strategy in strategies:
        results = vector_store.search(
            query=query,
            score_threshold=strategy["score_threshold"],
            filters=strategy["filters"],
            limit=20
        )

        if len(results) >= min_results:
            return results

    return results  # Return whatever we found

Reference Documentation

See /references/ for deep technical knowledge:

  • patterns.md - RAG architecture patterns and design principles
  • best_practices.md - Chunking strategies, embedding model selection, evaluation metrics
  • advanced_topics.md - GraphRAG, multi-hop reasoning, query decomposition
  • troubleshooting.md - Debugging low recall/precision, latency optimization
  • vector_stores_comparison.md - Qdrant vs Milvus vs ChromaDB feature comparison

Script Reference

See /scripts/ for executable tools:

  • setup_vector_store.py - Initialize and configure Qdrant/Milvus/ChromaDB
  • generate_embeddings.py - Batch embedding generation with chunking
  • hybrid_retrieval.py - Multi-strategy retrieval with fusion
  • evaluate_rag.py - Calculate retrieval metrics (MRR, recall, precision)
  • export_vectors.py - Export vectors for migration
  • import_vectors.py - Import vectors to new store

Template Reference

See /templates/ for copy-paste boilerplate:

  • vector_store_setup.py - Production-ready Qdrant setup with error handling
  • embedding_pipeline.py - Full pipeline: load → chunk → embed → store
  • retrieval_pipeline.py - Hybrid retrieval with Repository pattern
  • context_assembly.py - Intelligent context merging from multi-source results
  • rag_evaluation.py - Complete evaluation framework with multiple metrics

Related Skills

  • llm-integration: For LLM-based query expansion and reranking
  • data-pipeline: For ETL of source documents
  • testing-framework: For integration testing of RAG pipelines
  • monitoring: For production observability of retrieval quality

Version

1.0.0 - Initial release for SRS AI Systems project