Claude Code Plugins

Community-maintained marketplace

Feedback

Master Python fundamentals, OOP, data structures, and scripting for data engineering

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name python-programming
description Master Python fundamentals, OOP, data structures, async programming, and production-grade scripting for data engineering
sasmp_version 1.3.0
bonded_agent 01-data-engineer
bond_type PRIMARY_BOND
skill_version 2.0.0
last_updated 2025-01
complexity foundational
estimated_mastery_hours 120
prerequisites
unlocks sql-databases, etl-tools, big-data, machine-learning

Python Programming for Data Engineering

Production-grade Python development for building scalable data pipelines, ETL systems, and data-intensive applications.

Quick Start

# Modern Python 3.12+ data engineering setup
from dataclasses import dataclass
from typing import Generator
from collections.abc import Iterator
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class DataRecord:
    """Type-safe data container with validation."""
    id: int
    value: float
    category: str

    def __post_init__(self):
        if self.value < 0:
            raise ValueError(f"Value must be non-negative, got {self.value}")

def process_records(records: Iterator[dict]) -> Generator[DataRecord, None, None]:
    """Memory-efficient generator for processing large datasets."""
    for idx, record in enumerate(records):
        try:
            yield DataRecord(
                id=record['id'],
                value=float(record['value']),
                category=record.get('category', 'unknown')
            )
        except (KeyError, ValueError) as e:
            logger.warning(f"Skipping invalid record {idx}: {e}")
            continue

# Usage
if __name__ == "__main__":
    sample_data = [{"id": 1, "value": "100.5", "category": "A"}]
    for record in process_records(iter(sample_data)):
        logger.info(f"Processed: {record}")

Core Concepts

1. Type-Safe Data Structures (2024-2025 Standard)

from typing import TypedDict, NotRequired, Literal
from dataclasses import dataclass, field
from datetime import datetime

# TypedDict for JSON-like structures
class PipelineConfig(TypedDict):
    source: str
    destination: str
    batch_size: int
    retry_count: NotRequired[int]
    mode: Literal["batch", "streaming"]

# Dataclass for domain objects
@dataclass(frozen=True, slots=True)
class ETLJob:
    """Immutable, memory-efficient job definition."""
    job_id: str
    created_at: datetime = field(default_factory=datetime.utcnow)
    config: dict = field(default_factory=dict)

    def to_dict(self) -> dict:
        return {"job_id": self.job_id, "created_at": self.created_at.isoformat()}

2. Generator Patterns for Large Data

from typing import Generator, Iterable
import csv
from pathlib import Path

def read_csv_chunks(
    file_path: Path,
    chunk_size: int = 10000
) -> Generator[list[dict], None, None]:
    """
    Memory-efficient CSV reader using generators.
    Processes files of any size without loading into memory.
    """
    with open(file_path, 'r', newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        chunk = []
        for row in reader:
            chunk.append(row)
            if len(chunk) >= chunk_size:
                yield chunk
                chunk = []
        if chunk:  # Don't forget the last chunk
            yield chunk

def transform_pipeline(
    records: Iterable[dict],
    transformers: list[callable]
) -> Generator[dict, None, None]:
    """Composable transformation pipeline."""
    for record in records:
        result = record
        for transform in transformers:
            result = transform(result)
            if result is None:
                break
        if result is not None:
            yield result

3. Async Programming for I/O-Bound Tasks

import asyncio
import aiohttp
from typing import AsyncGenerator
import logging

logger = logging.getLogger(__name__)

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3,
    backoff_factor: float = 2.0
) -> dict | None:
    """
    Fetch URL with exponential backoff retry logic.
    Production pattern for API data ingestion.
    """
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
                resp.raise_for_status()
                return await resp.json()
        except aiohttp.ClientError as e:
            wait_time = backoff_factor ** attempt
            logger.warning(f"Attempt {attempt+1} failed for {url}: {e}. Retrying in {wait_time}s")
            await asyncio.sleep(wait_time)
    logger.error(f"All retries exhausted for {url}")
    return None

