| name | python-programming |
| description | Master Python fundamentals, OOP, data structures, async programming, and production-grade scripting for data engineering |
| sasmp_version | 1.3.0 |
| bonded_agent | 01-data-engineer |
| bond_type | PRIMARY_BOND |
| skill_version | 2.0.0 |
| last_updated | 2025-01 |
| complexity | foundational |
| estimated_mastery_hours | 120 |
| prerequisites | |
| unlocks | sql-databases, etl-tools, big-data, machine-learning |
Python Programming for Data Engineering
Production-grade Python development for building scalable data pipelines, ETL systems, and data-intensive applications.
Quick Start
# Modern Python 3.12+ data engineering setup
from dataclasses import dataclass
from typing import Generator
from collections.abc import Iterator
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class DataRecord:
"""Type-safe data container with validation."""
id: int
value: float
category: str
def __post_init__(self):
if self.value < 0:
raise ValueError(f"Value must be non-negative, got {self.value}")
def process_records(records: Iterator[dict]) -> Generator[DataRecord, None, None]:
"""Memory-efficient generator for processing large datasets."""
for idx, record in enumerate(records):
try:
yield DataRecord(
id=record['id'],
value=float(record['value']),
category=record.get('category', 'unknown')
)
except (KeyError, ValueError) as e:
logger.warning(f"Skipping invalid record {idx}: {e}")
continue
# Usage
if __name__ == "__main__":
sample_data = [{"id": 1, "value": "100.5", "category": "A"}]
for record in process_records(iter(sample_data)):
logger.info(f"Processed: {record}")
Core Concepts
1. Type-Safe Data Structures (2024-2025 Standard)
from typing import TypedDict, NotRequired, Literal
from dataclasses import dataclass, field
from datetime import datetime
# TypedDict for JSON-like structures
class PipelineConfig(TypedDict):
source: str
destination: str
batch_size: int
retry_count: NotRequired[int]
mode: Literal["batch", "streaming"]
# Dataclass for domain objects
@dataclass(frozen=True, slots=True)
class ETLJob:
"""Immutable, memory-efficient job definition."""
job_id: str
created_at: datetime = field(default_factory=datetime.utcnow)
config: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {"job_id": self.job_id, "created_at": self.created_at.isoformat()}
2. Generator Patterns for Large Data
from typing import Generator, Iterable
import csv
from pathlib import Path
def read_csv_chunks(
file_path: Path,
chunk_size: int = 10000
) -> Generator[list[dict], None, None]:
"""
Memory-efficient CSV reader using generators.
Processes files of any size without loading into memory.
"""
with open(file_path, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk: # Don't forget the last chunk
yield chunk
def transform_pipeline(
records: Iterable[dict],
transformers: list[callable]
) -> Generator[dict, None, None]:
"""Composable transformation pipeline."""
for record in records:
result = record
for transform in transformers:
result = transform(result)
if result is None:
break
if result is not None:
yield result
3. Async Programming for I/O-Bound Tasks
import asyncio
import aiohttp
from typing import AsyncGenerator
import logging
logger = logging.getLogger(__name__)
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3,
backoff_factor: float = 2.0
) -> dict | None:
"""
Fetch URL with exponential backoff retry logic.
Production pattern for API data ingestion.
"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientError as e:
wait_time = backoff_factor ** attempt
logger.warning(f"Attempt {attempt+1} failed for {url}: {e}. Retrying in {wait_time}s")
await asyncio.sleep(wait_time)
logger.error(f"All retries exhausted for {url}")
return None
async def fetch_all_pages(
base_url: str,
page_count: int,
concurrency_limit: int = 10
) -> AsyncGenerator[dict, None]:
"""Concurrent API fetching with rate limiting."""
semaphore = asyncio.Semaphore(concurrency_limit)
async def bounded_fetch(session: aiohttp.ClientSession, url: str):
async with semaphore:
return await fetch_with_retry(session, url)
async with aiohttp.ClientSession() as session:
tasks = [bounded_fetch(session, f"{base_url}?page={i}") for i in range(page_count)]
for result in asyncio.as_completed(tasks):
data = await result
if data:
yield data
4. Error Handling & Observability
import functools
import time
import logging
from typing import TypeVar, Callable, ParamSpec
P = ParamSpec('P')
R = TypeVar('R')
def with_retry(
max_attempts: int = 3,
exceptions: tuple = (Exception,),
backoff_factor: float = 2.0
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""
Decorator for automatic retry with exponential backoff.
Use for flaky operations (network, database connections).
"""
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
wait_time = backoff_factor ** attempt
logging.warning(
f"{func.__name__} attempt {attempt+1} failed: {e}. "
f"Retrying in {wait_time}s"
)
time.sleep(wait_time)
raise last_exception
return wrapper
return decorator
def log_execution_time(func: Callable[P, R]) -> Callable[P, R]:
"""Decorator for performance monitoring."""
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
start = time.perf_counter()
try:
result = func(*args, **kwargs)
duration = time.perf_counter() - start
logging.info(f"{func.__name__} completed in {duration:.3f}s")
return result
except Exception as e:
duration = time.perf_counter() - start
logging.error(f"{func.__name__} failed after {duration:.3f}s: {e}")
raise
return wrapper
Tools & Technologies
| Tool | Purpose | Version (2025) |
|---|---|---|
| Python | Core language | 3.12+ |
| uv | Package manager (replaces pip) | 0.4+ |
| Ruff | Linter + formatter (replaces Black, flake8) | 0.5+ |
| mypy | Static type checking | 1.11+ |
| pytest | Testing framework | 8.0+ |
| pydantic | Data validation | 2.5+ |
| polars | DataFrame operations (faster than pandas) | 0.20+ |
| httpx | Modern HTTP client | 0.27+ |
Learning Path
Phase 1: Foundations (Weeks 1-3)
Week 1: Core syntax, data types, control flow
Week 2: Functions, modules, file I/O
Week 3: OOP (classes, inheritance, composition)
Phase 2: Intermediate (Weeks 4-6)
Week 4: Generators, iterators, decorators
Week 5: Type hints, dataclasses, protocols
Week 6: Error handling, logging, testing basics
Phase 3: Advanced (Weeks 7-9)
Week 7: Async/await, concurrent programming
Week 8: Memory optimization, profiling
Week 9: Package structure, dependency management
Phase 4: Production Mastery (Weeks 10-12)
Week 10: CI/CD integration, linting, formatting
Week 11: Performance optimization patterns
Week 12: Production deployment patterns
Production Patterns
Configuration Management
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Type-safe configuration with environment variable support."""
database_url: str
api_key: str
batch_size: int = 1000
debug: bool = False
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
@lru_cache
def get_settings() -> Settings:
"""Cached settings singleton."""
return Settings()
Connection Pooling
from contextlib import contextmanager
from typing import Generator
import psycopg2
from psycopg2 import pool
class DatabasePool:
"""Thread-safe connection pool for PostgreSQL."""
def __init__(self, dsn: str, min_conn: int = 2, max_conn: int = 10):
self._pool = pool.ThreadedConnectionPool(min_conn, max_conn, dsn)
@contextmanager
def get_connection(self) -> Generator:
conn = self._pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._pool.putconn(conn)
def close(self):
self._pool.closeall()
Troubleshooting Guide
Common Failure Modes
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| Memory Error | MemoryError, process killed |
Loading full dataset into memory | Use generators, chunked processing |
| Import Error | ModuleNotFoundError |
Virtual env not activated, missing dep | uv pip install, check sys.path |
| Type Error | TypeError: unhashable type |
Using mutable as dict key | Convert to tuple or use dataclass |
| Async Deadlock | Program hangs | Blocking call in async code | Use asyncio.to_thread() for blocking ops |
| GIL Bottleneck | CPU-bound parallelism slow | Python GIL limits threads | Use multiprocessing or ProcessPoolExecutor |
Debug Checklist
# 1. Check Python version
python --version # Should be 3.12+
# 2. Verify virtual environment
which python # Should point to venv
# 3. Check installed packages
uv pip list | grep <package>
# 4. Run with verbose logging
python -m mymodule -v 2>&1 | tee debug.log
# 5. Profile memory usage
python -m memory_profiler script.py
# 6. Profile CPU
python -m cProfile -s cumtime script.py
Log Interpretation
# Structured logging for easier debugging
import structlog
logger = structlog.get_logger()
def process_batch(batch_id: str, records: list):
logger.info("batch_started", batch_id=batch_id, record_count=len(records))
try:
# processing...
logger.info("batch_completed", batch_id=batch_id, success=True)
except Exception as e:
logger.error("batch_failed", batch_id=batch_id, error=str(e), exc_info=True)
raise
Unit Test Template
import pytest
from unittest.mock import Mock, patch
from your_module import process_records, DataRecord
class TestProcessRecords:
"""Unit tests following AAA pattern (Arrange-Act-Assert)."""
def test_valid_records_processed(self):
# Arrange
input_data = [{"id": 1, "value": "10.5", "category": "A"}]
# Act
result = list(process_records(iter(input_data)))
# Assert
assert len(result) == 1
assert result[0].id == 1
assert result[0].value == 10.5
def test_invalid_records_skipped(self):
# Arrange
input_data = [{"id": 1}] # Missing 'value'
# Act
result = list(process_records(iter(input_data)))
# Assert
assert len(result) == 0
def test_negative_value_raises_error(self):
# Arrange & Act & Assert
with pytest.raises(ValueError, match="non-negative"):
DataRecord(id=1, value=-5.0, category="A")
@patch('your_module.external_api_call')
def test_with_mocked_dependency(self, mock_api):
# Arrange
mock_api.return_value = {"status": "ok"}
# Act
result = function_using_api()
# Assert
mock_api.assert_called_once()
assert result["status"] == "ok"
Best Practices
Code Style (2025 Standards)
# ✅ DO: Use type hints everywhere
def calculate_metrics(data: list[float]) -> dict[str, float]: ...
# ✅ DO: Prefer composition over inheritance
@dataclass
class Pipeline:
reader: DataReader
transformer: Transformer
writer: DataWriter
# ✅ DO: Use context managers for resources
with open_connection() as conn:
process(conn)
# ❌ DON'T: Use bare except
try: ...
except: pass # Never do this
# ❌ DON'T: Mutate function arguments
def process(items: list) -> list:
items.append("new") # Avoid this
return items.copy() # Return new list instead
Performance Tips
# ✅ Use generators for large data
def process_large_file(path):
with open(path) as f:
for line in f: # Memory efficient
yield transform(line)
# ✅ Use set/dict for O(1) lookups
valid_ids = set(load_valid_ids()) # Not list
if item_id in valid_ids: ...
# ✅ Use local variables in hot loops
def hot_loop(items):
local_func = expensive_lookup # Cache reference
for item in items:
local_func(item)
Resources
Official Documentation
Production References
Community
Next Skills
After mastering Python programming:
- →
sql-databases- Query and manage relational data - →
etl-tools- Build data pipelines with Airflow - →
big-data- Scale with Spark and distributed systems - →
machine-learning- Apply ML with scikit-learn
Skill Certification Checklist:
- Can write type-safe Python with mypy validation
- Can implement generators for large data processing
- Can use async/await for concurrent I/O
- Can write comprehensive unit tests with pytest
- Can profile and optimize Python performance