Claude Code Plugins

Community-maintained marketplace

Feedback

data-pipeline-processor

@vamseeachanta/workspace-hub
0
0

Process data files through transformation pipelines with validation, cleaning, and export. Use for CSV/Excel/JSON data processing, encoding handling, batch operations, and data transformation workflows.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name data-pipeline-processor
description Process data files through transformation pipelines with validation, cleaning, and export. Use for CSV/Excel/JSON data processing, encoding handling, batch operations, and data transformation workflows.
version 1.1.0
category development
related_skills yaml-workflow-executor, engineering-report-generator, parallel-file-processor

Data Pipeline Processor

Version: 1.1.0 Category: Development Last Updated: 2026-01-02

Process data files through transformation pipelines with validation, encoding detection, and multi-format export capabilities.

Quick Start

import pandas as pd
from pathlib import Path

# Simple pipeline: Load -> Transform -> Export
df = pd.read_csv("data/raw/source.csv")

# Transform
df = df[df['value'] > 0]  # Filter
df['date'] = pd.to_datetime(df['date'])  # Convert types
df = df.sort_values('date')  # Sort

# Export
Path("data/processed").mkdir(parents=True, exist_ok=True)
df.to_csv("data/processed/cleaned.csv", index=False)

print(f"Processed {len(df)} rows")

When to Use

  • Processing CSV/Excel/JSON files with validation
  • Data cleaning and transformation workflows
  • Batch file processing with aggregation
  • Handling encoding issues (UTF-8, Latin-1 fallback)
  • ETL (Extract, Transform, Load) operations
  • Data quality checks and reporting

Core Pattern

Input (CSV/Excel/JSON) -> Validate -> Transform -> Analyze -> Export

Implementation

Data Reader with Encoding Detection

import pandas as pd
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import logging
import chardet

logger = logging.getLogger(__name__)


class DataReader:
    """Read data files with automatic encoding detection."""

    SUPPORTED_FORMATS = ['csv', 'xlsx', 'xls', 'json', 'parquet']

    def __init__(self, encoding_fallback: List[str] = None):
        """
        Initialize data reader.

        Args:
            encoding_fallback: List of encodings to try in order
        """
        self.encoding_fallback = encoding_fallback or ['utf-8', 'latin-1', 'cp1252']

    def read(self, file_path: str, **kwargs) -> pd.DataFrame:
        """
        Read data file with automatic format and encoding detection.

        Args:
            file_path: Path to data file
            **kwargs: Additional arguments for pandas readers

        Returns:
            DataFrame with loaded data
        """
        path = Path(file_path)
        suffix = path.suffix.lower().lstrip('.')

        if suffix == 'csv':
            return self._read_csv(path, **kwargs)
        elif suffix in ['xlsx', 'xls']:
            return self._read_excel(path, **kwargs)
        elif suffix == 'json':
            return pd.read_json(path, **kwargs)
        elif suffix == 'parquet':
            return pd.read_parquet(path, **kwargs)
        else:
            raise ValueError(f"Unsupported format: {suffix}")

    def _read_csv(self, path: Path, **kwargs) -> pd.DataFrame:
        """Read CSV with encoding fallback."""
        # Try to detect encoding
        with open(path, 'rb') as f:
            raw = f.read(10000)
            detected = chardet.detect(raw)
            detected_encoding = detected.get('encoding', 'utf-8')

        # Try detected encoding first, then fallbacks
        encodings_to_try = [detected_encoding] + self.encoding_fallback

        for encoding in encodings_to_try:
            try:
                df = pd.read_csv(path, encoding=encoding, **kwargs)
                logger.info(f"Successfully read {path} with encoding: {encoding}")
                return df
            except UnicodeDecodeError:
                continue

        raise ValueError(f"Could not decode {path} with any encoding")

    def _read_excel(self, path: Path, **kwargs) -> pd.DataFrame:
        """Read Excel file."""
        return pd.read_excel(path, **kwargs)

Data Validator

from dataclasses import dataclass, field
from typing import Callable, List, Dict, Any


