Vector Index Tuning
Guide to optimizing vector indexes for production performance.
When to Use This Skill
- Tuning HNSW parameters
- Implementing quantization
- Optimizing memory usage
- Reducing search latency
- Balancing recall vs speed
- Scaling to billions of vectors
Core Concepts
1. Index Type Selection
Data Size Recommended Index
────────────────────────────────────────
< 10K vectors → Flat (exact search)
10K - 1M → HNSW
1M - 100M → HNSW + Quantization
> 100M → IVF + PQ or DiskANN
2. HNSW Parameters
| Parameter |
Default |
Effect |
| M |
16 |
Connections per node, ↑ = better recall, more memory |
| efConstruction |
100 |
Build quality, ↑ = better index, slower build |
| efSearch |
50 |
Search quality, ↑ = better recall, slower search |
3. Quantization Types
Full Precision (FP32): 4 bytes × dimensions
Half Precision (FP16): 2 bytes × dimensions
INT8 Scalar: 1 byte × dimensions
Product Quantization: ~32-64 bytes total
Binary: dimensions/8 bytes
Templates
Template 1: HNSW Parameter Tuning
import numpy as np
from typing import List, Tuple
import time
def benchmark_hnsw_parameters(
vectors: np.ndarray,
queries: np.ndarray,
ground_truth: np.ndarray,
m_values: List[int] = [8, 16, 32, 64],
ef_construction_values: List[int] = [64, 128, 256],
ef_search_values: List[int] = [32, 64, 128, 256]
) -> List[dict]:
"""Benchmark different HNSW configurations."""
import hnswlib
results = []
dim = vectors.shape[1]
n = vectors.shape[0]
for m in m_values:
for ef_construction in ef_construction_values:
# Build index
index = hnswlib.Index(space='cosine', dim=dim)
index.init_index(max_elements=n, M=m, ef_construction=ef_construction)
build_start = time.time()
index.add_items(vectors)
build_time = time.time() - build_start
# Get memory usage
memory_bytes = index.element_count * (
dim * 4 + # Vector storage
m * 2 * 4 # Graph edges (approximate)
)
for ef_search in ef_search_values:
index.set_ef(ef_search)
# Measure search
search_start = time.time()
labels, distances = index.knn_query(queries, k=10)
search_time = time.time() - search_start
# Calculate recall
recall = calculate_recall(labels, ground_truth, k=10)
results.append({
"M": m,
"ef_construction": ef_construction,
"ef_search": ef_search,
"build_time_s": build_time,
"search_time_ms": search_time * 1000 / len(queries),
"recall@10": recall,
"memory_mb": memory_bytes / 1024 / 1024
})
return results
def calculate_recall(predictions: np.ndarray, ground_truth: np.ndarray, k: int) -> float:
"""Calculate recall@k."""
correct = 0
for pred, truth in zip(predictions, ground_truth):
correct += len(set(pred[:k]) & set(truth[:k]))
return correct / (len(predictions) * k)
def recommend_hnsw_params(
num_vectors: int,
target_recall: float = 0.95,
max_latency_ms: float = 10,
available_memory_gb: float = 8
) -> dict:
"""Recommend HNSW parameters based on requirements."""
# Base recommendations
if num_vectors < 100_000:
m = 16
ef_construction = 100
elif num_vectors < 1_000_000:
m = 32
ef_construction = 200
else:
m = 48
ef_construction = 256
# Adjust ef_search based on recall target
if target_recall >= 0.99:
ef_search = 256
elif target_recall >= 0.95:
ef_search = 128
else:
ef_search = 64
return {
"M": m,
"ef_construction": ef_construction,
"ef_search": ef_search,
"notes": f"Estimated for {num_vectors:,} vectors, {target_recall:.0%} recall"
}
Template 2: Quantization Strategies
import numpy as np
from typing import Optional
class VectorQuantizer:
"""Quantization strategies for vector compression."""
@staticmethod
def scalar_quantize_int8(
vectors: np.ndarray,
min_val: Optional[float] = None,
max_val: Optional[float] = None
) -> Tuple[np.ndarray, dict]:
"""Scalar quantization to INT8."""
if min_val is None:
min_val = vectors.min()
if max_val is None:
max_val = vectors.max()
# Scale to 0-255 range
scale = 255.0 / (max_val - min_val)
quantized = np.clip(
np.round((vectors - min_val) * scale),
0, 255
).astype(np.uint8)
params = {"min_val": min_val, "max_val": max_val, "scale": scale}
return quantized, params
@staticmethod
def dequantize_int8(
quantized: np.ndarray,
params: dict
) -> np.ndarray:
"""Dequantize INT8 vectors."""
return quantized.astype(np.float32) / params["scale"] + params["min_val"]
@staticmethod
def product_quantize(
vectors: np.ndarray,
n_subvectors: int = 8,
n_centroids: int = 256
) -> Tuple[np.ndarray, dict]:
"""Product quantization for aggressive compression."""
from sklearn.cluster import KMeans
n, dim = vectors.shape
assert dim % n_subvectors == 0
subvector_dim = dim // n_subvectors
codebooks = []
codes = np.zeros((n, n_subvectors), dtype=np.uint8)
for i in range(n_subvectors):
start = i * subvector_dim
end = (i + 1) * subvector_dim
subvectors = vectors[:, start:end]
kmeans = KMeans(n_clusters=n_centroids, random_state=42)
codes[:, i] = kmeans.fit_predict(subvectors)
codebooks.append(kmeans.cluster_centers_)
params = {
"codebooks": codebooks,
"n_subvectors": n_subvectors,
"subvector_dim": subvector_dim
}
return codes, params
@staticmethod
def binary_quantize(vectors: np.ndarray) -> np.ndarray:
"""Binary quantization (sign of each dimension)."""
# Convert to binary: positive = 1, negative = 0
binary = (vectors > 0).astype(np.uint8)
# Pack bits into bytes
n, dim = vectors.shape
packed_dim = (dim + 7) // 8
packed = np.zeros((n, packed_dim), dtype=np.uint8)
for i in range(dim):
byte_idx = i // 8
bit_idx = i % 8
packed[:, byte_idx] |= (binary[:, i] << bit_idx)
return packed
def estimate_memory_usage(
num_vectors: int,
dimensions: int,
quantization: str = "fp32",
index_type: str = "hnsw",
hnsw_m: int = 16
) -> dict:
"""Estimate memory usage for different configurations."""
# Vector storage
bytes_per_dimension = {
"fp32": 4,
"fp16": 2,
"int8": 1,
"pq": 0.05, # Approximate
"binary": 0.125
}
vector_bytes = num_vectors * dimensions * bytes_per_dimension[quantization]
# Index overhead
if index_type == "hnsw":
# Each node has ~M*2 edges, each edge is 4 bytes (int32)
index_bytes = num_vectors * hnsw_m * 2 * 4
elif index_type == "ivf":
# Inverted lists + centroids
index_bytes = num_vectors * 8 + 65536 * dimensions * 4
else:
index_bytes = 0
total_bytes = vector_bytes + index_bytes
return {
"vector_storage_mb": vector_bytes / 1024 / 1024,
"index_overhead_mb": index_bytes / 1024 / 1024,
"total_mb": total_bytes / 1024 / 1024,
"total_gb": total_bytes / 1024 / 1024 / 1024
}
Template 3: Qdrant Index Configuration
from qdrant_client import QdrantClient
from qdrant_client.http import models
def create_optimized_collection(
client: QdrantClient,
collection_name: str,
vector_size: int,
num_vectors: int,
optimize_for: str = "balanced" # "recall", "speed", "memory"
) -> None:
"""Create collection with optimized settings."""
# HNSW configuration based on optimization target
hnsw_configs = {
"recall": models.HnswConfigDiff(m=32, ef_construct=256),
"speed": models.HnswConfigDiff(m=16, ef_construct=64),
"balanced": models.HnswConfigDiff(m=16, ef_construct=128),
"memory": models.HnswConfigDiff(m=8, ef_construct=64)
}
# Quantization configuration
quantization_configs = {
"recall": None, # No quantization for max recall
"speed": models.ScalarQuantization(
scalar=models.ScalarQuantizationConfig(
type=models.ScalarType.INT8,
quantile=0.99,
always_ram=True
)
),
"balanced": models.ScalarQuantization(
scalar=models.ScalarQuantizationConfig(
type=models.ScalarType.INT8,
quantile=0.99,
always_ram=False
)
),
"memory": models.ProductQuantization(
product=models.ProductQuantizationConfig(
compression=models.CompressionRatio.X16,
always_ram=False
)
)
}
# Optimizer configuration
optimizer_configs = {
"recall": models.OptimizersConfigDiff(
indexing_threshold=10000,
memmap_threshold=50000
),
"speed": models.OptimizersConfigDiff(
indexing_threshold=5000,
memmap_threshold=20000
),
"balanced": models.OptimizersConfigDiff(
indexing_threshold=20000,
memmap_threshold=50000
),
"memory": models.OptimizersConfigDiff(
indexing_threshold=50000,
memmap_threshold=10000 # Use disk sooner
)
}
client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(
size=vector_size,
distance=models.Distance.COSINE
),
hnsw_config=hnsw_configs[optimize_for],
quantization_config=quantization_configs[optimize_for],
optimizers_config=optimizer_configs[optimize_for]
)
def tune_search_parameters(
client: QdrantClient,
collection_name: str,
target_recall: float = 0.95
) -> dict:
"""Tune search parameters for target recall."""
# Search parameter recommendations
if target_recall >= 0.99:
search_params = models.SearchParams(
hnsw_ef=256,
exact=False,
quantization=models.QuantizationSearchParams(
ignore=True, # Don't use quantization for search
rescore=True
)
)
elif target_recall >= 0.95:
search_params = models.SearchParams(
hnsw_ef=128,
exact=False,
quantization=models.QuantizationSearchParams(
ignore=False,
rescore=True,
oversampling=2.0
)
)
else:
search_params = models.SearchParams(
hnsw_ef=64,
exact=False,
quantization=models.QuantizationSearchParams(
ignore=False,
rescore=False
)
)
return search_params
Template 4: Performance Monitoring
import time
from dataclasses import dataclass
from typing import List
import numpy as np
@dataclass
class SearchMetrics:
latency_p50_ms: float
latency_p95_ms: float
latency_p99_ms: float
recall: float
qps: float
class VectorSearchMonitor:
"""Monitor vector search performance."""
def __init__(self, ground_truth_fn=None):
self.latencies = []
self.recalls = []
self.ground_truth_fn = ground_truth_fn
def measure_search(
self,
search_fn,
query_vectors: np.ndarray,
k: int = 10,
num_iterations: int = 100
) -> SearchMetrics:
"""Benchmark search performance."""
latencies = []
for _ in range(num_iterations):
for query in query_vectors:
start = time.perf_counter()
results = search_fn(query, k=k)
latency = (time.perf_counter() - start) * 1000
latencies.append(latency)
latencies = np.array(latencies)
total_queries = num_iterations * len(query_vectors)
total_time = sum(latencies) / 1000 # seconds
return SearchMetrics(
latency_p50_ms=np.percentile(latencies, 50),
latency_p95_ms=np.percentile(latencies, 95),
latency_p99_ms=np.percentile(latencies, 99),
recall=self._calculate_recall(search_fn, query_vectors, k) if self.ground_truth_fn else 0,
qps=total_queries / total_time
)
def _calculate_recall(self, search_fn, queries: np.ndarray, k: int) -> float:
"""Calculate recall against ground truth."""
if not self.ground_truth_fn:
return 0
correct = 0
total = 0
for query in queries:
predicted = set(search_fn(query, k=k))
actual = set(self.ground_truth_fn(query, k=k))
correct += len(predicted & actual)
total += k
return correct / total
def profile_index_build(
build_fn,
vectors: np.ndarray,
batch_sizes: List[int] = [1000, 10000, 50000]
) -> dict:
"""Profile index build performance."""
results = {}
for batch_size in batch_sizes:
times = []
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
start = time.perf_counter()
build_fn(batch)
times.append(time.perf_counter() - start)
results[batch_size] = {
"avg_batch_time_s": np.mean(times),
"vectors_per_second": batch_size / np.mean(times)
}
return results
Best Practices
Do's
- Benchmark with real queries - Synthetic may not represent production
- Monitor recall continuously - Can degrade with data drift
- Start with defaults - Tune only when needed
- Use quantization - Significant memory savings
- Consider tiered storage - Hot/cold data separation
Don'ts
- Don't over-optimize early - Profile first
- Don't ignore build time - Index updates have cost
- Don't forget reindexing - Plan for maintenance
- Don't skip warming - Cold indexes are slow
Resources