Claude Code Plugins

Community-maintained marketplace

Feedback

Apache Spark, Hadoop, distributed computing, and large-scale data processing

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name big-data
description Apache Spark, Hadoop, distributed computing, and large-scale data processing for petabyte-scale workloads
sasmp_version 1.3.0
bonded_agent 01-data-engineer
bond_type PRIMARY_BOND
skill_version 2.0.0
last_updated 2025-01
complexity advanced
estimated_mastery_hours 160
prerequisites python-programming, sql-databases
unlocks data-warehousing, mlops, machine-learning

Big Data & Distributed Computing

Production-grade big data processing with Apache Spark, distributed systems patterns, and petabyte-scale data engineering.

Quick Start

# PySpark 3.5+ modern DataFrame API
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark with optimal settings
spark = (SparkSession.builder
    .appName("ProductionETL")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate())

# Efficient data loading with schema enforcement
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", LongType(), False),
    StructField("event_type", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("properties", StringType(), True)
])

df = (spark.read
    .schema(schema)
    .parquet("s3://bucket/events/")
    .filter(F.col("timestamp") >= F.current_date() - 30))

# Complex aggregation with window functions
window_spec = Window.partitionBy("user_id").orderBy("timestamp")

result = (df
    .withColumn("event_rank", F.row_number().over(window_spec))
    .withColumn("session_id", F.sum(
        F.when(
            F.col("timestamp") - F.lag("timestamp").over(window_spec) > F.expr("INTERVAL 30 MINUTES"),
            1
        ).otherwise(0)
    ).over(window_spec))
    .groupBy("user_id", "session_id")
    .agg(
        F.count("*").alias("event_count"),
        F.min("timestamp").alias("session_start"),
        F.max("timestamp").alias("session_end")
    ))

result.write.mode("overwrite").parquet("s3://bucket/sessions/")

Core Concepts

1. Spark Architecture Deep Dive

┌─────────────────────────────────────────────────────────┐
│                    Driver Program                        │
│  ┌─────────────────────────────────────────────────┐   │
│  │              SparkContext/SparkSession           │   │
│  │  - Creates execution plan (DAG)                  │   │
│  │  - Coordinates with Cluster Manager              │   │
│  │  - Schedules tasks                               │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│              Cluster Manager (YARN/K8s/Standalone)      │
└─────────────────────────────────────────────────────────┘
                           │
          ┌────────────────┼────────────────┐
          ▼                ▼                ▼
    ┌──────────┐    ┌──────────┐    ┌──────────┐
    │ Executor │    │ Executor │    │ Executor │
    │ ┌──────┐ │    │ ┌──────┐ │    │ ┌──────┐ │
    │ │Task 1│ │    │ │Task 2│ │    │ │Task 3│ │
    │ │Task 4│ │    │ │Task 5│ │    │ │Task 6│ │
    │ └──────┘ │    │ └──────┘ │    │ └──────┘ │
    │  Cache   │    │  Cache   │    │  Cache   │
    └──────────┘    └──────────┘    └──────────┘

2. Partition Optimization

from pyspark.sql import functions as F

# Check current partitioning
print(f"Partitions: {df.rdd.getNumPartitions()}")

# Rule of thumb: 128MB per partition, 2-4 partitions per core
# For 100GB data on 10 executors with 4 cores each:
# 100GB / 128MB ≈ 800 partitions, or 40 cores * 4 = 160 partitions
# Use: 200-400 partitions

# Repartition by key (for joins)
df_repartitioned = df.repartition(200, "user_id")

# Coalesce (reduce partitions without shuffle)
df_coalesced = df.coalesce(100)

# Optimal write partitioning
df.repartition(F.year("date"), F.month("date")) \
  .write \
  .partitionBy("year", "month") \
  .mode("overwrite") \
  .parquet("s3://bucket/output/")

# Bucketing for repeated joins
df.write \
  .bucketBy(256, "user_id") \
  .sortBy("user_id") \
  .saveAsTable("bucketed_events")

3. Join Optimization Strategies

from pyspark.sql import functions as F

# Broadcast join (small table < 10MB default, configurable to 100MB)
small_df = spark.read.parquet("s3://bucket/dim_product/")  # 5MB
large_df = spark.read.parquet("s3://bucket/fact_sales/")   # 500GB

# Explicit broadcast hint
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "product_id")

# Increase broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)  # 100MB

# Sort-Merge Join (for large tables)
# Both tables sorted and partitioned by join key
users = spark.read.parquet("users/").repartition(200, "user_id").sortWithinPartitions("user_id")
orders = spark.read.parquet("orders/").repartition(200, "user_id").sortWithinPartitions("user_id")
result = users.join(orders, "user_id")

# Skewed join handling (salting technique)
# If user_id has skew (some users have millions of rows)
salt_range = 10
salted_users = (users
    .withColumn("salt", F.explode(F.array([F.lit(i) for i in range(salt_range)])))
    .withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))

salted_orders = (orders
    .withColumn("salt", (F.rand() * salt_range).cast("int"))
    .withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))

result = salted_users.join(salted_orders, "salted_key").drop("salt", "salted_key")

