Claude Code Plugins

Community-maintained marketplace

Feedback

database-design-decisions

@chriscarterux/chris-claude-stack
1
0

This skill should be used when making critical database architecture decisions - choosing between SQL vs NoSQL, designing schemas, optimizing indexes, implementing caching strategies, and ensuring scalability.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name database-design-decisions
description This skill should be used when making critical database architecture decisions - choosing between SQL vs NoSQL, designing schemas, optimizing indexes, implementing caching strategies, and ensuring scalability.

Database Design Decisions

Overview

Make informed database architecture decisions that balance performance, scalability, consistency, and maintainability. Every database choice has long-term implications - choose wisely based on your specific access patterns and requirements.

Core principle: Database design is about understanding your data access patterns first, then choosing the right tool and schema to optimize for those patterns.

When to Use

Use when:

  • Starting a new project and choosing a database
  • Designing database schemas
  • Optimizing query performance
  • Planning for scale (horizontal or vertical)
  • Implementing caching strategies
  • Migrating between databases
  • Designing multi-region architectures

Don't use when:

  • Making trivial CRUD changes
  • Writing simple queries
  • Basic ORM operations

Decision Frameworks

SQL vs NoSQL Decision Tree

START: What are your primary requirements?

├─ Need ACID transactions + complex joins?
│  └─ **SQL** (PostgreSQL, MySQL)
│
├─ Need flexible schema + horizontal scaling?
│  └─ **NoSQL Document** (MongoDB, DynamoDB)
│
├─ Need time-series data + aggregations?
│  └─ **Time-Series** (TimescaleDB, InfluxDB)
│
├─ Need key-value store + ultra-fast reads?
│  └─ **Cache/KV Store** (Redis, Memcached)
│
├─ Need graph relationships?
│  └─ **Graph DB** (Neo4j, DGraph)
│
└─ Need full-text search?
   └─ **Search Engine** (Elasticsearch, Algolia)

Database Selection Matrix

Requirement PostgreSQL MySQL MongoDB DynamoDB Redis
ACID transactions Excellent Excellent Limited Limited No
Complex queries Excellent Good Fair Poor Poor
Horizontal scaling Fair Fair Excellent Excellent Excellent
Schema flexibility Good Fair Excellent Excellent Excellent
Read performance Excellent Excellent Excellent Excellent Outstanding
Write performance Good Excellent Excellent Excellent Outstanding
Learning curve Medium Low Low Medium Low
Operational cost Medium Low Medium Pay-per-use Low
Best for Complex apps Web apps Flexible data AWS apps Caching

CAP Theorem Trade-offs

CAP Theorem: You can only have 2 of 3:

  • Consistency: All nodes see the same data
  • Availability: System always responds
  • Partition tolerance: System works despite network failures

In practice (partition tolerance is mandatory):

Database Choice Trade-off
PostgreSQL CP Sacrifices availability during network splits
Cassandra AP Eventual consistency, always available
MongoDB CP (default) Can configure for AP with read concerns
DynamoDB AP (default) Offers strong consistency option
Redis AP Eventual consistency with replication

Normalization vs Denormalization

Normalize when:

  • Data integrity is critical
  • Storage cost matters
  • Write patterns are complex
  • Need to avoid update anomalies

Denormalize when:

  • Read performance is critical
  • Querying across tables is expensive
  • Using NoSQL databases
  • Access patterns are known and stable

Example decision:

// Normalized (SQL) - Better for writes
users: {
  id, name, email
}
posts: {
  id, user_id, title, content
}

// Query requires JOIN
SELECT users.name, posts.title
FROM posts
JOIN users ON posts.user_id = users.id

// Denormalized (NoSQL) - Better for reads
posts: {
  id, title, content,
  user: { id, name, email } // Embedded
}

// No JOIN needed - single query
db.posts.find({})

Common Patterns

RDBMS Patterns (PostgreSQL/MySQL)

1. Schema Design

Good schema design:

-- Users table
CREATE TABLE users (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  email VARCHAR(255) UNIQUE NOT NULL,
  username VARCHAR(50) UNIQUE NOT NULL,
  password_hash VARCHAR(255) NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Posts table with foreign key
CREATE TABLE posts (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
  title VARCHAR(255) NOT NULL,
  content TEXT NOT NULL,
  status VARCHAR(20) NOT NULL DEFAULT 'draft',
  published_at TIMESTAMPTZ,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

  -- Constraints
  CONSTRAINT valid_status CHECK (status IN ('draft', 'published', 'archived'))
);

-- Junction table for many-to-many
CREATE TABLE post_tags (
  post_id UUID NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
  tag_id UUID NOT NULL REFERENCES tags(id) ON DELETE CASCADE,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

  PRIMARY KEY (post_id, tag_id)
);

-- Indexes
CREATE INDEX idx_posts_user_id ON posts(user_id);
CREATE INDEX idx_posts_status ON posts(status);
CREATE INDEX idx_posts_published_at ON posts(published_at) WHERE status = 'published';

TypeScript with Prisma:

// schema.prisma
model User {
  id        String   @id @default(uuid())
  email     String   @unique
  username  String   @unique
  password  String
  posts     Post[]
  createdAt DateTime @default(now())
  updatedAt DateTime @updatedAt

  @@index([email])
}

model Post {
  id          String    @id @default(uuid())
  userId      String
  user        User      @relation(fields: [userId], references: [id], onDelete: Cascade)
  title       String
  content     String    @db.Text
  status      Status    @default(DRAFT)
  publishedAt DateTime?
  tags        PostTag[]
  createdAt   DateTime  @default(now())
  updatedAt   DateTime  @updatedAt

  @@index([userId])
  @@index([status])
  @@index([publishedAt])
}

enum Status {
  DRAFT
  PUBLISHED
  ARCHIVED
}

2. Query Optimization Patterns

// Bad: N+1 query problem
async function getPosts() {
  const posts = await db.posts.findMany()

  // This runs a query for EACH post!
  for (const post of posts) {
    post.user = await db.users.findUnique({
      where: { id: post.userId }
    })
  }

  return posts
}

// Good: Use joins/includes
async function getPosts() {
  return db.posts.findMany({
    include: {
      user: {
        select: { id: true, name: true, email: true }
      },
      tags: true
    }
  })
}

// Good: Pagination for large datasets
async function getPosts(cursor?: string, limit = 20) {
  return db.posts.findMany({
    take: limit,
    skip: cursor ? 1 : 0,
    cursor: cursor ? { id: cursor } : undefined,
    orderBy: { createdAt: 'desc' },
    include: { user: true }
  })
}

// Good: Batch operations
async function createPosts(postsData: PostInput[]) {
  return db.posts.createMany({
    data: postsData,
    skipDuplicates: true
  })
}

3. Transaction Patterns

// PostgreSQL transaction with Prisma
async function transferPoints(fromUserId: string, toUserId: string, points: number) {
  return db.$transaction(async (tx) => {
    // Deduct from sender
    const sender = await tx.user.update({
      where: { id: fromUserId },
      data: { points: { decrement: points } }
    })

    // Check if sender has enough points
    if (sender.points < 0) {
      throw new Error('Insufficient points')
    }

    // Add to receiver
    await tx.user.update({
      where: { id: toUserId },
      data: { points: { increment: points } }
    })

    // Log transaction
    await tx.transaction.create({
      data: {
        fromUserId,
        toUserId,
        amount: points,
        type: 'TRANSFER'
      }
    })
  })
}

// Optimistic locking
async function updatePost(postId: string, version: number, data: any) {
  const post = await db.post.updateMany({
    where: {
      id: postId,
      version: version // Only update if version matches
    },
    data: {
      ...data,
      version: { increment: 1 }
    }
  })

  if (post.count === 0) {
    throw new Error('Post was modified by another user')
  }
}

NoSQL Document Patterns (MongoDB)

1. Document Schema Design

// Embedded documents (denormalized) - for 1-to-few
interface UserDocument {
  _id: ObjectId
  email: string
  username: string
  profile: {
    bio: string
    avatar: string
    location: string
  }
  // Embed recent activity (limited growth)
  recentPosts: Array<{
    id: ObjectId
    title: string
    publishedAt: Date
  }>
  createdAt: Date
  updatedAt: Date
}

// Referenced documents - for 1-to-many or many-to-many
interface PostDocument {
  _id: ObjectId
  userId: ObjectId  // Reference to User
  title: string
  content: string
  status: 'draft' | 'published' | 'archived'
  tags: ObjectId[]  // References to Tag documents
  publishedAt?: Date
  createdAt: Date
  updatedAt: Date
}

// MongoDB schema with Mongoose
import mongoose from 'mongoose'

const UserSchema = new mongoose.Schema({
  email: { type: String, required: true, unique: true, index: true },
  username: { type: String, required: true, unique: true },
  profile: {
    bio: String,
    avatar: String,
    location: String
  },
  recentPosts: [{
    id: mongoose.Schema.Types.ObjectId,
    title: String,
    publishedAt: Date
  }]
}, {
  timestamps: true,
  // Optimize for read-heavy workloads
  collection: 'users',
  // Add indexes
})

// Compound index for complex queries
UserSchema.index({ username: 1, createdAt: -1 })

const PostSchema = new mongoose.Schema({
  userId: {
    type: mongoose.Schema.Types.ObjectId,
    ref: 'User',
    required: true,
    index: true
  },
  title: { type: String, required: true },
  content: { type: String, required: true },
  status: {
    type: String,
    enum: ['draft', 'published', 'archived'],
    default: 'draft',
    index: true
  },
  tags: [{ type: mongoose.Schema.Types.ObjectId, ref: 'Tag' }],
  publishedAt: Date
}, { timestamps: true })

// Compound index for common query patterns
PostSchema.index({ userId: 1, status: 1, publishedAt: -1 })

2. Query Patterns

// Efficient queries with projections
async function getPosts() {
  return Post.find({ status: 'published' })
    .select('title content publishedAt') // Only fetch needed fields
    .populate('userId', 'username avatar') // Join with user
    .sort({ publishedAt: -1 })
    .limit(20)
    .lean() // Return plain objects (faster)
}

// Aggregation pipeline for complex queries
async function getPostStats(userId: string) {
  return Post.aggregate([
    { $match: { userId: new ObjectId(userId) } },
    { $group: {
      _id: '$status',
      count: { $sum: 1 },
      avgLength: { $avg: { $strLenCP: '$content' } }
    }},
    { $sort: { count: -1 } }
  ])
}

// Text search (requires text index)
PostSchema.index({ title: 'text', content: 'text' })

async function searchPosts(query: string) {
  return Post.find(
    { $text: { $search: query } },
    { score: { $meta: 'textScore' } }
  ).sort({ score: { $meta: 'textScore' } })
}

DynamoDB Patterns

1. Single Table Design

// Single table pattern - all entities in one table
interface DynamoDBItem {
  PK: string        // Partition Key
  SK: string        // Sort Key
  GSI1PK?: string   // Global Secondary Index PK
  GSI1SK?: string   // Global Secondary Index SK
  Type: string
  // Entity-specific fields
  [key: string]: any
}

// User item
{
  PK: "USER#123",
  SK: "PROFILE",
  Type: "User",
  email: "user@example.com",
  username: "johndoe",
  createdAt: "2025-01-01T00:00:00Z"
}

// Post item
{
  PK: "USER#123",
  SK: "POST#456",
  Type: "Post",
  title: "My Post",
  content: "...",
  GSI1PK: "POST#456",     // For direct post access
  GSI1SK: "2025-01-01",   // For time-based queries
  createdAt: "2025-01-01T00:00:00Z"
}

// TypeScript implementation
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'
import { DynamoDBDocumentClient, GetCommand, QueryCommand, PutCommand } from '@aws-sdk/lib-dynamodb'

const client = new DynamoDBClient({})
const docClient = DynamoDBDocumentClient.from(client)

async function getUser(userId: string) {
  const result = await docClient.send(new GetCommand({
    TableName: 'AppTable',
    Key: {
      PK: `USER#${userId}`,
      SK: 'PROFILE'
    }
  }))

  return result.Item
}

async function getUserPosts(userId: string) {
  const result = await docClient.send(new QueryCommand({
    TableName: 'AppTable',
    KeyConditionExpression: 'PK = :pk AND begins_with(SK, :sk)',
    ExpressionAttributeValues: {
      ':pk': `USER#${userId}`,
      ':sk': 'POST#'
    }
  }))

  return result.Items
}

async function createPost(userId: string, post: any) {
  const postId = crypto.randomUUID()

  await docClient.send(new PutCommand({
    TableName: 'AppTable',
    Item: {
      PK: `USER#${userId}`,
      SK: `POST#${postId}`,
      GSI1PK: `POST#${postId}`,
      GSI1SK: new Date().toISOString(),
      Type: 'Post',
      ...post
    }
  }))

  return postId
}

Redis Caching Patterns

1. Cache Strategies

import Redis from 'ioredis'

const redis = new Redis(process.env.REDIS_URL!)

// Cache-aside pattern (lazy loading)
async function getUser(userId: string) {
  const cacheKey = `user:${userId}`

  // Try cache first
  const cached = await redis.get(cacheKey)
  if (cached) {
    return JSON.parse(cached)
  }

  // Fetch from database
  const user = await db.users.findUnique({ where: { id: userId } })

  // Store in cache (expires in 1 hour)
  await redis.setex(cacheKey, 3600, JSON.stringify(user))

  return user
}

// Write-through pattern (update cache on write)
async function updateUser(userId: string, data: any) {
  // Update database
  const user = await db.users.update({
    where: { id: userId },
    data
  })

  // Update cache immediately
  const cacheKey = `user:${userId}`
  await redis.setex(cacheKey, 3600, JSON.stringify(user))

  return user
}

// Cache invalidation
async function deleteUser(userId: string) {
  // Delete from database
  await db.users.delete({ where: { id: userId } })

  // Invalidate cache
  await redis.del(`user:${userId}`)
}

// List caching with Redis sets
async function getUserFollowers(userId: string): Promise<string[]> {
  const cacheKey = `user:${userId}:followers`

  // Check cache
  const cached = await redis.smembers(cacheKey)
  if (cached.length > 0) {
    return cached
  }

  // Fetch from database
  const followers = await db.follows.findMany({
    where: { followingId: userId },
    select: { followerId: true }
  })

  const followerIds = followers.map(f => f.followerId)

  // Cache as a set
  if (followerIds.length > 0) {
    await redis.sadd(cacheKey, ...followerIds)
    await redis.expire(cacheKey, 3600)
  }

  return followerIds
}

// Rate limiting with Redis
async function checkRateLimit(userId: string, limit = 100, windowSeconds = 60) {
  const key = `rate:${userId}`
  const current = await redis.incr(key)

  if (current === 1) {
    await redis.expire(key, windowSeconds)
  }

  return {
    allowed: current <= limit,
    remaining: Math.max(0, limit - current)
  }
}

// Distributed locks
async function acquireLock(resource: string, ttl = 10000) {
  const lockKey = `lock:${resource}`
  const token = crypto.randomUUID()

  // Try to acquire lock
  const acquired = await redis.set(
    lockKey,
    token,
    'PX',
    ttl,
    'NX'
  )

  if (!acquired) {
    throw new Error('Could not acquire lock')
  }

  return {
    token,
    release: async () => {
      // Only release if we still own the lock
      await redis.eval(
        `if redis.call("get", KEYS[1]) == ARGV[1] then
           return redis.call("del", KEYS[1])
         else
           return 0
         end`,
        1,
        lockKey,
        token
      )
    }
  }
}

Schema Design Best Practices

Data Type Selection

PostgreSQL:

-- Use appropriate types for efficiency
id UUID PRIMARY KEY,                          -- Better than VARCHAR for IDs
email VARCHAR(255),                           -- Limited size for emails
age INTEGER CHECK (age >= 0 AND age <= 120), -- Constrained integers
price DECIMAL(10, 2),                         -- Precise money (not FLOAT)
bio TEXT,                                     -- Unlimited text
is_active BOOLEAN DEFAULT true,               -- Not TINYINT
created_at TIMESTAMPTZ,                       -- With timezone
tags TEXT[],                                  -- Native array type
metadata JSONB,                               -- Binary JSON (indexed)

MongoDB:

// Use MongoDB types efficiently
interface UserDocument {
  _id: ObjectId              // MongoDB's native ID type
  email: string
  age: number
  price: Decimal128          // Precise decimals
  bio: string
  isActive: boolean
  createdAt: Date            // Native Date type
  tags: string[]             // Array
  metadata: Record<string, any>  // Flexible object
}

Constraints and Relationships

-- Primary keys
id UUID PRIMARY KEY DEFAULT gen_random_uuid()

-- Foreign keys with referential actions
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE

-- Unique constraints
CONSTRAINT unique_username UNIQUE (username)
CONSTRAINT unique_email_per_org UNIQUE (organization_id, email)

-- Check constraints
CONSTRAINT valid_status CHECK (status IN ('active', 'inactive', 'suspended'))
CONSTRAINT positive_price CHECK (price >= 0)
CONSTRAINT valid_date_range CHECK (end_date > start_date)

-- Not null constraints
email VARCHAR(255) NOT NULL
username VARCHAR(50) NOT NULL

-- Default values
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
status VARCHAR(20) DEFAULT 'active'

Migration Strategies

Safe migrations:

// 1. Add new column (nullable first)
await db.$executeRaw`
  ALTER TABLE users
  ADD COLUMN new_field VARCHAR(255)
`

// 2. Backfill data
await db.$executeRaw`
  UPDATE users
  SET new_field = old_field
  WHERE new_field IS NULL
`

// 3. Make it NOT NULL
await db.$executeRaw`
  ALTER TABLE users
  ALTER COLUMN new_field SET NOT NULL
`

// Zero-downtime column rename
// Step 1: Add new column
// Step 2: Dual-write to both columns
// Step 3: Backfill old data
// Step 4: Switch reads to new column
// Step 5: Drop old column

// Prisma migration example
import { PrismaClient } from '@prisma/client'

const prisma = new PrismaClient()

async function migrate() {
  console.log('Starting migration...')

  // Add new column
  await prisma.$executeRaw`
    ALTER TABLE posts
    ADD COLUMN slug VARCHAR(255)
  `

  // Generate slugs from titles
  const posts = await prisma.post.findMany()

  for (const post of posts) {
    const slug = post.title
      .toLowerCase()
      .replace(/[^a-z0-9]+/g, '-')
      .replace(/^-|-$/g, '')

    await prisma.post.update({
      where: { id: post.id },
      data: { slug }
    })
  }

  // Add unique constraint
  await prisma.$executeRaw`
    ALTER TABLE posts
    ADD CONSTRAINT unique_slug UNIQUE (slug)
  `

  console.log('Migration complete!')
}

Performance Considerations

Query Optimization

Use EXPLAIN to understand queries:

EXPLAIN ANALYZE
SELECT p.*, u.username
FROM posts p
JOIN users u ON p.user_id = u.id
WHERE p.status = 'published'
ORDER BY p.created_at DESC
LIMIT 20;

-- Look for:
-- - Seq Scan (bad for large tables) -> add index
-- - Index Scan (good)
-- - High execution time -> optimize query

Indexing strategy:

-- Single column index
CREATE INDEX idx_users_email ON users(email);

-- Composite index (order matters!)
CREATE INDEX idx_posts_user_status ON posts(user_id, status);

-- Partial index (smaller, faster)
CREATE INDEX idx_published_posts
ON posts(created_at)
WHERE status = 'published';

-- Covering index (includes all queried columns)
CREATE INDEX idx_posts_cover
ON posts(user_id, status)
INCLUDE (title, created_at);

-- Index monitoring
SELECT
  schemaname,
  tablename,
  indexname,
  idx_scan,  -- Number of index scans
  idx_tup_read,
  idx_tup_fetch
FROM pg_stat_user_indexes
WHERE idx_scan = 0  -- Unused indexes
ORDER BY idx_tup_read DESC;

Connection Pooling

// PostgreSQL with connection pooling
import { Pool } from 'pg'

const pool = new Pool({
  host: process.env.DB_HOST,
  port: 5432,
  database: process.env.DB_NAME,
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
  max: 20,                    // Maximum pool size
  min: 5,                     // Minimum idle connections
  idleTimeoutMillis: 30000,   // Close idle connections after 30s
  connectionTimeoutMillis: 2000,
})

async function query(text: string, params?: any[]) {
  const start = Date.now()
  const res = await pool.query(text, params)
  const duration = Date.now() - start

  console.log('Executed query', { text, duration, rows: res.rowCount })
  return res
}

// Prisma connection pooling
// In schema.prisma
datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
  // Connection pooling configuration
  // Format: postgresql://user:pass@host:5432/db?connection_limit=10
}

