Data Pipeline Processor
Version: 1.1.0
Category: Development
Last Updated: 2026-01-02
Process data files through transformation pipelines with validation, encoding detection, and multi-format export capabilities.
Quick Start
import pandas as pd
from pathlib import Path
# Simple pipeline: Load -> Transform -> Export
df = pd.read_csv("data/raw/source.csv")
# Transform
df = df[df['value'] > 0] # Filter
df['date'] = pd.to_datetime(df['date']) # Convert types
df = df.sort_values('date') # Sort
# Export
Path("data/processed").mkdir(parents=True, exist_ok=True)
df.to_csv("data/processed/cleaned.csv", index=False)
print(f"Processed {len(df)} rows")
When to Use
- Processing CSV/Excel/JSON files with validation
- Data cleaning and transformation workflows
- Batch file processing with aggregation
- Handling encoding issues (UTF-8, Latin-1 fallback)
- ETL (Extract, Transform, Load) operations
- Data quality checks and reporting
Core Pattern
Input (CSV/Excel/JSON) -> Validate -> Transform -> Analyze -> Export
Implementation
Data Reader with Encoding Detection
import pandas as pd
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import logging
import chardet
logger = logging.getLogger(__name__)
class DataReader:
"""Read data files with automatic encoding detection."""
SUPPORTED_FORMATS = ['csv', 'xlsx', 'xls', 'json', 'parquet']
def __init__(self, encoding_fallback: List[str] = None):
"""
Initialize data reader.
Args:
encoding_fallback: List of encodings to try in order
"""
self.encoding_fallback = encoding_fallback or ['utf-8', 'latin-1', 'cp1252']
def read(self, file_path: str, **kwargs) -> pd.DataFrame:
"""
Read data file with automatic format and encoding detection.
Args:
file_path: Path to data file
**kwargs: Additional arguments for pandas readers
Returns:
DataFrame with loaded data
"""
path = Path(file_path)
suffix = path.suffix.lower().lstrip('.')
if suffix == 'csv':
return self._read_csv(path, **kwargs)
elif suffix in ['xlsx', 'xls']:
return self._read_excel(path, **kwargs)
elif suffix == 'json':
return pd.read_json(path, **kwargs)
elif suffix == 'parquet':
return pd.read_parquet(path, **kwargs)
else:
raise ValueError(f"Unsupported format: {suffix}")
def _read_csv(self, path: Path, **kwargs) -> pd.DataFrame:
"""Read CSV with encoding fallback."""
# Try to detect encoding
with open(path, 'rb') as f:
raw = f.read(10000)
detected = chardet.detect(raw)
detected_encoding = detected.get('encoding', 'utf-8')
# Try detected encoding first, then fallbacks
encodings_to_try = [detected_encoding] + self.encoding_fallback
for encoding in encodings_to_try:
try:
df = pd.read_csv(path, encoding=encoding, **kwargs)
logger.info(f"Successfully read {path} with encoding: {encoding}")
return df
except UnicodeDecodeError:
continue
raise ValueError(f"Could not decode {path} with any encoding")
def _read_excel(self, path: Path, **kwargs) -> pd.DataFrame:
"""Read Excel file."""
return pd.read_excel(path, **kwargs)
Data Validator
from dataclasses import dataclass, field
from typing import Callable, List, Dict, Any
@dataclass
class ValidationResult:
"""Result of data validation."""
is_valid: bool
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
stats: Dict[str, Any] = field(default_factory=dict)
class DataValidator:
"""Validate data against configurable rules."""
def __init__(self):
self.rules: List[Callable] = []
def add_rule(self, rule: Callable[[pd.DataFrame], ValidationResult]):
"""Add a validation rule."""
self.rules.append(rule)
def validate(self, df: pd.DataFrame) -> ValidationResult:
"""Run all validation rules."""
all_errors = []
all_warnings = []
all_stats = {}
for rule in self.rules:
result = rule(df)
all_errors.extend(result.errors)
all_warnings.extend(result.warnings)
all_stats.update(result.stats)
return ValidationResult(
is_valid=len(all_errors) == 0,
errors=all_errors,
warnings=all_warnings,
stats=all_stats
)
# Common validation rules
def required_columns_rule(required: List[str]) -> Callable:
"""Validate required columns exist."""
def rule(df: pd.DataFrame) -> ValidationResult:
missing = [col for col in required if col not in df.columns]
return ValidationResult(
is_valid=len(missing) == 0,
errors=[f"Missing required column: {col}" for col in missing],
stats={'columns_found': len(df.columns)}
)
return rule
def no_duplicates_rule(subset: List[str] = None) -> Callable:
"""Validate no duplicate rows."""
def rule(df: pd.DataFrame) -> ValidationResult:
duplicates = df.duplicated(subset=subset).sum()
return ValidationResult(
is_valid=duplicates == 0,
warnings=[f"Found {duplicates} duplicate rows"] if duplicates > 0 else [],
stats={'duplicate_count': duplicates}
)
return rule
def non_null_rule(columns: List[str]) -> Callable:
"""Validate specified columns have no null values."""
def rule(df: pd.DataFrame) -> ValidationResult:
errors = []
stats = {}
for col in columns:
if col in df.columns:
null_count = df[col].isnull().sum()
stats[f'{col}_nulls'] = null_count
if null_count > 0:
errors.append(f"Column '{col}' has {null_count} null values")
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
stats=stats
)
return rule
Data Transformer
class DataTransformer:
"""Apply transformations to data."""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
def rename_columns(self, mapping: Dict[str, str]) -> 'DataTransformer':
"""Rename columns."""
self.df = self.df.rename(columns=mapping)
return self
def filter_rows(self, expression: str) -> 'DataTransformer':
"""Filter rows using query expression."""
self.df = self.df.query(expression)
return self
def select_columns(self, columns: List[str]) -> 'DataTransformer':
"""Select specific columns."""
self.df = self.df[columns]
return self
def drop_columns(self, columns: List[str]) -> 'DataTransformer':
"""Drop specified columns."""
self.df = self.df.drop(columns=columns, errors='ignore')
return self
def fill_nulls(self, value: Any = None, method: str = None) -> 'DataTransformer':
"""Fill null values."""
if method:
self.df = self.df.fillna(method=method)
else:
self.df = self.df.fillna(value)
return self
def convert_types(self, type_mapping: Dict[str, str]) -> 'DataTransformer':
"""Convert column types."""
for col, dtype in type_mapping.items():
if col in self.df.columns:
if dtype == 'datetime':
self.df[col] = pd.to_datetime(self.df[col])
elif dtype == 'numeric':
self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
else:
self.df[col] = self.df[col].astype(dtype)
return self
def add_column(self, name: str, expression: Callable) -> 'DataTransformer':
"""Add computed column."""
self.df[name] = expression(self.df)
return self
def aggregate(self, group_by: List[str], agg_spec: Dict[str, Any]) -> 'DataTransformer':
"""Aggregate data by groups."""
self.df = self.df.groupby(group_by).agg(agg_spec).reset_index()
return self
def sort(self, by: List[str], ascending: bool = True) -> 'DataTransformer':
"""Sort data."""
self.df = self.df.sort_values(by=by, ascending=ascending)
return self
def get_result(self) -> pd.DataFrame:
"""Get transformed DataFrame."""
return self.df
Data Exporter
class DataExporter:
"""Export data to various formats."""
@staticmethod
def to_csv(df: pd.DataFrame, path: str, **kwargs) -> str:
"""Export to CSV."""
Path(path).parent.mkdir(parents=True, exist_ok=True)
df.to_csv(path, index=False, **kwargs)
return path
@staticmethod
def to_excel(df: pd.DataFrame, path: str, sheet_name: str = 'Sheet1', **kwargs) -> str:
"""Export to Excel."""
Path(path).parent.mkdir(parents=True, exist_ok=True)
df.to_excel(path, sheet_name=sheet_name, index=False, **kwargs)
return path
@staticmethod
def to_json(df: pd.DataFrame, path: str, orient: str = 'records', **kwargs) -> str:
"""Export to JSON."""
Path(path).parent.mkdir(parents=True, exist_ok=True)
df.to_json(path, orient=orient, **kwargs)
return path
@staticmethod
def to_parquet(df: pd.DataFrame, path: str, **kwargs) -> str:
"""Export to Parquet."""
Path(path).parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(path, **kwargs)
return path
Pipeline Orchestrator
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
@dataclass
class PipelineConfig:
"""Configuration for data pipeline."""
input_path: str
output_path: str
input_options: Dict[str, Any] = field(default_factory=dict)
validation: Dict[str, Any] = field(default_factory=dict)
transformations: List[Dict[str, Any]] = field(default_factory=list)
output_format: str = 'csv'
output_options: Dict[str, Any] = field(default_factory=dict)
class DataPipeline:
"""Orchestrate data processing pipeline."""
def __init__(self, config: PipelineConfig):
self.config = config
self.reader = DataReader()
self.validator = DataValidator()
self.exporter = DataExporter()
def _setup_validation(self):
"""Configure validation rules from config."""
validation = self.config.validation
if 'required_columns' in validation:
self.validator.add_rule(
required_columns_rule(validation['required_columns'])
)
if 'no_duplicates' in validation:
self.validator.add_rule(
no_duplicates_rule(validation.get('no_duplicates_subset'))
)
if 'non_null_columns' in validation:
self.validator.add_rule(
non_null_rule(validation['non_null_columns'])
)
def _apply_transformations(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply configured transformations."""
transformer = DataTransformer(df)
for transform in self.config.transformations:
op = transform['operation']
if op == 'rename':
transformer.rename_columns(transform['mapping'])
elif op == 'filter':
transformer.filter_rows(transform['expression'])
elif op == 'select':
transformer.select_columns(transform['columns'])
elif op == 'drop':
transformer.drop_columns(transform['columns'])
elif op == 'fill_nulls':
transformer.fill_nulls(
value=transform.get('value'),
method=transform.get('method')
)
elif op == 'convert_types':
transformer.convert_types(transform['types'])
elif op == 'aggregate':
transformer.aggregate(
group_by=transform['group_by'],
agg_spec=transform['aggregations']
)
elif op == 'sort':
transformer.sort(
by=transform['by'],
ascending=transform.get('ascending', True)
)
return transformer.get_result()
def run(self) -> Dict[str, Any]:
"""Execute the pipeline."""
logger.info(f"Starting pipeline: {self.config.input_path}")
# Read input
df = self.reader.read(self.config.input_path, **self.config.input_options)
logger.info(f"Loaded {len(df)} rows")
# Validate
self._setup_validation()
validation_result = self.validator.validate(df)
if not validation_result.is_valid:
logger.error(f"Validation failed: {validation_result.errors}")
return {
'status': 'failed',
'stage': 'validation',
'errors': validation_result.errors,
'warnings': validation_result.warnings
}
if validation_result.warnings:
logger.warning(f"Validation warnings: {validation_result.warnings}")
# Transform
df = self._apply_transformations(df)
logger.info(f"Transformed to {len(df)} rows")
# Export
export_method = getattr(self.exporter, f'to_{self.config.output_format}')
output_path = export_method(df, self.config.output_path, **self.config.output_options)
logger.info(f"Exported to {output_path}")
return {
'status': 'success',
'input_rows': len(df),
'output_rows': len(df),
'output_path': output_path,
'validation_stats': validation_result.stats,
'warnings': validation_result.warnings
}
YAML Configuration Format
Basic Pipeline Config
# config/pipelines/data_clean.yaml
input:
path: data/raw/source.csv
options:
delimiter: ","
skiprows: 1
validation:
required_columns:
- id
- timestamp
- value
non_null_columns:
- id
- value
no_duplicates: true
no_duplicates_subset:
- id
transformations:
- operation: rename
mapping:
old_name: new_name
date_col: timestamp
- operation: filter
expression: "value > 0 and status != 'invalid'"
- operation: convert_types
types:
timestamp: datetime
value: numeric
- operation: fill_nulls
value: 0
- operation: sort
by: [timestamp]
ascending: true
output:
path: data/processed/cleaned.csv
format: csv
options:
index: false
Aggregation Pipeline
# config/pipelines/monthly_summary.yaml
input:
path: data/processed/daily_data.csv
validation:
required_columns:
- date
- category
- amount
transformations:
- operation: convert_types
types:
date: datetime
- operation: aggregate
group_by: [category]
aggregations:
amount:
- sum
- mean
- count
- operation: rename
mapping:
amount_sum: total_amount
amount_mean: average_amount
amount_count: transaction_count
- operation: sort
by: [total_amount]
ascending: false
output:
path: data/results/monthly_summary.csv
format: csv
Usage Examples
Example 1: Simple CSV Processing
# Process CSV with config
python -m data_pipeline config/pipelines/clean_data.yaml
# Override input/output
python -m data_pipeline config/pipelines/clean_data.yaml \
--input data/custom_input.csv \
--output data/custom_output.csv
# Dry run (validate only)
python -m data_pipeline config/pipelines/clean_data.yaml --dry-run
Example 2: Programmatic Usage
from data_pipeline import DataPipeline, PipelineConfig
config = PipelineConfig(
input_path='data/raw/sales.csv',
output_path='data/processed/sales_clean.csv',
validation={
'required_columns': ['date', 'product', 'amount'],
'non_null_columns': ['amount']
},
transformations=[
{'operation': 'filter', 'expression': 'amount > 0'},
{'operation': 'sort', 'by': ['date']}
]
)
pipeline = DataPipeline(config)
result = pipeline.run()
print(f"Processed {result['output_rows']} rows")
Example 3: Batch Processing
from pathlib import Path
from data_pipeline import DataReader, DataTransformer, DataExporter
reader = DataReader()
exporter = DataExporter()
# Process all CSV files in directory
input_dir = Path('data/raw/')
output_dir = Path('data/processed/')
for csv_file in input_dir.glob('*.csv'):
df = reader.read(str(csv_file))
# Apply transformations
df_clean = (DataTransformer(df)
.fill_nulls(value=0)
.filter_rows('value > 0')
.sort(['timestamp'])
.get_result())
# Export
output_path = output_dir / csv_file.name
exporter.to_csv(df_clean, str(output_path))
print(f"Processed: {csv_file.name}")
Example 4: Multi-Format Export
def export_all_formats(df: pd.DataFrame, base_path: str):
"""Export data to multiple formats."""
exporter = DataExporter()
outputs = {
'csv': exporter.to_csv(df, f"{base_path}.csv"),
'json': exporter.to_json(df, f"{base_path}.json"),
'parquet': exporter.to_parquet(df, f"{base_path}.parquet"),
'excel': exporter.to_excel(df, f"{base_path}.xlsx")
}
return outputs
Best Practices
Do
- Always detect encoding before reading CSV
- Use chunked reading for large files (>100MB)
- Specify dtypes to reduce memory usage
- Handle missing values explicitly
- Validate early in the pipeline
- Fail fast on critical errors
- Log warnings for non-critical issues
- Track validation statistics
Don't
- Assume encoding is always UTF-8
- Load entire large files into memory
- Skip validation steps
- Ignore encoding errors
- Mix transformation and validation
Data Reading
- Always detect encoding before reading CSV
- Use chunked reading for large files (>100MB)
- Specify dtypes to reduce memory usage
- Handle missing values explicitly
Validation
- Validate early in the pipeline
- Fail fast on critical errors
- Log warnings for non-critical issues
- Track validation statistics
Transformation
- Use method chaining for readability
- Apply filters before expensive operations
- Convert types early to catch errors
- Document transformation logic
Export
- Create output directories automatically
- Use appropriate formats (Parquet for large data)
- Include metadata in output
- Verify output integrity
File Organization
project/
config/
pipelines/ # Pipeline configs
clean_data.yaml
aggregate.yaml
data/
raw/ # Raw input data
processed/ # Cleaned data
results/ # Analysis results
src/
data_pipeline/ # Pipeline code
scripts/
run_pipeline.sh # CLI wrapper
Error Handling
Common Errors
| Error |
Cause |
Solution |
UnicodeDecodeError |
Wrong encoding |
Use DataReader with encoding fallback |
KeyError |
Missing column |
Check column names in config |
ValueError |
Type conversion failed |
Use errors='coerce' or validate first |
MemoryError |
File too large |
Use chunked reading |
FileNotFoundError |
Input file missing |
Verify file path |
Error Template
def safe_pipeline_run(config: PipelineConfig) -> dict:
"""Run pipeline with comprehensive error handling."""
try:
# Validate input exists
if not Path(config.input_path).exists():
return {'status': 'error', 'stage': 'input', 'message': 'File not found'}
pipeline = DataPipeline(config)
return pipeline.run()
except UnicodeDecodeError as e:
return {'status': 'error', 'stage': 'read', 'message': f'Encoding error: {e}'}
except KeyError as e:
return {'status': 'error', 'stage': 'transform', 'message': f'Missing column: {e}'}
except Exception as e:
return {'status': 'error', 'stage': 'unknown', 'message': str(e)}
Execution Checklist
Metrics
| Metric |
Target |
Description |
| Read Time |
<1s per 100MB |
Data loading speed |
| Validation Time |
<500ms |
Rule checking duration |
| Transform Time |
Varies |
Depends on operations |
| Export Time |
<1s per 100MB |
File writing speed |
| Memory Usage |
<2x file size |
Peak memory consumption |
Related Skills
Version History
- 1.1.0 (2026-01-02): Upgraded to SKILL_TEMPLATE_v2 format with Quick Start, Error Handling, Metrics, Execution Checklist, additional examples
- 1.0.0 (2024-10-15): Initial release with DataReader, DataValidator, DataTransformer, pipeline orchestration