| name | building-rag-systems |
| description | Build production RAG systems with semantic chunking, incremental indexing, and filtered retrieval. Use when implementing document ingestion pipelines, vector search with Qdrant, or context-aware retrieval. Covers chunking strategies, change detection, payload indexing, and context expansion. NOT when doing simple similarity search without production requirements. |
Building RAG Systems
Production-grade RAG with semantic chunking, incremental updates, and filtered retrieval.
Quick Start
# Dependencies
pip install qdrant-client openai pydantic python-frontmatter
# Core components
# 1. Crawler → discovers files, extracts path metadata
# 2. Parser → extracts frontmatter, computes file hash
# 3. Chunker → semantic split on ## headers, 400 tokens, 15% overlap
# 4. Embedder → batched OpenAI embeddings
# 5. Uploader → Qdrant upsert with indexed payloads
Ingestion Pipeline
Architecture
┌──────────┐ ┌────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐
│ Crawler │ -> │ Parser │ -> │ Chunker │ -> │ Embedder │ -> │ Uploader │
└──────────┘ └────────┘ └─────────┘ └──────────┘ └──────────┘
│ │ │ │ │
Discovers Extracts Splits by Generates Upserts to
files frontmatter semantic vectors Qdrant
+ file hash boundaries (batched) (batched)
Semantic Chunking (NOT Fixed-Size)
class SemanticChunker:
"""
Production chunking:
- Split on ## headers (semantic boundaries)
- Target 400 tokens (NVIDIA benchmark optimal)
- 15% overlap for context continuity
- Track prev/next for context expansion
"""
SECTION_PATTERN = re.compile(r"(?=^## )", re.MULTILINE)
TOKENS_PER_WORD = 1.3
def __init__(
self,
target_tokens: int = 400,
max_tokens: int = 512,
overlap_percent: float = 0.15,
):
self.target_words = int(target_tokens / self.TOKENS_PER_WORD)
self.overlap_words = int(self.target_words * overlap_percent)
def chunk(self, content: str, file_hash: str) -> list[Chunk]:
sections = self.SECTION_PATTERN.split(content)
chunks = []
for idx, section in enumerate(sections):
content_hash = hashlib.sha256(section.encode()).hexdigest()[:16]
chunk_id = f"{file_hash[:8]}_{content_hash}_{idx}"
chunks.append(Chunk(
id=chunk_id,
text=section,
chunk_index=idx,
total_chunks=len(sections),
prev_chunk_id=chunks[-1].id if chunks else None,
content_hash=content_hash,
source_file_hash=file_hash,
))
# Set next_chunk_id on previous
if len(chunks) > 1:
chunks[-2].next_chunk_id = chunk_id
return chunks
Change Detection (Incremental Updates)
def compute_file_hash(file_path: str) -> str:
"""SHA-256 for change detection."""
with open(file_path, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
class QdrantStateTracker:
"""Query Qdrant payloads directly - no external state DB needed."""
def get_indexed_files(self, book_id: str) -> dict[str, str]:
"""Returns {file_path: file_hash} from Qdrant."""
indexed = {}
offset = None
while True:
points, next_offset = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="book_id", match=MatchValue(value=book_id))
]),
limit=100,
offset=offset,
with_payload=["source_file", "source_file_hash"],
with_vectors=False,
)
for point in points:
indexed[point.payload["source_file"]] = point.payload["source_file_hash"]
if next_offset is None:
break
offset = next_offset
return indexed
def detect_changes(self, current: dict[str, str], indexed: dict[str, str]):
"""Compare filesystem vs index."""
new = [p for p in current if p not in indexed]
deleted = [p for p in indexed if p not in current]
modified = [p for p in current if p in indexed and current[p] != indexed[p]]
return new, modified, deleted
Batched Embeddings
class OpenAIEmbedder:
def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 20):
self.client = OpenAI()
self.model = model
self.batch_size = batch_size # OpenAI recommendation
def embed_chunks(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
embedded = []
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
response = self.client.embeddings.create(
input=[c.text for c in batch],
model=self.model,
)
for chunk, data in zip(batch, response.data):
embedded.append(EmbeddedChunk(**chunk.dict(), embedding=data.embedding))
return embedded
Qdrant Collection with Payload Indexes
def create_collection(self, recreate: bool = False):
"""Create collection with proper indexes for filtered retrieval."""
self.client.create_collection(
collection_name=self.collection,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
# Index ALL fields you filter by
indexes = [
("book_id", PayloadSchemaType.KEYWORD), # Tenant isolation
("module", PayloadSchemaType.KEYWORD), # Content filter
("chapter", PayloadSchemaType.INTEGER), # Range filter
("hardware_tier", PayloadSchemaType.INTEGER),# Personalization
("proficiency_level", PayloadSchemaType.KEYWORD),
("parent_doc_id", PayloadSchemaType.KEYWORD),# Context expansion
("source_file_hash", PayloadSchemaType.KEYWORD), # Change detection
]
for field, schema in indexes:
self.client.create_payload_index(
collection_name=self.collection,
field_name=field,
field_schema=schema,
)
Retrieval Patterns
Comprehensive Filter Builder
def build_filter(self, query: SearchQuery) -> Filter:
"""Build Qdrant filter with all conditions (AND logic)."""
conditions = []
# Required: Tenant isolation
conditions.append(FieldCondition(
key="book_id", match=MatchValue(value=query.book_id)
))
# Required: Hardware tier (lte = "tier X or lower")
conditions.append(FieldCondition(
key="hardware_tier", range=Range(lte=query.hardware_tier)
))
# Optional: Module exact match
if query.module:
conditions.append(FieldCondition(
key="module", match=MatchValue(value=query.module)
))
# Optional: Chapter range
if query.chapter_min or query.chapter_max:
chapter_range = Range()
if query.chapter_min:
chapter_range.gte = query.chapter_min
if query.chapter_max:
chapter_range.lte = query.chapter_max
conditions.append(FieldCondition(key="chapter", range=chapter_range))
# Optional: Proficiency OR logic
if query.proficiency_levels:
conditions.append(FieldCondition(
key="proficiency_level",
match=MatchAny(any=query.proficiency_levels),
))
return Filter(must=conditions)
Context Expansion (Walk Chunk Chain)
def expand_context(self, chunk_id: str, prev: int = 1, next: int = 1) -> list[Chunk]:
"""Walk prev_chunk_id/next_chunk_id chain for surrounding context."""
current = self.get_chunk_by_id(chunk_id)
if not current:
return []
# Walk backwards
prev_chunks = []
prev_id = current.prev_chunk_id
for _ in range(prev):
if not prev_id:
break
chunk = self.get_chunk_by_id(prev_id)
if not chunk:
break
prev_chunks.insert(0, chunk)
prev_id = chunk.prev_chunk_id
# Walk forwards
next_chunks = []
next_id = current.next_chunk_id
for _ in range(next):
if not next_id:
break
chunk = self.get_chunk_by_id(next_id)
if not chunk:
break
next_chunks.append(chunk)
next_id = chunk.next_chunk_id
return prev_chunks + [current] + next_chunks
Full Document Retrieval
def get_document_chunks(self, parent_doc_id: str) -> list[Chunk]:
"""Get all chunks for a document, ordered by chunk_index."""
points, _ = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="parent_doc_id", match=MatchValue(value=parent_doc_id))
]),
limit=100,
with_payload=True,
with_vectors=False,
)
chunks = [self._to_chunk(p) for p in points]
chunks.sort(key=lambda c: c.chunk_index)
return chunks
Payload Schema
class ChunkPayload(BaseModel):
"""Complete payload for filtered retrieval and context expansion."""
# Tenant isolation
book_id: str
# Content filters (all indexed)
module: str
chapter: int
lesson: int
hardware_tier: int
proficiency_level: str
# Display content
text: str
section_title: Optional[str]
source_file: str
# Context expansion
parent_doc_id: str
chunk_index: int
total_chunks: int
prev_chunk_id: Optional[str]
next_chunk_id: Optional[str]
# Change detection
content_hash: str
source_file_hash: str
Anti-Patterns
| Don't | Do Instead |
|---|---|
| Fixed character chunking | Semantic boundaries (## headers) |
| Position-based chunk IDs | Content hash for stable IDs |
| No overlap between chunks | 10-20% overlap for continuity |
| Full re-index on every change | Incremental with file hash detection |
| Missing payload indexes | Index every field you filter by |
| Synchronous embedding | Batch with background jobs |
| External state database | Qdrant-native state tracking |
Verification
Run: python scripts/verify.py
Related Skills
scaffolding-fastapi-dapr- API patterns for search endpointsstreaming-llm-responses- Streaming RAG responses
References
- references/ingestion-patterns.md - Full ingestion pipeline
- references/retrieval-patterns.md - Filter strategies, context expansion