// Use PgBouncer for better connection pooling
// DATABASE_URL="postgresql://user:pass@pgbouncer:6432/db?pgbouncer=true"

Read Replicas and Sharding

// Read replica setup
import { PrismaClient } from '@prisma/client'

const primaryDb = new PrismaClient({
  datasources: {
    db: { url: process.env.PRIMARY_DB_URL }
  }
})

const replicaDb = new PrismaClient({
  datasources: {
    db: { url: process.env.REPLICA_DB_URL }
  }
})

// Write to primary
async function createUser(data: any) {
  return primaryDb.user.create({ data })
}

// Read from replica
async function getUsers() {
  return replicaDb.user.findMany()
}

// Sharding by user ID
function getShardForUser(userId: string): PrismaClient {
  const hash = parseInt(userId.substring(0, 8), 16)
  const shardId = hash % NUMBER_OF_SHARDS

  return shardClients[shardId]
}

async function getUserFromShard(userId: string) {
  const db = getShardForUser(userId)
  return db.user.findUnique({ where: { id: userId } })
}

Anti-Patterns to Avoid

1. Not Using Indexes (or Using Too Many)

-- ❌ No index on frequently queried column
SELECT * FROM posts WHERE user_id = '123'  -- Seq scan on large table

-- ✅ Add index
CREATE INDEX idx_posts_user_id ON posts(user_id);