async def fetch_all_pages(
    base_url: str,
    page_count: int,
    concurrency_limit: int = 10
) -> AsyncGenerator[dict, None]:
    """Concurrent API fetching with rate limiting."""
    semaphore = asyncio.Semaphore(concurrency_limit)

    async def bounded_fetch(session: aiohttp.ClientSession, url: str):
        async with semaphore:
            return await fetch_with_retry(session, url)

    async with aiohttp.ClientSession() as session:
        tasks = [bounded_fetch(session, f"{base_url}?page={i}") for i in range(page_count)]
        for result in asyncio.as_completed(tasks):
            data = await result
            if data:
                yield data

4. Error Handling & Observability

import functools
import time
import logging
from typing import TypeVar, Callable, ParamSpec

P = ParamSpec('P')
R = TypeVar('R')

def with_retry(
    max_attempts: int = 3,
    exceptions: tuple = (Exception,),
    backoff_factor: float = 2.0
) -> Callable[[Callable[P, R]], Callable[P, R]]:
    """
    Decorator for automatic retry with exponential backoff.
    Use for flaky operations (network, database connections).
    """
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    wait_time = backoff_factor ** attempt
                    logging.warning(
                        f"{func.__name__} attempt {attempt+1} failed: {e}. "
                        f"Retrying in {wait_time}s"
                    )
                    time.sleep(wait_time)
            raise last_exception
        return wrapper
    return decorator

def log_execution_time(func: Callable[P, R]) -> Callable[P, R]:
    """Decorator for performance monitoring."""
    @functools.wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        start = time.perf_counter()
        try:
            result = func(*args, **kwargs)
            duration = time.perf_counter() - start
            logging.info(f"{func.__name__} completed in {duration:.3f}s")
            return result
        except Exception as e:
            duration = time.perf_counter() - start
            logging.error(f"{func.__name__} failed after {duration:.3f}s: {e}")
            raise
    return wrapper

Tools & Technologies

Tool Purpose Version (2025)
Python Core language 3.12+
uv Package manager (replaces pip) 0.4+
Ruff Linter + formatter (replaces Black, flake8) 0.5+
mypy Static type checking 1.11+
pytest Testing framework 8.0+
pydantic Data validation 2.5+
polars DataFrame operations (faster than pandas) 0.20+
httpx Modern HTTP client 0.27+

Learning Path

Phase 1: Foundations (Weeks 1-3)

Week 1: Core syntax, data types, control flow
Week 2: Functions, modules, file I/O
Week 3: OOP (classes, inheritance, composition)

Phase 2: Intermediate (Weeks 4-6)

Week 4: Generators, iterators, decorators
Week 5: Type hints, dataclasses, protocols
Week 6: Error handling, logging, testing basics

Phase 3: Advanced (Weeks 7-9)

Week 7: Async/await, concurrent programming
Week 8: Memory optimization, profiling
Week 9: Package structure, dependency management

Phase 4: Production Mastery (Weeks 10-12)

Week 10: CI/CD integration, linting, formatting
Week 11: Performance optimization patterns
Week 12: Production deployment patterns

Production Patterns

Configuration Management

from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    """Type-safe configuration with environment variable support."""
    database_url: str
    api_key: str
    batch_size: int = 1000
    debug: bool = False

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

@lru_cache
def get_settings() -> Settings:
    """Cached settings singleton."""
    return Settings()

Connection Pooling

from contextlib import contextmanager
from typing import Generator
import psycopg2
from psycopg2 import pool

class DatabasePool:
    """Thread-safe connection pool for PostgreSQL."""

    def __init__(self, dsn: str, min_conn: int = 2, max_conn: int = 10):
        self._pool = pool.ThreadedConnectionPool(min_conn, max_conn, dsn)

    @contextmanager
    def get_connection(self) -> Generator:
        conn = self._pool.getconn()
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            self._pool.putconn(conn)

    def close(self):
        self._pool.closeall()

Troubleshooting Guide

Common Failure Modes

