| name | postgres-performance |
| description | High-performance PostgreSQL patterns. Use when optimizing queries, designing for scale, or debugging performance issues. |
PostgreSQL Performance Engineering
Problem Statement
Performance problems compound. A query that takes 50ms at 1K rows takes 5s at 100K rows. This skill covers patterns for building performant database interactions from the start and fixing performance issues.
Pattern: Query Optimization Workflow
Step 1: Identify Slow Queries
-- Enable pg_stat_statements (if not already)
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
-- Find slowest queries
SELECT
query,
calls,
round(mean_exec_time::numeric, 2) as avg_ms,
round(total_exec_time::numeric, 2) as total_ms,
rows
FROM pg_stat_statements
WHERE calls > 10
ORDER BY mean_exec_time DESC
LIMIT 20;
Step 2: Analyze Query Plan
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM assessments
WHERE user_id = 'abc-123'
ORDER BY created_at DESC
LIMIT 10;
What to look for:
| Warning Sign | Problem | Solution |
|---|---|---|
| Seq Scan on large table | Missing index | Add index |
High loops count |
N+1 in join | Rewrite query, add index |
| Sort with high cost | No index for ORDER BY | Covering index |
| Hash/Merge Join with high rows | Large intermediate result | Filter earlier, better indexes |
| Buffers: shared read high | Data not cached | More RAM, or query less data |
Step 3: Fix and Verify
-- Add index
CREATE INDEX CONCURRENTLY ix_assessments_user_created
ON assessments (user_id, created_at DESC);
-- Verify improvement
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM assessments
WHERE user_id = 'abc-123'
ORDER BY created_at DESC
LIMIT 10;
-- Should now show "Index Scan" instead of "Seq Scan"
Pattern: Covering Indexes (Index-Only Scans)
Problem: Query reads index, then fetches rows from table (heap fetch).
-- Query
SELECT id, title, status FROM assessments WHERE user_id = ?;
-- Regular index: requires heap fetch
CREATE INDEX ix_assessments_user ON assessments (user_id);
-- Plan: Index Scan + Heap Fetches
-- ✅ Covering index: all columns in index
CREATE INDEX ix_assessments_user_covering
ON assessments (user_id)
INCLUDE (id, title, status);
-- Plan: Index Only Scan (no heap fetch, much faster)
When to use:
- Frequently run queries
- Queries selecting few columns
- Tables with many columns (heap fetch is expensive)
Pattern: Pagination at Scale
-- ❌ SLOW: OFFSET-based pagination
SELECT * FROM events ORDER BY created_at DESC LIMIT 20 OFFSET 10000;
-- Must scan and discard 10,000 rows!
-- ✅ FAST: Cursor-based (keyset) pagination
SELECT * FROM events
WHERE created_at < '2024-01-15T10:30:00Z' -- Last seen timestamp
ORDER BY created_at DESC
LIMIT 20;
-- Jumps directly to the right place via index
-- For compound cursor (when duplicates possible):
SELECT * FROM events
WHERE (created_at, id) < ('2024-01-15T10:30:00Z', 'last-id')
ORDER BY created_at DESC, id DESC
LIMIT 20;
In SQLAlchemy:
# Cursor-based pagination
async def get_events_page(
session: AsyncSession,
cursor_time: datetime | None,
cursor_id: UUID | None,
limit: int = 20,
) -> list[Event]:
query = select(Event).order_by(Event.created_at.desc(), Event.id.desc())
if cursor_time and cursor_id:
query = query.where(
tuple_(Event.created_at, Event.id) < (cursor_time, cursor_id)
)
result = await session.execute(query.limit(limit))
return result.scalars().all()
Pattern: Batch Processing
-- ❌ SLOW: One huge query/update
UPDATE events SET processed = true WHERE processed = false;
-- Locks millions of rows, times out
-- ✅ FAST: Batch processing
DO $$
DECLARE
batch_size INT := 10000;
rows_affected INT;
BEGIN
LOOP
UPDATE events
SET processed = true
WHERE id IN (
SELECT id FROM events
WHERE processed = false
LIMIT batch_size
FOR UPDATE SKIP LOCKED
);
GET DIAGNOSTICS rows_affected = ROW_COUNT;
IF rows_affected = 0 THEN
EXIT;
END IF;
COMMIT;
PERFORM pg_sleep(0.1); -- Brief pause to let other queries through
END LOOP;
END $$;
In Python:
async def process_in_batches(session: AsyncSession, batch_size: int = 10000):
while True:
result = await session.execute(
text("""
UPDATE events SET processed = true
WHERE id IN (
SELECT id FROM events
WHERE processed = false
LIMIT :batch_size
FOR UPDATE SKIP LOCKED
)
RETURNING id
"""),
{"batch_size": batch_size}
)
updated = result.fetchall()
await session.commit()
if len(updated) == 0:
break
await asyncio.sleep(0.1)
Pattern: Efficient Aggregations
-- ❌ SLOW: Count with complex WHERE
SELECT COUNT(*) FROM events WHERE user_id = ? AND status = 'active';
-- Scans all matching rows
-- ✅ FAST: Approximate count (for large tables)
SELECT reltuples::bigint AS estimate
FROM pg_class
WHERE relname = 'events';
-- ✅ FAST: Maintain counter cache
-- Add column: assessments.answer_count
-- Update on INSERT/DELETE to answers
-- ✅ FAST: Materialized view for complex aggregations
CREATE MATERIALIZED VIEW user_stats AS
SELECT
user_id,
COUNT(*) as total_assessments,
AVG(rating) as avg_rating
FROM assessments
GROUP BY user_id;
-- Refresh periodically
REFRESH MATERIALIZED VIEW CONCURRENTLY user_stats;
Pattern: Connection Pool Tuning
# Async SQLAlchemy with proper pool settings
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool
# For serverless/Lambda (no persistent connections)
engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool, # New connection per request
)
# For long-running servers
engine = create_async_engine(
DATABASE_URL,
poolclass=AsyncAdaptedQueuePool,
pool_size=10, # Base connections
max_overflow=20, # Extra connections under load
pool_timeout=30, # Wait for connection
pool_recycle=1800, # Recycle connections every 30 min
pool_pre_ping=True, # Test connection before use
)
PostgreSQL side:
-- Check max connections
SHOW max_connections; -- Default 100
-- See current connections
SELECT count(*) FROM pg_stat_activity;
-- Connection per application
SELECT application_name, count(*)
FROM pg_stat_activity
GROUP BY application_name;
Pattern: Read Replicas
# Route reads to replica, writes to primary
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
primary_engine = create_async_engine(PRIMARY_URL)
replica_engine = create_async_engine(REPLICA_URL)
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None):
if self._flushing or self.is_modified():
return primary_engine.sync_engine
return replica_engine.sync_engine
Pattern: Denormalization for Read Performance
-- ❌ SLOW: Joining 4 tables for common query
SELECT
a.id, a.title, u.name as user_name,
COUNT(q.id) as question_count,
AVG(ans.value) as avg_score
FROM assessments a
JOIN users u ON a.user_id = u.id
JOIN questions q ON q.assessment_id = a.id
LEFT JOIN answers ans ON ans.question_id = q.id
GROUP BY a.id, a.title, u.name;
-- ✅ FAST: Denormalized columns
ALTER TABLE assessments ADD COLUMN user_name VARCHAR(100);
ALTER TABLE assessments ADD COLUMN question_count INT DEFAULT 0;
ALTER TABLE assessments ADD COLUMN avg_score NUMERIC(3,2);
-- Update via triggers or application code
-- Query becomes simple:
SELECT id, title, user_name, question_count, avg_score FROM assessments;
Tradeoffs:
- ✅ Much faster reads
- ❌ More complex writes (must update denormalized data)
- ❌ Potential for stale data
Pattern: Partitioning Large Tables
-- Partition events by month
CREATE TABLE events (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
event_type VARCHAR(50),
created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);
-- Create partitions
CREATE TABLE events_2024_01 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE events_2024_02 PARTITION OF events
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- Query specific partition (fast)
SELECT * FROM events WHERE created_at >= '2024-01-15' AND created_at < '2024-02-01';
-- Drop old data instantly
DROP TABLE events_2023_01; -- Much faster than DELETE
Pattern: Caching Strategy
# Cache frequently-read, rarely-changed data
import redis.asyncio as redis
import json
cache = redis.from_url(REDIS_URL)
async def get_user_stats(user_id: UUID) -> UserStats:
cache_key = f"user_stats:{user_id}"
# Try cache first
cached = await cache.get(cache_key)
if cached:
return UserStats.model_validate_json(cached)
# Query database
async with get_session() as session:
stats = await calculate_user_stats(session, user_id)
# Cache for 5 minutes
await cache.setex(cache_key, 300, stats.model_dump_json())
return stats
# Invalidate on write
async def update_user_assessment(user_id: UUID, ...):
# ... update database ...
await cache.delete(f"user_stats:{user_id}")
Performance Monitoring Queries
-- Table bloat (needs VACUUM)
SELECT
schemaname, relname,
n_dead_tup as dead_tuples,
n_live_tup as live_tuples,
round(n_dead_tup::numeric / NULLIF(n_live_tup, 0) * 100, 2) as dead_pct
FROM pg_stat_user_tables
WHERE n_dead_tup > 10000
ORDER BY n_dead_tup DESC;
-- Index bloat
SELECT
indexrelname as index,
pg_size_pretty(pg_relation_size(indexrelid)) as size,
idx_scan as scans
FROM pg_stat_user_indexes
WHERE idx_scan = 0 -- Unused indexes
ORDER BY pg_relation_size(indexrelid) DESC;
-- Cache hit ratio (should be > 99%)
SELECT
sum(blks_hit) * 100.0 / sum(blks_hit + blks_read) as cache_hit_ratio
FROM pg_stat_database;
-- Long-running queries
SELECT
pid,
now() - query_start as duration,
query
FROM pg_stat_activity
WHERE state = 'active'
AND query NOT LIKE '%pg_stat%'
AND now() - query_start > interval '30 seconds';
Performance Checklist
Before deploying:
- Slow queries identified and optimized
- Indexes match query patterns
- Covering indexes for frequent queries
- Pagination uses cursor-based (not OFFSET)
- Large tables partitioned if > 10M rows
- Connection pool sized appropriately
- Cache layer for hot data
- Monitoring in place for slow queries
- VACUUM and ANALYZE scheduled