4. Caching & Persistence

from pyspark import StorageLevel

# Caching strategies
df.cache()  # MEMORY_AND_DISK by default in Spark 3.x
df.persist(StorageLevel.MEMORY_ONLY)  # Fastest, may recompute if evicted
df.persist(StorageLevel.MEMORY_AND_DISK_SER)  # Compressed, slower but less memory
df.persist(StorageLevel.DISK_ONLY)  # For very large intermediate datasets

# When to cache:
# - Reused DataFrames (used in multiple actions)
# - After expensive transformations (joins, aggregations)
# - Before iterative algorithms

# Cache usage pattern
expensive_df = (spark.read.parquet("s3://bucket/large/")
    .filter(F.col("status") == "active")
    .join(broadcast(dim_df), "dim_key")
    .groupBy("category")
    .agg(F.sum("amount").alias("total")))

expensive_df.cache()
expensive_df.count()  # Materialize cache

# Use cached DataFrame multiple times
top_categories = expensive_df.orderBy(F.desc("total")).limit(10)
summary = expensive_df.agg(F.avg("total"), F.max("total"))

# Release cache when done
expensive_df.unpersist()

5. Structured Streaming

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Read from Kafka
kafka_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", 100000)
    .load())

# Parse JSON payload
event_schema = StructType([
    StructField("event_id", StringType()),
    StructField("user_id", LongType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType())
])

parsed_df = (kafka_df
    .select(F.from_json(F.col("value").cast("string"), event_schema).alias("data"))
    .select("data.*")
    .withWatermark("timestamp", "10 minutes"))

# Windowed aggregation
windowed_counts = (parsed_df
    .groupBy(
        F.window("timestamp", "5 minutes", "1 minute"),
        "event_type"
    )
    .count())

# Write to Delta Lake with checkpointing
query = (windowed_counts.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://bucket/checkpoints/events/")
    .trigger(processingTime="1 minute")
    .start("s3://bucket/streaming_output/"))

# Monitor stream
query.awaitTermination()

Tools & Technologies

Tool Purpose Version (2025)
Apache Spark Distributed processing 3.5+
Delta Lake ACID transactions 3.0+
Apache Iceberg Table format 1.4+
Apache Flink Stream processing 1.18+
Databricks Managed Spark platform Latest
AWS EMR Managed Hadoop/Spark 7.0+
Trino Interactive queries 400+
dbt Transform layer 1.7+

Learning Path

Phase 1: Foundations (Weeks 1-3)

Week 1: Distributed computing concepts, MapReduce
Week 2: Spark architecture, RDDs, DataFrames
Week 3: Spark SQL, basic transformations

Phase 2: Intermediate (Weeks 4-6)

Week 4: Joins, aggregations, window functions
Week 5: Partitioning, bucketing, caching
Week 6: Performance tuning, Spark UI analysis

Phase 3: Advanced (Weeks 7-10)

Week 7: Structured Streaming
Week 8: Delta Lake / Iceberg table formats
Week 9: Cluster sizing, cost optimization
Week 10: Advanced optimizations (AQE, skew handling)

Phase 4: Production (Weeks 11-14)

Week 11: Deployment on EMR/Databricks
Week 12: Monitoring, alerting, debugging
Week 13: CI/CD for Spark jobs
Week 14: Multi-cluster architectures

Production Patterns

Delta Lake UPSERT (Merge)

from delta.tables import DeltaTable

# Incremental UPSERT pattern
delta_table = DeltaTable.forPath(spark, "s3://bucket/users/")
updates_df = spark.read.parquet("s3://bucket/updates/")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.user_id = source.user_id"
).whenMatchedUpdate(set={
    "email": "source.email",
    "updated_at": "source.updated_at"
}).whenNotMatchedInsertAll().execute()

# Optimize after merge
delta_table.optimize().executeCompaction()
delta_table.vacuum(retentionHours=168)  # 7 days

Cost-Effective Cluster Configuration

# spark-submit configuration for 1TB processing job
"""
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --num-executors 50 \
    --executor-cores 4 \
    --executor-memory 16g \
    --driver-memory 8g \
    --conf spark.sql.adaptive.enabled=true \
    --conf spark.sql.adaptive.coalescePartitions.enabled=true \
    --conf spark.sql.shuffle.partitions=400 \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=10 \
    --conf spark.dynamicAllocation.maxExecutors=100 \
    --conf spark.speculation=true \
    job.py
"""

# Sizing guidelines:
# - Executor memory: 16-32GB (avoid GC overhead)
# - Executor cores: 4-5 (parallelism per executor)
# - Total cores: 2-4x data size in GB
# - Partitions: 2-4x total cores

Troubleshooting Guide

Common Failure Modes

Issue Symptoms Root Cause Fix
OOM Error "Container killed by YARN" Too much data per partition Increase partitions, reduce broadcast
Shuffle Spill Slow stage, disk I/O Insufficient memory Increase spark.memory.fraction
Skewed Tasks One task much slower Data skew on key Use salting, AQE skew handling
GC Overhead "GC overhead limit exceeded" Too many small objects Use Kryo serialization, reduce UDFs
Driver OOM Driver crash collect(), large broadcast Avoid collect, stream results

