Data Engineering Fundamentals
Core data engineering concepts, patterns, and practices for building production data platforms.
Quick Start
# Production Data Pipeline Pattern
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
source: str
destination: str
batch_size: int = 10000
retry_count: int = 3
class DataPipeline:
"""Production-ready data pipeline with error handling."""
def __init__(self, config: PipelineConfig):
self.config = config
self.metrics = {"extracted": 0, "transformed": 0, "loaded": 0, "errors": 0}
def extract(self) -> Generator[dict, None, None]:
"""Extract data in batches from source."""
logger.info(f"Extracting from {self.config.source}")
offset = 0
while True:
batch = self._fetch_batch(offset, self.config.batch_size)
if not batch:
break
self.metrics["extracted"] += len(batch)
yield batch
offset += self.config.batch_size
def transform(self, batch: list[dict]) -> list[dict]:
"""Apply transformations with validation."""
transformed = []
for record in batch:
try:
cleaned = self._clean_record(record)
enriched = self._enrich_record(cleaned)
transformed.append(enriched)
except Exception as e:
logger.warning(f"Transform error: {e}")
self.metrics["errors"] += 1
self.metrics["transformed"] += len(transformed)
return transformed
def load(self, batch: list[dict]) -> None:
"""Load to destination with retry logic."""
for attempt in range(self.config.retry_count):
try:
self._write_batch(batch)
self.metrics["loaded"] += len(batch)
return
except Exception as e:
if attempt == self.config.retry_count - 1:
raise
logger.warning(f"Load attempt {attempt + 1} failed: {e}")
def run(self) -> dict:
"""Execute full ETL pipeline."""
start_time = datetime.now()
logger.info("Pipeline started")
for batch in self.extract():
transformed = self.transform(batch)
if transformed:
self.load(transformed)
duration = (datetime.now() - start_time).total_seconds()
self.metrics["duration_seconds"] = duration
logger.info(f"Pipeline completed: {self.metrics}")
return self.metrics
Core Concepts
1. Data Architecture Patterns
┌─────────────────────────────────────────────────────────────────┐
│ Modern Data Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Sources Ingestion Storage Consumption │
│ ──────── ───────── ─────── ─────────── │
│ ┌──────┐ ┌─────────┐ ┌───────┐ ┌──────────┐ │
│ │ APIs │───────▶│ Airbyte │─────▶│ Raw │──────▶│ BI Tools │ │
│ │ DBs │ │ Fivetran│ │ Layer │ │ Dashboards│ │
│ │ Files│ │ Kafka │ │ (S3) │ └──────────┘ │
│ │ SaaS │ └─────────┘ └───┬───┘ │
│ └──────┘ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Transform │ │
│ │ (dbt/Spark) │ │
│ └───────┬───────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ ┌──────────┐ │
│ │ Warehouse │──▶│ ML/AI │ │
│ │ (Snowflake) │ │ Pipelines│ │
│ └───────────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
2. ETL vs ELT
# ETL Pattern (Transform before load)
# Best for: Sensitive data, complex transformations, limited storage
def etl_pipeline():
raw_data = extract_from_source()
cleaned_data = transform_and_clean(raw_data) # Transform first
load_to_destination(cleaned_data)
# ELT Pattern (Load then transform)
# Best for: Cloud warehouses, large scale, exploratory analysis
def elt_pipeline():
raw_data = extract_from_source()
load_to_staging(raw_data) # Load raw first
# Transform in warehouse with SQL/dbt
run_dbt_models()
3. Data Quality Framework
from dataclasses import dataclass
from typing import Callable, Any
import pandas as pd
@dataclass
class DataQualityCheck:
name: str
check_fn: Callable[[pd.DataFrame], bool]
severity: str # "error" or "warning"
class DataQualityValidator:
def __init__(self, checks: list[DataQualityCheck]):
self.checks = checks
self.results = []
def validate(self, df: pd.DataFrame) -> bool:
all_passed = True
for check in self.checks:
passed = check.check_fn(df)
self.results.append({
"check": check.name,
"passed": passed,
"severity": check.severity
})
if not passed and check.severity == "error":
all_passed = False
return all_passed
# Define checks
checks = [
DataQualityCheck(
name="no_nulls_in_id",
check_fn=lambda df: df["id"].notna().all(),
severity="error"
),
DataQualityCheck(
name="positive_amounts",
check_fn=lambda df: (df["amount"] > 0).all(),
severity="error"
),
DataQualityCheck(
name="valid_dates",
check_fn=lambda df: pd.to_datetime(df["date"], errors="coerce").notna().all(),
severity="warning"
),
]
validator = DataQualityValidator(checks)
if not validator.validate(df):
raise ValueError(f"Data quality checks failed: {validator.results}")
4. Idempotent Operations
from datetime import date
def idempotent_load(df: pd.DataFrame, table: str, partition_date: date):
"""
Idempotent load: can be re-run safely without duplicates.
Uses delete-then-insert pattern.
"""
# Delete existing data for this partition
db.execute(f"""
DELETE FROM {table}
WHERE partition_date = %(date)s
""", {"date": partition_date})
# Insert new data
df["partition_date"] = partition_date
df.to_sql(table, db, if_exists="append", index=False)
# Alternative: MERGE/UPSERT pattern
def upsert_records(df: pd.DataFrame, table: str, key_columns: list[str]):
"""Upsert: Update existing, insert new."""
temp_table = f"{table}_staging"
df.to_sql(temp_table, db, if_exists="replace", index=False)
key_match = " AND ".join([f"t.{k} = s.{k}" for k in key_columns])
db.execute(f"""
MERGE INTO {table} t
USING {temp_table} s ON {key_match}
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
""")
Tools & Technologies
| Tool |
Purpose |
Version (2025) |
| Python |
Pipeline development |
3.12+ |
| SQL |
Data transformation |
- |
| Airflow |
Orchestration |
2.8+ |
| dbt |
SQL transformations |
1.7+ |
| Spark |
Large-scale processing |
3.5+ |
| Airbyte |
Data integration |
0.55+ |
| Great Expectations |
Data quality |
0.18+ |
Troubleshooting Guide
| Issue |
Symptoms |
Root Cause |
Fix |
| Duplicate Data |
Count mismatch |
Non-idempotent load |
Use MERGE, add dedup |
| Schema Drift |
Pipeline failure |
Source schema changed |
Schema validation, alerts |
| Data Freshness |
Stale data |
Pipeline delays |
Monitor SLAs, alerting |
| Memory Error |
OOM in pipeline |
Large batch size |
Chunked processing |
Best Practices
# ✅ DO: Make pipelines idempotent
delete_and_insert(partition_date)
# ✅ DO: Add observability
logger.info(f"Processed {count} records", extra={"metric": "records_processed"})
# ✅ DO: Validate data at boundaries
validate_schema(df, expected_schema)
validate_constraints(df)
# ✅ DO: Use incremental processing
WHERE updated_at > last_run_timestamp
# ❌ DON'T: Process all data every run
# ❌ DON'T: Skip data quality checks
# ❌ DON'T: Ignore schema changes
Resources
Skill Certification Checklist: