Claude Code Plugins

Community-maintained marketplace

Feedback

python-data-engineering

@CROW-B3/.claude
0
0

Comprehensive Python data engineering patterns for AWS Data Lake, including PySpark, Pandas, Apache Airflow, AWS Glue, ETL pipelines, data quality, schema management, performance optimization, FastAPI services, streaming with Kafka/Kinesis, data validation with Great Expectations, testing strategies, error handling, logging, and production deployment on AWS EMR and Glue.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name python-data-engineering
description Comprehensive Python data engineering patterns for AWS Data Lake, including PySpark, Pandas, Apache Airflow, AWS Glue, ETL pipelines, data quality, schema management, performance optimization, FastAPI services, streaming with Kafka/Kinesis, data validation with Great Expectations, testing strategies, error handling, logging, and production deployment on AWS EMR and Glue.

Python Data Engineering Skill

Purpose

Production-ready Python data engineering patterns for building scalable, reliable data pipelines on AWS Data Lake infrastructure. Covers PySpark, Pandas, Airflow, Glue, FastAPI, and modern data engineering best practices.

When to Use This Skill

Auto-activates when working with:

  • Python data pipeline development
  • PySpark applications
  • Apache Airflow DAGs
  • AWS Glue jobs
  • Pandas data transformations
  • ETL/ELT processes
  • Data quality validation
  • FastAPI data services
  • Streaming data processing

Core Principles

1. Layered Architecture

API/Service Layer (FastAPI) → Orchestration (Airflow) → Processing (PySpark/Pandas) → Storage (S3)

2. Data Lake Zones (Medallion)

  • Bronze: Raw, immutable data from sources
  • Silver: Cleaned, validated, deduplicated
  • Gold: Business-ready, aggregated, optimized

3. Schema Management

  • Use Pydantic for data validation
  • Maintain schema registry (AWS Glue Data Catalog)
  • Version schemas with backward compatibility
  • Validate early, fail fast

4. Error Handling & Resilience

  • Idempotent operations
  • Retry logic with exponential backoff
  • Dead letter queues for failed records
  • Comprehensive logging and monitoring

Quick Start Examples

FastAPI Data Service

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field, validator
from typing import List, Optional
import boto3
from datetime import datetime

app = FastAPI(title="Data Lake API")

class DataRequest(BaseModel):
    dataset_name: str = Field(..., regex="^[a-z0-9_]+$")
    partition_date: datetime
    limit: Optional[int] = Field(1000, gt=0, le=10000)

    @validator('partition_date')
    def validate_date(cls, v):
        if v > datetime.now():
            raise ValueError('partition_date cannot be in the future')
        return v

@app.get("/datasets/{dataset_name}/data")
async def get_dataset_data(dataset_name: str, request: DataRequest = Depends()):
    """Query data from S3 Data Lake with Athena."""
    try:
        athena = boto3.client('athena')

        query = f"""
        SELECT * FROM {dataset_name}
        WHERE partition_date = '{request.partition_date.strftime('%Y-%m-%d')}'
        LIMIT {request.limit}
        """

        response = athena.start_query_execution(
            QueryString=query,
            QueryExecutionContext={'Database': 'data_lake'},
            ResultConfiguration={'OutputLocation': 's3://query-results/'}
        )

        return {"query_execution_id": response['QueryExecutionId']}
    except Exception as e:
        logging.error(f"Query failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

PySpark ETL Job Structure

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import logging

class DataLakeETL:
    def __init__(self, app_name: str):
        self.spark = (SparkSession.builder
            .appName(app_name)
            .config("spark.sql.adaptive.enabled", "true")
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
            .getOrCreate())

        self.logger = logging.getLogger(__name__)

    def read_bronze(self, path: str, schema: StructType):
        """Read from raw/bronze zone with schema enforcement."""
        return (self.spark.read
            .schema(schema)
            .option("mode", "PERMISSIVE")
            .option("columnNameOfCorruptRecord", "_corrupt_record")
            .parquet(path))

    def transform_to_silver(self, df):
        """Clean and validate data for silver zone."""
        return (df
            .filter(col("_corrupt_record").isNull())  # Remove corrupt records
            .dropDuplicates(["id"])  # Deduplicate
            .withColumn("processed_at", current_timestamp())
            .withColumn("is_valid",
                when(col("value").isNotNull() & (col("value") > 0), True)
                .otherwise(False))
            .filter(col("is_valid")))  # Only valid records

    def write_silver(self, df, path: str, partition_cols: List[str]):
        """Write to processed/silver zone with partitioning."""
        (df.write
            .mode("overwrite")
            .partitionBy(*partition_cols)
            .parquet(path))

        self.logger.info(f"Written {df.count()} records to {path}")

Airflow DAG Pattern

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
}

