Claude Code Plugins

Community-maintained marketplace

Feedback
0
0

High-performance PostgreSQL patterns. Use when optimizing queries, designing for scale, or debugging performance issues.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name postgres-performance
description High-performance PostgreSQL patterns. Use when optimizing queries, designing for scale, or debugging performance issues.

PostgreSQL Performance Engineering

Problem Statement

Performance problems compound. A query that takes 50ms at 1K rows takes 5s at 100K rows. This skill covers patterns for building performant database interactions from the start and fixing performance issues.


Pattern: Query Optimization Workflow

Step 1: Identify Slow Queries

-- Enable pg_stat_statements (if not already)
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;

-- Find slowest queries
SELECT 
    query,
    calls,
    round(mean_exec_time::numeric, 2) as avg_ms,
    round(total_exec_time::numeric, 2) as total_ms,
    rows
FROM pg_stat_statements
WHERE calls > 10
ORDER BY mean_exec_time DESC
LIMIT 20;

Step 2: Analyze Query Plan

EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM assessments 
WHERE user_id = 'abc-123' 
ORDER BY created_at DESC 
LIMIT 10;

What to look for:

Warning Sign Problem Solution
Seq Scan on large table Missing index Add index
High loops count N+1 in join Rewrite query, add index
Sort with high cost No index for ORDER BY Covering index
Hash/Merge Join with high rows Large intermediate result Filter earlier, better indexes
Buffers: shared read high Data not cached More RAM, or query less data

Step 3: Fix and Verify

-- Add index
CREATE INDEX CONCURRENTLY ix_assessments_user_created 
ON assessments (user_id, created_at DESC);

-- Verify improvement
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM assessments 
WHERE user_id = 'abc-123' 
ORDER BY created_at DESC 
LIMIT 10;
-- Should now show "Index Scan" instead of "Seq Scan"

Pattern: Covering Indexes (Index-Only Scans)

Problem: Query reads index, then fetches rows from table (heap fetch).

-- Query
SELECT id, title, status FROM assessments WHERE user_id = ?;

-- Regular index: requires heap fetch
CREATE INDEX ix_assessments_user ON assessments (user_id);
-- Plan: Index Scan + Heap Fetches

-- ✅ Covering index: all columns in index
CREATE INDEX ix_assessments_user_covering 
ON assessments (user_id) 
INCLUDE (id, title, status);
-- Plan: Index Only Scan (no heap fetch, much faster)

When to use:

  • Frequently run queries
  • Queries selecting few columns
  • Tables with many columns (heap fetch is expensive)

Pattern: Pagination at Scale

-- ❌ SLOW: OFFSET-based pagination
SELECT * FROM events ORDER BY created_at DESC LIMIT 20 OFFSET 10000;
-- Must scan and discard 10,000 rows!

-- ✅ FAST: Cursor-based (keyset) pagination
SELECT * FROM events 
WHERE created_at < '2024-01-15T10:30:00Z'  -- Last seen timestamp
ORDER BY created_at DESC 
LIMIT 20;
-- Jumps directly to the right place via index

-- For compound cursor (when duplicates possible):
SELECT * FROM events 
WHERE (created_at, id) < ('2024-01-15T10:30:00Z', 'last-id')
ORDER BY created_at DESC, id DESC 
LIMIT 20;

In SQLAlchemy:

# Cursor-based pagination
async def get_events_page(
    session: AsyncSession,
    cursor_time: datetime | None,
    cursor_id: UUID | None,
    limit: int = 20,
) -> list[Event]:
    query = select(Event).order_by(Event.created_at.desc(), Event.id.desc())
    
    if cursor_time and cursor_id:
        query = query.where(
            tuple_(Event.created_at, Event.id) < (cursor_time, cursor_id)
        )
    
    result = await session.execute(query.limit(limit))
    return result.scalars().all()

Pattern: Batch Processing

-- ❌ SLOW: One huge query/update
UPDATE events SET processed = true WHERE processed = false;
-- Locks millions of rows, times out

-- ✅ FAST: Batch processing
DO $$
DECLARE
    batch_size INT := 10000;
    rows_affected INT;
BEGIN
    LOOP
        UPDATE events 
        SET processed = true 
        WHERE id IN (
            SELECT id FROM events 
            WHERE processed = false 
            LIMIT batch_size
            FOR UPDATE SKIP LOCKED
        );
        
        GET DIAGNOSTICS rows_affected = ROW_COUNT;
        
        IF rows_affected = 0 THEN
            EXIT;
        END IF;
        
        COMMIT;
        PERFORM pg_sleep(0.1);  -- Brief pause to let other queries through
    END LOOP;
END $$;

In Python:

async def process_in_batches(session: AsyncSession, batch_size: int = 10000):
    while True:
        result = await session.execute(
            text("""
                UPDATE events SET processed = true
                WHERE id IN (
                    SELECT id FROM events 
                    WHERE processed = false 
                    LIMIT :batch_size
                    FOR UPDATE SKIP LOCKED
                )
                RETURNING id
            """),
            {"batch_size": batch_size}
        )
        
        updated = result.fetchall()
        await session.commit()
        
        if len(updated) == 0:
            break
        
        await asyncio.sleep(0.1)

Pattern: Efficient Aggregations

-- ❌ SLOW: Count with complex WHERE
SELECT COUNT(*) FROM events WHERE user_id = ? AND status = 'active';
-- Scans all matching rows

-- ✅ FAST: Approximate count (for large tables)
SELECT reltuples::bigint AS estimate
FROM pg_class
WHERE relname = 'events';

