Claude Code Plugins

Community-maintained marketplace

Feedback

building-rag-systems

@mjunaidca/mjs-agent-skills
8
0

|

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name building-rag-systems
description Build production RAG systems with semantic chunking, incremental indexing, and filtered retrieval. Use when implementing document ingestion pipelines, vector search with Qdrant, or context-aware retrieval. Covers chunking strategies, change detection, payload indexing, and context expansion. NOT when doing simple similarity search without production requirements.

Building RAG Systems

Production-grade RAG with semantic chunking, incremental updates, and filtered retrieval.

Quick Start

# Dependencies
pip install qdrant-client openai pydantic python-frontmatter

# Core components
# 1. Crawler → discovers files, extracts path metadata
# 2. Parser → extracts frontmatter, computes file hash
# 3. Chunker → semantic split on ## headers, 400 tokens, 15% overlap
# 4. Embedder → batched OpenAI embeddings
# 5. Uploader → Qdrant upsert with indexed payloads

Ingestion Pipeline

Architecture

┌──────────┐    ┌────────┐    ┌─────────┐    ┌──────────┐    ┌──────────┐
│ Crawler  │ -> │ Parser │ -> │ Chunker │ -> │ Embedder │ -> │ Uploader │
└──────────┘    └────────┘    └─────────┘    └──────────┘    └──────────┘
     │              │              │              │              │
Discovers      Extracts       Splits by     Generates      Upserts to
files          frontmatter    semantic      vectors        Qdrant
               + file hash    boundaries    (batched)      (batched)

Semantic Chunking (NOT Fixed-Size)

class SemanticChunker:
    """
    Production chunking:
    - Split on ## headers (semantic boundaries)
    - Target 400 tokens (NVIDIA benchmark optimal)
    - 15% overlap for context continuity
    - Track prev/next for context expansion
    """
    SECTION_PATTERN = re.compile(r"(?=^## )", re.MULTILINE)
    TOKENS_PER_WORD = 1.3

    def __init__(
        self,
        target_tokens: int = 400,
        max_tokens: int = 512,
        overlap_percent: float = 0.15,
    ):
        self.target_words = int(target_tokens / self.TOKENS_PER_WORD)
        self.overlap_words = int(self.target_words * overlap_percent)

    def chunk(self, content: str, file_hash: str) -> list[Chunk]:
        sections = self.SECTION_PATTERN.split(content)
        chunks = []

        for idx, section in enumerate(sections):
            content_hash = hashlib.sha256(section.encode()).hexdigest()[:16]
            chunk_id = f"{file_hash[:8]}_{content_hash}_{idx}"

            chunks.append(Chunk(
                id=chunk_id,
                text=section,
                chunk_index=idx,
                total_chunks=len(sections),
                prev_chunk_id=chunks[-1].id if chunks else None,
                content_hash=content_hash,
                source_file_hash=file_hash,
            ))

            # Set next_chunk_id on previous
            if len(chunks) > 1:
                chunks[-2].next_chunk_id = chunk_id

        return chunks

Change Detection (Incremental Updates)

def compute_file_hash(file_path: str) -> str:
    """SHA-256 for change detection."""
    with open(file_path, 'rb') as f:
        return hashlib.sha256(f.read()).hexdigest()

class QdrantStateTracker:
    """Query Qdrant payloads directly - no external state DB needed."""

    def get_indexed_files(self, book_id: str) -> dict[str, str]:
        """Returns {file_path: file_hash} from Qdrant."""
        indexed = {}
        offset = None

        while True:
            points, next_offset = self.client.scroll(
                collection_name=self.collection,
                scroll_filter=Filter(must=[
                    FieldCondition(key="book_id", match=MatchValue(value=book_id))
                ]),
                limit=100,
                offset=offset,
                with_payload=["source_file", "source_file_hash"],
                with_vectors=False,
            )

            for point in points:
                indexed[point.payload["source_file"]] = point.payload["source_file_hash"]

            if next_offset is None:
                break
            offset = next_offset

        return indexed

    def detect_changes(self, current: dict[str, str], indexed: dict[str, str]):
        """Compare filesystem vs index."""
        new = [p for p in current if p not in indexed]
        deleted = [p for p in indexed if p not in current]
        modified = [p for p in current if p in indexed and current[p] != indexed[p]]
        return new, modified, deleted

Batched Embeddings