with DAG(
    'data_lake_bronze_to_silver',
    default_args=default_args,
    description='Process raw data to silver zone',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    catchup=False,
    tags=['data-lake', 'etl', 'silver'],
) as dag:

    # Wait for source data
    wait_for_data = S3KeySensor(
        task_id='wait_for_bronze_data',
        bucket_name='data-lake-bronze',
        bucket_key='events/{{ ds }}/*.parquet',
        timeout=3600,
        poke_interval=300,
    )

    # Run Glue job
    process_data = GlueJobOperator(
        task_id='bronze_to_silver',
        job_name='bronze-to-silver-etl',
        script_args={
            '--input_path': 's3://data-lake-bronze/events/{{ ds }}/',
            '--output_path': 's3://data-lake-silver/events/',
            '--partition_date': '{{ ds }}',
        },
    )

    # Data quality checks
    def validate_silver_data(**context):
        """Validate silver zone data quality."""
        import boto3
        athena = boto3.client('athena')

        # Row count check
        query = f"""
        SELECT COUNT(*) as row_count
        FROM silver.events
        WHERE partition_date = '{context['ds']}'
        """
        # Execute and validate (simplified)
        # Add full implementation with quality checks

    quality_check = PythonOperator(
        task_id='data_quality_check',
        python_callable=validate_silver_data,
    )

    wait_for_data >> process_data >> quality_check

Directory Structure

services/python/
├── data-ingestion/
│   ├── app/
│   │   ├── api/         # FastAPI routes
│   │   ├── services/    # Business logic
│   │   └── models/      # Pydantic models
│   ├── tests/
│   ├── Dockerfile
│   └── requirements.txt
├── etl-pipeline/
│   ├── jobs/            # PySpark jobs
│   ├── glue/            # Glue-specific code
│   ├── tests/
│   └── requirements.txt
└── airflow/
    ├── dags/            # Airflow DAGs
    ├── plugins/         # Custom operators
    └── requirements.txt

Resource Files

For detailed information on specific topics:

resources/pyspark-patterns.md

  • PySpark job structure and optimization
  • Window functions and aggregations
  • Join strategies and optimization
  • Partition management
  • Performance tuning
  • Catalyst optimizer usage

resources/airflow-best-practices.md

  • DAG design patterns
  • Task dependencies and sensors
  • Dynamic DAG generation
  • XComs and task communication
  • Backfilling strategies
  • Error handling and retries

resources/aws-glue-integration.md

  • Glue job development
  • Glue Data Catalog usage
  • DynamicFrames vs DataFrames
  • Bookmarks for incremental processing
  • Cost optimization
  • Security and IAM

resources/data-quality-validation.md

  • Great Expectations integration
  • Schema validation
  • Data profiling
  • Anomaly detection
  • SLA monitoring
  • Quality metrics

resources/fastapi-data-services.md

  • API design for data services
  • Authentication and authorization
  • Query optimization
  • Pagination and filtering
  • Async processing
  • Caching strategies

resources/streaming-processing.md

  • Kafka consumer patterns
  • Kinesis integration
  • Structured Streaming (PySpark)
  • Exactly-once semantics
  • Windowing and watermarks
  • State management

