| name | apache-spark-data-processing |
| description | Complete guide for Apache Spark data processing including RDDs, DataFrames, Spark SQL, streaming, MLlib, and production deployment |
| tags | spark, big-data, distributed-computing, dataframes, streaming, machine-learning |
| tier | tier-1 |
Apache Spark Data Processing
A comprehensive skill for mastering Apache Spark data processing, from basic RDD operations to advanced streaming, SQL, and machine learning workflows. Learn to build scalable, distributed data pipelines and analytics systems.
When to Use This Skill
Use Apache Spark when you need to:
- Process Large-Scale Data: Handle datasets too large for single-machine processing (TB to PB scale)
- Perform Distributed Computing: Execute parallel computations across cluster nodes
- Real-Time Stream Processing: Process continuous data streams with low latency
- Complex Data Analytics: Run sophisticated analytics, aggregations, and transformations
- Machine Learning at Scale: Train ML models on massive datasets
- ETL/ELT Pipelines: Build robust data transformation and loading workflows
- Interactive Data Analysis: Perform exploratory analysis on large datasets
- Unified Data Processing: Combine batch and streaming workloads in one framework
Not Ideal For:
- Small datasets (<100 GB) that fit in memory on a single machine
- Simple CRUD operations (use traditional databases)
- Ultra-low latency requirements (<10ms) where specialized stream processors excel
- Workflows requiring strong ACID transactions across distributed data
Core Concepts
Resilient Distributed Datasets (RDDs)
RDDs are Spark's fundamental data abstraction - immutable, distributed collections of objects that can be processed in parallel.
Key Characteristics:
- Resilient: Fault-tolerant through lineage tracking
- Distributed: Partitioned across cluster nodes
- Immutable: Transformations create new RDDs, not modify existing ones
- Lazy Evaluation: Transformations build computation graph; actions trigger execution
- In-Memory Computing: Cache intermediate results for iterative algorithms
RDD Operations:
- Transformations: Lazy operations that return new RDDs (map, filter, flatMap, reduceByKey)
- Actions: Operations that trigger computation and return values (collect, count, reduce, saveAsTextFile)
When to Use RDDs:
- Low-level control over data distribution and partitioning
- Custom partitioning schemes required
- Working with unstructured data (text files, binary data)
- Migrating legacy code from early Spark versions
Prefer DataFrames/Datasets when possible - they provide automatic optimization via Catalyst optimizer.
DataFrames and Datasets
DataFrames are distributed collections of data organized into named columns - similar to a database table or pandas DataFrame, but with powerful optimizations.
DataFrames:
- Structured data with schema
- Automatic query optimization (Catalyst)
- Cross-language support (Python, Scala, Java, R)
- Rich API for SQL-like operations
Datasets (Scala/Java only):
- Typed DataFrames with compile-time type safety
- Best performance in Scala due to JVM optimization
- Combine RDD type safety with DataFrame optimizations
Key Advantages Over RDDs:
- Query Optimization: Catalyst optimizer rewrites queries for efficiency
- Tungsten Execution: Optimized CPU and memory usage
- Columnar Storage: Efficient data representation
- Code Generation: Compile-time bytecode generation for faster execution
Lazy Evaluation
Spark uses lazy evaluation to optimize execution:
- Transformations build a Directed Acyclic Graph (DAG) of operations
- Actions trigger execution of the DAG
- Spark's optimizer analyzes the entire DAG and creates an optimized execution plan
- Work is distributed across cluster nodes
Benefits:
- Minimize data movement across network
- Combine multiple operations into single stage
- Eliminate unnecessary computations
- Optimize memory usage
Partitioning
Data is divided into partitions for parallel processing:
- Default Partitioning: Typically based on HDFS block size or input source
- Hash Partitioning: Distribute data by key hash (used by groupByKey, reduceByKey)
- Range Partitioning: Distribute data by key ranges (useful for sorted data)
- Custom Partitioning: Define your own partitioning logic
Partition Count Considerations:
- Too few partitions: Underutilized cluster, large task execution time
- Too many partitions: Scheduling overhead, small task execution time
- General rule: 2-4 partitions per CPU core in cluster
- Use
repartition()orcoalesce()to adjust partition count
Caching and Persistence
Cache frequently accessed data in memory for performance:
# Cache DataFrame in memory
df.cache() # Shorthand for persist(StorageLevel.MEMORY_AND_DISK)
# Different storage levels
df.persist(StorageLevel.MEMORY_ONLY) # Fast but may lose data if evicted
df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk if memory full
df.persist(StorageLevel.DISK_ONLY) # Store only on disk
df.persist(StorageLevel.MEMORY_ONLY_SER) # Serialized in memory (more compact)
# Unpersist when done
df.unpersist()
When to Cache:
- Data used multiple times in workflow
- Iterative algorithms (ML training)
- Interactive analysis sessions
- Expensive transformations reused downstream
When Not to Cache:
- Data used only once
- Very large datasets that exceed cluster memory
- Streaming applications with continuous new data
Spark SQL
Spark SQL allows you to query structured data using SQL or DataFrame API:
- Execute SQL queries on DataFrames and tables
- Register DataFrames as temporary views
- Join structured and semi-structured data
- Connect to Hive metastore for table metadata
- Support for various data sources (Parquet, ORC, JSON, CSV, JDBC)
Performance Features:
- Catalyst Optimizer: Rule-based and cost-based query optimization
- Tungsten Execution Engine: Whole-stage code generation, vectorized processing
- Adaptive Query Execution (AQE): Runtime optimization based on statistics
- Dynamic Partition Pruning: Skip irrelevant partitions during execution
Broadcast Variables and Accumulators
Shared variables for efficient distributed computing:
Broadcast Variables:
- Read-only variables cached on each node
- Efficient for sharing large read-only data (lookup tables, ML models)
- Avoid sending large data with every task
# Broadcast a lookup table
lookup_table = {"key1": "value1", "key2": "value2"}
broadcast_lookup = sc.broadcast(lookup_table)
# Use in transformations
rdd.map(lambda x: broadcast_lookup.value.get(x, "default"))
Accumulators:
- Write-only variables for aggregating values across tasks
- Used for counters and sums in distributed operations
- Only driver can read final accumulated value
# Create accumulator
error_count = sc.accumulator(0)
# Increment in tasks
rdd.foreach(lambda x: error_count.add(1) if is_error(x) else None)
# Read final value in driver
print(f"Total errors: {error_count.value}")
Spark SQL Deep Dive
DataFrame Creation
Create DataFrames from various sources:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()
# From structured data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
columns = ["name", "id"]
df = spark.createDataFrame(data, columns)
# From files
df_json = spark.read.json("path/to/file.json")
df_parquet = spark.read.parquet("path/to/file.parquet")
df_csv = spark.read.option("header", "true").csv("path/to/file.csv")
# From JDBC sources
df_jdbc = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:port/database") \
.option("dbtable", "table_name") \
.option("user", "username") \
.option("password", "password") \
.load()
DataFrame Operations
Common DataFrame transformations:
# Select columns
df.select("name", "age").show()
# Filter rows
df.filter(df.age > 21).show()
df.where(df["age"] > 21).show() # Alternative syntax
# Add/modify columns
from pyspark.sql.functions import col, lit
df.withColumn("age_plus_10", col("age") + 10).show()
df.withColumn("country", lit("USA")).show()
# Aggregations
df.groupBy("department").count().show()
df.groupBy("department").agg({"salary": "avg", "age": "max"}).show()
# Sorting
df.orderBy("age").show()
df.orderBy(col("age").desc()).show()
# Joins
df1.join(df2, df1.id == df2.user_id, "inner").show()
df1.join(df2, "id", "left_outer").show()
# Unions
df1.union(df2).show()
SQL Queries
Execute SQL on DataFrames:
# Register DataFrame as temporary view
df.createOrReplaceTempView("people")
# Run SQL queries
sql_result = spark.sql("SELECT name FROM people WHERE age > 21")
sql_result.show()
# Complex queries
result = spark.sql("""
SELECT
department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MAX(age) as max_age
FROM people
WHERE age > 25
GROUP BY department
HAVING COUNT(*) > 5
ORDER BY avg_salary DESC
""")
result.show()
Data Sources
Spark SQL supports multiple data formats:
Parquet (Recommended for Analytics):
- Columnar storage format
- Excellent compression and query performance
- Schema embedded in file
- Supports predicate pushdown and column pruning
# Write
df.write.parquet("output/path", mode="overwrite", compression="snappy")
# Read with partition pruning
df = spark.read.parquet("output/path").filter(col("date") == "2025-01-01")
ORC (Optimized Row Columnar):
- Similar to Parquet with slightly better compression
- Preferred for Hive integration
- Built-in indexes for faster queries
df.write.orc("output/path", mode="overwrite")
df = spark.read.orc("output/path")
JSON (Semi-Structured Data):
- Human-readable but less efficient
- Schema inference on read
- Good for nested/complex data
# Read with explicit schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.schema(schema).json("data.json")
CSV (Legacy/Simple Data):
- Widely compatible but slow
- Requires header inference or explicit schema
- Minimal compression benefits
df.write.csv("output.csv", header=True, mode="overwrite")
df = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")
Window Functions
Advanced analytics with window functions:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum, avg
# Define window specification
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
# Ranking functions
df.withColumn("rank", rank().over(window_spec)).show()
df.withColumn("row_num", row_number().over(window_spec)).show()
df.withColumn("dense_rank", dense_rank().over(window_spec)).show()
# Aggregate functions over window
df.withColumn("dept_avg_salary", avg("salary").over(window_spec)).show()
df.withColumn("running_total", sum("salary").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()
# Offset functions
df.withColumn("prev_salary", lag("salary", 1).over(window_spec)).show()
df.withColumn("next_salary", lead("salary", 1).over(window_spec)).show()
User-Defined Functions (UDFs)
Create custom transformations:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
# Python UDF (slower due to serialization overhead)
def categorize_age(age):
if age < 18:
return "Minor"
elif age < 65:
return "Adult"
else:
return "Senior"
categorize_udf = udf(categorize_age, StringType())
df.withColumn("age_category", categorize_udf(col("age"))).show()
# Pandas UDF (vectorized, faster for large datasets)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(IntegerType())
def square(series: pd.Series) -> pd.Series:
return series ** 2
df.withColumn("age_squared", square(col("age"))).show()
UDF Performance Tips:
- Use built-in Spark functions when possible (always faster)
- Prefer Pandas UDFs over Python UDFs for better performance
- Use Scala UDFs for maximum performance (no serialization overhead)
- Cache DataFrames before applying UDFs if used multiple times
Transformations and Actions
Common Transformations
map: Apply function to each element
# RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
squared = rdd.map(lambda x: x * 2) # [2, 4, 6, 8, 10]
# DataFrame (use select with functions)
from pyspark.sql.functions import col
df.select(col("value") * 2).show()
filter: Select elements matching predicate
# RDD
rdd.filter(lambda x: x > 2).collect() # [3, 4, 5]
# DataFrame
df.filter(col("age") > 25).show()
flatMap: Map and flatten results
# RDD - Split text into words
lines = sc.parallelize(["hello world", "apache spark"])
words = lines.flatMap(lambda line: line.split(" ")) # ["hello", "world", "apache", "spark"]
reduceByKey: Aggregate values by key
# Word count example
words = sc.parallelize(["apple", "banana", "apple", "cherry", "banana", "apple"])
word_pairs = words.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
# Result: [("apple", 3), ("banana", 2), ("cherry", 1)]
groupByKey: Group values by key (avoid when possible - use reduceByKey instead)
# Less efficient than reduceByKey
word_pairs.groupByKey().mapValues(list).collect()
# Result: [("apple", [1, 1, 1]), ("banana", [1, 1]), ("cherry", [1])]
join: Combine datasets by key
# RDD join
users = sc.parallelize([("user1", "Alice"), ("user2", "Bob")])
orders = sc.parallelize([("user1", 100), ("user2", 200), ("user1", 150)])
users.join(orders).collect()
# Result: [("user1", ("Alice", 100)), ("user1", ("Alice", 150)), ("user2", ("Bob", 200))]
# DataFrame join (more efficient)
df_users.join(df_orders, "user_id", "inner").show()
distinct: Remove duplicates
# RDD
rdd.distinct().collect()
# DataFrame
df.distinct().show()
df.dropDuplicates(["user_id"]).show() # Drop based on specific columns
coalesce/repartition: Change partition count
# Reduce partitions (no shuffle, more efficient)
df.coalesce(1).write.parquet("output")
# Increase/decrease partitions (involves shuffle)
df.repartition(10).write.parquet("output")
df.repartition(10, "user_id").write.parquet("output") # Partition by column
Common Actions
collect: Retrieve all data to driver
results = rdd.collect() # Returns list
# WARNING: Only use on small datasets that fit in driver memory
count: Count elements
total = df.count() # Number of rows
first/take: Get first N elements
first_elem = rdd.first()
first_five = rdd.take(5)
reduce: Aggregate all elements
total_sum = rdd.reduce(lambda a, b: a + b)
foreach: Execute function on each element
# Side effects only (no return value)
rdd.foreach(lambda x: print(x))
saveAsTextFile: Write to file system
rdd.saveAsTextFile("hdfs://path/to/output")
show: Display DataFrame rows (action)
df.show(20, truncate=False) # Show 20 rows, don't truncate columns
Structured Streaming
Process continuous data streams using DataFrame API.
Core Concepts
Streaming DataFrame:
- Unbounded table that grows continuously
- Same operations as batch DataFrames
- Micro-batch processing (default) or continuous processing
Input Sources:
- File sources (JSON, Parquet, CSV, ORC, text)
- Kafka
- Socket (for testing)
- Rate source (for testing)
- Custom sources
Output Modes:
- Append: Only new rows added to result table
- Complete: Entire result table written every trigger
- Update: Only updated rows written
Output Sinks:
- File sinks (Parquet, ORC, JSON, CSV, text)
- Kafka
- Console (for debugging)
- Memory (for testing)
- Foreach/ForeachBatch (custom logic)
Basic Streaming Example
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
# Read stream from JSON files
input_stream = spark.readStream \
.format("json") \
.schema(schema) \
.option("maxFilesPerTrigger", 1) \
.load("input/directory")
# Transform streaming data
processed = input_stream \
.filter(col("value") > 10) \
.select("id", "value", "timestamp")
# Write stream to Parquet
query = processed.writeStream \
.format("parquet") \
.option("path", "output/directory") \
.option("checkpointLocation", "checkpoint/directory") \
.outputMode("append") \
.start()
# Wait for termination
query.awaitTermination()
Stream-Static Joins
Join streaming data with static reference data:
# Static DataFrame (loaded once)
static_df = spark.read.parquet("reference/data")
# Streaming DataFrame
streaming_df = spark.readStream.format("kafka").load()
# Inner join (supported)
joined = streaming_df.join(static_df, "type")
# Left outer join (supported)
joined = streaming_df.join(static_df, "type", "left_outer")
# Write result
joined.writeStream \
.format("parquet") \
.option("path", "output") \
.option("checkpointLocation", "checkpoint") \
.start()
Windowed Aggregations
Aggregate data over time windows:
from pyspark.sql.functions import window, col, count
# 10-minute tumbling window
windowed_counts = streaming_df \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("word")
) \
.count()
# 10-minute sliding window with 5-minute slide
windowed_counts = streaming_df \
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("word")
) \
.count()
# Write to console for debugging
query = windowed_counts.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.start()
Watermarking for Late Data
Handle late-arriving data with watermarks:
from pyspark.sql.functions import window
# Define watermark (10 minutes tolerance for late data)
windowed_counts = streaming_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("word")
) \
.count()
# Data arriving more than 10 minutes late will be dropped
Watermark Benefits:
- Limit state size by dropping old aggregation state
- Handle late data within tolerance window
- Improve performance by not maintaining infinite state
Session Windows
Group events into sessions based on inactivity gaps:
from pyspark.sql.functions import session_window, when
# Dynamic session window based on user
session_window_spec = session_window(
col("timestamp"),
when(col("userId") == "user1", "5 seconds")
.when(col("userId") == "user2", "20 seconds")
.otherwise("5 minutes")
)
sessionized_counts = streaming_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(session_window_spec, col("userId")) \
.count()
Stateful Stream Processing
Maintain state across micro-batches:
from pyspark.sql.functions import expr
# Deduplication using state
deduplicated = streaming_df \
.withWatermark("timestamp", "1 hour") \
.dropDuplicates(["user_id", "event_id"])
# Stream-stream joins (stateful)
stream1 = spark.readStream.format("kafka").option("subscribe", "topic1").load()
stream2 = spark.readStream.format("kafka").option("subscribe", "topic2").load()
joined = stream1 \
.withWatermark("timestamp", "10 minutes") \
.join(
stream2.withWatermark("timestamp", "20 minutes"),
expr("stream1.user_id = stream2.user_id AND stream1.timestamp >= stream2.timestamp AND stream1.timestamp <= stream2.timestamp + interval 15 minutes"),
"inner"
)
Checkpointing
Ensure fault tolerance with checkpoints:
# Checkpoint location stores:
# - Stream metadata (offsets, configuration)
# - State information (for stateful operations)
# - Write-ahead logs
query = streaming_df.writeStream \
.format("parquet") \
.option("path", "output") \
.option("checkpointLocation", "checkpoint/dir") # REQUIRED for production \
.start()
# Recovery: Restart query with same checkpoint location
# Spark will resume from last committed offset
Checkpoint Best Practices:
- Always set checkpointLocation for production streams
- Use reliable distributed storage (HDFS, S3) for checkpoints
- Don't delete checkpoint directory while stream is running
- Back up checkpoints for disaster recovery
Machine Learning with MLlib
Spark's scalable machine learning library.
Core Components
MLlib Features:
- ML Pipelines: Chain transformations and models
- Featurization: Vector assemblers, scalers, encoders
- Classification & Regression: Linear models, tree-based models, neural networks
- Clustering: K-means, Gaussian Mixture, LDA
- Collaborative Filtering: ALS (Alternating Least Squares)
- Dimensionality Reduction: PCA, SVD
- Model Selection: Cross-validation, train-test split, parameter tuning
ML Pipelines
Chain transformations and estimators:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
# Load data
df = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
# Define pipeline stages
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="features"
)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withStd=True,
withMean=True
)
lr = LogisticRegression(
featuresCol="scaled_features",
labelCol="label",
maxIter=10,
regParam=0.01
)
# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Train model
model = pipeline.fit(train_df)
# Make predictions
predictions = model.transform(test_df)
predictions.select("label", "prediction", "probability").show()
Feature Engineering
Transform raw data into features:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
# Categorical encoding
indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")
# Numerical scaling
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
# Assemble features
assembler = VectorAssembler(
inputCols=["category_vec", "numeric_feature1", "numeric_feature2"],
outputCol="features"
)
# Text processing
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")
Streaming Linear Regression
Train models on streaming data:
from pyspark.mllib.regression import LabeledPoint
from pyspark.streaming import StreamingContext
from pyspark.streaming.ml import StreamingLinearRegressionWithSGD
# Create StreamingContext
ssc = StreamingContext(sc, batchDuration=1)
# Define data streams
training_stream = ssc.textFileStream("training/data/path")
testing_stream = ssc.textFileStream("testing/data/path")
# Parse streams into LabeledPoint objects
def parse_point(line):
values = [float(x) for x in line.strip().split(',')]
return LabeledPoint(values[0], values[1:])
parsed_training = training_stream.map(parse_point)
parsed_testing = testing_stream.map(parse_point)
# Initialize model
num_features = 3
model = StreamingLinearRegressionWithSGD(initialWeights=[0.0] * num_features)
# Train and predict
model.trainOn(parsed_training)
predictions = model.predictOnValues(parsed_testing.map(lambda lp: (lp.label, lp.features)))
# Print predictions
predictions.pprint()
# Start streaming
ssc.start()
ssc.awaitTermination()
Model Evaluation
Evaluate model performance:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
# Binary classification
binary_evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc = binary_evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# Multiclass classification
multi_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = multi_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
# Regression
regression_evaluator = RegressionEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="rmse"
)
rmse = regression_evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
Hyperparameter Tuning
Optimize model parameters with cross-validation:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Define model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
# Build parameter grid
param_grid = ParamGridBuilder() \
.addGrid(rf.numTrees, [10, 20, 50]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
.build()
# Define evaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
# Cross-validation
cv = CrossValidator(
estimator=rf,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=5,
parallelism=4
)
# Train
cv_model = cv.fit(train_df)
# Best model
best_model = cv_model.bestModel
print(f"Best numTrees: {best_model.getNumTrees}")
print(f"Best maxDepth: {best_model.getMaxDepth()}")
# Evaluate on test set
predictions = cv_model.transform(test_df)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")
Distributed Matrix Operations
MLlib provides distributed matrix representations:
from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix, CoordinateMatrix
from pyspark.mllib.linalg import Vectors
# RowMatrix: Distributed matrix without row indices
rows = sc.parallelize([
Vectors.dense([1.0, 2.0, 3.0]),
Vectors.dense([4.0, 5.0, 6.0]),
Vectors.dense([7.0, 8.0, 9.0])
])
row_matrix = RowMatrix(rows)
# Compute statistics
print(f"Rows: {row_matrix.numRows()}")
print(f"Cols: {row_matrix.numCols()}")
print(f"Column means: {row_matrix.computeColumnSummaryStatistics().mean()}")
# IndexedRowMatrix: Matrix with row indices
from pyspark.mllib.linalg.distributed import IndexedRow
indexed_rows = sc.parallelize([
IndexedRow(0, Vectors.dense([1.0, 2.0, 3.0])),
IndexedRow(1, Vectors.dense([4.0, 5.0, 6.0]))
])
indexed_matrix = IndexedRowMatrix(indexed_rows)
# CoordinateMatrix: Sparse matrix using (row, col, value) entries
from pyspark.mllib.linalg.distributed import MatrixEntry
entries = sc.parallelize([
MatrixEntry(0, 0, 1.0),
MatrixEntry(0, 2, 3.0),
MatrixEntry(1, 1, 5.0)
])
coord_matrix = CoordinateMatrix(entries)
Stratified Sampling
Sample data while preserving class distribution:
# Scala/Java approach
data = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("c", 6)]
rdd = sc.parallelize(data)
# Define sampling fractions per key
fractions = {"a": 0.5, "b": 0.5, "c": 0.5}
# Approximate sample (faster, one pass)
sampled_rdd = rdd.sampleByKey(withReplacement=False, fractions=fractions)
# Exact sample (slower, guaranteed exact counts)
exact_sampled = rdd.sampleByKeyExact(withReplacement=False, fractions=fractions)
print(sampled_rdd.collect())
Performance Tuning
Memory Management
Memory Breakdown:
- Execution Memory: Used for shuffles, joins, sorts, aggregations
- Storage Memory: Used for caching and broadcast variables
- User Memory: Used for user data structures and UDFs
- Reserved Memory: Reserved for Spark internal operations
Configuration:
spark = SparkSession.builder \
.appName("MemoryTuning") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.config("spark.memory.fraction", "0.6") # Fraction for execution + storage \
.config("spark.memory.storageFraction", "0.5") # Fraction of above for storage \
.getOrCreate()
Memory Best Practices:
- Monitor memory usage via Spark UI
- Use appropriate storage levels for caching
- Avoid collecting large datasets to driver
- Increase executor memory for memory-intensive operations
- Use kryo serialization for better memory efficiency
Shuffle Optimization
Shuffles are expensive operations - minimize them:
Causes of Shuffles:
- groupByKey, reduceByKey, aggregateByKey
- join, cogroup
- repartition, coalesce (with increase)
- distinct, intersection
- sortByKey
Optimization Strategies:
# 1. Use reduceByKey instead of groupByKey
# Bad: groupByKey shuffles all data
word_pairs.groupByKey().mapValues(sum)
# Good: reduceByKey combines locally before shuffle
word_pairs.reduceByKey(lambda a, b: a + b)
# 2. Broadcast small tables in joins
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
# 3. Partition data appropriately
df.repartition(200, "user_id") # Partition by key for subsequent aggregations
# 4. Coalesce instead of repartition when reducing partitions
df.coalesce(10) # No shuffle, just merge partitions
# 5. Tune shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 200) # Default is 200
Shuffle Configuration:
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 200) \
.config("spark.default.parallelism", 200) \
.config("spark.sql.adaptive.enabled", "true") # Enable AQE \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
Partitioning Strategies
Partition Count Guidelines:
- Too few: Underutilized cluster, OOM errors
- Too many: Task scheduling overhead
- Sweet spot: 2-4x number of CPU cores
- For large shuffles: 100-200+ partitions
Partition by Column:
# Partition writes by date for easy filtering
df.write.partitionBy("date", "country").parquet("output")
# Read with partition pruning (only reads relevant partitions)
spark.read.parquet("output").filter(col("date") == "2025-01-15").show()
Custom Partitioning:
from pyspark.rdd import portable_hash
# Custom partitioner for RDD
def custom_partitioner(key):
return portable_hash(key) % 100
rdd.partitionBy(100, custom_partitioner)
Caching Strategies
When to Cache:
# Iterative algorithms (ML)
training_data.cache()
for i in range(num_iterations):
model = train_model(training_data)
# Multiple aggregations on same data
base_df.cache()
result1 = base_df.groupBy("country").count()
result2 = base_df.groupBy("city").avg("sales")
# Interactive analysis
df.cache()
df.filter(condition1).show()
df.filter(condition2).show()
df.groupBy("category").count().show()
Storage Levels:
from pyspark import StorageLevel
# Memory only (fastest, but may lose data)
df.persist(StorageLevel.MEMORY_ONLY)
# Memory and disk (spill to disk if needed)
df.persist(StorageLevel.MEMORY_AND_DISK)
# Serialized in memory (more compact, slower access)
df.persist(StorageLevel.MEMORY_ONLY_SER)
# Disk only (slowest, but always available)
df.persist(StorageLevel.DISK_ONLY)
# Replicated (fault tolerance)
df.persist(StorageLevel.MEMORY_AND_DISK_2) # 2 replicas
Broadcast Joins
Optimize joins with small tables:
from pyspark.sql.functions import broadcast
# Automatic broadcast (tables < spark.sql.autoBroadcastJoinThreshold)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # 10 MB
# Explicit broadcast hint
large_df.join(broadcast(small_df), "key")
# Benefits:
# - No shuffle of large table
# - Small table sent to all executors once
# - Much faster for small dimension tables
Adaptive Query Execution (AQE)
Enable runtime query optimization:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE Benefits:
# - Dynamically coalesce partitions after shuffle
# - Handle skewed joins by splitting large partitions
# - Optimize join strategy at runtime
Data Format Selection
Performance Comparison:
- Parquet (Best for analytics): Columnar, compressed, fast queries
- ORC (Best for Hive): Similar to Parquet, slightly better compression
- Avro (Best for row-oriented): Good for write-heavy workloads
- JSON (Slowest): Human-readable but inefficient
- CSV (Legacy): Compatible but slow and no schema
Recommendation:
- Use Parquet for most analytics workloads
- Enable compression (snappy, gzip, lzo)
- Partition by commonly filtered columns
- Use columnar formats for read-heavy workloads
Catalyst Optimizer
Understand query optimization:
# View physical plan
df.explain(mode="extended")
# Optimizations include:
# - Predicate pushdown: Push filters to data source
# - Column pruning: Read only required columns
# - Constant folding: Evaluate constants at compile time
# - Join reordering: Optimize join order
# - Partition pruning: Skip irrelevant partitions
Production Deployment
Cluster Managers
Standalone:
- Simple, built-in cluster manager
- Easy setup for development and small clusters
- No resource sharing with other frameworks
# Start master
$SPARK_HOME/sbin/start-master.sh
# Start workers
$SPARK_HOME/sbin/start-worker.sh spark://master:7077
# Submit application
spark-submit --master spark://master:7077 app.py
YARN:
- Hadoop's resource manager
- Share cluster resources with MapReduce, Hive, etc.
- Two modes: cluster (driver on YARN) and client (driver on local machine)
# Cluster mode (driver runs on YARN)
spark-submit --master yarn --deploy-mode cluster app.py
# Client mode (driver runs locally)
spark-submit --master yarn --deploy-mode client app.py
Kubernetes:
- Modern container orchestration
- Dynamic resource allocation
- Cloud-native deployments
spark-submit \
--master k8s://https://k8s-master:443 \
--deploy-mode cluster \
--name spark-app \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=spark:latest \
app.py
Mesos:
- General-purpose cluster manager
- Fine-grained or coarse-grained resource sharing
Application Submission
Basic spark-submit:
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.sql.shuffle.partitions=200 \
--py-files dependencies.zip \
--files config.json \
application.py
Configuration Options:
--master: Cluster manager URL--deploy-mode: Where to run driver (client or cluster)--driver-memory: Memory for driver process--executor-memory: Memory per executor--executor-cores: Cores per executor--num-executors: Number of executors--conf: Spark configuration properties--py-files: Python dependencies--files: Additional files to distribute
Resource Allocation
General Guidelines:
- Driver Memory: 1-4 GB (unless collecting large results)
- Executor Memory: 4-16 GB per executor
- Executor Cores: 4-5 cores per executor (diminishing returns beyond 5)
- Number of Executors: Fill cluster capacity, leave resources for OS/other services
- Parallelism: 2-4x total cores
Example Calculations:
Cluster: 10 nodes, 32 cores each, 128 GB RAM each
Option 1: Many small executors
- 30 executors (3 per node)
- 10 cores per executor
- 40 GB memory per executor
- Total: 300 cores
Option 2: Fewer large executors (RECOMMENDED)
- 50 executors (5 per node)
- 5 cores per executor
- 24 GB memory per executor
- Total: 250 cores
Dynamic Allocation
Automatically scale executors based on workload:
spark = SparkSession.builder \
.appName("DynamicAllocation") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", 2) \
.config("spark.dynamicAllocation.maxExecutors", 100) \
.config("spark.dynamicAllocation.initialExecutors", 10) \
.config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
.getOrCreate()
Benefits:
- Better resource utilization
- Automatic scaling for varying workloads
- Reduced costs in cloud environments
Monitoring and Logging
Spark UI:
- Web UI at http://driver:4040
- Stages, tasks, storage, environment, executors
- SQL query plans and execution details
- Identify bottlenecks and performance issues
History Server:
# Start history server
$SPARK_HOME/sbin/start-history-server.sh
# Configure event logging
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir", "hdfs://namenode/spark-logs")
Metrics:
# Enable metrics collection
spark.conf.set("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink")
spark.conf.set("spark.metrics.conf.*.sink.console.period", 10)
Logging:
# Configure log level
spark.sparkContext.setLogLevel("WARN") # ERROR, WARN, INFO, DEBUG
# Custom logging
import logging
logger = logging.getLogger(__name__)
logger.info("Custom log message")
Fault Tolerance
Automatic Recovery:
- Task failures: Automatically retry failed tasks
- Executor failures: Reschedule tasks on other executors
- Driver failures: Restore from checkpoint (streaming)
- Node failures: Recompute lost partitions from lineage
Checkpointing:
# Set checkpoint directory
spark.sparkContext.setCheckpointDir("hdfs://namenode/checkpoints")
# Checkpoint RDD (breaks lineage for very long chains)
rdd.checkpoint()
# Streaming checkpoint (required for production)
query = streaming_df.writeStream \
.option("checkpointLocation", "hdfs://namenode/streaming-checkpoint") \
.start()
Speculative Execution:
# Enable speculative execution for slow tasks
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", 1.5)
spark.conf.set("spark.speculation.quantile", 0.75)
Data Locality
Optimize data placement for performance:
Locality Levels:
- PROCESS_LOCAL: Data in same JVM as task (fastest)
- NODE_LOCAL: Data on same node, different process
- RACK_LOCAL: Data on same rack
- ANY: Data on different rack (slowest)
Improve Locality:
# Increase locality wait time
spark.conf.set("spark.locality.wait", "10s")
spark.conf.set("spark.locality.wait.node", "5s")
spark.conf.set("spark.locality.wait.rack", "3s")
# Partition data to match cluster topology
df.repartition(num_nodes * cores_per_node)
Best Practices
Code Organization
- Modular Design: Separate data loading, transformation, and output logic
- Configuration Management: Externalize configuration (use config files)
- Error Handling: Implement robust error handling and logging
- Testing: Unit test transformations, integration test pipelines
- Documentation: Document complex transformations and business logic
Performance
- Avoid Shuffles: Use reduceByKey instead of groupByKey
- Cache Wisely: Only cache data reused multiple times
- Broadcast Small Tables: Use broadcast joins for small reference data
- Partition Appropriately: 2-4x CPU cores, partition by frequently filtered columns
- Use Parquet: Columnar format for analytical workloads
- Enable AQE: Leverage adaptive query execution for runtime optimization
- Tune Memory: Balance executor memory and cores
- Monitor: Use Spark UI to identify bottlenecks
Development Workflow
- Start Small: Develop with sample data locally
- Profile Early: Monitor performance from the start
- Iterate: Optimize incrementally based on metrics
- Test at Scale: Validate with production-sized data before deployment
- Version Control: Track code, configurations, and schemas
Data Quality
- Schema Validation: Enforce schemas on read/write
- Null Handling: Explicitly handle null values
- Data Validation: Check for expected ranges, formats, constraints
- Deduplication: Remove duplicates based on business logic
- Audit Logging: Track data lineage and transformations
Security
- Authentication: Enable Kerberos for YARN/HDFS
- Authorization: Use ACLs for data access control
- Encryption: Encrypt data at rest and in transit
- Secrets Management: Use secure credential providers
- Audit Trails: Log data access and modifications
Cost Optimization
- Right-Size Resources: Don't over-provision executors
- Dynamic Allocation: Scale executors based on workload
- Spot Instances: Use spot/preemptible instances in cloud
- Data Compression: Use efficient formats (Parquet, ORC)
- Partitioning: Prune unnecessary data reads
- Auto-Shutdown: Terminate idle clusters
Common Patterns
ETL Pipeline Pattern
def etl_pipeline(spark, input_path, output_path):
# Extract
raw_df = spark.read.parquet(input_path)
# Transform
cleaned_df = raw_df \
.dropDuplicates(["id"]) \
.filter(col("value").isNotNull()) \
.withColumn("processed_date", current_date())
# Enrich
enriched_df = cleaned_df.join(broadcast(reference_df), "key")
# Aggregate
aggregated_df = enriched_df \
.groupBy("category", "date") \
.agg(
count("*").alias("count"),
sum("amount").alias("total_amount"),
avg("value").alias("avg_value")
)
# Load
aggregated_df.write \
.partitionBy("date") \
.mode("overwrite") \
.parquet(output_path)
Incremental Processing Pattern
def incremental_process(spark, input_path, output_path, checkpoint_path):
# Read last processed timestamp
last_timestamp = read_checkpoint(checkpoint_path)
# Read new data
new_data = spark.read.parquet(input_path) \
.filter(col("timestamp") > last_timestamp)
# Process
processed = transform(new_data)
# Write
processed.write.mode("append").parquet(output_path)
# Update checkpoint
max_timestamp = new_data.agg(max("timestamp")).collect()[0][0]
write_checkpoint(checkpoint_path, max_timestamp)
Slowly Changing Dimension (SCD) Pattern
def scd_type2_upsert(spark, dimension_df, updates_df):
# Mark existing records as inactive if updated
inactive_records = dimension_df \
.join(updates_df, "business_key") \
.select(
dimension_df["*"],
lit(False).alias("is_active"),
current_date().alias("end_date")
)
# Add new records
new_records = updates_df \
.withColumn("is_active", lit(True)) \
.withColumn("start_date", current_date()) \
.withColumn("end_date", lit(None))
# Union unchanged, inactive, and new records
result = dimension_df \
.join(updates_df, "business_key", "left_anti") \
.union(inactive_records) \
.union(new_records)
return result
Window Analytics Pattern
def calculate_running_metrics(df):
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lag, sum, avg
# Define window
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
# Calculate metrics
result = df \
.withColumn("row_num", row_number().over(window_spec)) \
.withColumn("prev_value", lag("value", 1).over(window_spec)) \
.withColumn("running_total", sum("value").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))) \
.withColumn("moving_avg", avg("value").over(window_spec.rowsBetween(-2, 0)))
return result
Troubleshooting
Out of Memory Errors
Symptoms:
java.lang.OutOfMemoryError- Executor failures
- Slow garbage collection
Solutions:
# Increase executor memory
spark.conf.set("spark.executor.memory", "8g")
# Increase driver memory (if collecting data)
spark.conf.set("spark.driver.memory", "4g")
# Reduce memory pressure
df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk
df.coalesce(100) # Reduce partition count
spark.conf.set("spark.sql.shuffle.partitions", 400) # Increase shuffle partitions
# Avoid collect() on large datasets
# Use take() or limit() instead
df.take(100)
Shuffle Performance Issues
Symptoms:
- Long shuffle read/write times
- Skewed partition sizes
- Task stragglers
Solutions:
# Increase shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 400)
# Handle skew with salting
df_salted = df.withColumn("salt", (rand() * 10).cast("int"))
result = df_salted.groupBy("key", "salt").agg(...)
# Use broadcast for small tables
large_df.join(broadcast(small_df), "key")
# Enable AQE for automatic optimization
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Streaming Job Failures
Symptoms:
- Streaming query stopped
- Checkpoint corruption
- Processing lag increasing
Solutions:
# Increase executor memory for stateful operations
spark.conf.set("spark.executor.memory", "8g")
# Tune watermark for late data
.withWatermark("timestamp", "15 minutes")
# Increase trigger interval to reduce micro-batch overhead
.trigger(processingTime="30 seconds")
# Monitor lag and adjust parallelism
spark.conf.set("spark.sql.shuffle.partitions", 200)
# Recover from checkpoint corruption
# Delete checkpoint directory and restart (data loss possible)
# Or implement custom state recovery logic
Data Skew
Symptoms:
- Few tasks take much longer than others
- Unbalanced partition sizes
- Executor OOM errors
Solutions:
# 1. Salting technique (add random prefix to keys)
from pyspark.sql.functions import concat, lit, rand
df_salted = df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 10).cast("int")))
result = df_salted.groupBy("salted_key").agg(...)
# 2. Repartition by skewed column
df.repartition(200, "skewed_column")
# 3. Isolate skewed keys
skewed_keys = df.groupBy("key").count().filter(col("count") > threshold).select("key")
skewed_df = df.join(broadcast(skewed_keys), "key")
normal_df = df.join(broadcast(skewed_keys), "key", "left_anti")
# Process separately
skewed_result = process_with_salting(skewed_df)
normal_result = process_normally(normal_df)
final = skewed_result.union(normal_result)
Context7 Code Integration
This skill integrates real-world code examples from Apache Spark's official repository. All code snippets in the EXAMPLES.md file are sourced from Context7's Apache Spark library documentation, ensuring production-ready patterns and best practices.
Version and Compatibility
- Apache Spark Version: 3.x (compatible with 2.4+)
- Python: 3.7+
- Scala: 2.12+
- Java: 8+
- R: 3.5+
References
- Official Documentation: https://spark.apache.org/docs/latest/
- API Reference: https://spark.apache.org/docs/latest/api.html
- GitHub Repository: https://github.com/apache/spark
- Databricks Blog: https://databricks.com/blog
- Context7 Library: /apache/spark
Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Big Data, Distributed Computing, Data Engineering, Machine Learning Context7 Integration: /apache/spark with 8000 tokens of documentation