-- ✅ FAST: Maintain counter cache
-- Add column: assessments.answer_count
-- Update on INSERT/DELETE to answers

-- ✅ FAST: Materialized view for complex aggregations
CREATE MATERIALIZED VIEW user_stats AS
SELECT 
    user_id,
    COUNT(*) as total_assessments,
    AVG(rating) as avg_rating
FROM assessments
GROUP BY user_id;

-- Refresh periodically
REFRESH MATERIALIZED VIEW CONCURRENTLY user_stats;

Pattern: Connection Pool Tuning

# Async SQLAlchemy with proper pool settings
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool

# For serverless/Lambda (no persistent connections)
engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # New connection per request
)

# For long-running servers
engine = create_async_engine(
    DATABASE_URL,
    poolclass=AsyncAdaptedQueuePool,
    pool_size=10,           # Base connections
    max_overflow=20,        # Extra connections under load
    pool_timeout=30,        # Wait for connection
    pool_recycle=1800,      # Recycle connections every 30 min
    pool_pre_ping=True,     # Test connection before use
)

PostgreSQL side:

-- Check max connections
SHOW max_connections;  -- Default 100

-- See current connections
SELECT count(*) FROM pg_stat_activity;

-- Connection per application
SELECT application_name, count(*) 
FROM pg_stat_activity 
GROUP BY application_name;

Pattern: Read Replicas

# Route reads to replica, writes to primary
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

primary_engine = create_async_engine(PRIMARY_URL)
replica_engine = create_async_engine(REPLICA_URL)

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None):
        if self._flushing or self.is_modified():
            return primary_engine.sync_engine
        return replica_engine.sync_engine

Pattern: Denormalization for Read Performance

-- ❌ SLOW: Joining 4 tables for common query
SELECT 
    a.id, a.title, u.name as user_name, 
    COUNT(q.id) as question_count,
    AVG(ans.value) as avg_score
FROM assessments a
JOIN users u ON a.user_id = u.id
JOIN questions q ON q.assessment_id = a.id
LEFT JOIN answers ans ON ans.question_id = q.id
GROUP BY a.id, a.title, u.name;

-- ✅ FAST: Denormalized columns
ALTER TABLE assessments ADD COLUMN user_name VARCHAR(100);
ALTER TABLE assessments ADD COLUMN question_count INT DEFAULT 0;
ALTER TABLE assessments ADD COLUMN avg_score NUMERIC(3,2);

-- Update via triggers or application code
-- Query becomes simple:
SELECT id, title, user_name, question_count, avg_score FROM assessments;

Tradeoffs:

  • ✅ Much faster reads
  • ❌ More complex writes (must update denormalized data)
  • ❌ Potential for stale data

Pattern: Partitioning Large Tables

-- Partition events by month
CREATE TABLE events (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    event_type VARCHAR(50),
    created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);

-- Create partitions
CREATE TABLE events_2024_01 PARTITION OF events
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE events_2024_02 PARTITION OF events
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');

-- Query specific partition (fast)
SELECT * FROM events WHERE created_at >= '2024-01-15' AND created_at < '2024-02-01';

-- Drop old data instantly
DROP TABLE events_2023_01;  -- Much faster than DELETE

Pattern: Caching Strategy

# Cache frequently-read, rarely-changed data
import redis.asyncio as redis
import json

cache = redis.from_url(REDIS_URL)

async def get_user_stats(user_id: UUID) -> UserStats:
    cache_key = f"user_stats:{user_id}"
    
    # Try cache first
    cached = await cache.get(cache_key)
    if cached:
        return UserStats.model_validate_json(cached)
    
    # Query database
    async with get_session() as session:
        stats = await calculate_user_stats(session, user_id)
    
    # Cache for 5 minutes
    await cache.setex(cache_key, 300, stats.model_dump_json())
    
    return stats

# Invalidate on write
async def update_user_assessment(user_id: UUID, ...):
    # ... update database ...
    await cache.delete(f"user_stats:{user_id}")

Performance Monitoring Queries

-- Table bloat (needs VACUUM)
SELECT 
    schemaname, relname,
    n_dead_tup as dead_tuples,
    n_live_tup as live_tuples,
    round(n_dead_tup::numeric / NULLIF(n_live_tup, 0) * 100, 2) as dead_pct
FROM pg_stat_user_tables
WHERE n_dead_tup > 10000
ORDER BY n_dead_tup DESC;

-- Index bloat
SELECT
    indexrelname as index,
    pg_size_pretty(pg_relation_size(indexrelid)) as size,
    idx_scan as scans
FROM pg_stat_user_indexes
WHERE idx_scan = 0  -- Unused indexes
ORDER BY pg_relation_size(indexrelid) DESC;

-- Cache hit ratio (should be > 99%)
SELECT 
    sum(blks_hit) * 100.0 / sum(blks_hit + blks_read) as cache_hit_ratio
FROM pg_stat_database;

-- Long-running queries
SELECT 
    pid,
    now() - query_start as duration,
    query
FROM pg_stat_activity
WHERE state = 'active' 
    AND query NOT LIKE '%pg_stat%'
    AND now() - query_start > interval '30 seconds';

Performance Checklist

Before deploying:

  • Slow queries identified and optimized
  • Indexes match query patterns
  • Covering indexes for frequent queries
  • Pagination uses cursor-based (not OFFSET)
  • Large tables partitioned if > 10M rows
  • Connection pool sized appropriately
  • Cache layer for hot data
  • Monitoring in place for slow queries
  • VACUUM and ANALYZE scheduled