Claude Code Plugins

Community-maintained marketplace

Feedback

Expert in data pipelines, ETL processes, and data infrastructure

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name data-engineer
description Expert in data pipelines, ETL processes, and data infrastructure
version 1.0.0
tags data-engineering, etl, pipelines, databases, analytics

Data Engineer Skill

I help you build robust data pipelines, ETL processes, and data infrastructure.

What I Do

Data Pipelines:

  • Extract, Transform, Load (ETL) processes
  • Data ingestion from multiple sources
  • Batch and real-time processing
  • Data quality validation

Data Infrastructure:

  • Database schema design
  • Data warehousing
  • Caching strategies
  • Data replication

Analytics:

  • Data aggregation
  • Metrics calculation
  • Report generation
  • Data export

ETL Patterns

Pattern 1: Simple ETL Pipeline

Use case: Daily sync from external API to database

// lib/etl/daily-sync.ts

interface RawCustomer {
  id: string
  full_name: string
  email_address: string
  signup_date: string
}

interface Customer {
  id: string
  name: string
  email: string
  signupDate: Date
}

export async function syncCustomers() {
  console.log('Starting customer sync...')

  // EXTRACT: Fetch data from external API
  const response = await fetch('https://api.example.com/customers', {
    headers: {
      'Authorization': `Bearer ${process.env.API_KEY}`
    }
  })

  const rawCustomers: RawCustomer[] = await response.json()
  console.log(`Extracted ${rawCustomers.length} customers`)

  // TRANSFORM: Clean and normalize data
  const transformedCustomers: Customer[] = rawCustomers.map(raw => ({
    id: raw.id,
    name: raw.full_name.trim(),
    email: raw.email_address.toLowerCase(),
    signupDate: new Date(raw.signup_date)
  }))

  // LOAD: Insert into database
  let inserted = 0
  let updated = 0

  for (const customer of transformedCustomers) {
    const existing = await db.customers.findUnique({
      where: { id: customer.id }
    })

    if (existing) {
      await db.customers.update({
        where: { id: customer.id },
        data: customer
      })
      updated++
    } else {
      await db.customers.create({
        data: customer
      })
      inserted++
    }
  }

  console.log(`Sync complete: ${inserted} inserted, ${updated} updated`)

  return { inserted, updated, total: transformedCustomers.length }
}

Schedule with Vercel Cron:

// vercel.json
{
  "crons": [
    {
      "path": "/api/cron/sync-customers",
      "schedule": "0 2 * * *"
    }
  ]
}
// app/api/cron/sync-customers/route.ts
import { syncCustomers } from '@/lib/etl/daily-sync'

export async function GET(req: Request) {
  const authHeader = req.headers.get('authorization')
  if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
    return Response.json({ error: 'Unauthorized' }, { status: 401 })
  }

  try {
    const result = await syncCustomers()
    return Response.json(result)
  } catch (error) {
    console.error('Sync failed:', error)
    return Response.json({ error: 'Sync failed' }, { status: 500 })
  }
}

Pattern 2: Incremental ETL (Delta Sync)

Use case: Only process new/changed records

// lib/etl/incremental-sync.ts

export async function incrementalSync() {
  // Get last sync timestamp
  const lastSync = await db.syncLog.findFirst({
    where: { source: 'customers' },
    orderBy: { syncedAt: 'desc' }
  })

  const since = lastSync?.syncedAt || new Date('2020-01-01')

  // EXTRACT: Only fetch records modified since last sync
  const response = await fetch(
    `https://api.example.com/customers?modified_since=${since.toISOString()}`,
    {
      headers: { 'Authorization': `Bearer ${process.env.API_KEY}` }
    }
  )

  const newOrModified = await response.json()
  console.log(`Found ${newOrModified.length} new/modified records`)

  // TRANSFORM & LOAD
  for (const record of newOrModified) {
    await db.customers.upsert({
      where: { id: record.id },
      create: transformCustomer(record),
      update: transformCustomer(record)
    })
  }

  // Log sync
  await db.syncLog.create({
    data: {
      source: 'customers',
      recordsProcessed: newOrModified.length,
      syncedAt: new Date()
    }
  })

  return { processed: newOrModified.length }
}

Benefits:

  • Faster (only process changes)
  • Lower API costs
  • Reduced database load

Pattern 3: Real-Time Data Pipeline

Use case: Process events as they happen

// lib/pipelines/events-processor.ts

import { Kafka } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'myapp',
  brokers: [process.env.KAFKA_BROKER!]
})