@dataclass
class ValidationResult:
    """Result of data validation."""
    is_valid: bool
    errors: List[str] = field(default_factory=list)
    warnings: List[str] = field(default_factory=list)
    stats: Dict[str, Any] = field(default_factory=dict)


class DataValidator:
    """Validate data against configurable rules."""

    def __init__(self):
        self.rules: List[Callable] = []

    def add_rule(self, rule: Callable[[pd.DataFrame], ValidationResult]):
        """Add a validation rule."""
        self.rules.append(rule)

    def validate(self, df: pd.DataFrame) -> ValidationResult:
        """Run all validation rules."""
        all_errors = []
        all_warnings = []
        all_stats = {}

        for rule in self.rules:
            result = rule(df)
            all_errors.extend(result.errors)
            all_warnings.extend(result.warnings)
            all_stats.update(result.stats)

        return ValidationResult(
            is_valid=len(all_errors) == 0,
            errors=all_errors,
            warnings=all_warnings,
            stats=all_stats
        )


# Common validation rules
def required_columns_rule(required: List[str]) -> Callable:
    """Validate required columns exist."""
    def rule(df: pd.DataFrame) -> ValidationResult:
        missing = [col for col in required if col not in df.columns]
        return ValidationResult(
            is_valid=len(missing) == 0,
            errors=[f"Missing required column: {col}" for col in missing],
            stats={'columns_found': len(df.columns)}
        )
    return rule


def no_duplicates_rule(subset: List[str] = None) -> Callable:
    """Validate no duplicate rows."""
    def rule(df: pd.DataFrame) -> ValidationResult:
        duplicates = df.duplicated(subset=subset).sum()
        return ValidationResult(
            is_valid=duplicates == 0,
            warnings=[f"Found {duplicates} duplicate rows"] if duplicates > 0 else [],
            stats={'duplicate_count': duplicates}
        )
    return rule


def non_null_rule(columns: List[str]) -> Callable:
    """Validate specified columns have no null values."""
    def rule(df: pd.DataFrame) -> ValidationResult:
        errors = []
        stats = {}
        for col in columns:
            if col in df.columns:
                null_count = df[col].isnull().sum()
                stats[f'{col}_nulls'] = null_count
                if null_count > 0:
                    errors.append(f"Column '{col}' has {null_count} null values")
        return ValidationResult(
            is_valid=len(errors) == 0,
            errors=errors,
            stats=stats
        )
    return rule

Data Transformer

class DataTransformer:
    """Apply transformations to data."""

    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()

    def rename_columns(self, mapping: Dict[str, str]) -> 'DataTransformer':
        """Rename columns."""
        self.df = self.df.rename(columns=mapping)
        return self

    def filter_rows(self, expression: str) -> 'DataTransformer':
        """Filter rows using query expression."""
        self.df = self.df.query(expression)
        return self

    def select_columns(self, columns: List[str]) -> 'DataTransformer':
        """Select specific columns."""
        self.df = self.df[columns]
        return self

    def drop_columns(self, columns: List[str]) -> 'DataTransformer':
        """Drop specified columns."""
        self.df = self.df.drop(columns=columns, errors='ignore')
        return self

    def fill_nulls(self, value: Any = None, method: str = None) -> 'DataTransformer':
        """Fill null values."""
        if method:
            self.df = self.df.fillna(method=method)
        else:
            self.df = self.df.fillna(value)
        return self

    def convert_types(self, type_mapping: Dict[str, str]) -> 'DataTransformer':
        """Convert column types."""
        for col, dtype in type_mapping.items():
            if col in self.df.columns:
                if dtype == 'datetime':
                    self.df[col] = pd.to_datetime(self.df[col])
                elif dtype == 'numeric':
                    self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
                else:
                    self.df[col] = self.df[col].astype(dtype)
        return self

    def add_column(self, name: str, expression: Callable) -> 'DataTransformer':
        """Add computed column."""
        self.df[name] = expression(self.df)
        return self

    def aggregate(self, group_by: List[str], agg_spec: Dict[str, Any]) -> 'DataTransformer':
        """Aggregate data by groups."""
        self.df = self.df.groupby(group_by).agg(agg_spec).reset_index()
        return self

    def sort(self, by: List[str], ascending: bool = True) -> 'DataTransformer':
        """Sort data."""
        self.df = self.df.sort_values(by=by, ascending=ascending)
        return self

    def get_result(self) -> pd.DataFrame:
        """Get transformed DataFrame."""
        return self.df

