| name | csv-processor |
| description | Parse, transform, and analyze CSV files with advanced data manipulation capabilities. |
CSV Processor Skill
Parse, transform, and analyze CSV files with advanced data manipulation capabilities.
Instructions
You are a CSV processing expert. When invoked:
Parse CSV Files:
- Auto-detect delimiters (comma, tab, semicolon, pipe)
- Handle different encodings (UTF-8, Latin-1, Windows-1252)
- Process quoted fields and escaped characters
- Handle multi-line fields correctly
- Detect and use header rows
Transform Data:
- Filter rows based on conditions
- Select specific columns
- Sort and group data
- Merge multiple CSV files
- Split large files into smaller chunks
- Pivot and unpivot data
Clean Data:
- Remove duplicates
- Handle missing values
- Trim whitespace
- Normalize data formats
- Fix encoding issues
- Validate data types
Analyze Data:
- Generate statistics (sum, average, min, max, count)
- Identify data quality issues
- Detect outliers
- Profile column data types
- Calculate distributions
Usage Examples
@csv-processor data.csv
@csv-processor --filter "age > 30"
@csv-processor --select "name,email,age"
@csv-processor --merge file1.csv file2.csv
@csv-processor --stats
@csv-processor --clean --remove-duplicates
Basic CSV Operations
Reading CSV Files
Python (pandas)
import pandas as pd
# Basic read
df = pd.read_csv('data.csv')
# Custom delimiter
df = pd.read_csv('data.tsv', delimiter='\t')
# Specify encoding
df = pd.read_csv('data.csv', encoding='latin-1')
# Skip rows
df = pd.read_csv('data.csv', skiprows=2)
# Select specific columns
df = pd.read_csv('data.csv', usecols=['name', 'email', 'age'])
# Parse dates
df = pd.read_csv('data.csv', parse_dates=['created_at', 'updated_at'])
# Handle missing values
df = pd.read_csv('data.csv', na_values=['NA', 'N/A', 'null', ''])
# Specify data types
df = pd.read_csv('data.csv', dtype={
'user_id': int,
'age': int,
'score': float,
'active': bool
})
JavaScript (csv-parser)
const fs = require('fs');
const csv = require('csv-parser');
// Basic parsing
const results = [];
fs.createReadStream('data.csv')
.pipe(csv())
.on('data', (row) => {
results.push(row);
})
.on('end', () => {
console.log(`Processed ${results.length} rows`);
});
// With custom options
const Papa = require('papaparse');
Papa.parse(fs.createReadStream('data.csv'), {
header: true,
delimiter: ',',
skipEmptyLines: true,
transformHeader: (header) => header.trim().toLowerCase(),
complete: (results) => {
console.log('Parsed:', results.data);
}
});
Python (csv module)
import csv
# Basic reading
with open('data.csv', 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
print(row['name'], row['age'])
# Custom delimiter
with open('data.csv', 'r') as file:
reader = csv.reader(file, delimiter='\t')
for row in reader:
print(row)
# Handle different dialects
with open('data.csv', 'r') as file:
dialect = csv.Sniffer().sniff(file.read(1024))
file.seek(0)
reader = csv.reader(file, dialect)
for row in reader:
print(row)
Writing CSV Files
Python (pandas)
# Basic write
df.to_csv('output.csv', index=False)
# Custom delimiter
df.to_csv('output.tsv', sep='\t', index=False)
# Specify encoding
df.to_csv('output.csv', encoding='utf-8-sig', index=False)
# Write only specific columns
df[['name', 'email']].to_csv('output.csv', index=False)
# Append to existing file
df.to_csv('output.csv', mode='a', header=False, index=False)
# Quote all fields
df.to_csv('output.csv', quoting=csv.QUOTE_ALL, index=False)
JavaScript (csv-writer)
const createCsvWriter = require('csv-writer').createObjectCsvWriter;
const csvWriter = createCsvWriter({
path: 'output.csv',
header: [
{id: 'name', title: 'Name'},
{id: 'email', title: 'Email'},
{id: 'age', title: 'Age'}
]
});
const records = [
{name: 'John Doe', email: 'john@example.com', age: 30},
{name: 'Jane Smith', email: 'jane@example.com', age: 25}
];
csvWriter.writeRecords(records)
.then(() => console.log('CSV file written successfully'));
Data Transformation Patterns
Filtering Rows
Python (pandas)
# Single condition
filtered = df[df['age'] > 30]
# Multiple conditions (AND)
filtered = df[(df['age'] > 30) & (df['country'] == 'USA')]
# Multiple conditions (OR)
filtered = df[(df['age'] < 18) | (df['age'] > 65)]
# String operations
filtered = df[df['email'].str.contains('@gmail.com')]
filtered = df[df['name'].str.startswith('John')]
# Is in list
filtered = df[df['country'].isin(['USA', 'Canada', 'Mexico'])]
# Not null values
filtered = df[df['email'].notna()]
# Complex conditions
filtered = df.query('age > 30 and country == "USA" and active == True')
JavaScript
// Filter with arrow function
const filtered = data.filter(row => row.age > 30);
// Multiple conditions
const filtered = data.filter(row =>
row.age > 30 && row.country === 'USA'
);
// String operations
const filtered = data.filter(row =>
row.email.includes('@gmail.com')
);
// Complex filtering
const filtered = data.filter(row => {
const age = parseInt(row.age);
return age >= 18 && age <= 65 && row.active === 'true';
});
Selecting Columns
Python (pandas)
# Select single column
names = df['name']
# Select multiple columns
subset = df[['name', 'email', 'age']]
# Select by column type
numeric_cols = df.select_dtypes(include=['int64', 'float64'])
string_cols = df.select_dtypes(include=['object'])
# Select columns matching pattern
email_cols = df.filter(regex='.*email.*')
# Drop columns
df_without = df.drop(['temporary', 'unused'], axis=1)
# Rename columns
df_renamed = df.rename(columns={
'old_name': 'new_name',
'email_address': 'email'
})
JavaScript
// Map to select columns
const subset = data.map(row => ({
name: row.name,
email: row.email,
age: row.age
}));
// Destructuring
const subset = data.map(({name, email, age}) => ({name, email, age}));
// Dynamic column selection
const columns = ['name', 'email', 'age'];
const subset = data.map(row =>
Object.fromEntries(
columns.map(col => [col, row[col]])
)
);
Sorting Data
Python (pandas)
# Sort by single column
sorted_df = df.sort_values('age')
# Sort descending
sorted_df = df.sort_values('age', ascending=False)
# Sort by multiple columns
sorted_df = df.sort_values(['country', 'age'], ascending=[True, False])
# Sort by index
sorted_df = df.sort_index()
JavaScript
// Sort by single field
const sorted = data.sort((a, b) => a.age - b.age);
// Sort descending
const sorted = data.sort((a, b) => b.age - a.age);
// Sort by string
const sorted = data.sort((a, b) => a.name.localeCompare(b.name));
// Sort by multiple fields
const sorted = data.sort((a, b) => {
if (a.country !== b.country) {
return a.country.localeCompare(b.country);
}
return b.age - a.age;
});
Grouping and Aggregation
Python (pandas)
# Group by single column
grouped = df.groupby('country')
# Count by group
counts = df.groupby('country').size()
# Multiple aggregations
stats = df.groupby('country').agg({
'age': ['mean', 'min', 'max'],
'salary': ['sum', 'mean'],
'user_id': 'count'
})
# Group by multiple columns
grouped = df.groupby(['country', 'city']).agg({
'revenue': 'sum',
'user_id': 'count'
})
# Custom aggregation
df.groupby('country').apply(lambda x: x['salary'].max() - x['salary'].min())
# Pivot table
pivot = df.pivot_table(
values='revenue',
index='country',
columns='year',
aggfunc='sum',
fill_value=0
)
JavaScript (lodash)
const _ = require('lodash');
// Group by field
const grouped = _.groupBy(data, 'country');
// Count by group
const counts = _.mapValues(
_.groupBy(data, 'country'),
group => group.length
);
// Sum by group
const sums = _.mapValues(
_.groupBy(data, 'country'),
group => _.sumBy(group, row => parseFloat(row.salary))
);
// Multiple aggregations
const stats = Object.entries(_.groupBy(data, 'country')).map(([country, rows]) => ({
country,
count: rows.length,
avgAge: _.meanBy(rows, row => parseInt(row.age)),
totalSalary: _.sumBy(rows, row => parseFloat(row.salary))
}));
Merging CSV Files
Python (pandas)
# Concatenate vertically (stack rows)
df1 = pd.read_csv('file1.csv')
df2 = pd.read_csv('file2.csv')
combined = pd.concat([df1, df2], ignore_index=True)
# Join (SQL-like merge)
users = pd.read_csv('users.csv')
orders = pd.read_csv('orders.csv')
# Inner join
merged = pd.merge(users, orders, on='user_id', how='inner')
# Left join
merged = pd.merge(users, orders, on='user_id', how='left')
# Multiple keys
merged = pd.merge(
users, orders,
left_on='id',
right_on='user_id',
how='left'
)
# Merge with different column names
merged = pd.merge(
users, orders,
left_on='user_id',
right_on='customer_id',
how='inner'
)
JavaScript
// Concatenate arrays
const file1 = parseCSV('file1.csv');
const file2 = parseCSV('file2.csv');
const combined = [...file1, ...file2];
// Join arrays (like SQL)
function leftJoin(left, right, leftKey, rightKey) {
return left.map(leftRow => {
const rightRow = right.find(r => r[rightKey] === leftRow[leftKey]);
return {...leftRow, ...rightRow};
});
}
const merged = leftJoin(users, orders, 'id', 'user_id');
Data Cleaning Operations
Remove Duplicates
Python (pandas)
# Remove duplicate rows
df_unique = df.drop_duplicates()
# Based on specific columns
df_unique = df.drop_duplicates(subset=['email'])
# Keep first or last occurrence
df_unique = df.drop_duplicates(subset=['email'], keep='first')
df_unique = df.drop_duplicates(subset=['email'], keep='last')
# Identify duplicates
duplicates = df[df.duplicated()]
duplicate_emails = df[df.duplicated(subset=['email'])]
Handle Missing Values
Python (pandas)
# Check for missing values
missing_count = df.isnull().sum()
missing_percent = (df.isnull().sum() / len(df)) * 100
# Drop rows with any missing values
df_clean = df.dropna()
# Drop rows where specific column is missing
df_clean = df.dropna(subset=['email'])
# Drop columns with too many missing values
df_clean = df.dropna(axis=1, thresh=len(df)*0.7)
# Fill missing values
df_filled = df.fillna(0)
df_filled = df.fillna({'age': 0, 'country': 'Unknown'})
# Forward fill
df_filled = df.fillna(method='ffill')
# Fill with mean/median
df['age'].fillna(df['age'].mean(), inplace=True)
df['age'].fillna(df['age'].median(), inplace=True)
# Interpolate
df['value'].interpolate(method='linear', inplace=True)
JavaScript
// Filter out rows with missing values
const cleaned = data.filter(row =>
row.email && row.name && row.age
);
// Fill missing values
const filled = data.map(row => ({
...row,
age: row.age || 0,
country: row.country || 'Unknown'
}));
Data Validation
Python (pandas)
# Validate email format
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
df['email_valid'] = df['email'].str.match(email_pattern)
# Validate age range
df['age_valid'] = df['age'].between(0, 120)
# Validate required fields
df['valid'] = df[['name', 'email', 'age']].notna().all(axis=1)
# Check data types
def validate_types(df):
errors = []
# Check numeric columns
for col in ['age', 'salary', 'score']:
if col in df.columns:
if not pd.api.types.is_numeric_dtype(df[col]):
errors.append(f"{col} should be numeric")
# Check date columns
for col in ['created_at', 'updated_at']:
if col in df.columns:
try:
pd.to_datetime(df[col])
except:
errors.append(f"{col} has invalid dates")
return errors
# Remove invalid rows
df_valid = df[df['email_valid'] & df['age_valid']]
Data Normalization
Python (pandas)
# Trim whitespace
df['name'] = df['name'].str.strip()
df['email'] = df['email'].str.strip()
# Convert to lowercase
df['email'] = df['email'].str.lower()
# Standardize phone numbers
df['phone'] = df['phone'].str.replace(r'[^0-9]', '', regex=True)
# Standardize dates
df['created_at'] = pd.to_datetime(df['created_at'])
# Standardize country names
country_mapping = {
'USA': 'United States',
'US': 'United States',
'United States of America': 'United States',
'UK': 'United Kingdom'
}
df['country'] = df['country'].replace(country_mapping)
# Convert data types
df['age'] = pd.to_numeric(df['age'], errors='coerce')
df['active'] = df['active'].astype(bool)
df['score'] = df['score'].astype(float)
Data Analysis Operations
Statistical Summary
Python (pandas)
# Basic statistics
print(df.describe())
# Statistics for all columns (including non-numeric)
print(df.describe(include='all'))
# Specific statistics
print(f"Mean age: {df['age'].mean()}")
print(f"Median age: {df['age'].median()}")
print(f"Std dev: {df['age'].std()}")
print(f"Min: {df['age'].min()}")
print(f"Max: {df['age'].max()}")
# Count values
print(df['country'].value_counts())
# Percentage distribution
print(df['country'].value_counts(normalize=True) * 100)
# Cross-tabulation
cross_tab = pd.crosstab(df['country'], df['active'])
# Correlation matrix
correlation = df[['age', 'salary', 'score']].corr()
Data Profiling
Python (pandas)
def profile_dataframe(df):
"""Generate comprehensive data profile"""
profile = {
'shape': df.shape,
'columns': list(df.columns),
'dtypes': df.dtypes.to_dict(),
'memory_usage': df.memory_usage(deep=True).sum() / 1024**2, # MB
'missing_values': df.isnull().sum().to_dict(),
'missing_percent': (df.isnull().sum() / len(df) * 100).to_dict(),
'duplicates': df.duplicated().sum(),
'numeric_summary': df.describe().to_dict(),
'unique_counts': df.nunique().to_dict()
}
# Column-specific analysis
for col in df.columns:
profile[f'{col}_sample'] = df[col].head(5).tolist()
if df[col].dtype == 'object':
profile[f'{col}_top_values'] = df[col].value_counts().head(10).to_dict()
if pd.api.types.is_numeric_dtype(df[col]):
profile[f'{col}_outliers'] = detect_outliers(df[col])
return profile
def detect_outliers(series):
"""Detect outliers using IQR method"""
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = series[(series < lower_bound) | (series > upper_bound)]
return {
'count': len(outliers),
'percent': (len(outliers) / len(series)) * 100,
'values': outliers.tolist()
}
Generate Report
def generate_csv_report(df, filename='report.md'):
"""Generate comprehensive analysis report"""
report = f"""# CSV Analysis Report
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## Dataset Overview
- **Rows**: {len(df):,}
- **Columns**: {len(df.columns)}
- **Memory Usage**: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB
- **Duplicates**: {df.duplicated().sum():,}
## Column Summary
| Column | Type | Non-Null | Unique | Missing % |
|--------|------|----------|--------|-----------|
"""
for col in df.columns:
dtype = str(df[col].dtype)
non_null = df[col].count()
unique = df[col].nunique()
missing_pct = (df[col].isnull().sum() / len(df)) * 100
report += f"| {col} | {dtype} | {non_null:,} | {unique:,} | {missing_pct:.1f}% |\n"
report += "\n## Numeric Columns Statistics\n\n"
report += df.describe().to_markdown()
report += "\n\n## Data Quality Issues\n\n"
# Missing values
missing = df.isnull().sum()
if missing.sum() > 0:
report += "### Missing Values\n"
for col, count in missing[missing > 0].items():
pct = (count / len(df)) * 100
report += f"- **{col}**: {count:,} ({pct:.1f}%)\n"
# Duplicates
if df.duplicated().sum() > 0:
report += f"\n### Duplicates\n"
report += f"- Found {df.duplicated().sum():,} duplicate rows\n"
# Write report
with open(filename, 'w') as f:
f.write(report)
print(f"Report generated: {filename}")
Advanced Operations
Splitting Large CSV Files
def split_csv(input_file, rows_per_file=10000):
"""Split large CSV into smaller chunks"""
chunk_num = 0
for chunk in pd.read_csv(input_file, chunksize=rows_per_file):
output_file = f"{input_file.rsplit('.', 1)[0]}_part{chunk_num}.csv"
chunk.to_csv(output_file, index=False)
print(f"Created {output_file} with {len(chunk)} rows")
chunk_num += 1
Pivot and Unpivot
# Pivot (wide format)
pivot = df.pivot_table(
values='revenue',
index='product',
columns='month',
aggfunc='sum'
)
# Unpivot (long format)
melted = df.melt(
id_vars=['product', 'category'],
value_vars=['jan', 'feb', 'mar'],
var_name='month',
value_name='revenue'
)
Data Type Conversion
# Convert columns
df['age'] = pd.to_numeric(df['age'], errors='coerce')
df['created_at'] = pd.to_datetime(df['created_at'])
df['active'] = df['active'].astype(bool)
# Parse custom date formats
df['date'] = pd.to_datetime(df['date'], format='%d/%m/%Y')
# Handle mixed types
df['mixed'] = df['mixed'].astype(str)
Performance Optimization
Reading Large Files Efficiently
# Read in chunks
chunk_size = 10000
chunks = []
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
# Process chunk
processed = chunk[chunk['active'] == True]
chunks.append(processed)
result = pd.concat(chunks, ignore_index=True)
# Read only needed columns
df = pd.read_csv('large_file.csv', usecols=['name', 'email', 'age'])
# Use appropriate dtypes
df = pd.read_csv('large_file.csv', dtype={
'id': 'int32', # instead of int64
'age': 'int8', # small integers
'category': 'category' # categorical data
})
Writing Large Files
# Write in chunks
chunk_size = 10000
for i in range(0, len(df), chunk_size):
chunk = df.iloc[i:i+chunk_size]
mode = 'w' if i == 0 else 'a'
header = i == 0
chunk.to_csv('output.csv', mode=mode, header=header, index=False)
Command Line Tools
Using csvkit
# View CSV structure
csvcut -n data.csv
# Filter columns
csvcut -c name,email,age data.csv > subset.csv
# Filter rows
csvgrep -c age -r "^[3-9][0-9]$" data.csv > age_30plus.csv
# Convert to JSON
csvjson data.csv > data.json
# Statistics
csvstat data.csv
# SQL queries on CSV
csvsql --query "SELECT country, COUNT(*) FROM data GROUP BY country" data.csv
Using awk
# Print specific columns
awk -F',' '{print $1, $3}' data.csv
# Filter rows
awk -F',' '$3 > 30' data.csv
# Sum column
awk -F',' '{sum+=$3} END {print sum}' data.csv
Best Practices
- Always validate data before processing
- Use appropriate data types to save memory
- Handle encoding issues early in the process
- Profile data first to understand structure
- Use chunks for large files
- Back up original files before transformations
- Document transformations for reproducibility
- Validate output after processing
- Use version control for CSV processing scripts
- Test with sample data before processing full datasets
Common Issues and Solutions
Issue: Encoding Errors
# Try different encodings
for encoding in ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']:
try:
df = pd.read_csv('data.csv', encoding=encoding)
print(f"Success with encoding: {encoding}")
break
except UnicodeDecodeError:
continue
Issue: Delimiter Detection
# Auto-detect delimiter
with open('data.csv', 'r') as file:
sample = file.read(1024)
sniffer = csv.Sniffer()
delimiter = sniffer.sniff(sample).delimiter
df = pd.read_csv('data.csv', delimiter=delimiter)
Issue: Memory Errors
# Use chunking
chunks = []
for chunk in pd.read_csv('large.csv', chunksize=10000):
# Process and filter
processed = chunk[chunk['keep'] == True]
chunks.append(processed)
df = pd.concat(chunks, ignore_index=True)
Notes
- Always inspect CSV structure before processing
- Test transformations on a small sample first
- Consider using databases for very large datasets
- Document column meanings and data types
- Use consistent date and number formats
- Validate data quality regularly
- Keep processing scripts version controlled