-- ❌ Too many indexes slow down writes
CREATE INDEX idx1 ON users(email);
CREATE INDEX idx2 ON users(username);
CREATE INDEX idx3 ON users(created_at);
CREATE INDEX idx4 ON users(updated_at);
CREATE INDEX idx5 ON users(status);  -- Do you really need all these?

-- ✅ Only index columns used in WHERE, JOIN, ORDER BY

2. N+1 Query Problem

// ❌ N+1 queries
const posts = await db.posts.findMany()
for (const post of posts) {
  post.author = await db.users.findUnique({
    where: { id: post.userId }
  })
}

// ✅ Single query with join
const posts = await db.posts.findMany({
  include: { author: true }
})

3. Not Using Transactions

// ❌ No transaction - can leave data inconsistent
await db.accounts.update({
  where: { id: fromAccount },
  data: { balance: { decrement: amount } }
})
// If this fails, money disappears!
await db.accounts.update({
  where: { id: toAccount },
  data: { balance: { increment: amount } }
})

// ✅ Use transaction
await db.$transaction([
  db.accounts.update({
    where: { id: fromAccount },
    data: { balance: { decrement: amount } }
  }),
  db.accounts.update({
    where: { id: toAccount },
    data: { balance: { increment: amount } }
  })
])

4. Storing Arrays as Strings