Data Exporter

class DataExporter:
    """Export data to various formats."""

    @staticmethod
    def to_csv(df: pd.DataFrame, path: str, **kwargs) -> str:
        """Export to CSV."""
        Path(path).parent.mkdir(parents=True, exist_ok=True)
        df.to_csv(path, index=False, **kwargs)
        return path

    @staticmethod
    def to_excel(df: pd.DataFrame, path: str, sheet_name: str = 'Sheet1', **kwargs) -> str:
        """Export to Excel."""
        Path(path).parent.mkdir(parents=True, exist_ok=True)
        df.to_excel(path, sheet_name=sheet_name, index=False, **kwargs)
        return path

    @staticmethod
    def to_json(df: pd.DataFrame, path: str, orient: str = 'records', **kwargs) -> str:
        """Export to JSON."""
        Path(path).parent.mkdir(parents=True, exist_ok=True)
        df.to_json(path, orient=orient, **kwargs)
        return path

    @staticmethod
    def to_parquet(df: pd.DataFrame, path: str, **kwargs) -> str:
        """Export to Parquet."""
        Path(path).parent.mkdir(parents=True, exist_ok=True)
        df.to_parquet(path, **kwargs)
        return path

Pipeline Orchestrator

from dataclasses import dataclass
from typing import List, Dict, Any, Optional


@dataclass
class PipelineConfig:
    """Configuration for data pipeline."""
    input_path: str
    output_path: str
    input_options: Dict[str, Any] = field(default_factory=dict)
    validation: Dict[str, Any] = field(default_factory=dict)
    transformations: List[Dict[str, Any]] = field(default_factory=list)
    output_format: str = 'csv'
    output_options: Dict[str, Any] = field(default_factory=dict)


class DataPipeline:
    """Orchestrate data processing pipeline."""

    def __init__(self, config: PipelineConfig):
        self.config = config
        self.reader = DataReader()
        self.validator = DataValidator()
        self.exporter = DataExporter()

    def _setup_validation(self):
        """Configure validation rules from config."""
        validation = self.config.validation

        if 'required_columns' in validation:
            self.validator.add_rule(
                required_columns_rule(validation['required_columns'])
            )

        if 'no_duplicates' in validation:
            self.validator.add_rule(
                no_duplicates_rule(validation.get('no_duplicates_subset'))
            )

        if 'non_null_columns' in validation:
            self.validator.add_rule(
                non_null_rule(validation['non_null_columns'])
            )

    def _apply_transformations(self, df: pd.DataFrame) -> pd.DataFrame:
        """Apply configured transformations."""
        transformer = DataTransformer(df)

        for transform in self.config.transformations:
            op = transform['operation']

            if op == 'rename':
                transformer.rename_columns(transform['mapping'])
            elif op == 'filter':
                transformer.filter_rows(transform['expression'])
            elif op == 'select':
                transformer.select_columns(transform['columns'])
            elif op == 'drop':
                transformer.drop_columns(transform['columns'])
            elif op == 'fill_nulls':
                transformer.fill_nulls(
                    value=transform.get('value'),
                    method=transform.get('method')
                )
            elif op == 'convert_types':
                transformer.convert_types(transform['types'])
            elif op == 'aggregate':
                transformer.aggregate(
                    group_by=transform['group_by'],
                    agg_spec=transform['aggregations']
                )
            elif op == 'sort':
                transformer.sort(
                    by=transform['by'],
                    ascending=transform.get('ascending', True)
                )

        return transformer.get_result()

    def run(self) -> Dict[str, Any]:
        """Execute the pipeline."""
        logger.info(f"Starting pipeline: {self.config.input_path}")

        # Read input
        df = self.reader.read(self.config.input_path, **self.config.input_options)
        logger.info(f"Loaded {len(df)} rows")

        # Validate
        self._setup_validation()
        validation_result = self.validator.validate(df)

        if not validation_result.is_valid:
            logger.error(f"Validation failed: {validation_result.errors}")
            return {
                'status': 'failed',
                'stage': 'validation',
                'errors': validation_result.errors,
                'warnings': validation_result.warnings
            }

        if validation_result.warnings:
            logger.warning(f"Validation warnings: {validation_result.warnings}")

        # Transform
        df = self._apply_transformations(df)
        logger.info(f"Transformed to {len(df)} rows")

        # Export
        export_method = getattr(self.exporter, f'to_{self.config.output_format}')
        output_path = export_method(df, self.config.output_path, **self.config.output_options)
        logger.info(f"Exported to {output_path}")

        return {
            'status': 'success',
            'input_rows': len(df),
            'output_rows': len(df),
            'output_path': output_path,
            'validation_stats': validation_result.stats,
            'warnings': validation_result.warnings
        }

