Claude Code Plugins

Community-maintained marketplace

Feedback

data-lake-management

@CROW-B3/.claude
0
0

Data Lake architecture and management including medallion architecture (bronze/silver/gold zones), data catalog with AWS Glue, partitioning strategies, schema evolution, data quality, governance, cost optimization, S3 lifecycle policies, data retention, compliance, query optimization with Athena, data formats (Parquet, ORC, Avro), incremental processing, CDC patterns, and production best practices for scalable data lakes.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name data-lake-management
description Data Lake architecture and management including medallion architecture (bronze/silver/gold zones), data catalog with AWS Glue, partitioning strategies, schema evolution, data quality, governance, cost optimization, S3 lifecycle policies, data retention, compliance, query optimization with Athena, data formats (Parquet, ORC, Avro), incremental processing, CDC patterns, and production best practices for scalable data lakes.

Data Lake Management Skill

Purpose

Design, build, and maintain production-grade data lakes on AWS using medallion architecture, ensuring data quality, governance, and cost optimization.

When to Use This Skill

Auto-activates when working with:

  • Data lake architecture design
  • Medallion architecture (Bronze/Silver/Gold)
  • AWS S3 data organization
  • AWS Glue Data Catalog
  • Data partitioning strategies
  • Schema evolution
  • Data quality and governance
  • Cost optimization
  • Query performance tuning

Core Concepts

Medallion Architecture

Bronze Zone (Raw)
├── Immutable source data
├── Original format
├── Minimal validation
└── Append-only

Silver Zone (Cleaned)
├── Validated and cleaned
├── Deduplicated
├── Standardized schema
└── Business rules applied

Gold Zone (Curated)
├── Aggregated
├── Business-ready
├── Optimized for queries
└── Subject-oriented

S3 Data Organization

s3://data-lake-{env}/
├── bronze/                      # Raw zone
│   ├── source_system_1/
│   │   ├── table_name/
│   │   │   ├── year=2025/
│   │   │   │   ├── month=11/
│   │   │   │   │   ├── day=04/
│   │   │   │   │   │   └── data.parquet
│   ├── _metadata/               # Metadata and manifests
│   └── _dlq/                    # Dead letter queue
├── silver/                      # Processed zone
│   ├── domain_1/
│   │   ├── entity_name/
│   │   │   ├── date=2025-11-04/
│   │   │   │   └── part-*.parquet
├── gold/                        # Curated zone
│   ├── analytics/
│   │   ├── daily_aggregates/
│   │   ├── monthly_reports/
└── temp/                        # Temporary processing

Quick Start Examples

1. Bronze Zone Ingestion

import boto3
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime
import json

class BronzeIngestion:
    """Ingest raw data to bronze zone."""

    def __init__(self, bucket: str, source_system: str):
        self.s3 = boto3.client('s3')
        self.bucket = bucket
        self.source_system = source_system

    def ingest(self, data: list, table_name: str):
        """Ingest data with partitioning by ingestion date."""
        now = datetime.now()
        partition_path = f"year={now.year}/month={now.month:02d}/day={now.day:02d}"

        # Convert to Arrow Table
        table = pa.Table.from_pylist(data)

        # Add metadata
        metadata = {
            b'ingestion_timestamp': now.isoformat().encode(),
            b'source_system': self.source_system.encode(),
            b'record_count': str(len(data)).encode()
        }
        table = table.replace_schema_metadata(metadata)

        # Write to S3
        s3_path = f"s3://{self.bucket}/bronze/{self.source_system}/{table_name}/{partition_path}/data.parquet"

        pq.write_to_dataset(
            table,
            root_path=s3_path,
            filesystem=pa.fs.S3FileSystem(),
            compression='snappy',
            use_dictionary=True,
            write_statistics=True
        )

        # Update Glue Data Catalog
        self.update_catalog(table_name, partition_path)

    def update_catalog(self, table_name: str, partition: str):
        """Update Glue Data Catalog with new partition."""
        glue = boto3.client('glue')

        try:
            glue.create_partition(
                DatabaseName='bronze',
                TableName=f"{self.source_system}_{table_name}",
                PartitionInput={
                    'Values': partition.split('/'),
                    'StorageDescriptor': {
                        'Location': f"s3://{self.bucket}/bronze/{self.source_system}/{table_name}/{partition}/",
                        'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
                        'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
                        'SerdeInfo': {
                            'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
                        }
                    }
                }
            )
        except glue.exceptions.AlreadyExistsException:
            pass  # Partition already exists

2. Silver Zone Processing (PySpark)

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable

class SilverZoneProcessor:
    """Process bronze to silver with Delta Lake."""

    def __init__(self):
        self.spark = (SparkSession.builder
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .getOrCreate())

    def process_incremental(self, bronze_table: str, silver_path: str, primary_key: str):
        """Incremental processing with deduplication and SCD Type 2."""

        # Read bronze (new records only)
        df_bronze = (self.spark.read
            .format("delta")
            .load(bronze_table)
            .filter(col("processed") == False))

        # Clean and transform
        df_cleaned = (df_bronze
            # Remove corrupt records
            .filter(col("_corrupt_record").isNull())

            # Standardize schema
            .select(
                col("id").cast("string").alias("id"),
                col("timestamp").cast("timestamp"),
                col("value").cast("double"),
                to_json(col("metadata")).alias("metadata_json")
            )

            # Add processing metadata
            .withColumn("processed_at", current_timestamp())
            .withColumn("is_active", lit(True))
            .withColumn("valid_from", col("timestamp"))
            .withColumn("valid_to", lit(None).cast("timestamp"))

            # Deduplicate (keep latest per key)
            .withColumn("row_num",
                row_number().over(
                    Window.partitionBy(primary_key)
                    .orderBy(col("timestamp").desc())
                )
            )
            .filter(col("row_num") == 1)
            .drop("row_num")
        )

        # Merge into silver (SCD Type 2)
        if DeltaTable.isDeltaTable(self.spark, silver_path):
            delta_table = DeltaTable.forPath(self.spark, silver_path)

            # Close out old records
            delta_table.alias("target").merge(
                df_cleaned.alias("source"),
                f"target.{primary_key} = source.{primary_key} AND target.is_active = true"
            ).whenMatchedUpdate(
                condition="source.timestamp > target.valid_from",
                set={
                    "is_active": "false",
                    "valid_to": "source.timestamp"
                }
            ).execute()

            # Insert new records
            delta_table.alias("target").merge(
                df_cleaned.alias("source"),
                f"target.{primary_key} = source.{primary_key} AND target.is_active = true"
            ).whenNotMatchedInsert(
                values={
                    col: f"source.{col}" for col in df_cleaned.columns
                }
            ).execute()
        else:
            # Initial load
            df_cleaned.write.format("delta").save(silver_path)

        # Mark bronze records as processed
        (df_bronze
            .withColumn("processed", lit(True))
            .write.format("delta").mode("overwrite").save(bronze_table))

        # Optimize delta table
        delta_table.optimize().executeCompaction()

3. Gold Zone Aggregation

class GoldZoneAggregator:
    """Create business-ready aggregated tables."""

    def create_daily_summary(self, silver_path: str, gold_path: str, date: str):
        """Daily aggregation for analytics."""

        df_silver = spark.read.format("delta").load(silver_path)

        df_gold = (df_silver
            .filter(col("date") == date)
            .filter(col("is_active") == True)

            # Business aggregations
            .groupBy("date", "category", "region")
            .agg(
                count("*").alias("total_count"),
                sum("value").alias("total_value"),
                avg("value").alias("avg_value"),
                min("value").alias("min_value"),
                max("value").alias("max_value"),
                stddev("value").alias("stddev_value"),
                approx_count_distinct("user_id").alias("unique_users")
            )

            # Add derived metrics
            .withColumn("value_per_user", col("total_value") / col("unique_users"))

            # Add metadata
            .withColumn("created_at", current_timestamp())
            .withColumn("data_version", lit("1.0"))
        )

        # Write with overwrite for specific partition
        (df_gold.write
            .format("delta")
            .mode("overwrite")
            .option("replaceWhere", f"date = '{date}'")
            .partitionBy("date")
            .save(gold_path))

        # Create/update Athena table
        self.create_athena_table(gold_path, "daily_summary")

Data Quality Framework

from great_expectations.dataset import SparkDFDataset
import great_expectations as ge

class DataQualityValidator:
    """Data quality validation with Great Expectations."""

    def validate_silver_data(self, df):
        """Validate silver zone data quality."""

        ge_df = SparkDFDataset(df)

        # Schema validation
        ge_df.expect_column_to_exist("id")
        ge_df.expect_column_to_exist("timestamp")
        ge_df.expect_column_to_exist("value")

        # Data validation
        ge_df.expect_column_values_to_not_be_null("id")
        ge_df.expect_column_values_to_be_unique("id")
        ge_df.expect_column_values_to_be_between("value", min_value=0, max_value=1000000)

        # Business rules
        ge_df.expect_column_values_to_match_regex("email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")

        # Get validation results
        results = ge_df.validate()

        if not results.success:
            failed_expectations = [exp for exp in results.results if not exp.success]
            raise DataQualityError(f"Validation failed: {failed_expectations}")

        return results

Resource Files

resources/partitioning-strategies.md

  • Partition key selection
  • Multi-level partitioning
  • Partition pruning optimization
  • Partition size management

resources/schema-evolution.md

  • Schema versioning
  • Backward compatibility
  • Column addition/removal
  • Data type changes

resources/cost-optimization.md

  • S3 storage classes
  • Lifecycle policies
  • Compression formats
  • Query optimization

resources/data-governance.md

  • Access control (Lake Formation)
  • Data lineage tracking
  • Compliance (GDPR, HIPAA)
  • Data retention policies

resources/query-optimization.md

  • Athena query tuning
  • Partition pruning
  • Columnar format benefits
  • Caching strategies

Best Practices

  • Partition by date for time-series data
  • Use Parquet with Snappy compression
  • Implement schema evolution carefully
  • Monitor data quality continuously
  • Use Delta Lake for ACID transactions
  • Implement data lineage tracking
  • Set up lifecycle policies for cost control
  • Use Glue Data Catalog for discoverability
  • Implement incremental processing
  • Monitor query costs and optimize
  • Use proper access controls
  • Document data schemas
  • Implement data retention policies

Common Patterns

Incremental Processing with Bookmarks

# Read with bookmark
bookmark = get_bookmark("my_job")
df_new = spark.read.parquet(source).filter(col("timestamp") > bookmark)

# Process
df_processed = transform(df_new)

# Write and update bookmark
df_processed.write.parquet(target)
update_bookmark("my_job", df_new.agg(max("timestamp")).collect()[0][0])

Data Catalog Registration

glue.create_table(
    DatabaseName='silver',
    TableInput={
        'Name': 'events',
        'StorageDescriptor': {
            'Columns': schema_columns,
            'Location': 's3://data-lake/silver/events/',
            'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
            'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
            'SerdeInfo': {
                'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
            }
        },
        'PartitionKeys': [
            {'Name': 'date', 'Type': 'string'}
        ]
    }
)

Status: Production-Ready Last Updated: 2025-11-04 Architecture: Medallion (Bronze → Silver → Gold)