const consumer = kafka.consumer({ groupId: 'analytics-group' })

export async function startEventProcessor() {
  await consumer.connect()
  await consumer.subscribe({ topic: 'user-events', fromBeginning: false })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value!.toString())

      // TRANSFORM: Enrich event data
      const enrichedEvent = {
        ...event,
        processedAt: new Date(),
        userId: event.user_id,
        eventType: event.type.toLowerCase()
      }

      // LOAD: Write to analytics database
      await analyticsDb.events.create({
        data: enrichedEvent
      })

      // Also update real-time metrics
      await updateRealtimeMetrics(enrichedEvent)
    }
  })
}

async function updateRealtimeMetrics(event: any) {
  if (event.eventType === 'purchase') {
    await redis.hincrby('metrics:today', 'purchases', 1)
    await redis.hincrbyfloat('metrics:today', 'revenue', event.amount)
  }
}

Data Transformation Patterns

Transformation 1: Data Cleaning

// lib/transformers/cleaners.ts

export function cleanEmail(email: string): string {
  return email.trim().toLowerCase()
}

export function cleanPhone(phone: string): string {
  // Remove all non-numeric characters
  return phone.replace(/\D/g, '')
}

export function cleanName(name: string): string {
  return name
    .trim()
    .replace(/\s+/g, ' ')  // Multiple spaces → single space
    .split(' ')
    .map(word => word.charAt(0).toUpperCase() + word.slice(1).toLowerCase())
    .join(' ')
}

export function parseDate(dateStr: string): Date | null {
  try {
    const date = new Date(dateStr)
    return isNaN(date.getTime()) ? null : date
  } catch {
    return null
  }
}

Transformation 2: Data Enrichment

// lib/transformers/enrichers.ts

export async function enrichCustomer(customer: RawCustomer) {
  // Add geolocation data
  const geo = await geocode(customer.address)

  // Add lifecycle stage
  const daysSinceSignup = differenceInDays(new Date(), customer.signupDate)
  const lifecycleStage =
    daysSinceSignup < 7 ? 'new' :
    daysSinceSignup < 30 ? 'active' :
    daysSinceSignup < 90 ? 'engaged' : 'dormant'

  // Add lifetime value
  const orders = await db.orders.findMany({
    where: { customerId: customer.id }
  })
  const lifetimeValue = orders.reduce((sum, order) => sum + order.total, 0)

  return {
    ...customer,
    latitude: geo.lat,
    longitude: geo.lng,
    lifecycleStage,
    lifetimeValue,
    totalOrders: orders.length
  }
}

Transformation 3: Data Aggregation

// lib/transformers/aggregators.ts

export async function aggregateDailySales() {
  const sales = await db.$queryRaw`
    SELECT
      DATE(created_at) as date,
      COUNT(*) as order_count,
      SUM(total) as total_revenue,
      AVG(total) as average_order_value,
      COUNT(DISTINCT user_id) as unique_customers
    FROM orders
    WHERE created_at >= NOW() - INTERVAL '30 days'
    GROUP BY DATE(created_at)
    ORDER BY date DESC
  `

  return sales
}

export async function aggregateByRegion() {
  const regions = await db.$queryRaw`
    SELECT
      country,
      COUNT(*) as customer_count,
      SUM(lifetime_value) as total_revenue
    FROM customers
    GROUP BY country
    ORDER BY total_revenue DESC
  `

  return regions
}

Data Validation

Schema Validation with Zod

// lib/validators/customer.ts
import { z } from 'zod'

export const customerSchema = z.object({
  id: z.string().uuid(),
  email: z.string().email(),
  name: z.string().min(1).max(100),
  age: z.number().int().min(0).max(150).optional(),
  signupDate: z.coerce.date(),
  tags: z.array(z.string()).default([])
})

export type ValidatedCustomer = z.infer<typeof customerSchema>

export function validateCustomer(data: unknown): ValidatedCustomer {
  return customerSchema.parse(data)
}

// In ETL pipeline
const rawData = await fetchFromAPI()
const validatedData = rawData.map(record => {
  try {
    return validateCustomer(record)
  } catch (error) {
    console.error(`Validation failed for record ${record.id}:`, error)
    return null
  }
}).filter(Boolean)

Data Quality Checks

// lib/quality/checks.ts