YAML Configuration Format

Basic Pipeline Config

# config/pipelines/data_clean.yaml

input:
  path: data/raw/source.csv
  options:
    delimiter: ","
    skiprows: 1

validation:
  required_columns:
    - id
    - timestamp
    - value
  non_null_columns:
    - id
    - value
  no_duplicates: true
  no_duplicates_subset:
    - id

transformations:
  - operation: rename
    mapping:
      old_name: new_name
      date_col: timestamp

  - operation: filter
    expression: "value > 0 and status != 'invalid'"

  - operation: convert_types
    types:
      timestamp: datetime
      value: numeric

  - operation: fill_nulls
    value: 0

  - operation: sort
    by: [timestamp]
    ascending: true

output:
  path: data/processed/cleaned.csv
  format: csv
  options:
    index: false

Aggregation Pipeline

# config/pipelines/monthly_summary.yaml

input:
  path: data/processed/daily_data.csv

validation:
  required_columns:
    - date
    - category
    - amount

transformations:
  - operation: convert_types
    types:
      date: datetime

  - operation: aggregate
    group_by: [category]
    aggregations:
      amount:
        - sum
        - mean
        - count

  - operation: rename
    mapping:
      amount_sum: total_amount
      amount_mean: average_amount
      amount_count: transaction_count

  - operation: sort
    by: [total_amount]
    ascending: false

output:
  path: data/results/monthly_summary.csv
  format: csv

Usage Examples

Example 1: Simple CSV Processing

# Process CSV with config
python -m data_pipeline config/pipelines/clean_data.yaml

# Override input/output
python -m data_pipeline config/pipelines/clean_data.yaml \
    --input data/custom_input.csv \
    --output data/custom_output.csv

# Dry run (validate only)
python -m data_pipeline config/pipelines/clean_data.yaml --dry-run

Example 2: Programmatic Usage

from data_pipeline import DataPipeline, PipelineConfig

config = PipelineConfig(
    input_path='data/raw/sales.csv',
    output_path='data/processed/sales_clean.csv',
    validation={
        'required_columns': ['date', 'product', 'amount'],
        'non_null_columns': ['amount']
    },
    transformations=[
        {'operation': 'filter', 'expression': 'amount > 0'},
        {'operation': 'sort', 'by': ['date']}
    ]
)

pipeline = DataPipeline(config)
result = pipeline.run()
print(f"Processed {result['output_rows']} rows")

Example 3: Batch Processing

from pathlib import Path
from data_pipeline import DataReader, DataTransformer, DataExporter

reader = DataReader()
exporter = DataExporter()

# Process all CSV files in directory
input_dir = Path('data/raw/')
output_dir = Path('data/processed/')

for csv_file in input_dir.glob('*.csv'):
    df = reader.read(str(csv_file))

    # Apply transformations
    df_clean = (DataTransformer(df)
        .fill_nulls(value=0)
        .filter_rows('value > 0')
        .sort(['timestamp'])
        .get_result())

    # Export
    output_path = output_dir / csv_file.name
    exporter.to_csv(df_clean, str(output_path))
    print(f"Processed: {csv_file.name}")

