Claude Code Plugins

Community-maintained marketplace

Feedback
15
0

Handle errors and implement logging in data pipelines. Use when implementing retry logic, logging sync operations, tracking failed records, or building fault-tolerant data processing workflows.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name error-handling
description Handle errors and implement logging in data pipelines. Use when implementing retry logic, logging sync operations, tracking failed records, or building fault-tolerant data processing workflows.

Error Handling

Provides patterns for error handling and logging in sync pipelines.

Structured Logging

import logging
import json
from datetime import datetime

def setup_logger(name: str, log_file: str = None) -> logging.Logger:
    """Set up structured JSON logger."""
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    formatter = logging.Formatter(
        json.dumps({
            "timestamp": "%(asctime)s",
            "level": "%(levelname)s",
            "logger": "%(name)s",
            "message": "%(message)s"
        })
    )

    # Console handler
    console = logging.StreamHandler()
    console.setFormatter(formatter)
    logger.addHandler(console)

    # File handler
    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

    return logger

Retry Decorator

import time
from functools import wraps
from typing import Tuple, Type

def retry(
    max_attempts: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
    """
    Retry decorator with exponential backoff.

    Usage:
        @retry(max_attempts=3, delay=1.0, backoff=2.0)
        def api_call():
            response = requests.get(url)
            response.raise_for_status()
            return response.json()
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            current_delay = delay

            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        time.sleep(current_delay)
                        current_delay *= backoff

            raise last_exception
        return wrapper
    return decorator

Error Tracking

from typing import Dict, List, Any
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class SyncError:
    """Represents a sync error."""
    record_id: str
    error_type: str
    message: str
    timestamp: datetime = field(default_factory=datetime.now)
    details: Dict[str, Any] = field(default_factory=dict)

class ErrorTracker:
    """Track errors during sync operations."""

    def __init__(self):
        self.errors: List[SyncError] = []

    def add_error(self, record_id: str, error_type: str, message: str, **details):
        self.errors.append(SyncError(
            record_id=record_id,
            error_type=error_type,
            message=message,
            details=details
        ))

    def get_summary(self) -> Dict[str, int]:
        """Get error counts by type."""
        summary = {}
        for error in self.errors:
            summary[error.error_type] = summary.get(error.error_type, 0) + 1
        return summary

    def has_errors(self) -> bool:
        return len(self.errors) > 0

Dead Letter Queue

from typing import List, Dict, Any
import json
from pathlib import Path

class DeadLetterQueue:
    """Store failed records for later processing."""

    def __init__(self, file_path: str):
        self.file_path = Path(file_path)
        self.records: List[Dict[str, Any]] = []

    def add(self, record: Dict, error: str):
        """Add failed record to queue."""
        self.records.append({
            "record": record,
            "error": error,
            "timestamp": datetime.now().isoformat()
        })

    def save(self):
        """Save queue to file."""
        with open(self.file_path, "w") as f:
            json.dump(self.records, f, indent=2)

    def load(self) -> List[Dict]:
        """Load queue from file."""
        if self.file_path.exists():
            with open(self.file_path) as f:
                self.records = json.load(f)
        return self.records

Context Manager for Operations

from contextlib import contextmanager
from typing import Optional

@contextmanager
def sync_operation(operation_name: str, logger: logging.Logger):
    """
    Context manager for tracking sync operations.

    Usage:
        with sync_operation("fetch_contacts", logger) as op:
            records = api.fetch_all("/contacts")
            op["records_fetched"] = len(records)
    """
    context = {"operation": operation_name, "start_time": datetime.now()}
    logger.info(f"Starting {operation_name}")

    try:
        yield context
        context["status"] = "success"
        context["end_time"] = datetime.now()
        logger.info(f"Completed {operation_name}: {context}")
    except Exception as e:
        context["status"] = "error"
        context["error"] = str(e)
        context["end_time"] = datetime.now()
        logger.error(f"Failed {operation_name}: {context}")
        raise

Helper Script

Use helper.py for the SyncPipeline class with built-in error tracking, logging, and retry logic.