Issue Symptoms Root Cause Fix
Memory Error MemoryError, process killed Loading full dataset into memory Use generators, chunked processing
Import Error ModuleNotFoundError Virtual env not activated, missing dep uv pip install, check sys.path
Type Error TypeError: unhashable type Using mutable as dict key Convert to tuple or use dataclass
Async Deadlock Program hangs Blocking call in async code Use asyncio.to_thread() for blocking ops
GIL Bottleneck CPU-bound parallelism slow Python GIL limits threads Use multiprocessing or ProcessPoolExecutor

Debug Checklist

# 1. Check Python version
python --version  # Should be 3.12+

# 2. Verify virtual environment
which python  # Should point to venv

# 3. Check installed packages
uv pip list | grep <package>

# 4. Run with verbose logging
python -m mymodule -v 2>&1 | tee debug.log

# 5. Profile memory usage
python -m memory_profiler script.py

# 6. Profile CPU
python -m cProfile -s cumtime script.py

Log Interpretation

# Structured logging for easier debugging
import structlog

logger = structlog.get_logger()

def process_batch(batch_id: str, records: list):
    logger.info("batch_started", batch_id=batch_id, record_count=len(records))
    try:
        # processing...
        logger.info("batch_completed", batch_id=batch_id, success=True)
    except Exception as e:
        logger.error("batch_failed", batch_id=batch_id, error=str(e), exc_info=True)
        raise

Unit Test Template

import pytest
from unittest.mock import Mock, patch
from your_module import process_records, DataRecord

class TestProcessRecords:
    """Unit tests following AAA pattern (Arrange-Act-Assert)."""

    def test_valid_records_processed(self):
        # Arrange
        input_data = [{"id": 1, "value": "10.5", "category": "A"}]

        # Act
        result = list(process_records(iter(input_data)))

        # Assert
        assert len(result) == 1
        assert result[0].id == 1
        assert result[0].value == 10.5

    def test_invalid_records_skipped(self):
        # Arrange
        input_data = [{"id": 1}]  # Missing 'value'

        # Act
        result = list(process_records(iter(input_data)))

        # Assert
        assert len(result) == 0

    def test_negative_value_raises_error(self):
        # Arrange & Act & Assert
        with pytest.raises(ValueError, match="non-negative"):
            DataRecord(id=1, value=-5.0, category="A")

    @patch('your_module.external_api_call')
    def test_with_mocked_dependency(self, mock_api):
        # Arrange
        mock_api.return_value = {"status": "ok"}

        # Act
        result = function_using_api()

        # Assert
        mock_api.assert_called_once()
        assert result["status"] == "ok"

Best Practices

Code Style (2025 Standards)

# ✅ DO: Use type hints everywhere
def calculate_metrics(data: list[float]) -> dict[str, float]: ...

# ✅ DO: Prefer composition over inheritance
@dataclass
class Pipeline:
    reader: DataReader
    transformer: Transformer
    writer: DataWriter

# ✅ DO: Use context managers for resources
with open_connection() as conn:
    process(conn)

# ❌ DON'T: Use bare except
try: ...
except: pass  # Never do this

# ❌ DON'T: Mutate function arguments
def process(items: list) -> list:
    items.append("new")  # Avoid this
    return items.copy()  # Return new list instead

Performance Tips

# ✅ Use generators for large data
def process_large_file(path):
    with open(path) as f:
        for line in f:  # Memory efficient
            yield transform(line)

# ✅ Use set/dict for O(1) lookups
valid_ids = set(load_valid_ids())  # Not list
if item_id in valid_ids: ...

# ✅ Use local variables in hot loops
def hot_loop(items):
    local_func = expensive_lookup  # Cache reference
    for item in items:
        local_func(item)

Resources

Official Documentation

Production References

Community

Next Skills

After mastering Python programming:

  • sql-databases - Query and manage relational data
  • etl-tools - Build data pipelines with Airflow
  • big-data - Scale with Spark and distributed systems
  • machine-learning - Apply ML with scikit-learn

Skill Certification Checklist:

  • Can write type-safe Python with mypy validation
  • Can implement generators for large data processing
  • Can use async/await for concurrent I/O
  • Can write comprehensive unit tests with pytest
  • Can profile and optimize Python performance