Example 4: Multi-Format Export

def export_all_formats(df: pd.DataFrame, base_path: str):
    """Export data to multiple formats."""
    exporter = DataExporter()

    outputs = {
        'csv': exporter.to_csv(df, f"{base_path}.csv"),
        'json': exporter.to_json(df, f"{base_path}.json"),
        'parquet': exporter.to_parquet(df, f"{base_path}.parquet"),
        'excel': exporter.to_excel(df, f"{base_path}.xlsx")
    }

    return outputs

Best Practices

Do

  1. Always detect encoding before reading CSV
  2. Use chunked reading for large files (>100MB)
  3. Specify dtypes to reduce memory usage
  4. Handle missing values explicitly
  5. Validate early in the pipeline
  6. Fail fast on critical errors
  7. Log warnings for non-critical issues
  8. Track validation statistics

Don't

  1. Assume encoding is always UTF-8
  2. Load entire large files into memory
  3. Skip validation steps
  4. Ignore encoding errors
  5. Mix transformation and validation

Data Reading

  • Always detect encoding before reading CSV
  • Use chunked reading for large files (>100MB)
  • Specify dtypes to reduce memory usage
  • Handle missing values explicitly

Validation

  • Validate early in the pipeline
  • Fail fast on critical errors
  • Log warnings for non-critical issues
  • Track validation statistics

Transformation

  • Use method chaining for readability
  • Apply filters before expensive operations
  • Convert types early to catch errors
  • Document transformation logic

Export

  • Create output directories automatically
  • Use appropriate formats (Parquet for large data)
  • Include metadata in output
  • Verify output integrity

File Organization

project/
    config/
        pipelines/           # Pipeline configs
            clean_data.yaml
            aggregate.yaml
    data/
        raw/                 # Raw input data
        processed/           # Cleaned data
        results/             # Analysis results
    src/
        data_pipeline/       # Pipeline code
    scripts/
        run_pipeline.sh      # CLI wrapper

Error Handling

Common Errors

Error Cause Solution
UnicodeDecodeError Wrong encoding Use DataReader with encoding fallback
KeyError Missing column Check column names in config
ValueError Type conversion failed Use errors='coerce' or validate first
MemoryError File too large Use chunked reading
FileNotFoundError Input file missing Verify file path

Error Template

def safe_pipeline_run(config: PipelineConfig) -> dict:
    """Run pipeline with comprehensive error handling."""
    try:
        # Validate input exists
        if not Path(config.input_path).exists():
            return {'status': 'error', 'stage': 'input', 'message': 'File not found'}

        pipeline = DataPipeline(config)
        return pipeline.run()

    except UnicodeDecodeError as e:
        return {'status': 'error', 'stage': 'read', 'message': f'Encoding error: {e}'}
    except KeyError as e:
        return {'status': 'error', 'stage': 'transform', 'message': f'Missing column: {e}'}
    except Exception as e:
        return {'status': 'error', 'stage': 'unknown', 'message': str(e)}

Execution Checklist

  • Input file exists and is readable
  • Encoding detected or specified
  • Required columns present
  • Validation rules configured
  • Transformations in correct order
  • Output directory exists or is created
  • Export format appropriate for data size
  • Error handling covers all failure modes
  • Logging configured for debugging

Metrics

Metric Target Description
Read Time <1s per 100MB Data loading speed
Validation Time <500ms Rule checking duration
Transform Time Varies Depends on operations
Export Time <1s per 100MB File writing speed
Memory Usage <2x file size Peak memory consumption

Related Skills


Version History

  • 1.1.0 (2026-01-02): Upgraded to SKILL_TEMPLATE_v2 format with Quick Start, Error Handling, Metrics, Execution Checklist, additional examples
  • 1.0.0 (2024-10-15): Initial release with DataReader, DataValidator, DataTransformer, pipeline orchestration