// ❌ Storing JSON as string
tags: '["javascript", "typescript", "react"]'

// ✅ Use proper array/JSON types
// PostgreSQL
tags TEXT[]
metadata JSONB

// MongoDB - native arrays
tags: ['javascript', 'typescript', 'react']

5. Not Handling Connection Leaks

// ❌ Not closing connections
const client = await pool.connect()
await client.query('SELECT * FROM users')
// Connection never released!

// ✅ Always release
const client = await pool.connect()
try {
  await client.query('SELECT * FROM users')
} finally {
  client.release()
}

// ✅ Or use query directly
await pool.query('SELECT * FROM users')

6. Ignoring Database Constraints

// ❌ Enforcing uniqueness in application only
const existing = await db.user.findUnique({ where: { email } })
if (existing) throw new Error('Email exists')
await db.user.create({ data: { email } })
// Race condition! Two requests can both pass the check

// ✅ Use database unique constraint
// In schema:
email VARCHAR(255) UNIQUE NOT NULL

Examples

Example 1: E-commerce Product Catalog (PostgreSQL)

-- Schema design for e-commerce
CREATE TABLE products (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  sku VARCHAR(50) UNIQUE NOT NULL,
  name VARCHAR(255) NOT NULL,
  description TEXT,
  price DECIMAL(10, 2) NOT NULL CHECK (price >= 0),
  stock_quantity INTEGER NOT NULL DEFAULT 0 CHECK (stock_quantity >= 0),
  category_id UUID REFERENCES categories(id),
  is_active BOOLEAN DEFAULT true,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Optimized indexes
CREATE INDEX idx_products_category ON products(category_id) WHERE is_active = true;
CREATE INDEX idx_products_sku ON products(sku);
CREATE INDEX idx_products_search ON products USING GIN (to_tsvector('english', name || ' ' || description));

-- TypeScript implementation
interface Product {
  id: string
  sku: string
  name: string
  description: string
  price: number
  stockQuantity: number
  categoryId: string
  isActive: boolean
}

async function searchProducts(query: string, categoryId?: string) {
  return db.$queryRaw<Product[]>`
    SELECT * FROM products
    WHERE is_active = true
    AND to_tsvector('english', name || ' ' || description) @@ plainto_tsquery('english', ${query})
    ${categoryId ? Prisma.sql`AND category_id = ${categoryId}` : Prisma.empty}
    ORDER BY ts_rank(to_tsvector('english', name || ' ' || description), plainto_tsquery('english', ${query})) DESC
    LIMIT 20
  `
}

async function reserveStock(productId: string, quantity: number) {
  return db.$transaction(async (tx) => {
    const product = await tx.product.findUnique({
      where: { id: productId }
    })

    if (!product || product.stockQuantity < quantity) {
      throw new Error('Insufficient stock')
    }

    return tx.product.update({
      where: { id: productId },
      data: { stockQuantity: { decrement: quantity } }
    })
  })
}

Example 2: Social Media Feed (MongoDB + Redis)

// MongoDB schema for social media posts
interface PostDocument {
  _id: ObjectId
  userId: ObjectId
  content: string
  media: Array<{
    type: 'image' | 'video'
    url: string
  }>
  likes: number
  comments: number
  createdAt: Date
  updatedAt: Date
}

const PostSchema = new mongoose.Schema({
  userId: { type: mongoose.Schema.Types.ObjectId, ref: 'User', index: true },
  content: { type: String, required: true },
  media: [{
    type: { type: String, enum: ['image', 'video'] },
    url: String
  }],
  likes: { type: Number, default: 0 },
  comments: { type: Number, default: 0 },
}, { timestamps: true })

// Compound index for feed queries
PostSchema.index({ userId: 1, createdAt: -1 })

// Feed generation with caching
async function getUserFeed(userId: string, page = 0, limit = 20) {
  const cacheKey = `feed:${userId}:${page}`

  // Try cache first
  const cached = await redis.get(cacheKey)
  if (cached) {
    return JSON.parse(cached)
  }

  // Get user's following list
  const following = await redis.smembers(`user:${userId}:following`)

  if (following.length === 0) {
    return []
  }

  // Fetch posts from followed users
  const feed = await Post.find({
    userId: { $in: following }
  })
    .sort({ createdAt: -1 })
    .skip(page * limit)
    .limit(limit)
    .populate('userId', 'username avatar')
    .lean()

  // Cache for 5 minutes
  await redis.setex(cacheKey, 300, JSON.stringify(feed))

  return feed
}

// Optimized like operation
async function likePost(postId: string, userId: string) {
  const likeKey = `post:${postId}:likes`

  // Use Redis set for quick lookup
  const alreadyLiked = await redis.sismember(likeKey, userId)
  if (alreadyLiked) {
    return { liked: true }
  }

  // Add to Redis
  await redis.sadd(likeKey, userId)

  // Increment counter in MongoDB (async)
  Post.updateOne(
    { _id: postId },
    { $inc: { likes: 1 } }
  ).exec() // Fire and forget

  // Invalidate feed cache
  await invalidateFeedCache(userId)

  return { liked: true }
}

Example 3: Multi-Tenant SaaS (PostgreSQL with Row-Level Security)

-- Multi-tenant schema with RLS
CREATE TABLE organizations (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  name VARCHAR(255) NOT NULL,
  plan VARCHAR(50) NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE users (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
  email VARCHAR(255) NOT NULL,
  role VARCHAR(50) NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

  UNIQUE(organization_id, email)
);

CREATE TABLE documents (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
  user_id UUID NOT NULL REFERENCES users(id),
  title VARCHAR(255) NOT NULL,
  content TEXT,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Enable Row-Level Security
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- Policy: Users can only see their organization's documents
CREATE POLICY org_isolation ON documents
  FOR ALL
  USING (organization_id = current_setting('app.current_org_id')::UUID);

-- TypeScript implementation
async function setOrganizationContext(orgId: string) {
  await db.$executeRaw`
    SET LOCAL app.current_org_id = ${orgId}
  `
}

async function getDocuments(orgId: string, userId: string) {
  // Set organization context
  await setOrganizationContext(orgId)

  // RLS automatically filters by organization
  return db.documents.findMany({
    where: { userId }
  })
}

// Alternative: Query-based tenant isolation (simpler)
async function getDocumentsSimple(orgId: string, userId: string) {
  return db.documents.findMany({
    where: {
      organizationId: orgId,
      userId
    }
  })
}

Example 4: Time-Series Analytics (TimescaleDB)

-- TimescaleDB for metrics storage
CREATE TABLE metrics (
  time TIMESTAMPTZ NOT NULL,
  user_id UUID NOT NULL,
  metric_name VARCHAR(50) NOT NULL,
  value DOUBLE PRECISION NOT NULL,
  tags JSONB
);

-- Convert to hypertable (TimescaleDB)
SELECT create_hypertable('metrics', 'time');

-- Create indexes
CREATE INDEX idx_metrics_user_time ON metrics (user_id, time DESC);
CREATE INDEX idx_metrics_name_time ON metrics (metric_name, time DESC);

-- Continuous aggregate for hourly rollups
CREATE MATERIALIZED VIEW metrics_hourly
WITH (timescaledb.continuous) AS
SELECT
  time_bucket('1 hour', time) AS bucket,
  user_id,
  metric_name,
  AVG(value) as avg_value,
  MAX(value) as max_value,
  MIN(value) as min_value,
  COUNT(*) as count
FROM metrics
GROUP BY bucket, user_id, metric_name;

-- TypeScript implementation
async function recordMetric(
  userId: string,
  metricName: string,
  value: number,
  tags?: Record<string, any>
) {
  await db.$executeRaw`
    INSERT INTO metrics (time, user_id, metric_name, value, tags)
    VALUES (NOW(), ${userId}, ${metricName}, ${value}, ${JSON.stringify(tags)})
  `
}

async function getMetrics(
  userId: string,
  metricName: string,
  startTime: Date,
  endTime: Date
) {
  return db.$queryRaw`
    SELECT
      time_bucket('5 minutes', time) AS bucket,
      AVG(value) as value
    FROM metrics
    WHERE user_id = ${userId}
      AND metric_name = ${metricName}
      AND time >= ${startTime}
      AND time <= ${endTime}
    GROUP BY bucket
    ORDER BY bucket
  `
}

Resources

  • PostgreSQL documentation: postgresql.org/docs
  • MongoDB schema design: mongodb.com/docs/manual/data-modeling
  • DynamoDB best practices: docs.aws.amazon.com/dynamodb
  • Redis patterns: redis.io/topics/introduction
  • Database normalization: wikipedia.org/wiki/Database_normalization
  • CAP theorem: wikipedia.org/wiki/CAP_theorem

Core takeaway: Database design decisions have long-term consequences. Understand your data access patterns, choose the right database for your use case, design efficient schemas, and always plan for scale from day one.