export async function dataQualityChecks() {
  const checks = []

  // Check 1: No duplicate emails
  const duplicates = await db.$queryRaw`
    SELECT email, COUNT(*) as count
    FROM customers
    GROUP BY email
    HAVING COUNT(*) > 1
  `

  checks.push({
    name: 'No duplicate emails',
    passed: duplicates.length === 0,
    issues: duplicates
  })

  // Check 2: All customers have valid emails
  const invalidEmails = await db.customers.count({
    where: {
      email: {
        not: {
          contains: '@'
        }
      }
    }
  })

  checks.push({
    name: 'Valid email format',
    passed: invalidEmails === 0,
    issues: invalidEmails
  })

  // Check 3: No orphaned orders
  const orphanedOrders = await db.$queryRaw`
    SELECT COUNT(*) as count
    FROM orders
    WHERE user_id NOT IN (SELECT id FROM customers)
  `

  checks.push({
    name: 'No orphaned orders',
    passed: orphanedOrders[0].count === 0,
    issues: orphanedOrders[0].count
  })

  return checks
}

Data Warehousing

Star Schema Design

// prisma/schema.prisma

// Fact table (metrics/events)
model FactSales {
  id             String   @id @default(cuid())

  // Foreign keys to dimensions
  dateId         String
  customerId     String
  productId      String
  locationId     String

  // Metrics
  quantity       Int
  unitPrice      Decimal
  totalAmount    Decimal
  discountAmount Decimal
  netAmount      Decimal

  // Relations
  date           DimDate     @relation(fields: [dateId], references: [id])
  customer       DimCustomer @relation(fields: [customerId], references: [id])
  product        DimProduct  @relation(fields: [productId], references: [id])
  location       DimLocation @relation(fields: [locationId], references: [id])
}

// Dimension tables
model DimDate {
  id         String   @id
  date       DateTime
  year       Int
  quarter    Int
  month      Int
  dayOfWeek  Int
  isWeekend  Boolean
  isHoliday  Boolean

  sales      FactSales[]
}

model DimCustomer {
  id             String   @id
  name           String
  email          String
  segment        String  // 'enterprise', 'smb', 'consumer'
  lifecycleStage String

  sales          FactSales[]
}

model DimProduct {
  id       String   @id
  name     String
  category String
  brand    String
  sku      String

  sales    FactSales[]
}

model DimLocation {
  id      String   @id
  country String
  state   String
  city    String
  zipCode String

  sales   FactSales[]
}

Populate Data Warehouse

// lib/warehouse/populate.ts

export async function populateWarehouse() {
  // Extract from operational database
  const orders = await db.orders.findMany({
    include: {
      customer: true,
      items: {
        include: { product: true }
      }
    },
    where: {
      createdAt: {
        gte: new Date(Date.now() - 24 * 60 * 60 * 1000)  // Last 24 hours
      }
    }
  })

  for (const order of orders) {
    // Transform into fact table format
    for (const item of order.items) {
      await warehouseDb.factSales.create({
        data: {
          dateId: formatDateId(order.createdAt),
          customerId: order.customer.id,
          productId: item.product.id,
          locationId: order.customer.locationId,
          quantity: item.quantity,
          unitPrice: item.price,
          totalAmount: item.quantity * item.price,
          discountAmount: item.discount || 0,
          netAmount: (item.quantity * item.price) - (item.discount || 0)
        }
      })
    }
  }
}

function formatDateId(date: Date): string {
  return date.toISOString().split('T')[0]  // "2025-10-22"
}

Performance Optimization

Batch Processing

// lib/etl/batch-processor.ts

export async function processBatch<T>(
  items: T[],
  processor: (item: T) => Promise<void>,
  batchSize = 100
) {
  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize)

    // Process batch in parallel
    await Promise.all(batch.map(processor))

    console.log(`Processed ${Math.min(i + batchSize, items.length)}/${items.length}`)

    // Small delay to avoid overwhelming database
    await new Promise(resolve => setTimeout(resolve, 100))
  }
}

// Usage
await processBatch(
  customers,
  async (customer) => {
    await db.customers.upsert({
      where: { id: customer.id },
      create: customer,
      update: customer
    })
  },
  100  // Process 100 at a time
)

Database Optimization

// lib/db/optimizations.ts

// Use raw SQL for complex aggregations
export async function efficientAggregation() {
  // Instead of multiple queries
  const result = await db.$queryRaw`
    SELECT
      c.segment,
      COUNT(DISTINCT c.id) as customer_count,
      COUNT(o.id) as order_count,
      SUM(o.total) as total_revenue,
      AVG(o.total) as avg_order_value
    FROM customers c
    LEFT JOIN orders o ON c.id = o.customer_id
    GROUP BY c.segment
  `

  return result
}

