Claude Code Plugins

Community-maintained marketplace

Feedback

Bronze/Silver/Gold layer design patterns and templates for building scalable data lakehouse architectures. Includes incremental processing, data quality checks, and optimization strategies.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name medallion-architecture
description Bronze/Silver/Gold layer design patterns and templates for building scalable data lakehouse architectures. Includes incremental processing, data quality checks, and optimization strategies.
triggers medallion architecture, bronze silver gold, data lakehouse layers, multi-hop architecture
category architecture

Medallion Architecture Skill

Overview

The medallion architecture (also called multi-hop architecture) is a design pattern for organizing data in a lakehouse using three progressive layers:

  • Bronze (Raw): Ingested data in its original format
  • Silver (Refined): Cleansed and conformed data
  • Gold (Curated): Business-level aggregates and features

When to Use This Skill

Use this skill when you need to:

  • Design a new data pipeline with proper layering
  • Migrate from traditional ETL to lakehouse architecture
  • Implement incremental processing patterns
  • Build a scalable data platform
  • Ensure data quality at each layer

Architecture Principles

1. Bronze Layer (Raw)

Purpose: Store raw data exactly as received from source systems

Characteristics:

  • Immutable historical record
  • Schema-on-read approach
  • Metadata enrichment (_ingested_at, _source_file)
  • Minimal transformations
  • Full audit trail

Use Cases:

  • Data recovery
  • Reprocessing requirements
  • Audit compliance
  • Debugging data issues

2. Silver Layer (Refined)

Purpose: Cleansed, validated, and standardized data

Characteristics:

  • Schema enforcement
  • Data quality checks
  • Deduplication
  • Standardization
  • Type conversions
  • Business rules applied

Use Cases:

  • Downstream analytics
  • Feature engineering
  • Data science modeling
  • Operational reporting

3. Gold Layer (Curated)

Purpose: Business-level aggregates optimized for consumption

Characteristics:

  • Highly aggregated
  • Optimized for queries
  • Business KPIs
  • Feature tables
  • Production-ready datasets

Use Cases:

  • Dashboards and BI
  • ML model serving
  • Real-time applications
  • Executive reporting

Implementation Patterns

Pattern 1: Batch Processing

Bronze Layer:

def ingest_to_bronze(source_path: str, target_table: str):
    """Ingest raw data to Bronze layer."""
    df = (spark.read
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .load(source_path)
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", input_file_name())
    )
    
    (df.write
        .format("delta")
        .mode("append")
        .option("mergeSchema", "true")
        .saveAsTable(target_table)
    )

Silver Layer:

def process_to_silver(bronze_table: str, silver_table: str):
    """Transform Bronze to Silver with quality checks."""
    bronze_df = spark.read.table(bronze_table)
    
    silver_df = (bronze_df
        .dropDuplicates(["id"])
        .filter(col("id").isNotNull())
        .withColumn("email", lower(trim(col("email"))))
        .withColumn("created_date", to_date(col("created_at")))
        .withColumn("quality_score", 
            when(col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$"), 1.0)
            .otherwise(0.5)
        )
    )
    
    (silver_df.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(silver_table)
    )

Gold Layer:

def aggregate_to_gold(silver_table: str, gold_table: str):
    """Aggregate Silver to Gold business metrics."""
    silver_df = spark.read.table(silver_table)
    
    gold_df = (silver_df
        .groupBy("customer_segment", "region")
        .agg(
            count("*").alias("customer_count"),
            sum("lifetime_value").alias("total_ltv"),
            avg("quality_score").alias("avg_quality")
        )
        .withColumn("updated_at", current_timestamp())
    )
    
    (gold_df.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(gold_table)
    )

Pattern 2: Incremental Processing

Bronze (Streaming):

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load(source_path)
    .withColumn("_ingested_at", current_timestamp())
    .writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .toTable(bronze_table)
)

Silver (Incremental Merge):

from delta.tables import DeltaTable

def incremental_silver_merge(bronze_table: str, silver_table: str, watermark: str):
    """Incrementally merge new Bronze data into Silver."""
    
    # Get new records since last watermark
    new_records = (spark.read.table(bronze_table)
        .filter(col("_ingested_at") > watermark)
    )
    
    # Transform
    transformed = transform_to_silver(new_records)
    
    # Merge into Silver
    silver = DeltaTable.forName(spark, silver_table)
    
    (silver.alias("target")
        .merge(
            transformed.alias("source"),
            "target.id = source.id"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

Data Quality Patterns

Quality Checks at Each Layer

Bronze:

  • File completeness check
  • Row count validation
  • Schema drift detection

Silver:

  • Null value checks
  • Data type validation
  • Business rule validation
  • Referential integrity
  • Duplicate detection

Gold:

  • Aggregate accuracy
  • KPI threshold checks
  • Trend anomaly detection
  • Completeness validation

Quality Check Implementation

def validate_silver_quality(table_name: str) -> Dict[str, bool]:
    """Run quality checks on Silver table."""
    df = spark.read.table(table_name)
    
    checks = {
        "no_null_ids": df.filter(col("id").isNull()).count() == 0,
        "valid_emails": df.filter(
            ~col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$")
        ).count() == 0,
        "no_duplicates": df.count() == df.select("id").distinct().count(),
        "within_date_range": df.filter(
            (col("created_date") < "2020-01-01") |
            (col("created_date") > current_date())
        ).count() == 0
    }
    
    return checks

Optimization Strategies

Bronze Layer Optimization

-- Partition by ingestion date
CREATE TABLE bronze.raw_events
USING delta
PARTITIONED BY (ingestion_date)
AS SELECT *, current_date() as ingestion_date FROM source;

-- Enable auto-optimize
ALTER TABLE bronze.raw_events
SET TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact' = 'true'
);

Silver Layer Optimization

-- Z-ORDER for common filters
OPTIMIZE silver.customers
ZORDER BY (customer_segment, region, created_date);

-- Enable Change Data Feed
ALTER TABLE silver.customers
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

Gold Layer Optimization

-- Liquid clustering for query performance
CREATE TABLE gold.customer_metrics
USING delta
CLUSTER BY (customer_segment, date)
AS SELECT * FROM aggregated_metrics;

-- Optimize and vacuum
OPTIMIZE gold.customer_metrics;
VACUUM gold.customer_metrics RETAIN 168 HOURS;

Complete Example

See /templates/bronze-silver-gold/ for a complete implementation including:

  • Project structure
  • Bronze ingestion scripts
  • Silver transformation logic
  • Gold aggregation queries
  • Data quality tests
  • Deployment configuration

Best Practices

  1. Idempotency: Ensure pipelines can be re-run safely
  2. Incrementality: Process only new/changed data
  3. Quality Gates: Block bad data from progressing
  4. Schema Evolution: Handle schema changes gracefully
  5. Monitoring: Track pipeline health and data quality
  6. Documentation: Document data lineage and transformations
  7. Testing: Unit test transformations, integration test pipelines

Common Pitfalls to Avoid

Don't:

  • Mix transformation logic across layers
  • Skip Bronze layer to "save storage"
  • Over-aggregate too early
  • Ignore data quality in Silver
  • Hard-code business logic in Bronze

Do:

  • Keep Bronze immutable
  • Enforce quality in Silver
  • Optimize Gold for consumption
  • Use incremental processing
  • Implement proper monitoring

Related Skills

  • delta-live-tables: Declarative pipeline orchestration
  • data-quality: Great Expectations integration
  • testing-patterns: Pipeline testing strategies
  • cicd-workflows: Deployment automation

References