| name | csv |
| description | Read, write, and transform CSV files for CRM data imports and exports. Use when processing CRM exports, preparing data for bulk import, converting between CSV formats, or handling tabular contact data. |
CSV
Provides patterns for CSV handling in CRM data sync pipelines.
Reading CSV Files
import csv
from pathlib import Path
from typing import List, Dict
def read_csv(filepath: str, encoding: str = "utf-8") -> List[Dict]:
"""Read CSV file as list of dictionaries."""
with open(filepath, newline="", encoding=encoding) as f:
reader = csv.DictReader(f)
return list(reader)
def read_csv_with_types(filepath: str, type_map: Dict[str, type]) -> List[Dict]:
"""
Read CSV with type conversion.
Usage:
data = read_csv_with_types("contacts.csv", {
"age": int,
"revenue": float,
"active": lambda x: x.lower() == "true"
})
"""
rows = read_csv(filepath)
for row in rows:
for field, converter in type_map.items():
if field in row and row[field]:
try:
row[field] = converter(row[field])
except (ValueError, TypeError):
pass
return rows
def detect_delimiter(filepath: str) -> str:
"""Auto-detect CSV delimiter."""
with open(filepath, "r") as f:
sample = f.read(4096)
try:
dialect = csv.Sniffer().sniff(sample, delimiters=",\t;|")
return dialect.delimiter
except csv.Error:
return ","
Writing CSV Files
def write_csv(filepath: str, data: List[Dict], fieldnames: List[str] = None):
"""Write list of dictionaries to CSV file."""
if not data:
return
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
fieldnames = fieldnames or list(data[0].keys())
with open(filepath, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
def append_csv(filepath: str, data: List[Dict]):
"""Append rows to existing CSV file."""
if not data:
return
file_exists = Path(filepath).exists()
fieldnames = list(data[0].keys())
with open(filepath, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerows(data)
Transforming CRM Data
def rename_columns(data: List[Dict], mapping: Dict[str, str]) -> List[Dict]:
"""
Rename columns for target CRM format.
Usage:
mapping = {
"Email": "email",
"First Name": "first_name",
"Last Name": "last_name"
}
transformed = rename_columns(contacts, mapping)
"""
return [
{mapping.get(k, k): v for k, v in row.items()}
for row in data
]
def select_columns(data: List[Dict], columns: List[str]) -> List[Dict]:
"""Select specific columns from data."""
return [{col: row.get(col) for col in columns} for row in data]
def filter_rows(data: List[Dict], condition) -> List[Dict]:
"""Filter rows by condition."""
return [row for row in data if condition(row)]
def map_values(data: List[Dict], column: str, mapping: Dict) -> List[Dict]:
"""
Map values in a column.
Usage:
# Convert status codes
mapping = {"A": "active", "I": "inactive", "P": "pending"}
transformed = map_values(contacts, "status", mapping)
"""
return [
{**row, column: mapping.get(row.get(column), row.get(column))}
for row in data
]
CRM Import/Export Formats
def convert_to_hubspot_format(data: List[Dict], field_mapping: Dict[str, str]) -> List[Dict]:
"""
Convert data to HubSpot import format.
Usage:
mapping = {
"email": "Email",
"first_name": "First Name",
"last_name": "Last Name",
"company": "Company Name"
}
hubspot_data = convert_to_hubspot_format(contacts, mapping)
"""
return [
{v: row.get(k, "") for k, v in field_mapping.items()}
for row in data
]
def convert_to_salesforce_format(data: List[Dict], field_mapping: Dict[str, str]) -> List[Dict]:
"""
Convert data to Salesforce import format.
Usage:
mapping = {
"email": "Email",
"first_name": "FirstName",
"last_name": "LastName",
"account": "Account.Name"
}
sf_data = convert_to_salesforce_format(contacts, mapping)
"""
return [
{v: row.get(k, "") for k, v in field_mapping.items()}
for row in data
]
Validation and Cleaning
import re
def validate_email_column(data: List[Dict], email_col: str = "email") -> tuple[List[Dict], List[Dict]]:
"""
Split data into valid and invalid email records.
Returns (valid_records, invalid_records)
"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
valid = []
invalid = []
for row in data:
email = row.get(email_col, "")
if re.match(pattern, str(email)):
valid.append(row)
else:
invalid.append(row)
return valid, invalid
def clean_data(data: List[Dict]) -> List[Dict]:
"""
Apply common cleaning operations.
- Strip whitespace from strings
- Lowercase emails
- Remove empty rows
"""
cleaned = []
for row in data:
# Skip empty rows
if not any(row.values()):
continue
cleaned_row = {}
for key, value in row.items():
if isinstance(value, str):
value = value.strip()
if key.lower() == "email":
value = value.lower()
cleaned_row[key] = value
cleaned.append(cleaned_row)
return cleaned
Merging CSV Files
def merge_csv_files(filepaths: List[str], output_path: str):
"""Merge multiple CSV files into one."""
all_data = []
all_fieldnames = set()
for filepath in filepaths:
data = read_csv(filepath)
all_data.extend(data)
if data:
all_fieldnames.update(data[0].keys())
fieldnames = sorted(all_fieldnames)
write_csv(output_path, all_data, fieldnames)
def join_csv_files(file1: str, file2: str, on: str, output_path: str, how: str = "left"):
"""
Join two CSV files on a common field.
how: "left", "right", "inner", "outer"
"""
data1 = read_csv(file1)
data2 = read_csv(file2)
lookup = {row[on]: row for row in data2}
result = []
for row in data1:
key = row.get(on)
if key in lookup:
merged = {**row, **lookup[key]}
result.append(merged)
elif how in ("left", "outer"):
result.append(row)
write_csv(output_path, result)
Helper Script
Use helper.py for the CsvProcessor class with comprehensive CSV handling.