class OpenAIEmbedder:
    def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 20):
        self.client = OpenAI()
        self.model = model
        self.batch_size = batch_size  # OpenAI recommendation

    def embed_chunks(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
        embedded = []
        for i in range(0, len(chunks), self.batch_size):
            batch = chunks[i:i + self.batch_size]
            response = self.client.embeddings.create(
                input=[c.text for c in batch],
                model=self.model,
            )
            for chunk, data in zip(batch, response.data):
                embedded.append(EmbeddedChunk(**chunk.dict(), embedding=data.embedding))
        return embedded

Qdrant Collection with Payload Indexes

def create_collection(self, recreate: bool = False):
    """Create collection with proper indexes for filtered retrieval."""
    self.client.create_collection(
        collection_name=self.collection,
        vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
    )

    # Index ALL fields you filter by
    indexes = [
        ("book_id", PayloadSchemaType.KEYWORD),      # Tenant isolation
        ("module", PayloadSchemaType.KEYWORD),       # Content filter
        ("chapter", PayloadSchemaType.INTEGER),      # Range filter
        ("hardware_tier", PayloadSchemaType.INTEGER),# Personalization
        ("proficiency_level", PayloadSchemaType.KEYWORD),
        ("parent_doc_id", PayloadSchemaType.KEYWORD),# Context expansion
        ("source_file_hash", PayloadSchemaType.KEYWORD),  # Change detection
    ]

    for field, schema in indexes:
        self.client.create_payload_index(
            collection_name=self.collection,
            field_name=field,
            field_schema=schema,
        )

Retrieval Patterns

Comprehensive Filter Builder

def build_filter(self, query: SearchQuery) -> Filter:
    """Build Qdrant filter with all conditions (AND logic)."""
    conditions = []

    # Required: Tenant isolation
    conditions.append(FieldCondition(
        key="book_id", match=MatchValue(value=query.book_id)
    ))

    # Required: Hardware tier (lte = "tier X or lower")
    conditions.append(FieldCondition(
        key="hardware_tier", range=Range(lte=query.hardware_tier)
    ))

    # Optional: Module exact match
    if query.module:
        conditions.append(FieldCondition(
            key="module", match=MatchValue(value=query.module)
        ))

    # Optional: Chapter range
    if query.chapter_min or query.chapter_max:
        chapter_range = Range()
        if query.chapter_min:
            chapter_range.gte = query.chapter_min
        if query.chapter_max:
            chapter_range.lte = query.chapter_max
        conditions.append(FieldCondition(key="chapter", range=chapter_range))

    # Optional: Proficiency OR logic
    if query.proficiency_levels:
        conditions.append(FieldCondition(
            key="proficiency_level",
            match=MatchAny(any=query.proficiency_levels),
        ))

    return Filter(must=conditions)

Context Expansion (Walk Chunk Chain)

def expand_context(self, chunk_id: str, prev: int = 1, next: int = 1) -> list[Chunk]:
    """Walk prev_chunk_id/next_chunk_id chain for surrounding context."""
    current = self.get_chunk_by_id(chunk_id)
    if not current:
        return []

    # Walk backwards
    prev_chunks = []
    prev_id = current.prev_chunk_id
    for _ in range(prev):
        if not prev_id:
            break
        chunk = self.get_chunk_by_id(prev_id)
        if not chunk:
            break
        prev_chunks.insert(0, chunk)
        prev_id = chunk.prev_chunk_id

    # Walk forwards
    next_chunks = []
    next_id = current.next_chunk_id
    for _ in range(next):
        if not next_id:
            break
        chunk = self.get_chunk_by_id(next_id)
        if not chunk:
            break
        next_chunks.append(chunk)
        next_id = chunk.next_chunk_id

    return prev_chunks + [current] + next_chunks

Full Document Retrieval

def get_document_chunks(self, parent_doc_id: str) -> list[Chunk]:
    """Get all chunks for a document, ordered by chunk_index."""
    points, _ = self.client.scroll(
        collection_name=self.collection,
        scroll_filter=Filter(must=[
            FieldCondition(key="parent_doc_id", match=MatchValue(value=parent_doc_id))
        ]),
        limit=100,
        with_payload=True,
        with_vectors=False,
    )

    chunks = [self._to_chunk(p) for p in points]
    chunks.sort(key=lambda c: c.chunk_index)
    return chunks

Payload Schema

class ChunkPayload(BaseModel):
    """Complete payload for filtered retrieval and context expansion."""

    # Tenant isolation
    book_id: str

    # Content filters (all indexed)
    module: str
    chapter: int
    lesson: int
    hardware_tier: int
    proficiency_level: str

    # Display content
    text: str
    section_title: Optional[str]
    source_file: str

    # Context expansion
    parent_doc_id: str
    chunk_index: int
    total_chunks: int
    prev_chunk_id: Optional[str]
    next_chunk_id: Optional[str]

    # Change detection
    content_hash: str
    source_file_hash: str

Anti-Patterns

Don't Do Instead
Fixed character chunking Semantic boundaries (## headers)
Position-based chunk IDs Content hash for stable IDs
No overlap between chunks 10-20% overlap for continuity
Full re-index on every change Incremental with file hash detection
Missing payload indexes Index every field you filter by
Synchronous embedding Batch with background jobs
External state database Qdrant-native state tracking

Verification

Run: python scripts/verify.py

Related Skills

  • scaffolding-fastapi-dapr - API patterns for search endpoints
  • streaming-llm-responses - Streaming RAG responses

References