| name | data-cleaning |
| description | Clean, normalize, and validate scraped data for ETL pipelines. Use this skill when removing duplicates, handling missing values, normalizing text and dates, validating data formats, or preparing raw scraped data for storage or analysis. |
Data Cleaning
Clean and normalize scraped data for reliable ETL processing.
Common Cleaning Operations
Text Normalization
import re
import unicodedata
def clean_text(text: str) -> str:
"""Normalize and clean text content."""
if not text:
return ''
# Normalize unicode
text = unicodedata.normalize('NFKC', text)
# Remove extra whitespace
text = ' '.join(text.split())
# Strip leading/trailing whitespace
return text.strip()
def remove_html_tags(text: str) -> str:
"""Remove HTML tags from text."""
return re.sub(r'<[^>]+>', '', text)
def normalize_whitespace(text: str) -> str:
"""Replace multiple spaces/newlines with single space."""
return re.sub(r'\s+', ' ', text).strip()
Numeric Cleaning
import re
def clean_price(price_str: str) -> float:
"""Extract numeric price from string."""
if not price_str:
return None
# Remove currency symbols and commas
cleaned = re.sub(r'[^\d.]', '', price_str)
try:
return float(cleaned)
except ValueError:
return None
def clean_integer(value: str) -> int:
"""Extract integer from string."""
if not value:
return None
cleaned = re.sub(r'[^\d]', '', str(value))
return int(cleaned) if cleaned else None
Date Normalization
from datetime import datetime
from typing import Optional
DATE_FORMATS = [
'%Y-%m-%d',
'%d/%m/%Y',
'%m/%d/%Y',
'%B %d, %Y',
'%b %d, %Y',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
]
def parse_date(date_str: str) -> Optional[datetime]:
"""Parse date string trying multiple formats."""
if not date_str:
return None
date_str = date_str.strip()
for fmt in DATE_FORMATS:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
return None
def normalize_date(date_str: str, output_format: str = '%Y-%m-%d') -> str:
"""Normalize date to standard format."""
parsed = parse_date(date_str)
return parsed.strftime(output_format) if parsed else None
Deduplication
def deduplicate(records: list[dict], key_fields: list[str]) -> list[dict]:
"""Remove duplicate records based on key fields."""
seen = set()
unique = []
for record in records:
key = tuple(record.get(f) for f in key_fields)
if key not in seen:
seen.add(key)
unique.append(record)
return unique
def deduplicate_by_hash(records: list[dict]) -> list[dict]:
"""Remove exact duplicate records."""
import json
seen = set()
unique = []
for record in records:
record_hash = hash(json.dumps(record, sort_keys=True, default=str))
if record_hash not in seen:
seen.add(record_hash)
unique.append(record)
return unique
Missing Value Handling
def fill_missing(records: list[dict], defaults: dict) -> list[dict]:
"""Fill missing values with defaults."""
return [
{**defaults, **{k: v for k, v in r.items() if v is not None}}
for r in records
]
def drop_incomplete(records: list[dict], required_fields: list[str]) -> list[dict]:
"""Drop records missing required fields."""
return [
r for r in records
if all(r.get(f) not in (None, '', []) for f in required_fields)
]
Helper Script
Use clean_data.py for command-line data cleaning:
python clean_data.py input.json --dedupe id --output cleaned.json
python clean_data.py input.json --required name,email --normalize-dates date_field