resources/testing-strategies.md

  • Unit testing PySpark
  • Integration testing pipelines
  • Data quality tests
  • Mocking AWS services
  • Test data generation
  • CI/CD for data pipelines

resources/error-handling-logging.md

  • Structured logging
  • Error handling patterns
  • Retry logic
  • Dead letter queues
  • Monitoring with CloudWatch
  • Alerting strategies

resources/performance-optimization.md

  • Spark tuning parameters
  • Partition optimization
  • Broadcast joins
  • Caching strategies
  • S3 read/write optimization
  • Cost optimization

resources/schema-management.md

  • Schema evolution strategies
  • Avro, Parquet, ORC formats
  • Glue Data Catalog management
  • Backward/forward compatibility
  • Schema registry patterns

Common Patterns

1. Idempotent ETL Processing

def process_partition(partition_date: str, force_reprocess: bool = False):
    """Process data idempotently."""
    output_path = f"s3://silver/{partition_date}/"

    # Check if already processed
    if not force_reprocess and path_exists(output_path):
        logger.info(f"Partition {partition_date} already processed, skipping")
        return

    # Process data
    df = read_bronze(partition_date)
    df_clean = transform_to_silver(df)

    # Atomic write (write to temp, then move)
    temp_path = f"{output_path}_temp/"
    df_clean.write.parquet(temp_path)

    # Move to final location (atomic on S3)
    move(temp_path, output_path)

2. Error Handling with DLQ

def process_record_safe(record):
    """Process with error handling and DLQ."""
    try:
        # Validation
        validated = validate_schema(record)

        # Processing
        result = transform(validated)

        return ("success", result)
    except ValidationError as e:
        logger.warning(f"Validation failed: {e}")
        return ("dlq", {"record": record, "error": str(e), "type": "validation"})
    except Exception as e:
        logger.error(f"Processing failed: {e}")
        return ("dlq", {"record": record, "error": str(e), "type": "processing"})

# Process batch
results = df.rdd.map(process_record_safe).collect()
success_records = [r for status, r in results if status == "success"]
dlq_records = [r for status, r in results if status == "dlq"]

# Write DLQ records for investigation
if dlq_records:
    write_to_dlq(dlq_records)

Best Practices Checklist

  • Use Pydantic for data validation at API layer
  • Implement schema enforcement in PySpark reads
  • Partition data by date for optimal query performance
  • Use broadcast joins for small dimension tables
  • Cache intermediate DataFrames when reused
  • Write idempotent processing logic
  • Implement dead letter queues for failed records
  • Use structured logging with correlation IDs
  • Monitor data quality metrics continuously
  • Optimize Spark configurations for workload
  • Use S3 partitioning aligned with query patterns
  • Implement incremental processing with bookmarks
  • Test with realistic data volumes
  • Use type hints throughout Python code
  • Follow PEP 8 style guidelines

Tools & Libraries

  • PySpark: 3.5+
  • Pandas: 2.0+ (for small-medium datasets)
  • FastAPI: 0.100+ (async support)
  • Pydantic: 2.0+ (validation)
  • Apache Airflow: 2.7+
  • Great Expectations: Data quality
  • boto3: AWS SDK for Python
  • pytest: Testing framework
  • black: Code formatting
  • mypy: Static type checking

Monitoring & Observability

import logging
from pythonjsonlogger import jsonlogger

# Structured logging
logger = logging.getLogger()
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
    '%(timestamp)s %(level)s %(name)s %(message)s %(correlation_id)s'
)
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)

# Usage with context
def process_with_context(data_id: str):
    with log_context(correlation_id=data_id):
        logger.info("Processing started", extra={"record_count": len(data)})
        # Process data
        logger.info("Processing completed")

Status: Production-Ready Last Updated: 2025-11-04 Follows: Anthropic 500-line rule, progressive disclosure pattern