Similarity Search Patterns
Patterns for implementing efficient similarity search in production systems.
When to Use This Skill
- Building semantic search systems
- Implementing RAG retrieval
- Creating recommendation engines
- Optimizing search latency
- Scaling to millions of vectors
- Combining semantic and keyword search
Core Concepts
1. Distance Metrics
| Metric |
Formula |
Best For |
| Cosine |
1 - (A·B)/(‖A‖‖B‖) |
Normalized embeddings |
| Euclidean (L2) |
√Σ(a-b)² |
Raw embeddings |
| Dot Product |
A·B |
Magnitude matters |
| Manhattan (L1) |
Σ |
a-b |
2. Index Types
┌─────────────────────────────────────────────────┐
│ Index Types │
├─────────────┬───────────────┬───────────────────┤
│ Flat │ HNSW │ IVF+PQ │
│ (Exact) │ (Graph-based) │ (Quantized) │
├─────────────┼───────────────┼───────────────────┤
│ O(n) search │ O(log n) │ O(√n) │
│ 100% recall │ ~95-99% │ ~90-95% │
│ Small data │ Medium-Large │ Very Large │
└─────────────┴───────────────┴───────────────────┘
Templates
Template 1: Pinecone Implementation
from pinecone import Pinecone, ServerlessSpec
from typing import List, Dict, Optional
import hashlib
class PineconeVectorStore:
def __init__(
self,
api_key: str,
index_name: str,
dimension: int = 1536,
metric: str = "cosine"
):
self.pc = Pinecone(api_key=api_key)
# Create index if not exists
if index_name not in self.pc.list_indexes().names():
self.pc.create_index(
name=index_name,
dimension=dimension,
metric=metric,
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
self.index = self.pc.Index(index_name)
def upsert(
self,
vectors: List[Dict],
namespace: str = ""
) -> int:
"""
Upsert vectors.
vectors: [{"id": str, "values": List[float], "metadata": dict}]
"""
# Batch upsert
batch_size = 100
total = 0
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
self.index.upsert(vectors=batch, namespace=namespace)
total += len(batch)
return total
def search(
self,
query_vector: List[float],
top_k: int = 10,
namespace: str = "",
filter: Optional[Dict] = None,
include_metadata: bool = True
) -> List[Dict]:
"""Search for similar vectors."""
results = self.index.query(
vector=query_vector,
top_k=top_k,
namespace=namespace,
filter=filter,
include_metadata=include_metadata
)
return [
{
"id": match.id,
"score": match.score,
"metadata": match.metadata
}
for match in results.matches
]
def search_with_rerank(
self,
query: str,
query_vector: List[float],
top_k: int = 10,
rerank_top_n: int = 50,
namespace: str = ""
) -> List[Dict]:
"""Search and rerank results."""
# Over-fetch for reranking
initial_results = self.search(
query_vector,
top_k=rerank_top_n,
namespace=namespace
)
# Rerank with cross-encoder or LLM
reranked = self._rerank(query, initial_results)
return reranked[:top_k]
def _rerank(self, query: str, results: List[Dict]) -> List[Dict]:
"""Rerank results using cross-encoder."""
from sentence_transformers import CrossEncoder
model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
pairs = [(query, r["metadata"]["text"]) for r in results]
scores = model.predict(pairs)
for result, score in zip(results, scores):
result["rerank_score"] = float(score)
return sorted(results, key=lambda x: x["rerank_score"], reverse=True)
def delete(self, ids: List[str], namespace: str = ""):
"""Delete vectors by ID."""
self.index.delete(ids=ids, namespace=namespace)
def delete_by_filter(self, filter: Dict, namespace: str = ""):
"""Delete vectors matching filter."""
self.index.delete(filter=filter, namespace=namespace)
Template 2: Qdrant Implementation
from qdrant_client import QdrantClient
from qdrant_client.http import models
from typing import List, Dict, Optional
class QdrantVectorStore:
def __init__(
self,
url: str = "localhost",
port: int = 6333,
collection_name: str = "documents",
vector_size: int = 1536
):
self.client = QdrantClient(url=url, port=port)
self.collection_name = collection_name
# Create collection if not exists
collections = self.client.get_collections().collections
if collection_name not in [c.name for c in collections]:
self.client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(
size=vector_size,
distance=models.Distance.COSINE
),
# Optional: enable quantization for memory efficiency
quantization_config=models.ScalarQuantization(
scalar=models.ScalarQuantizationConfig(
type=models.ScalarType.INT8,
quantile=0.99,
always_ram=True
)
)
)
def upsert(self, points: List[Dict]) -> int:
"""
Upsert points.
points: [{"id": str/int, "vector": List[float], "payload": dict}]
"""
qdrant_points = [
models.PointStruct(
id=p["id"],
vector=p["vector"],
payload=p.get("payload", {})
)
for p in points
]
self.client.upsert(
collection_name=self.collection_name,
points=qdrant_points
)
return len(points)
def search(
self,
query_vector: List[float],
limit: int = 10,
filter: Optional[models.Filter] = None,
score_threshold: Optional[float] = None
) -> List[Dict]:
"""Search for similar vectors."""
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
limit=limit,
query_filter=filter,
score_threshold=score_threshold
)
return [
{
"id": r.id,
"score": r.score,
"payload": r.payload
}
for r in results
]
def search_with_filter(
self,
query_vector: List[float],
must_conditions: List[Dict] = None,
should_conditions: List[Dict] = None,
must_not_conditions: List[Dict] = None,
limit: int = 10
) -> List[Dict]:
"""Search with complex filters."""
conditions = []
if must_conditions:
conditions.extend([
models.FieldCondition(
key=c["key"],
match=models.MatchValue(value=c["value"])
)
for c in must_conditions
])
filter = models.Filter(must=conditions) if conditions else None
return self.search(query_vector, limit=limit, filter=filter)
def search_with_sparse(
self,
dense_vector: List[float],
sparse_vector: Dict[int, float],
limit: int = 10,
dense_weight: float = 0.7
) -> List[Dict]:
"""Hybrid search with dense and sparse vectors."""
# Requires collection with named vectors
results = self.client.search(
collection_name=self.collection_name,
query_vector=models.NamedVector(
name="dense",
vector=dense_vector
),
limit=limit
)
return [{"id": r.id, "score": r.score, "payload": r.payload} for r in results]
Template 3: pgvector with PostgreSQL
import asyncpg
from typing import List, Dict, Optional
import numpy as np
class PgVectorStore:
def __init__(self, connection_string: str):
self.connection_string = connection_string
async def init(self):
"""Initialize connection pool and extension."""
self.pool = await asyncpg.create_pool(self.connection_string)
async with self.pool.acquire() as conn:
# Enable extension
await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")
# Create table
await conn.execute("""
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
content TEXT,
metadata JSONB,
embedding vector(1536)
)
""")
# Create index (HNSW for better performance)
await conn.execute("""
CREATE INDEX IF NOT EXISTS documents_embedding_idx
ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""")
async def upsert(self, documents: List[Dict]):
"""Upsert documents with embeddings."""
async with self.pool.acquire() as conn:
await conn.executemany(
"""
INSERT INTO documents (id, content, metadata, embedding)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET
content = EXCLUDED.content,
metadata = EXCLUDED.metadata,
embedding = EXCLUDED.embedding
""",
[
(
doc["id"],
doc["content"],
doc.get("metadata", {}),
np.array(doc["embedding"]).tolist()
)
for doc in documents
]
)
async def search(
self,
query_embedding: List[float],
limit: int = 10,
filter_metadata: Optional[Dict] = None
) -> List[Dict]:
"""Search for similar documents."""
query = """
SELECT id, content, metadata,
1 - (embedding <=> $1::vector) as similarity
FROM documents
"""
params = [query_embedding]
if filter_metadata:
conditions = []
for key, value in filter_metadata.items():
params.append(value)
conditions.append(f"metadata->>'{key}' = ${len(params)}")
query += " WHERE " + " AND ".join(conditions)
query += f" ORDER BY embedding <=> $1::vector LIMIT ${len(params) + 1}"
params.append(limit)
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, *params)
return [
{
"id": row["id"],
"content": row["content"],
"metadata": row["metadata"],
"score": row["similarity"]
}
for row in rows
]
async def hybrid_search(
self,
query_embedding: List[float],
query_text: str,
limit: int = 10,
vector_weight: float = 0.5
) -> List[Dict]:
"""Hybrid search combining vector and full-text."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
WITH vector_results AS (
SELECT id, content, metadata,
1 - (embedding <=> $1::vector) as vector_score
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT $3 * 2
),
text_results AS (
SELECT id, content, metadata,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', $2)) as text_score
FROM documents
WHERE to_tsvector('english', content) @@ plainto_tsquery('english', $2)
LIMIT $3 * 2
)
SELECT
COALESCE(v.id, t.id) as id,
COALESCE(v.content, t.content) as content,
COALESCE(v.metadata, t.metadata) as metadata,
COALESCE(v.vector_score, 0) * $4 +
COALESCE(t.text_score, 0) * (1 - $4) as combined_score
FROM vector_results v
FULL OUTER JOIN text_results t ON v.id = t.id
ORDER BY combined_score DESC
LIMIT $3
""",
query_embedding, query_text, limit, vector_weight
)
return [dict(row) for row in rows]
Template 4: Weaviate Implementation
import weaviate
from weaviate.util import generate_uuid5
from typing import List, Dict, Optional
class WeaviateVectorStore:
def __init__(
self,
url: str = "http://localhost:8080",
class_name: str = "Document"
):
self.client = weaviate.Client(url=url)
self.class_name = class_name
self._ensure_schema()
def _ensure_schema(self):
"""Create schema if not exists."""
schema = {
"class": self.class_name,
"vectorizer": "none", # We provide vectors
"properties": [
{"name": "content", "dataType": ["text"]},
{"name": "source", "dataType": ["string"]},
{"name": "chunk_id", "dataType": ["int"]}
]
}
if not self.client.schema.exists(self.class_name):
self.client.schema.create_class(schema)
def upsert(self, documents: List[Dict]):
"""Batch upsert documents."""
with self.client.batch as batch:
batch.batch_size = 100
for doc in documents:
batch.add_data_object(
data_object={
"content": doc["content"],
"source": doc.get("source", ""),
"chunk_id": doc.get("chunk_id", 0)
},
class_name=self.class_name,
uuid=generate_uuid5(doc["id"]),
vector=doc["embedding"]
)
def search(
self,
query_vector: List[float],
limit: int = 10,
where_filter: Optional[Dict] = None
) -> List[Dict]:
"""Vector search."""
query = (
self.client.query
.get(self.class_name, ["content", "source", "chunk_id"])
.with_near_vector({"vector": query_vector})
.with_limit(limit)
.with_additional(["distance", "id"])
)
if where_filter:
query = query.with_where(where_filter)
results = query.do()
return [
{
"id": item["_additional"]["id"],
"content": item["content"],
"source": item["source"],
"score": 1 - item["_additional"]["distance"]
}
for item in results["data"]["Get"][self.class_name]
]
def hybrid_search(
self,
query: str,
query_vector: List[float],
limit: int = 10,
alpha: float = 0.5 # 0 = keyword, 1 = vector
) -> List[Dict]:
"""Hybrid search combining BM25 and vector."""
results = (
self.client.query
.get(self.class_name, ["content", "source"])
.with_hybrid(query=query, vector=query_vector, alpha=alpha)
.with_limit(limit)
.with_additional(["score"])
.do()
)
return [
{
"content": item["content"],
"source": item["source"],
"score": item["_additional"]["score"]
}
for item in results["data"]["Get"][self.class_name]
]
Best Practices
Do's
- Use appropriate index - HNSW for most cases
- Tune parameters - ef_search, nprobe for recall/speed
- Implement hybrid search - Combine with keyword search
- Monitor recall - Measure search quality
- Pre-filter when possible - Reduce search space
Don'ts
- Don't skip evaluation - Measure before optimizing
- Don't over-index - Start with flat, scale up
- Don't ignore latency - P99 matters for UX
- Don't forget costs - Vector storage adds up
Resources