Claude Code Plugins

Community-maintained marketplace

Feedback

Production ETL patterns including idempotency, checkpointing, error handling, and backfill strategies.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name etl-patterns
description Production ETL patterns including idempotency, checkpointing, error handling, and backfill strategies.
allowed-tools Read, Write, Edit, Grep, Glob, Bash

ETL-Patterns

Production-grade Extract-Transform-Load patterns for reliable data pipelines.

Idempotency Patterns

# Pattern 1: Delete-then-insert (simple, works for small datasets)
def load_daily_data(date: str, df: pd.DataFrame) -> None:
    with engine.begin() as conn:
        conn.execute(
            text("DELETE FROM daily_metrics WHERE date = :date"),
            {"date": date}
        )
        df.to_sql('daily_metrics', conn, if_exists='append', index=False)

# Pattern 2: UPSERT (better for large datasets)
def upsert_records(df: pd.DataFrame) -> None:
    for batch in chunked(df.to_dict('records'), 1000):
        stmt = insert(MyTable).values(batch)
        stmt = stmt.on_conflict_do_update(
            index_elements=['id'],
            set_={col: stmt.excluded[col] for col in update_cols}
        )
        session.execute(stmt)

# Pattern 3: Source hash for change detection
def extract_with_hash(df: pd.DataFrame) -> pd.DataFrame:
    hash_cols = ['id', 'name', 'value', 'updated_at']
    df['_row_hash'] = pd.util.hash_pandas_object(df[hash_cols])
    return df

Checkpointing

import json
from pathlib import Path

class Checkpoint:
    def __init__(self, path: str):
        self.path = Path(path)
        self.state = self._load()

    def _load(self) -> dict:
        if self.path.exists():
            return json.loads(self.path.read_text())
        return {}

    def save(self) -> None:
        self.path.write_text(json.dumps(self.state, default=str))

    def get_last_processed(self, key: str) -> str | None:
        return self.state.get(key)

    def set_last_processed(self, key: str, value: str) -> None:
        self.state[key] = value
        self.save()

# Usage
checkpoint = Checkpoint('.etl_checkpoint.json')
last_id = checkpoint.get_last_processed('users_sync')

for batch in fetch_users_since(last_id):
    process(batch)
    checkpoint.set_last_processed('users_sync', batch[-1]['id'])

Error Handling

from dataclasses import dataclass
from typing import Any

@dataclass
class FailedRecord:
    source_id: str
    error: str
    raw_data: dict
    timestamp: datetime

class ETLProcessor:
    def __init__(self):
        self.failed_records: list[FailedRecord] = []

    def process_batch(self, records: list[dict]) -> list[dict]:
        processed = []
        for record in records:
            try:
                processed.append(self.transform(record))
            except Exception as e:
                self.failed_records.append(FailedRecord(
                    source_id=record.get('id', 'unknown'),
                    error=str(e),
                    raw_data=record,
                    timestamp=datetime.now()
                ))
        return processed

    def save_failures(self, path: str) -> None:
        if self.failed_records:
            df = pd.DataFrame([vars(r) for r in self.failed_records])
            df.to_parquet(f"{path}/failures_{datetime.now():%Y%m%d_%H%M%S}.parquet")

# Dead letter queue pattern
def process_with_dlq(records: list[dict], dlq_table: str) -> None:
    for record in records:
        try:
            process(record)
        except Exception as e:
            # Send to dead letter queue for later inspection
            save_to_dlq(dlq_table, record, str(e))

Chunked Processing

from typing import Iterator, TypeVar

T = TypeVar('T')

def chunked(iterable: Iterator[T], size: int) -> Iterator[list[T]]:
    """Yield successive chunks from iterable."""
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) >= size:
            yield batch
            batch = []
    if batch:
        yield batch

# Memory-efficient file processing
def process_large_csv(path: str, chunk_size: int = 50_000) -> None:
    for i, chunk in enumerate(pd.read_csv(path, chunksize=chunk_size)):
        print(f"Processing chunk {i}: {len(chunk)} rows")
        transformed = transform(chunk)
        load(transformed, mode='append')
        del chunk, transformed  # Explicit memory cleanup
        gc.collect()

Backfill Strategy