Debug Checklist

# 1. Check Spark UI (port 4040/18080)
# - Stages: Look for skewed tasks (max >> median)
# - Storage: Check cached data size
# - Environment: Verify configuration

# 2. Analyze execution plan
df.explain(mode="extended")

# 3. Check partition distribution
df.groupBy(F.spark_partition_id()).count().show()

# 4. Profile data skew
df.groupBy("key_column").count().orderBy(F.desc("count")).show(20)

# 5. Monitor job metrics
spark.sparkContext.setLogLevel("WARN")

# 6. Enable detailed metrics
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.eventLog.enabled", "true")

Reading Spark UI

Stage Analysis:
├── Duration: Total time for stage
├── Tasks: Number of parallel tasks
│   ├── Median: Typical task duration
│   ├── Max: Slowest task (check for skew)
│   └── Failed: Retry count
├── Input: Data read
├── Shuffle Read: Data from other stages
├── Shuffle Write: Data for downstream stages
└── Spill: Disk spill (indicates memory pressure)

Key Metrics:
├── GC Time > 10%: Memory issue
├── Shuffle Write > Input: Exploding join
├── Max/Median > 2x: Data skew
└── Spill > 0: Increase partitions or memory

Unit Test Template

import pytest
from pyspark.sql import SparkSession
from chispa.dataframe_comparer import assert_df_equality
import pyspark.sql.functions as F

@pytest.fixture(scope="session")
def spark():
    """Session-scoped Spark for tests."""
    return (SparkSession.builder
        .master("local[2]")
        .appName("UnitTests")
        .config("spark.sql.shuffle.partitions", 2)
        .getOrCreate())

@pytest.fixture
def sample_data(spark):
    return spark.createDataFrame([
        (1, "user1", 100.0),
        (2, "user2", 200.0),
        (3, "user1", 150.0),
    ], ["id", "user_id", "amount"])

class TestAggregations:

    def test_user_totals(self, spark, sample_data):
        # Arrange
        expected = spark.createDataFrame([
            ("user1", 250.0),
            ("user2", 200.0),
        ], ["user_id", "total"])

        # Act
        result = sample_data.groupBy("user_id").agg(
            F.sum("amount").alias("total")
        )

        # Assert
        assert_df_equality(result, expected, ignore_row_order=True)

    def test_handles_empty_dataframe(self, spark):
        # Arrange
        empty_df = spark.createDataFrame([], "id INT, amount DOUBLE")

        # Act
        result = empty_df.agg(F.sum("amount").alias("total")).collect()

        # Assert
        assert result[0]["total"] is None

    def test_window_functions(self, spark, sample_data):
        # Arrange
        from pyspark.sql.window import Window
        window = Window.partitionBy("user_id").orderBy("id")

        # Act
        result = sample_data.withColumn(
            "running_total",
            F.sum("amount").over(window)
        ).filter(F.col("user_id") == "user1")

        # Assert
        totals = [row["running_total"] for row in result.collect()]
        assert totals == [100.0, 250.0]

Best Practices

Performance

# ✅ DO: Use DataFrame API over RDD
df.filter(F.col("status") == "active")  # Catalyst optimized

# ❌ DON'T: Use RDD transformations
rdd.filter(lambda x: x["status"] == "active")  # No optimization

# ✅ DO: Use built-in functions
df.withColumn("upper_name", F.upper("name"))

# ❌ DON'T: Use Python UDFs (slow serialization)
@udf
def upper_name(name):
    return name.upper()

# ✅ DO: Broadcast small lookups
df.join(broadcast(small_df), "key")

# ✅ DO: Persist wisely
intermediate.cache()
intermediate.count()  # Force materialization
# ... use intermediate multiple times ...
intermediate.unpersist()

Code Organization

# ✅ DO: Chain transformations fluently
result = (df
    .filter(condition)
    .withColumn("new_col", F.expr("..."))
    .groupBy("key")
    .agg(F.sum("value")))

# ✅ DO: Use descriptive column aliases
.agg(
    F.count("*").alias("event_count"),
    F.avg("amount").alias("avg_amount")
)

# ✅ DO: Parameterize for reusability
def add_date_features(df, date_col):
    return (df
        .withColumn("year", F.year(date_col))
        .withColumn("month", F.month(date_col))
        .withColumn("day_of_week", F.dayofweek(date_col)))

Resources

Official Documentation

Performance Tuning

Books

  • "Learning Spark 2nd Edition" by Damji et al.
  • "Spark: The Definitive Guide" by Chambers & Zaharia
  • "High Performance Spark" by Karau & Warren

Next Skills

After mastering Big Data:

  • data-warehousing - Design dimensional models
  • mlops - Deploy ML at scale
  • streaming - Real-time with Flink/Kafka
  • cloud-platforms - AWS EMR, Databricks

Skill Certification Checklist:

  • Can optimize Spark jobs using EXPLAIN and Spark UI
  • Can implement efficient joins with broadcast and bucketing
  • Can handle data skew with salting techniques
  • Can build streaming pipelines with Structured Streaming
  • Can use Delta Lake for ACID operations