| name | error-handling |
| description | Handle errors and implement logging in data pipelines. Use when implementing retry logic, logging sync operations, tracking failed records, or building fault-tolerant data processing workflows. |
Error Handling
Provides patterns for error handling and logging in sync pipelines.
Structured Logging
import logging
import json
from datetime import datetime
def setup_logger(name: str, log_file: str = None) -> logging.Logger:
"""Set up structured JSON logger."""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
json.dumps({
"timestamp": "%(asctime)s",
"level": "%(levelname)s",
"logger": "%(name)s",
"message": "%(message)s"
})
)
# Console handler
console = logging.StreamHandler()
console.setFormatter(formatter)
logger.addHandler(console)
# File handler
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
Retry Decorator
import time
from functools import wraps
from typing import Tuple, Type
def retry(
max_attempts: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
"""
Retry decorator with exponential backoff.
Usage:
@retry(max_attempts=3, delay=1.0, backoff=2.0)
def api_call():
response = requests.get(url)
response.raise_for_status()
return response.json()
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
current_delay = delay
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
time.sleep(current_delay)
current_delay *= backoff
raise last_exception
return wrapper
return decorator
Error Tracking
from typing import Dict, List, Any
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class SyncError:
"""Represents a sync error."""
record_id: str
error_type: str
message: str
timestamp: datetime = field(default_factory=datetime.now)
details: Dict[str, Any] = field(default_factory=dict)
class ErrorTracker:
"""Track errors during sync operations."""
def __init__(self):
self.errors: List[SyncError] = []
def add_error(self, record_id: str, error_type: str, message: str, **details):
self.errors.append(SyncError(
record_id=record_id,
error_type=error_type,
message=message,
details=details
))
def get_summary(self) -> Dict[str, int]:
"""Get error counts by type."""
summary = {}
for error in self.errors:
summary[error.error_type] = summary.get(error.error_type, 0) + 1
return summary
def has_errors(self) -> bool:
return len(self.errors) > 0
Dead Letter Queue
from typing import List, Dict, Any
import json
from pathlib import Path
class DeadLetterQueue:
"""Store failed records for later processing."""
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self.records: List[Dict[str, Any]] = []
def add(self, record: Dict, error: str):
"""Add failed record to queue."""
self.records.append({
"record": record,
"error": error,
"timestamp": datetime.now().isoformat()
})
def save(self):
"""Save queue to file."""
with open(self.file_path, "w") as f:
json.dump(self.records, f, indent=2)
def load(self) -> List[Dict]:
"""Load queue from file."""
if self.file_path.exists():
with open(self.file_path) as f:
self.records = json.load(f)
return self.records
Context Manager for Operations
from contextlib import contextmanager
from typing import Optional
@contextmanager
def sync_operation(operation_name: str, logger: logging.Logger):
"""
Context manager for tracking sync operations.
Usage:
with sync_operation("fetch_contacts", logger) as op:
records = api.fetch_all("/contacts")
op["records_fetched"] = len(records)
"""
context = {"operation": operation_name, "start_time": datetime.now()}
logger.info(f"Starting {operation_name}")
try:
yield context
context["status"] = "success"
context["end_time"] = datetime.now()
logger.info(f"Completed {operation_name}: {context}")
except Exception as e:
context["status"] = "error"
context["error"] = str(e)
context["end_time"] = datetime.now()
logger.error(f"Failed {operation_name}: {context}")
raise
Helper Script
Use helper.py for the SyncPipeline class with built-in error tracking, logging, and retry logic.