from datetime import date, timedelta
from concurrent.futures import ThreadPoolExecutor

def backfill_date_range(
    start: date,
    end: date,
    process_fn: callable,
    parallel: int = 4
) -> None:
    """Backfill data for a date range."""
    dates = []
    current = start
    while current <= end:
        dates.append(current)
        current += timedelta(days=1)

    # Process in parallel with controlled concurrency
    with ThreadPoolExecutor(max_workers=parallel) as executor:
        futures = {executor.submit(process_fn, d): d for d in dates}
        for future in as_completed(futures):
            d = futures[future]
            try:
                future.result()
                print(f"Completed: {d}")
            except Exception as e:
                print(f"Failed: {d} - {e}")

# Usage
backfill_date_range(
    start=date(2024, 1, 1),
    end=date(2024, 3, 31),
    process_fn=process_daily_data,
    parallel=4
)

Incremental Load Patterns

# Pattern 1: Timestamp-based incremental
def incremental_by_timestamp(table: str, timestamp_col: str) -> pd.DataFrame:
    last_run = get_last_run_timestamp(table)
    query = f"""
        SELECT * FROM {table}
        WHERE {timestamp_col} > :last_run
        ORDER BY {timestamp_col}
    """
    df = pd.read_sql(query, engine, params={'last_run': last_run})
    if not df.empty:
        set_last_run_timestamp(table, df[timestamp_col].max())
    return df

# Pattern 2: Change Data Capture (CDC) style
def process_cdc_events(events: list[dict]) -> None:
    for event in events:
        op = event['operation']  # INSERT, UPDATE, DELETE
        data = event['data']

        if op == 'DELETE':
            soft_delete(data['id'])
        else:
            upsert(data)

# Pattern 3: Full refresh with swap
def full_refresh_with_swap(df: pd.DataFrame, table: str) -> None:
    temp_table = f"{table}_temp"
    df.to_sql(temp_table, engine, if_exists='replace', index=False)

    with engine.begin() as conn:
        conn.execute(text(f"DROP TABLE IF EXISTS {table}_old"))
        conn.execute(text(f"ALTER TABLE {table} RENAME TO {table}_old"))
        conn.execute(text(f"ALTER TABLE {temp_table} RENAME TO {table}"))
        conn.execute(text(f"DROP TABLE {table}_old"))

Retry Logic

import time
from functools import wraps

def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
    """Decorator for retrying failed operations."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            current_delay = delay

            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        print(f"Attempt {attempt + 1} failed: {e}. Retrying in {current_delay}s")
                        time.sleep(current_delay)
                        current_delay *= backoff

            raise last_exception
        return wrapper
    return decorator

@retry(max_attempts=3, delay=1.0, backoff=2.0)
def fetch_from_api(url: str) -> dict:
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    return response.json()

Pipeline Orchestration

from enum import Enum
from dataclasses import dataclass, field

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class PipelineStep:
    name: str
    func: callable
    dependencies: list[str] = field(default_factory=list)
    status: StepStatus = StepStatus.PENDING
    error: str | None = None

class Pipeline:
    def __init__(self, name: str):
        self.name = name
        self.steps: dict[str, PipelineStep] = {}

    def add_step(self, name: str, func: callable, depends_on: list[str] = None):
        self.steps[name] = PipelineStep(name, func, depends_on or [])

    def run(self) -> bool:
        for step in self._topological_sort():
            # Skip if dependencies failed
            if any(self.steps[d].status == StepStatus.FAILED for d in step.dependencies):
                step.status = StepStatus.SKIPPED
                continue

            step.status = StepStatus.RUNNING
            try:
                step.func()
                step.status = StepStatus.SUCCESS
            except Exception as e:
                step.status = StepStatus.FAILED
                step.error = str(e)

        return all(s.status == StepStatus.SUCCESS for s in self.steps.values())

Logging Best Practices

import structlog

logger = structlog.get_logger()

def process_with_logging(batch_id: str, records: list[dict]) -> None:
    log = logger.bind(batch_id=batch_id, record_count=len(records))

    log.info("batch_started")

    try:
        result = process(records)
        log.info("batch_completed",
                 processed=result.processed_count,
                 failed=result.failed_count)
    except Exception as e:
        log.error("batch_failed", error=str(e))
        raise