Claude Code Plugins

Community-maintained marketplace

Feedback

Read, write, and transform CSV files for CRM data imports and exports. Use when processing CRM exports, preparing data for bulk import, converting between CSV formats, or handling tabular contact data.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name csv
description Read, write, and transform CSV files for CRM data imports and exports. Use when processing CRM exports, preparing data for bulk import, converting between CSV formats, or handling tabular contact data.

CSV

Provides patterns for CSV handling in CRM data sync pipelines.

Reading CSV Files

import csv
from pathlib import Path
from typing import List, Dict

def read_csv(filepath: str, encoding: str = "utf-8") -> List[Dict]:
    """Read CSV file as list of dictionaries."""
    with open(filepath, newline="", encoding=encoding) as f:
        reader = csv.DictReader(f)
        return list(reader)

def read_csv_with_types(filepath: str, type_map: Dict[str, type]) -> List[Dict]:
    """
    Read CSV with type conversion.

    Usage:
        data = read_csv_with_types("contacts.csv", {
            "age": int,
            "revenue": float,
            "active": lambda x: x.lower() == "true"
        })
    """
    rows = read_csv(filepath)
    for row in rows:
        for field, converter in type_map.items():
            if field in row and row[field]:
                try:
                    row[field] = converter(row[field])
                except (ValueError, TypeError):
                    pass
    return rows

def detect_delimiter(filepath: str) -> str:
    """Auto-detect CSV delimiter."""
    with open(filepath, "r") as f:
        sample = f.read(4096)
    try:
        dialect = csv.Sniffer().sniff(sample, delimiters=",\t;|")
        return dialect.delimiter
    except csv.Error:
        return ","

Writing CSV Files

def write_csv(filepath: str, data: List[Dict], fieldnames: List[str] = None):
    """Write list of dictionaries to CSV file."""
    if not data:
        return

    Path(filepath).parent.mkdir(parents=True, exist_ok=True)
    fieldnames = fieldnames or list(data[0].keys())

    with open(filepath, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)

def append_csv(filepath: str, data: List[Dict]):
    """Append rows to existing CSV file."""
    if not data:
        return

    file_exists = Path(filepath).exists()
    fieldnames = list(data[0].keys())

    with open(filepath, "a", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        if not file_exists:
            writer.writeheader()
        writer.writerows(data)

Transforming CRM Data

def rename_columns(data: List[Dict], mapping: Dict[str, str]) -> List[Dict]:
    """
    Rename columns for target CRM format.

    Usage:
        mapping = {
            "Email": "email",
            "First Name": "first_name",
            "Last Name": "last_name"
        }
        transformed = rename_columns(contacts, mapping)
    """
    return [
        {mapping.get(k, k): v for k, v in row.items()}
        for row in data
    ]

def select_columns(data: List[Dict], columns: List[str]) -> List[Dict]:
    """Select specific columns from data."""
    return [{col: row.get(col) for col in columns} for row in data]

def filter_rows(data: List[Dict], condition) -> List[Dict]:
    """Filter rows by condition."""
    return [row for row in data if condition(row)]

def map_values(data: List[Dict], column: str, mapping: Dict) -> List[Dict]:
    """
    Map values in a column.

    Usage:
        # Convert status codes
        mapping = {"A": "active", "I": "inactive", "P": "pending"}
        transformed = map_values(contacts, "status", mapping)
    """
    return [
        {**row, column: mapping.get(row.get(column), row.get(column))}
        for row in data
    ]

CRM Import/Export Formats

def convert_to_hubspot_format(data: List[Dict], field_mapping: Dict[str, str]) -> List[Dict]:
    """
    Convert data to HubSpot import format.

    Usage:
        mapping = {
            "email": "Email",
            "first_name": "First Name",
            "last_name": "Last Name",
            "company": "Company Name"
        }
        hubspot_data = convert_to_hubspot_format(contacts, mapping)
    """
    return [
        {v: row.get(k, "") for k, v in field_mapping.items()}
        for row in data
    ]

def convert_to_salesforce_format(data: List[Dict], field_mapping: Dict[str, str]) -> List[Dict]:
    """
    Convert data to Salesforce import format.

    Usage:
        mapping = {
            "email": "Email",
            "first_name": "FirstName",
            "last_name": "LastName",
            "account": "Account.Name"
        }
        sf_data = convert_to_salesforce_format(contacts, mapping)
    """
    return [
        {v: row.get(k, "") for k, v in field_mapping.items()}
        for row in data
    ]

Validation and Cleaning

import re

def validate_email_column(data: List[Dict], email_col: str = "email") -> tuple[List[Dict], List[Dict]]:
    """
    Split data into valid and invalid email records.

    Returns (valid_records, invalid_records)
    """
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    valid = []
    invalid = []

    for row in data:
        email = row.get(email_col, "")
        if re.match(pattern, str(email)):
            valid.append(row)
        else:
            invalid.append(row)

    return valid, invalid

def clean_data(data: List[Dict]) -> List[Dict]:
    """
    Apply common cleaning operations.

    - Strip whitespace from strings
    - Lowercase emails
    - Remove empty rows
    """
    cleaned = []

    for row in data:
        # Skip empty rows
        if not any(row.values()):
            continue

        cleaned_row = {}
        for key, value in row.items():
            if isinstance(value, str):
                value = value.strip()
                if key.lower() == "email":
                    value = value.lower()
            cleaned_row[key] = value

        cleaned.append(cleaned_row)

    return cleaned

Merging CSV Files

def merge_csv_files(filepaths: List[str], output_path: str):
    """Merge multiple CSV files into one."""
    all_data = []
    all_fieldnames = set()

    for filepath in filepaths:
        data = read_csv(filepath)
        all_data.extend(data)
        if data:
            all_fieldnames.update(data[0].keys())

    fieldnames = sorted(all_fieldnames)
    write_csv(output_path, all_data, fieldnames)

def join_csv_files(file1: str, file2: str, on: str, output_path: str, how: str = "left"):
    """
    Join two CSV files on a common field.

    how: "left", "right", "inner", "outer"
    """
    data1 = read_csv(file1)
    data2 = read_csv(file2)

    lookup = {row[on]: row for row in data2}

    result = []
    for row in data1:
        key = row.get(on)
        if key in lookup:
            merged = {**row, **lookup[key]}
            result.append(merged)
        elif how in ("left", "outer"):
            result.append(row)

    write_csv(output_path, result)

Helper Script

Use helper.py for the CsvProcessor class with comprehensive CSV handling.