Embedding Strategies
Guide to selecting and optimizing embedding models for vector search applications.
When to Use This Skill
- Choosing embedding models for RAG
- Optimizing chunking strategies
- Fine-tuning embeddings for domains
- Comparing embedding model performance
- Reducing embedding dimensions
- Handling multilingual content
Core Concepts
1. Embedding Model Comparison
| Model |
Dimensions |
Max Tokens |
Best For |
| text-embedding-3-large |
3072 |
8191 |
High accuracy |
| text-embedding-3-small |
1536 |
8191 |
Cost-effective |
| voyage-2 |
1024 |
4000 |
Code, legal |
| bge-large-en-v1.5 |
1024 |
512 |
Open source |
| all-MiniLM-L6-v2 |
384 |
256 |
Fast, lightweight |
| multilingual-e5-large |
1024 |
512 |
Multi-language |
2. Embedding Pipeline
Document → Chunking → Preprocessing → Embedding Model → Vector
↓
[Overlap, Size] [Clean, Normalize] [API/Local]
Templates
Template 1: OpenAI Embeddings
from openai import OpenAI
from typing import List
import numpy as np
client = OpenAI()
def get_embeddings(
texts: List[str],
model: str = "text-embedding-3-small",
dimensions: int = None
) -> List[List[float]]:
"""Get embeddings from OpenAI."""
# Handle batching for large lists
batch_size = 100
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
kwargs = {"input": batch, "model": model}
if dimensions:
kwargs["dimensions"] = dimensions
response = client.embeddings.create(**kwargs)
embeddings = [item.embedding for item in response.data]
all_embeddings.extend(embeddings)
return all_embeddings
def get_embedding(text: str, **kwargs) -> List[float]:
"""Get single embedding."""
return get_embeddings([text], **kwargs)[0]
# Dimension reduction with OpenAI
def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]:
"""Get embedding with reduced dimensions (Matryoshka)."""
return get_embedding(
text,
model="text-embedding-3-small",
dimensions=dimensions
)
Template 2: Local Embeddings with Sentence Transformers
from sentence_transformers import SentenceTransformer
from typing import List, Optional
import numpy as np
class LocalEmbedder:
"""Local embedding with sentence-transformers."""
def __init__(
self,
model_name: str = "BAAI/bge-large-en-v1.5",
device: str = "cuda"
):
self.model = SentenceTransformer(model_name, device=device)
def embed(
self,
texts: List[str],
normalize: bool = True,
show_progress: bool = False
) -> np.ndarray:
"""Embed texts with optional normalization."""
embeddings = self.model.encode(
texts,
normalize_embeddings=normalize,
show_progress_bar=show_progress,
convert_to_numpy=True
)
return embeddings
def embed_query(self, query: str) -> np.ndarray:
"""Embed a query with BGE-style prefix."""
# BGE models benefit from query prefix
if "bge" in self.model.get_sentence_embedding_dimension():
query = f"Represent this sentence for searching relevant passages: {query}"
return self.embed([query])[0]
def embed_documents(self, documents: List[str]) -> np.ndarray:
"""Embed documents for indexing."""
return self.embed(documents)
# E5 model with instructions
class E5Embedder:
def __init__(self, model_name: str = "intfloat/multilingual-e5-large"):
self.model = SentenceTransformer(model_name)
def embed_query(self, query: str) -> np.ndarray:
return self.model.encode(f"query: {query}")
def embed_document(self, document: str) -> np.ndarray:
return self.model.encode(f"passage: {document}")
Template 3: Chunking Strategies
from typing import List, Tuple
import re
def chunk_by_tokens(
text: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
tokenizer=None
) -> List[str]:
"""Chunk text by token count."""
import tiktoken
tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")
tokens = tokenizer.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = start + chunk_size
chunk_tokens = tokens[start:end]
chunk_text = tokenizer.decode(chunk_tokens)
chunks.append(chunk_text)
start = end - chunk_overlap
return chunks
def chunk_by_sentences(
text: str,
max_chunk_size: int = 1000,
min_chunk_size: int = 100
) -> List[str]:
"""Chunk text by sentences, respecting size limits."""
import nltk
sentences = nltk.sent_tokenize(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
if current_size + sentence_size > max_chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_size = 0
current_chunk.append(sentence)
current_size += sentence_size
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def chunk_by_semantic_sections(
text: str,
headers_pattern: str = r'^#{1,3}\s+.+$'
) -> List[Tuple[str, str]]:
"""Chunk markdown by headers, preserving hierarchy."""
lines = text.split('\n')
chunks = []
current_header = ""
current_content = []
for line in lines:
if re.match(headers_pattern, line, re.MULTILINE):
if current_content:
chunks.append((current_header, '\n'.join(current_content)))
current_header = line
current_content = []
else:
current_content.append(line)
if current_content:
chunks.append((current_header, '\n'.join(current_content)))
return chunks
def recursive_character_splitter(
text: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: List[str] = None
) -> List[str]:
"""LangChain-style recursive splitter."""
separators = separators or ["\n\n", "\n", ". ", " ", ""]
def split_text(text: str, separators: List[str]) -> List[str]:
if not text:
return []
separator = separators[0]
remaining_separators = separators[1:]
if separator == "":
# Character-level split
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]
splits = text.split(separator)
chunks = []
current_chunk = []
current_length = 0
for split in splits:
split_length = len(split) + len(separator)
if current_length + split_length > chunk_size and current_chunk:
chunk_text = separator.join(current_chunk)
# Recursively split if still too large
if len(chunk_text) > chunk_size and remaining_separators:
chunks.extend(split_text(chunk_text, remaining_separators))
else:
chunks.append(chunk_text)
# Start new chunk with overlap
overlap_splits = []
overlap_length = 0
for s in reversed(current_chunk):
if overlap_length + len(s) <= chunk_overlap:
overlap_splits.insert(0, s)
overlap_length += len(s)
else:
break
current_chunk = overlap_splits
current_length = overlap_length
current_chunk.append(split)
current_length += split_length
if current_chunk:
chunks.append(separator.join(current_chunk))
return chunks
return split_text(text, separators)
Template 4: Domain-Specific Embedding Pipeline
class DomainEmbeddingPipeline:
"""Pipeline for domain-specific embeddings."""
def __init__(
self,
embedding_model: str = "text-embedding-3-small",
chunk_size: int = 512,
chunk_overlap: int = 50,
preprocessing_fn=None
):
self.embedding_model = embedding_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.preprocess = preprocessing_fn or self._default_preprocess
def _default_preprocess(self, text: str) -> str:
"""Default preprocessing."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters
text = re.sub(r'[^\w\s.,!?-]', '', text)
return text.strip()
async def process_documents(
self,
documents: List[dict],
id_field: str = "id",
content_field: str = "content",
metadata_fields: List[str] = None
) -> List[dict]:
"""Process documents for vector storage."""
processed = []
for doc in documents:
content = doc[content_field]
doc_id = doc[id_field]
# Preprocess
cleaned = self.preprocess(content)
# Chunk
chunks = chunk_by_tokens(
cleaned,
self.chunk_size,
self.chunk_overlap
)
# Create embeddings
embeddings = get_embeddings(chunks, self.embedding_model)
# Create records
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
record = {
"id": f"{doc_id}_chunk_{i}",
"document_id": doc_id,
"chunk_index": i,
"text": chunk,
"embedding": embedding
}
# Add metadata
if metadata_fields:
for field in metadata_fields:
if field in doc:
record[field] = doc[field]
processed.append(record)
return processed
# Code-specific pipeline
class CodeEmbeddingPipeline:
"""Specialized pipeline for code embeddings."""
def __init__(self, model: str = "voyage-code-2"):
self.model = model
def chunk_code(self, code: str, language: str) -> List[dict]:
"""Chunk code by functions/classes."""
import tree_sitter
# Parse with tree-sitter
# Extract functions, classes, methods
# Return chunks with context
pass
def embed_with_context(self, chunk: str, context: str) -> List[float]:
"""Embed code with surrounding context."""
combined = f"Context: {context}\n\nCode:\n{chunk}"
return get_embedding(combined, model=self.model)
Template 5: Embedding Quality Evaluation
import numpy as np
from typing import List, Tuple
def evaluate_retrieval_quality(
queries: List[str],
relevant_docs: List[List[str]], # List of relevant doc IDs per query
retrieved_docs: List[List[str]], # List of retrieved doc IDs per query
k: int = 10
) -> dict:
"""Evaluate embedding quality for retrieval."""
def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / k
def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / len(relevant) if relevant else 0
def mrr(relevant: set, retrieved: List[str]) -> float:
for i, doc in enumerate(retrieved):
if doc in relevant:
return 1 / (i + 1)
return 0
def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:
dcg = sum(
1 / np.log2(i + 2) if doc in relevant else 0
for i, doc in enumerate(retrieved[:k])
)
ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))
return dcg / ideal_dcg if ideal_dcg > 0 else 0
metrics = {
f"precision@{k}": [],
f"recall@{k}": [],
"mrr": [],
f"ndcg@{k}": []
}
for relevant, retrieved in zip(relevant_docs, retrieved_docs):
relevant_set = set(relevant)
metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k))
metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k))
metrics["mrr"].append(mrr(relevant_set, retrieved))
metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k))
return {name: np.mean(values) for name, values in metrics.items()}
def compute_embedding_similarity(
embeddings1: np.ndarray,
embeddings2: np.ndarray,
metric: str = "cosine"
) -> np.ndarray:
"""Compute similarity matrix between embedding sets."""
if metric == "cosine":
# Normalize
norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)
norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)
return norm1 @ norm2.T
elif metric == "euclidean":
from scipy.spatial.distance import cdist
return -cdist(embeddings1, embeddings2, metric='euclidean')
elif metric == "dot":
return embeddings1 @ embeddings2.T
Best Practices
Do's
- Match model to use case - Code vs prose vs multilingual
- Chunk thoughtfully - Preserve semantic boundaries
- Normalize embeddings - For cosine similarity
- Batch requests - More efficient than one-by-one
- Cache embeddings - Avoid recomputing
Don'ts
- Don't ignore token limits - Truncation loses info
- Don't mix embedding models - Incompatible spaces
- Don't skip preprocessing - Garbage in, garbage out
- Don't over-chunk - Lose context
Resources