// Use indexes for faster queries
// In migration file:
await db.$executeRaw`
  CREATE INDEX idx_orders_created_at ON orders(created_at);
  CREATE INDEX idx_orders_customer_id ON orders(customer_id);
  CREATE INDEX idx_customers_email ON customers(email);
`

Caching Strategies

Redis for Aggregated Data

// lib/cache/metrics.ts
import Redis from 'ioredis'

const redis = new Redis(process.env.REDIS_URL!)

export async function getCachedMetrics(key: string) {
  const cached = await redis.get(key)

  if (cached) {
    return JSON.parse(cached)
  }

  // Calculate metrics
  const metrics = await calculateMetrics()

  // Cache for 1 hour
  await redis.setex(key, 3600, JSON.stringify(metrics))

  return metrics
}

async function calculateMetrics() {
  const [revenue, orders, customers] = await Promise.all([
    db.orders.aggregate({ _sum: { total: true } }),
    db.orders.count(),
    db.customers.count()
  ])

  return {
    totalRevenue: revenue._sum.total || 0,
    totalOrders: orders,
    totalCustomers: customers,
    avgOrderValue: (revenue._sum.total || 0) / orders
  }
}

Monitoring and Logging

Pipeline Monitoring

// lib/monitoring/pipeline.ts

interface PipelineRun {
  pipelineName: string
  startTime: Date
  endTime?: Date
  status: 'running' | 'success' | 'failed'
  recordsProcessed: number
  errorMessage?: string
}

export async function trackPipeline<T>(
  name: string,
  pipeline: () => Promise<T>
): Promise<T> {
  const run: PipelineRun = {
    pipelineName: name,
    startTime: new Date(),
    status: 'running',
    recordsProcessed: 0
  }

  // Log start
  await db.pipelineRuns.create({ data: run })

  try {
    const result = await pipeline()

    // Log success
    run.endTime = new Date()
    run.status = 'success'
    await db.pipelineRuns.update({
      where: { id: run.id },
      data: run
    })

    return result
  } catch (error) {
    // Log failure
    run.endTime = new Date()
    run.status = 'failed'
    run.errorMessage = error.message

    await db.pipelineRuns.update({
      where: { id: run.id },
      data: run
    })

    // Alert team
    await sendAlert({
      channel: '#data-engineering',
      message: `Pipeline ${name} failed: ${error.message}`
    })

    throw error
  }
}

// Usage
await trackPipeline('daily-customer-sync', async () => {
  return await syncCustomers()
})

Common Patterns

Pattern: Upsert with Conflict Resolution

export async function upsertWithConflict(record: any) {
  const existing = await db.customers.findUnique({
    where: { id: record.id }
  })

  if (existing) {
    // Conflict resolution: Use most recent data
    if (record.updatedAt > existing.updatedAt) {
      await db.customers.update({
        where: { id: record.id },
        data: record
      })
      return { action: 'updated' }
    } else {
      return { action: 'skipped', reason: 'stale data' }
    }
  } else {
    await db.customers.create({ data: record })
    return { action: 'created' }
  }
}

Pattern: Dead Letter Queue

// lib/queue/dead-letter.ts

export async function processWithRetry<T>(
  item: T,
  processor: (item: T) => Promise<void>,
  maxRetries = 3
) {
  let attempt = 0

  while (attempt < maxRetries) {
    try {
      await processor(item)
      return { success: true }
    } catch (error) {
      attempt++
      console.error(`Attempt ${attempt} failed:`, error)

      if (attempt >= maxRetries) {
        // Move to dead letter queue
        await db.deadLetterQueue.create({
          data: {
            item: JSON.stringify(item),
            error: error.message,
            attempts: attempt,
            queuedAt: new Date()
          }
        })

        return { success: false, deadLettered: true }
      }

      // Exponential backoff
      await new Promise(resolve =>
        setTimeout(resolve, 1000 * Math.pow(2, attempt))
      )
    }
  }
}

When to Use Me

Perfect for:

  • Building ETL pipelines
  • Data migration projects
  • Analytics infrastructure
  • Data quality improvement
  • Real-time data processing

I'll help you:

  • Design data pipelines
  • Transform and clean data
  • Build data warehouses
  • Optimize database queries
  • Monitor data quality

What I'll Create

🔄 ETL Pipelines
📊 Data Transformations
🏛️ Data Warehouses
✅ Data Quality Checks
⚡ Real-Time Processors
📈 Analytics Infrastructure

Let's build robust, scalable data infrastructure!