| name | vector-database-management |
| description | Comprehensive guide for managing vector databases including Pinecone, Weaviate, and Chroma for semantic search, RAG systems, and similarity-based applications |
| version | 1.0.0 |
| category | data-engineering |
| tags | vector-database, embeddings, semantic-search, rag, pinecone, weaviate, chroma, similarity-search, machine-learning, ai |
| prerequisites | Python 3.8+, Basic understanding of embeddings and vector representations, Familiarity with REST APIs, Knowledge of similarity metrics (cosine, euclidean, dot product) |
Vector Database Management
Table of Contents
- Introduction
- Vector Embeddings Fundamentals
- Database Setup & Configuration
- Index Operations
- Vector Operations
- Similarity Search
- Metadata Filtering
- Hybrid Search
- Namespace & Collection Management
- Performance & Scaling
- Production Best Practices
- Cost Optimization
Introduction
Vector databases are specialized systems designed to store, index, and query high-dimensional vector embeddings efficiently. They power modern AI applications including semantic search, recommendation systems, RAG (Retrieval Augmented Generation), and similarity-based matching.
Key Concepts
- Vector Embeddings: Numerical representations of data (text, images, audio) in high-dimensional space
- Similarity Search: Finding vectors that are "close" to a query vector using distance metrics
- Metadata Filtering: Combining vector similarity with structured data filtering
- Indexing: Optimization structures (HNSW, IVF, etc.) for fast approximate nearest neighbor search
Database Comparison
| Feature | Pinecone | Weaviate | Chroma |
|---|---|---|---|
| Deployment | Fully managed | Managed or self-hosted | Self-hosted or cloud |
| Index Types | Serverless, Pods | HNSW | HNSW |
| Metadata Filtering | Advanced | GraphQL-based | Simple |
| Hybrid Search | Sparse-Dense | Built-in | Limited |
| Scale | Massive | Large | Small-Medium |
| Best For | Production RAG | Knowledge graphs | Local development |
Vector Embeddings Fundamentals
Understanding Vector Representations
Vector embeddings transform unstructured data into numerical arrays that capture semantic meaning:
# Text to embeddings using OpenAI
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
def generate_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]:
"""Generate embeddings from text using OpenAI."""
response = client.embeddings.create(
input=text,
model=model
)
return response.data[0].embedding
# Example usage
text = "Vector databases enable semantic search capabilities"
embedding = generate_embedding(text)
print(f"Embedding dimension: {len(embedding)}") # 1536 dimensions
print(f"First 5 values: {embedding[:5]}")
Popular Embedding Models
# 1. OpenAI Embeddings (Production-grade)
from openai import OpenAI
def openai_embeddings(texts: list[str]) -> list[list[float]]:
"""Batch generate OpenAI embeddings."""
client = OpenAI(api_key="YOUR_API_KEY")
response = client.embeddings.create(
input=texts,
model="text-embedding-3-large" # 3072 dimensions
)
return [item.embedding for item in response.data]
# 2. Sentence Transformers (Open-source)
from sentence_transformers import SentenceTransformer
def sentence_transformer_embeddings(texts: list[str]) -> list[list[float]]:
"""Generate embeddings using Sentence Transformers."""
model = SentenceTransformer('all-MiniLM-L6-v2') # 384 dimensions
embeddings = model.encode(texts)
return embeddings.tolist()
# 3. Cohere Embeddings
import cohere
def cohere_embeddings(texts: list[str]) -> list[list[float]]:
"""Generate embeddings using Cohere."""
co = cohere.Client("YOUR_API_KEY")
response = co.embed(
texts=texts,
model="embed-english-v3.0",
input_type="search_document"
)
return response.embeddings
Embedding Dimensions & Trade-offs
# Different embedding models for different use cases
EMBEDDING_CONFIGS = {
"openai-small": {
"model": "text-embedding-3-small",
"dimensions": 1536,
"cost_per_1m": 0.02,
"use_case": "General purpose, cost-effective"
},
"openai-large": {
"model": "text-embedding-3-large",
"dimensions": 3072,
"cost_per_1m": 0.13,
"use_case": "High accuracy requirements"
},
"sentence-transformers": {
"model": "all-MiniLM-L6-v2",
"dimensions": 384,
"cost_per_1m": 0.00, # Open-source
"use_case": "Local development, privacy-sensitive"
},
"cohere-multilingual": {
"model": "embed-multilingual-v3.0",
"dimensions": 1024,
"cost_per_1m": 0.10,
"use_case": "Multi-language applications"
}
}
Database Setup & Configuration
Pinecone Setup
# Install Pinecone SDK
# pip install pinecone-client
from pinecone import Pinecone, ServerlessSpec
# Initialize Pinecone client
pc = Pinecone(api_key="YOUR_API_KEY")
# List existing indexes
indexes = pc.list_indexes()
print(f"Existing indexes: {[idx.name for idx in indexes]}")
# Create serverless index (recommended for production)
index_name = "production-search"
if index_name not in [idx.name for idx in pc.list_indexes()]:
pc.create_index(
name=index_name,
dimension=1536, # Match your embedding model
metric="cosine", # cosine, dotproduct, or euclidean
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
),
deletion_protection="enabled", # Prevent accidental deletion
tags={
"environment": "production",
"team": "ml",
"project": "semantic-search"
}
)
print(f"Created index: {index_name}")
# Connect to index
index = pc.Index(index_name)
# Get index stats
stats = index.describe_index_stats()
print(f"Index stats: {stats}")
Selective Metadata Indexing (Pinecone)
# Configure which metadata fields to index for filtering
# This optimizes memory usage and query performance
from pinecone import Pinecone, ServerlessSpec
pc = Pinecone(api_key="YOUR_API_KEY")
# Create index with metadata configuration
pc.create_index(
name="optimized-index",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1",
schema={
"fields": {
# Index these fields for filtering
"document_id": {"filterable": True},
"category": {"filterable": True},
"created_at": {"filterable": True},
"tags": {"filterable": True},
# Store but don't index (saves memory)
"document_title": {"filterable": False},
"document_url": {"filterable": False},
"full_content": {"filterable": False}
}
}
)
)
# This configuration allows you to:
# 1. Filter by document_id, category, created_at, tags
# 2. Retrieve document_title, document_url, full_content in results
# 3. Save memory by not indexing non-filterable fields
Weaviate Setup
# Install Weaviate client
# pip install weaviate-client
import weaviate
from weaviate.classes.config import Configure, Property, DataType
# Connect to Weaviate
client = weaviate.connect_to_local()
# Or connect to Weaviate Cloud
# client = weaviate.connect_to_wcs(
# cluster_url="YOUR_WCS_URL",
# auth_credentials=weaviate.auth.AuthApiKey("YOUR_API_KEY")
# )
# Create collection (schema)
try:
collection = client.collections.create(
name="Documents",
vectorizer_config=Configure.Vectorizer.text2vec_openai(
model="text-embedding-3-small"
),
properties=[
Property(name="title", data_type=DataType.TEXT),
Property(name="content", data_type=DataType.TEXT),
Property(name="category", data_type=DataType.TEXT),
Property(name="created_at", data_type=DataType.DATE),
Property(name="tags", data_type=DataType.TEXT_ARRAY)
]
)
print(f"Created collection: Documents")
except Exception as e:
print(f"Collection exists or error: {e}")
# Get collection
documents = client.collections.get("Documents")
# Check collection info
print(documents.config.get())
Chroma Setup
# Install Chroma
# pip install chromadb
import chromadb
from chromadb.config import Settings
# Initialize Chroma client (persistent)
client = chromadb.PersistentClient(path="./chroma_db")
# Or use ephemeral (in-memory)
# client = chromadb.EphemeralClient()
# Create or get collection
collection = client.get_or_create_collection(
name="documents",
metadata={
"description": "Document collection for semantic search",
"hnsw:space": "cosine" # cosine, l2, or ip (inner product)
}
)
# List all collections
collections = client.list_collections()
print(f"Available collections: {[c.name for c in collections]}")
# Get collection info
print(f"Collection count: {collection.count()}")
Index Operations
Creating Indexes with Different Configurations
from pinecone import Pinecone, ServerlessSpec, PodSpec
pc = Pinecone(api_key="YOUR_API_KEY")
# 1. Serverless index (auto-scaling, pay-per-use)
pc.create_index(
name="serverless-index",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
# 2. Pod-based index (dedicated resources)
pc.create_index(
name="pod-index",
dimension=1536,
metric="dotproduct",
spec=PodSpec(
environment="us-east-1-aws",
pod_type="p1.x1", # Performance tier
pods=2, # Number of pods
replicas=2, # Replicas for high availability
shards=1
)
)
# 3. Sparse index (for BM25-like search)
pc.create_index(
name="sparse-index",
dimension=None, # Sparse vectors don't have fixed dimension
metric="dotproduct",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
Index Management Operations
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
# List all indexes
indexes = pc.list_indexes()
for idx in indexes:
print(f"Name: {idx.name}, Status: {idx.status.state}, Host: {idx.host}")
# Describe specific index
index_info = pc.describe_index("production-search")
print(f"Dimension: {index_info.dimension}")
print(f"Metric: {index_info.metric}")
print(f"Status: {index_info.status}")
# Connect to index
index = pc.Index("production-search")
# Get index statistics
stats = index.describe_index_stats()
print(f"Total vectors: {stats.total_vector_count}")
print(f"Namespaces: {stats.namespaces}")
print(f"Index fullness: {stats.index_fullness}")
# Delete index (be careful!)
# pc.delete_index("test-index")
Configuring Index for Optimal Performance
# Configuration for different use cases
# 1. High-throughput search (many queries/second)
pc.create_index(
name="high-throughput",
dimension=1536,
metric="cosine",
spec=PodSpec(
environment="us-east-1-aws",
pod_type="p2.x1", # Higher performance tier
pods=4,
replicas=3 # More replicas = higher query throughput
)
)
# 2. Large-scale storage (billions of vectors)
pc.create_index(
name="large-scale",
dimension=1536,
metric="cosine",
spec=PodSpec(
environment="us-east-1-aws",
pod_type="s1.x1", # Storage-optimized
pods=8,
shards=4 # More shards = more storage capacity
)
)
# 3. Cost-optimized development
pc.create_index(
name="dev-environment",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
) # Serverless = pay only for what you use
)
Vector Operations
Upserting Vectors (Pinecone)
from pinecone import Pinecone
import uuid
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")
# 1. Single vector upsert
vector_id = str(uuid.uuid4())
index.upsert(
vectors=[
{
"id": vector_id,
"values": [0.1, 0.2, 0.3, ...], # 1536 dimensions
"metadata": {
"title": "Introduction to Vector Databases",
"category": "education",
"author": "John Doe",
"created_at": "2024-01-15",
"tags": ["ml", "ai", "databases"]
}
}
],
namespace="documents"
)
# 2. Batch upsert (efficient for large datasets)
batch_size = 100
vectors = []
for i, (doc_id, embedding, metadata) in enumerate(documents):
vectors.append({
"id": doc_id,
"values": embedding,
"metadata": metadata
})
# Upsert in batches
if len(vectors) >= batch_size or i == len(documents) - 1:
index.upsert(vectors=vectors, namespace="documents")
print(f"Upserted batch of {len(vectors)} vectors")
vectors = []
# 3. Upsert with async for better performance
from pinecone import Pinecone
import asyncio
async def upsert_vectors_async(vectors_batch):
"""Async upsert for parallel processing."""
index.upsert(vectors=vectors_batch, namespace="documents", async_req=True)
# Parallel upsert
tasks = []
for batch in batches:
tasks.append(upsert_vectors_async(batch))
await asyncio.gather(*tasks)
Sparse Vector Operations (Pinecone)
# Sparse vectors are useful for keyword-based search (like BM25)
# Combined with dense vectors for hybrid search
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("hybrid-search-index")
# Upsert vector with both dense and sparse components
index.upsert(
vectors=[
{
"id": "doc1",
"values": [0.1, 0.2, ..., 0.5], # Dense vector
"sparse_values": {
"indices": [10, 45, 123, 234, 678], # Token IDs
"values": [0.8, 0.6, 0.9, 0.7, 0.5] # TF-IDF weights
},
"metadata": {"title": "Hybrid Search Document"}
}
],
namespace="hybrid"
)
# Query with hybrid search
results = index.query(
vector=[0.1, 0.2, ..., 0.5], # Dense query vector
sparse_vector={
"indices": [10, 45, 123],
"values": [0.8, 0.7, 0.9]
},
top_k=10,
namespace="hybrid",
include_metadata=True
)
Vector Operations (Weaviate)
import weaviate
from weaviate.classes.query import MetadataQuery
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")
# 1. Insert single object
doc_uuid = documents.data.insert(
properties={
"title": "Vector Database Guide",
"content": "A comprehensive guide to vector databases...",
"category": "tutorial",
"created_at": "2024-01-15T10:00:00Z",
"tags": ["database", "ml", "ai"]
}
)
print(f"Inserted: {doc_uuid}")
# 2. Batch insert
with documents.batch.dynamic() as batch:
for doc in document_list:
batch.add_object(
properties={
"title": doc["title"],
"content": doc["content"],
"category": doc["category"],
"created_at": doc["created_at"],
"tags": doc["tags"]
}
)
# 3. Insert with custom vector
documents.data.insert(
properties={"title": "Custom Vector Doc", "content": "..."},
vector=[0.1, 0.2, 0.3, ...] # Your pre-computed vector
)
# 4. Update object
documents.data.update(
uuid=doc_uuid,
properties={"title": "Updated Title"}
)
# 5. Delete object
documents.data.delete_by_id(uuid=doc_uuid)
Vector Operations (Chroma)
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")
# 1. Add documents with auto-embedding
collection.add(
documents=[
"This is document 1",
"This is document 2",
"This is document 3"
],
metadatas=[
{"category": "tech", "author": "Alice"},
{"category": "science", "author": "Bob"},
{"category": "tech", "author": "Charlie"}
],
ids=["doc1", "doc2", "doc3"]
)
# 2. Add with custom embeddings
collection.add(
embeddings=[
[0.1, 0.2, 0.3, ...],
[0.4, 0.5, 0.6, ...]
],
metadatas=[
{"title": "Doc 1"},
{"title": "Doc 2"}
],
ids=["custom1", "custom2"]
)
# 3. Update documents
collection.update(
ids=["doc1"],
documents=["Updated document content"],
metadatas=[{"category": "tech", "updated": True}]
)
# 4. Delete documents
collection.delete(ids=["doc1", "doc2"])
# 5. Get documents by IDs
results = collection.get(
ids=["doc1", "doc2"],
include=["documents", "metadatas", "embeddings"]
)
Similarity Search
Basic Similarity Search (Pinecone)
from pinecone import Pinecone
from openai import OpenAI
# Initialize clients
pc = Pinecone(api_key="PINECONE_API_KEY")
openai_client = OpenAI(api_key="OPENAI_API_KEY")
index = pc.Index("production-search")
# 1. Generate query embedding
query_text = "What are the benefits of vector databases?"
response = openai_client.embeddings.create(
input=query_text,
model="text-embedding-3-small"
)
query_embedding = response.data[0].embedding
# 2. Search for similar vectors
results = index.query(
vector=query_embedding,
top_k=10,
namespace="documents",
include_values=False,
include_metadata=True
)
# 3. Process results
print(f"Found {len(results.matches)} results")
for match in results.matches:
print(f"ID: {match.id}")
print(f"Score: {match.score:.4f}")
print(f"Title: {match.metadata.get('title')}")
print(f"Category: {match.metadata.get('category')}")
print("---")
Search by ID (Query by Example)
# Search using an existing vector as query
results = index.query(
id="existing-doc-id", # Use this document as the query
top_k=10,
namespace="documents",
include_metadata=True
)
# Useful for "find similar items" features
print(f"Documents similar to {results.matches[0].metadata.get('title')}:")
for match in results.matches[1:]: # Skip first (self)
print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
Multi-vector Search (Pinecone)
# Search multiple query vectors in one request
query_embeddings = [
[0.1, 0.2, ...], # Query 1
[0.3, 0.4, ...], # Query 2
[0.5, 0.6, ...] # Query 3
]
results = index.query(
queries=query_embeddings,
top_k=5,
namespace="documents",
include_metadata=True
)
# Process results for each query
for i, query_results in enumerate(results):
print(f"\nResults for query {i+1}:")
for match in query_results.matches:
print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
Similarity Search (Weaviate)
import weaviate
from weaviate.classes.query import MetadataQuery
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")
# 1. Near text search (semantic)
response = documents.query.near_text(
query="vector database performance optimization",
limit=10,
return_metadata=MetadataQuery(distance=True, certainty=True)
)
for obj in response.objects:
print(f"Title: {obj.properties['title']}")
print(f"Distance: {obj.metadata.distance:.4f}")
print(f"Certainty: {obj.metadata.certainty:.4f}")
print("---")
# 2. Near vector search (with custom embedding)
response = documents.query.near_vector(
near_vector=[0.1, 0.2, 0.3, ...],
limit=10
)
# 3. Near object search (find similar to existing object)
response = documents.query.near_object(
near_object="uuid-of-reference-object",
limit=10
)
Similarity Search (Chroma)
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")
# 1. Query with text (auto-embedding)
results = collection.query(
query_texts=["What is machine learning?"],
n_results=10,
include=["documents", "metadatas", "distances"]
)
print(f"Found {len(results['ids'][0])} results")
for i, doc_id in enumerate(results['ids'][0]):
print(f"ID: {doc_id}")
print(f"Distance: {results['distances'][0][i]:.4f}")
print(f"Document: {results['documents'][0][i][:100]}...")
print(f"Metadata: {results['metadatas'][0][i]}")
print("---")
# 2. Query with custom embedding
results = collection.query(
query_embeddings=[[0.1, 0.2, 0.3, ...]],
n_results=10
)
Metadata Filtering
Pinecone Metadata Filters
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")
# 1. Equality filter
results = index.query(
vector=query_embedding,
top_k=10,
filter={"category": {"$eq": "education"}},
include_metadata=True
)
# 2. Inequality filter
results = index.query(
vector=query_embedding,
top_k=10,
filter={"year": {"$ne": 2023}},
include_metadata=True
)
# 3. Range filters
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"$and": [
{"year": {"$gte": 2020}},
{"year": {"$lte": 2024}}
]
},
include_metadata=True
)
# 4. In/Not-in filters
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"category": {"$in": ["education", "tutorial", "guide"]}
},
include_metadata=True
)
# 5. Existence check
results = index.query(
vector=query_embedding,
top_k=10,
filter={"author": {"$exists": True}},
include_metadata=True
)
# 6. Complex AND/OR queries
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"$and": [
{"category": {"$eq": "education"}},
{
"$or": [
{"year": {"$eq": 2024}},
{"featured": {"$eq": True}}
]
},
{"tags": {"$in": ["ml", "ai"]}}
]
},
include_metadata=True
)
# 7. Greater than/Less than
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"view_count": {"$gt": 1000},
"rating": {"$gte": 4.5}
},
include_metadata=True
)
Production Metadata Filter Patterns
# Pattern 1: Time-based filtering (recent content)
from datetime import datetime, timedelta
def search_recent_documents(query_text: str, days: int = 30):
"""Search only documents from last N days."""
cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter={
"created_at": {"$gte": cutoff_date}
},
include_metadata=True
)
return results
# Pattern 2: User permission filtering
def search_with_permissions(query_text: str, user_id: str, user_roles: list):
"""Search only documents user has access to."""
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter={
"$or": [
{"owner_id": {"$eq": user_id}},
{"shared_with": {"$in": [user_id]}},
{"public": {"$eq": True}},
{"required_roles": {"$in": user_roles}}
]
},
include_metadata=True
)
return results
# Pattern 3: Multi-tenant filtering
def search_tenant_documents(query_text: str, tenant_id: str, category: str = None):
"""Search within a specific tenant's data."""
filter_dict = {"tenant_id": {"$eq": tenant_id}}
if category:
filter_dict["category"] = {"$eq": category}
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter=filter_dict,
include_metadata=True
)
return results
# Pattern 4: Faceted search
def faceted_search(query_text: str, facets: dict):
"""Search with multiple facet filters."""
filter_conditions = []
for field, values in facets.items():
if isinstance(values, list):
filter_conditions.append({field: {"$in": values}})
else:
filter_conditions.append({field: {"$eq": values}})
results = index.query(
vector=generate_embedding(query_text),
top_k=10,
filter={"$and": filter_conditions} if filter_conditions else {},
include_metadata=True
)
return results
# Usage
results = faceted_search(
"machine learning tutorials",
facets={
"category": ["education", "tutorial"],
"difficulty": "beginner",
"language": ["english", "spanish"]
}
)
Weaviate Metadata Filtering
import weaviate
from weaviate.classes.query import Filter
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")
# 1. Simple equality filter
response = documents.query.near_text(
query="vector databases",
limit=10,
filters=Filter.by_property("category").equal("education")
)
# 2. Greater than filter
response = documents.query.near_text(
query="machine learning",
limit=10,
filters=Filter.by_property("year").greater_than(2020)
)
# 3. Contains any filter
response = documents.query.near_text(
query="AI tutorials",
limit=10,
filters=Filter.by_property("tags").contains_any(["ml", "ai", "deep-learning"])
)
# 4. Complex AND/OR filters
response = documents.query.near_text(
query="database optimization",
limit=10,
filters=(
Filter.by_property("category").equal("tutorial") &
(Filter.by_property("difficulty").equal("beginner") |
Filter.by_property("featured").equal(True))
)
)
Chroma Metadata Filtering
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_collection("documents")
# 1. Simple equality filter
results = collection.query(
query_texts=["vector databases"],
n_results=10,
where={"category": "education"}
)
# 2. AND conditions
results = collection.query(
query_texts=["machine learning"],
n_results=10,
where={
"$and": [
{"category": "education"},
{"difficulty": "beginner"}
]
}
)
# 3. OR conditions
results = collection.query(
query_texts=["AI tutorials"],
n_results=10,
where={
"$or": [
{"category": "education"},
{"category": "tutorial"}
]
}
)
# 4. Greater than/Less than
results = collection.query(
query_texts=["recent content"],
n_results=10,
where={"year": {"$gte": 2023}}
)
# 5. In operator
results = collection.query(
query_texts=["programming guides"],
n_results=10,
where={"language": {"$in": ["python", "javascript", "go"]}}
)
Hybrid Search
Pinecone Hybrid Search (Dense + Sparse)
from pinecone import Pinecone
from typing import Dict, List
import re
from collections import Counter
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("hybrid-search-index")
def create_sparse_vector(text: str, top_k: int = 100) -> Dict:
"""Create sparse vector using simple TF approach."""
# Tokenize
tokens = re.findall(r'\w+', text.lower())
# Calculate term frequencies
tf = Counter(tokens)
# Create vocabulary mapping
vocab = {word: hash(word) % 10000 for word in set(tokens)}
# Get top-k terms
top_terms = tf.most_common(top_k)
# Create sparse vector
indices = [vocab[term] for term, _ in top_terms]
values = [float(freq) / len(tokens) for _, freq in top_terms]
return {
"indices": indices,
"values": values
}
def hybrid_search(query_text: str, top_k: int = 10, alpha: float = 0.5):
"""
Perform hybrid search combining dense and sparse vectors.
alpha: weight for dense search (0.0 = sparse only, 1.0 = dense only)
"""
# Generate dense vector
dense_vector = generate_embedding(query_text)
# Generate sparse vector
sparse_vector = create_sparse_vector(query_text)
# Hybrid query
results = index.query(
vector=dense_vector,
sparse_vector=sparse_vector,
top_k=top_k,
include_metadata=True
)
return results
# Example usage
results = hybrid_search("machine learning vector databases", top_k=10)
for match in results.matches:
print(f"{match.metadata['title']}: {match.score:.4f}")
Weaviate Hybrid Search
import weaviate
client = weaviate.connect_to_local()
documents = client.collections.get("Documents")
# Hybrid search (combines dense vector + BM25 keyword search)
response = documents.query.hybrid(
query="vector database performance",
limit=10,
alpha=0.5, # 0 = pure keyword, 1 = pure vector, 0.5 = balanced
fusion_type="rankedFusion" # or "relativeScore"
)
for obj in response.objects:
print(f"Title: {obj.properties['title']}")
print(f"Score: {obj.metadata.score}")
print("---")
# Hybrid search with filters
response = documents.query.hybrid(
query="machine learning tutorials",
limit=10,
alpha=0.7, # Favor semantic search
filters=Filter.by_property("category").equal("education")
)
# Hybrid search with custom vector
response = documents.query.hybrid(
query="custom query",
vector=[0.1, 0.2, 0.3, ...], # Your pre-computed vector
limit=10,
alpha=0.5
)
BM25 + Vector Hybrid (Custom Implementation)
from rank_bm25 import BM25Okapi
import numpy as np
class HybridSearchEngine:
"""Custom hybrid search combining BM25 and vector search."""
def __init__(self, index, documents: List[Dict]):
self.index = index
self.documents = documents
# Build BM25 index
tokenized_docs = [doc['content'].lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
self.doc_ids = [doc['id'] for doc in documents]
def search(self, query: str, top_k: int = 10, alpha: float = 0.5):
"""
Hybrid search with custom score fusion.
alpha: weight for vector search (1-alpha for BM25)
"""
# 1. Vector search
query_embedding = generate_embedding(query)
vector_results = self.index.query(
vector=query_embedding,
top_k=top_k * 2, # Get more candidates
include_metadata=True
)
# 2. BM25 search
tokenized_query = query.lower().split()
bm25_scores = self.bm25.get_scores(tokenized_query)
# 3. Normalize scores
vector_scores = {
m.id: m.score for m in vector_results.matches
}
max_vec_score = max(vector_scores.values()) if vector_scores else 1.0
max_bm25_score = max(bm25_scores) if max(bm25_scores) > 0 else 1.0
# 4. Combine scores
hybrid_scores = {}
all_ids = set(vector_scores.keys()) | set(self.doc_ids)
for doc_id in all_ids:
vec_score = vector_scores.get(doc_id, 0) / max_vec_score
idx = self.doc_ids.index(doc_id) if doc_id in self.doc_ids else -1
bm25_score = bm25_scores[idx] / max_bm25_score if idx >= 0 else 0
hybrid_scores[doc_id] = (alpha * vec_score) + ((1 - alpha) * bm25_score)
# 5. Rank and return top-k
ranked = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
return ranked[:top_k]
# Usage
engine = HybridSearchEngine(index, documents)
results = engine.search("machine learning databases", top_k=10, alpha=0.7)
Namespace & Collection Management
Pinecone Namespaces
from pinecone import Pinecone
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")
# 1. Upsert to specific namespace
index.upsert(
vectors=[
{"id": "doc1", "values": [...], "metadata": {...}}
],
namespace="production"
)
# 2. Query specific namespace
results = index.query(
vector=[...],
top_k=10,
namespace="production",
include_metadata=True
)
# 3. Get namespace statistics
stats = index.describe_index_stats()
for namespace, info in stats.namespaces.items():
print(f"Namespace: {namespace}")
print(f" Vector count: {info.vector_count}")
# 4. Delete all vectors in namespace
index.delete(delete_all=True, namespace="test")
# 5. Multi-namespace architecture
NAMESPACES = {
"production": "Live user-facing data",
"staging": "Testing before production",
"development": "Development and experiments",
"archive": "Historical data"
}
def upsert_with_environment(vectors, environment="production"):
"""Upsert to appropriate namespace."""
namespace = environment if environment in NAMESPACES else "development"
index.upsert(vectors=vectors, namespace=namespace)
def search_across_namespaces(query_vector, namespaces=["production", "archive"]):
"""Search multiple namespaces and combine results."""
all_results = []
for ns in namespaces:
results = index.query(
vector=query_vector,
top_k=10,
namespace=ns,
include_metadata=True
)
for match in results.matches:
match.metadata["source_namespace"] = ns
all_results.append(match)
# Sort by score
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:10]
Weaviate Collections
import weaviate
from weaviate.classes.config import Configure
client = weaviate.connect_to_local()
# 1. Create multiple collections
collections_config = [
{
"name": "Products",
"properties": ["name", "description", "category", "price"]
},
{
"name": "Users",
"properties": ["username", "bio", "interests"]
},
{
"name": "Reviews",
"properties": ["content", "rating", "product_id", "user_id"]
}
]
for config in collections_config:
try:
client.collections.create(
name=config["name"],
vectorizer_config=Configure.Vectorizer.text2vec_openai()
)
except Exception as e:
print(f"Collection {config['name']} exists: {e}")
# 2. Cross-collection references
client.collections.create(
name="Orders",
references=[
weaviate.classes.config.ReferenceProperty(
name="hasProduct",
target_collection="Products"
),
weaviate.classes.config.ReferenceProperty(
name="byUser",
target_collection="Users"
)
]
)
# 3. Multi-collection search
def search_all_collections(query: str):
"""Search across multiple collections."""
results = {}
for collection_name in ["Products", "Users", "Reviews"]:
collection = client.collections.get(collection_name)
response = collection.query.near_text(
query=query,
limit=5
)
results[collection_name] = response.objects
return results
# 4. Delete collection
client.collections.delete("TestCollection")
Chroma Collections
import chromadb
client = chromadb.PersistentClient(path="./chroma_db")
# 1. Create multiple collections
collections = {
"documents": {
"metadata": {"description": "Document embeddings"},
"embedding_function": None # Use default
},
"images": {
"metadata": {"description": "Image embeddings"},
"embedding_function": None
},
"code": {
"metadata": {"description": "Code snippets"},
"embedding_function": None
}
}
for name, config in collections.items():
collection = client.get_or_create_collection(
name=name,
metadata=config["metadata"]
)
# 2. List all collections
all_collections = client.list_collections()
for coll in all_collections:
print(f"Collection: {coll.name}")
print(f" Count: {coll.count()}")
print(f" Metadata: {coll.metadata}")
# 3. Collection-specific operations
docs_collection = client.get_collection("documents")
docs_collection.add(
documents=["Document text..."],
metadatas=[{"type": "article"}],
ids=["doc1"]
)
# 4. Delete collection
client.delete_collection("test_collection")
# 5. Multi-collection search
def search_all_collections(query: str, n_results: int = 5):
"""Search across all collections."""
results = {}
for collection in client.list_collections():
try:
collection_results = collection.query(
query_texts=[query],
n_results=n_results
)
results[collection.name] = collection_results
except Exception as e:
print(f"Error searching {collection.name}: {e}")
return results
Performance & Scaling
Batch Operations Best Practices
from pinecone import Pinecone
from typing import List, Dict
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
pc = Pinecone(api_key="YOUR_API_KEY")
index = pc.Index("production-search")
# 1. Optimal batch size
OPTIMAL_BATCH_SIZE = 100 # Pinecone recommendation
def batch_upsert(vectors: List[Dict], batch_size: int = OPTIMAL_BATCH_SIZE):
"""Efficiently upsert vectors in batches."""
total_batches = (len(vectors) + batch_size - 1) // batch_size
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
index.upsert(vectors=batch, namespace="documents")
if (i // batch_size + 1) % 10 == 0:
print(f"Processed {i // batch_size + 1}/{total_batches} batches")
# 2. Parallel batch upsert
def parallel_batch_upsert(vectors: List[Dict], num_workers: int = 4):
"""Parallel upsert using thread pool."""
batch_size = 100
batches = [
vectors[i:i + batch_size]
for i in range(0, len(vectors), batch_size)
]
def upsert_batch(batch):
try:
index.upsert(vectors=batch, namespace="documents")
return len(batch)
except Exception as e:
print(f"Error upserting batch: {e}")
return 0
with ThreadPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(upsert_batch, batches))
print(f"Successfully upserted {sum(results)} vectors")
# 3. Rate limiting for API calls
class RateLimiter:
"""Simple rate limiter for API calls."""
def __init__(self, max_calls: int, time_window: float):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
def wait_if_needed(self):
"""Wait if rate limit would be exceeded."""
now = time.time()
# Remove old calls outside time window
self.calls = [call_time for call_time in self.calls
if now - call_time < self.time_window]
if len(self.calls) >= self.max_calls:
sleep_time = self.time_window - (now - self.calls[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.calls = []
self.calls.append(now)
# Usage
rate_limiter = RateLimiter(max_calls=100, time_window=60) # 100 calls/minute
for batch in batches:
rate_limiter.wait_if_needed()
index.upsert(vectors=batch)
# 4. Bulk delete optimization
def bulk_delete_by_filter(filter_dict: Dict, namespace: str = "documents"):
"""Delete vectors matching filter (more efficient than individual deletes)."""
# First, get IDs matching filter
results = index.query(
vector=[0] * 1536, # Dummy vector
top_k=10000, # Max allowed
filter=filter_dict,
namespace=namespace,
include_values=False
)
ids_to_delete = [match.id for match in results.matches]
# Delete in batches
batch_size = 1000
for i in range(0, len(ids_to_delete), batch_size):
batch = ids_to_delete[i:i + batch_size]
index.delete(ids=batch, namespace=namespace)
print(f"Deleted {len(batch)} vectors")
Query Optimization
# 1. Minimize data transfer
results = index.query(
vector=query_vector,
top_k=10,
include_values=False, # Don't return vectors if not needed
include_metadata=False, # Don't return metadata if not needed
namespace="documents"
)
# 2. Use appropriate top_k
# Smaller top_k = faster queries
results_small = index.query(vector=query_vector, top_k=10) # Fast
results_large = index.query(vector=query_vector, top_k=1000) # Slower
# 3. Filter before vector search when possible
# Good: Reduces search space
results = index.query(
vector=query_vector,
top_k=10,
filter={"category": "education"}, # Reduces candidates
namespace="documents"
)
# 4. Batch queries when possible
# More efficient than individual queries
queries = [embedding1, embedding2, embedding3]
results = index.query(
queries=queries,
top_k=10,
namespace="documents"
)
# 5. Cache frequent queries
from functools import lru_cache
import hashlib
import json
def vector_hash(vector: List[float]) -> str:
"""Create hash of vector for caching."""
return hashlib.md5(json.dumps(vector).encode()).hexdigest()
class CachedIndex:
"""Wrapper with query caching."""
def __init__(self, index, cache_size: int = 1000):
self.index = index
self.cache = {}
self.cache_size = cache_size
def query(self, vector: List[float], top_k: int = 10, **kwargs):
"""Query with caching."""
cache_key = f"{vector_hash(vector)}_{top_k}_{json.dumps(kwargs)}"
if cache_key in self.cache:
return self.cache[cache_key]
results = self.index.query(vector=vector, top_k=top_k, **kwargs)
if len(self.cache) >= self.cache_size:
# Remove oldest entry
self.cache.pop(next(iter(self.cache)))
self.cache[cache_key] = results
return results
# Usage
cached_index = CachedIndex(index)
results = cached_index.query(query_vector, top_k=10) # Cached on subsequent calls
Scaling Strategies
# 1. Index sizing for scale
def calculate_index_requirements(
num_vectors: int,
dimension: int,
metadata_size_per_vector: int = 1024 # bytes
) -> Dict:
"""Calculate storage and cost for index."""
# Approximate calculations
vector_size = dimension * 4 # 4 bytes per float32
total_vector_storage = num_vectors * vector_size
total_metadata_storage = num_vectors * metadata_size_per_vector
total_storage = total_vector_storage + total_metadata_storage
# Pinecone pricing (approximate)
storage_cost_per_gb_month = 0.095 # Serverless pricing
total_gb = total_storage / (1024 ** 3)
monthly_storage_cost = total_gb * storage_cost_per_gb_month
return {
"num_vectors": num_vectors,
"total_storage_gb": round(total_gb, 2),
"monthly_storage_cost_usd": round(monthly_storage_cost, 2),
"recommended_pod_type": "s1.x1" if num_vectors > 10_000_000 else "p1.x1"
}
# Example
reqs = calculate_index_requirements(
num_vectors=10_000_000,
dimension=1536
)
print(f"10M vectors storage: {reqs['total_storage_gb']} GB")
print(f"Monthly cost: ${reqs['monthly_storage_cost_usd']}")
# 2. Sharding strategy for massive scale
def create_sharded_indexes(
base_name: str,
num_shards: int,
dimension: int,
metric: str = "cosine"
):
"""Create multiple indexes for horizontal scaling."""
indexes = []
for shard_id in range(num_shards):
index_name = f"{base_name}-shard-{shard_id}"
pc.create_index(
name=index_name,
dimension=dimension,
metric=metric,
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
indexes.append(index_name)
return indexes
def route_to_shard(vector_id: str, num_shards: int) -> int:
"""Determine which shard a vector belongs to."""
return hash(vector_id) % num_shards
def query_sharded_indexes(query_vector: List[float], indexes: List, top_k: int = 10):
"""Query all shards and merge results."""
all_results = []
for index_name in indexes:
idx = pc.Index(index_name)
results = idx.query(
vector=query_vector,
top_k=top_k,
include_metadata=True
)
all_results.extend(results.matches)
# Sort by score and return top_k
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:top_k]
Production Best Practices
Error Handling & Retries
import time
from typing import Optional, Callable
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PineconeRetryHandler:
"""Robust error handling for Pinecone operations."""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
def retry_with_backoff(
self,
operation: Callable,
*args,
**kwargs
) -> Optional[any]:
"""Retry operation with exponential backoff."""
for attempt in range(self.max_retries):
try:
return operation(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
logger.error(f"Operation failed after {self.max_retries} attempts: {e}")
raise
delay = self.base_delay * (2 ** attempt)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
return None
# Usage
retry_handler = PineconeRetryHandler(max_retries=3)
# Upsert with retry
def safe_upsert(vectors, namespace="documents"):
return retry_handler.retry_with_backoff(
index.upsert,
vectors=vectors,
namespace=namespace
)
# Query with retry
def safe_query(vector, top_k=10, **kwargs):
return retry_handler.retry_with_backoff(
index.query,
vector=vector,
top_k=top_k,
**kwargs
)
# Example
try:
results = safe_query(query_vector, top_k=10, include_metadata=True)
except Exception as e:
logger.error(f"Query failed permanently: {e}")
Monitoring & Observability
import time
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime
@dataclass
class QueryMetrics:
"""Track query performance metrics."""
query_time: float
result_count: int
top_score: float
timestamp: datetime
namespace: str
filter_used: bool
class VectorDBMonitor:
"""Monitor vector database operations."""
def __init__(self):
self.metrics: List[QueryMetrics] = []
def track_query(
self,
query_func: Callable,
*args,
**kwargs
):
"""Track query execution and metrics."""
start_time = time.time()
results = query_func(*args, **kwargs)
elapsed = time.time() - start_time
metrics = QueryMetrics(
query_time=elapsed,
result_count=len(results.matches),
top_score=results.matches[0].score if results.matches else 0.0,
timestamp=datetime.now(),
namespace=kwargs.get('namespace', 'default'),
filter_used='filter' in kwargs
)
self.metrics.append(metrics)
# Alert on slow queries
if elapsed > 1.0: # 1 second threshold
logger.warning(f"Slow query detected: {elapsed:.2f}s")
return results
def get_stats(self) -> Dict:
"""Get aggregate statistics."""
if not self.metrics:
return {}
query_times = [m.query_time for m in self.metrics]
return {
"total_queries": len(self.metrics),
"avg_query_time": sum(query_times) / len(query_times),
"p95_query_time": sorted(query_times)[int(len(query_times) * 0.95)],
"p99_query_time": sorted(query_times)[int(len(query_times) * 0.99)],
"avg_results": sum(m.result_count for m in self.metrics) / len(self.metrics),
"filtered_queries_pct": sum(1 for m in self.metrics if m.filter_used) / len(self.metrics) * 100
}
# Usage
monitor = VectorDBMonitor()
# Wrap queries
results = monitor.track_query(
index.query,
vector=query_vector,
top_k=10,
namespace="documents",
filter={"category": "education"}
)
# Get statistics
stats = monitor.get_stats()
print(f"Average query time: {stats['avg_query_time']:.3f}s")
print(f"P95 query time: {stats['p95_query_time']:.3f}s")
Data Validation
from typing import List, Dict
import numpy as np
class VectorValidator:
"""Validate vectors and metadata before operations."""
def __init__(self, expected_dimension: int):
self.expected_dimension = expected_dimension
def validate_vector(self, vector: List[float]) -> tuple[bool, str]:
"""Validate vector format and content."""
# Check type
if not isinstance(vector, (list, np.ndarray)):
return False, "Vector must be list or numpy array"
# Check dimension
if len(vector) != self.expected_dimension:
return False, f"Expected {self.expected_dimension} dimensions, got {len(vector)}"
# Check for NaN or Inf
if any(not np.isfinite(v) for v in vector):
return False, "Vector contains NaN or Inf values"
# Check for zero vector
if all(v == 0 for v in vector):
return False, "Zero vector not allowed"
return True, "Valid"
def validate_metadata(self, metadata: Dict) -> tuple[bool, str]:
"""Validate metadata format."""
# Check type
if not isinstance(metadata, dict):
return False, "Metadata must be dictionary"
# Check metadata size (Pinecone limit: 40KB)
metadata_str = str(metadata)
if len(metadata_str.encode('utf-8')) > 40_000:
return False, "Metadata exceeds 40KB limit"
# Check for required fields (customize as needed)
required_fields = ["title", "category"]
for field in required_fields:
if field not in metadata:
return False, f"Missing required field: {field}"
return True, "Valid"
def validate_batch(
self,
vectors: List[Dict]
) -> tuple[List[Dict], List[str]]:
"""Validate batch of vectors, return valid ones and errors."""
valid_vectors = []
errors = []
for i, item in enumerate(vectors):
# Validate vector
is_valid, error = self.validate_vector(item.get('values', []))
if not is_valid:
errors.append(f"Vector {i} ({item.get('id', 'unknown')}): {error}")
continue
# Validate metadata
if 'metadata' in item:
is_valid, error = self.validate_metadata(item['metadata'])
if not is_valid:
errors.append(f"Metadata {i} ({item.get('id', 'unknown')}): {error}")
continue
valid_vectors.append(item)
return valid_vectors, errors
# Usage
validator = VectorValidator(expected_dimension=1536)
# Validate single vector
is_valid, error = validator.validate_vector(embedding)
if not is_valid:
print(f"Invalid vector: {error}")
# Validate batch
valid_vectors, errors = validator.validate_batch(vectors_to_upsert)
if errors:
for error in errors:
logger.error(error)
# Upsert only valid vectors
if valid_vectors:
index.upsert(vectors=valid_vectors)
Backup & Disaster Recovery
import json
import gzip
from datetime import datetime
from pathlib import Path
class VectorDBBackup:
"""Backup and restore vector database data."""
def __init__(self, index, backup_dir: str = "./backups"):
self.index = index
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(exist_ok=True)
def backup_namespace(
self,
namespace: str = "documents",
compress: bool = True
) -> str:
"""Backup all vectors in a namespace."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"backup_{namespace}_{timestamp}.json"
if compress:
filename += ".gz"
filepath = self.backup_dir / filename
# Fetch all vectors (in batches)
all_vectors = []
batch_size = 100
# Get all IDs first (would need to be tracked separately)
# This is a simplified example
stats = self.index.describe_index_stats()
# For actual implementation, you'd need to track IDs
# or use fetch with known IDs
# Save to file
data = {
"namespace": namespace,
"timestamp": timestamp,
"vector_count": len(all_vectors),
"vectors": all_vectors
}
if compress:
with gzip.open(filepath, 'wt', encoding='utf-8') as f:
json.dump(data, f)
else:
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Backed up {len(all_vectors)} vectors to {filepath}")
return str(filepath)
def restore_from_backup(
self,
backup_file: str,
target_namespace: str = None
):
"""Restore vectors from backup file."""
filepath = Path(backup_file)
# Load backup
if filepath.suffix == '.gz':
with gzip.open(filepath, 'rt', encoding='utf-8') as f:
data = json.load(f)
else:
with open(filepath, 'r') as f:
data = json.load(f)
namespace = target_namespace or data['namespace']
vectors = data['vectors']
# Restore in batches
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
self.index.upsert(vectors=batch, namespace=namespace)
logger.info(f"Restored {len(batch)} vectors")
logger.info(f"Restored {len(vectors)} vectors to namespace '{namespace}'")
# Usage
backup_manager = VectorDBBackup(index)
# Backup
backup_file = backup_manager.backup_namespace("production")
# Restore
backup_manager.restore_from_backup(backup_file, target_namespace="production-restored")
Cost Optimization
Storage Optimization
# 1. Reduce metadata size
# Bad: Storing full content in metadata
bad_metadata = {
"title": "Long document title",
"full_content": "...<entire document>...", # Wastes space
"description": "...<long description>...",
"extra_field_1": "...",
"extra_field_2": "..."
}
# Good: Store only necessary metadata
good_metadata = {
"title": "Long document title",
"doc_id": "doc-123", # Reference to external store
"category": "education",
"created_at": "2024-01-15"
}
# 2. Use selective metadata indexing
# Only index fields you'll filter on
pc.create_index(
name="optimized-index",
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1",
schema={
"fields": {
"category": {"filterable": True}, # Need to filter
"created_at": {"filterable": True}, # Need to filter
"title": {"filterable": False}, # Just for display
"description": {"filterable": False} # Just for display
}
}
)
)
# 3. Regular cleanup of unused vectors
def cleanup_old_vectors(days_old: int = 90):
"""Delete vectors older than specified days."""
from datetime import datetime, timedelta
cutoff_date = (datetime.now() - timedelta(days=days_old)).isoformat()
# Delete by filter
index.delete(
filter={"created_at": {"$lt": cutoff_date}},
namespace="documents"
)
# 4. Compress dimensions for smaller models
# text-embedding-3-small: 1536 dimensions
# all-MiniLM-L6-v2: 384 dimensions (75% storage reduction)
# Trade-off: slightly lower accuracy for significant cost savings
Query Cost Optimization
# 1. Batch queries instead of individual
# Bad: Multiple individual queries
for query in queries:
results = index.query(vector=query, top_k=10) # N API calls
# Good: Single batch query
results = index.query(
queries=query_vectors, # 1 API call
top_k=10
)
# 2. Use appropriate top_k
# Larger top_k = more expensive
results = index.query(
vector=query_vector,
top_k=10, # Usually sufficient
# top_k=1000 # Much more expensive
)
# 3. Minimize data transfer
results = index.query(
vector=query_vector,
top_k=10,
include_values=False, # Save bandwidth
include_metadata=False # Save bandwidth if not needed
)
# 4. Use caching for repeated queries
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_search(query_text: str, top_k: int = 10):
"""Cache search results for identical queries."""
embedding = generate_embedding(query_text)
results = index.query(
vector=embedding,
top_k=top_k,
include_metadata=True
)
return results
# 5. Choose serverless vs pods appropriately
# Serverless: Low/variable traffic (pay per query)
# Pods: High consistent traffic (fixed cost)
def choose_deployment_type(
queries_per_month: int,
avg_response_time_requirement: float = 100 # ms
) -> str:
"""Recommend deployment type based on usage."""
# Rough cost calculations (update with current pricing)
serverless_cost_per_query = 0.0001 # Example
pod_cost_per_month = 70 # p1.x1 pod
serverless_monthly_cost = queries_per_month * serverless_cost_per_query
if serverless_monthly_cost < pod_cost_per_month:
return "serverless"
else:
return "pods"
Cost Monitoring
import json
from datetime import datetime, timedelta
from collections import defaultdict
class CostMonitor:
"""Monitor and estimate vector database costs."""
def __init__(self):
self.operations = defaultdict(int)
self.pricing = {
"serverless_write_units": 0.0000025, # per write unit
"serverless_read_units": 0.00000625, # per read unit
"serverless_storage_gb": 0.095, # per GB per month
"p1_x1_pod": 0.096, # per hour
"p2_x1_pod": 0.240, # per hour
}
def track_operation(self, operation_type: str, units: int = 1):
"""Track database operations."""
self.operations[operation_type] += units
def estimate_monthly_cost(
self,
deployment_type: str,
storage_gb: float = 0,
pod_type: str = None
) -> Dict:
"""Estimate monthly costs."""
costs = {}
if deployment_type == "serverless":
# Storage cost
storage_cost = storage_gb * self.pricing["serverless_storage_gb"]
# Operation costs
write_cost = (
self.operations["upsert"] *
self.pricing["serverless_write_units"]
)
read_cost = (
self.operations["query"] *
self.pricing["serverless_read_units"]
)
costs = {
"storage": storage_cost,
"writes": write_cost,
"reads": read_cost,
"total": storage_cost + write_cost + read_cost
}
elif deployment_type == "pods":
# Fixed pod cost
hours_per_month = 730
pod_cost = self.pricing.get(f"{pod_type}_pod", 0) * hours_per_month
costs = {
"pod": pod_cost,
"total": pod_cost
}
return costs
def get_cost_report(self) -> str:
"""Generate cost report."""
report = f"\n{'=' * 50}\n"
report += "VECTOR DATABASE COST REPORT\n"
report += f"{'=' * 50}\n\n"
report += "Operations Summary:\n"
for operation, count in self.operations.items():
report += f" {operation}: {count:,}\n"
report += f"\n{'=' * 50}\n"
return report
# Usage
cost_monitor = CostMonitor()
# Track operations
def monitored_upsert(vectors, **kwargs):
cost_monitor.track_operation("upsert", len(vectors))
return index.upsert(vectors=vectors, **kwargs)
def monitored_query(vector, **kwargs):
cost_monitor.track_operation("query", 1)
return index.query(vector=vector, **kwargs)
# Get cost estimate
monthly_cost = cost_monitor.estimate_monthly_cost(
deployment_type="serverless",
storage_gb=10.5
)
print(f"Estimated monthly cost: ${monthly_cost['total']:.2f}")
print(cost_monitor.get_cost_report())
Summary
This comprehensive guide covers all aspects of vector database management across Pinecone, Weaviate, and Chroma. Key takeaways:
- Choose the right database: Pinecone for production scale, Weaviate for knowledge graphs, Chroma for local development
- Optimize embeddings: Balance dimension size with accuracy and cost
- Use metadata filtering: Combine vector similarity with structured filtering for powerful search
- Implement hybrid search: Combine dense and sparse vectors for best results
- Scale efficiently: Use batching, caching, and appropriate index configurations
- Monitor and optimize costs: Track usage and choose the right deployment type
For more information:
- Pinecone Documentation: https://docs.pinecone.io
- Weaviate Documentation: https://weaviate.io/developers/weaviate
- Chroma Documentation: https://docs.trychroma.com