Claude Code Plugins

Community-maintained marketplace

Feedback

data-deduplication

@benchflow-ai/skillsbench
10
0

Identify and merge duplicate records in CRM datasets using fuzzy matching and exact matching. Use when cleaning contact lists, merging records from multiple sources, finding duplicate customers, or consolidating CRM data before sync.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name data-deduplication
description Identify and merge duplicate records in CRM datasets using fuzzy matching and exact matching. Use when cleaning contact lists, merging records from multiple sources, finding duplicate customers, or consolidating CRM data before sync.

Data Deduplication

Provides patterns for identifying and handling duplicate records in CRM data.

Exact Match Deduplication

from typing import List, Dict, Any

def deduplicate_exact(records: List[Dict], key_field: str) -> List[Dict]:
    """
    Remove exact duplicates based on key field.

    Usage:
        unique = deduplicate_exact(contacts, "email")
    """
    seen = set()
    result = []

    for record in records:
        key = record.get(key_field)
        if key and key not in seen:
            seen.add(key)
            result.append(record)

    return result

def deduplicate_by_fields(records: List[Dict], fields: List[str]) -> List[Dict]:
    """
    Remove duplicates based on combination of fields.

    Usage:
        unique = deduplicate_by_fields(contacts, ["first_name", "last_name", "company"])
    """
    seen = set()
    result = []

    for record in records:
        key = tuple(record.get(f, "") for f in fields)
        if key not in seen:
            seen.add(key)
            result.append(record)

    return result

Finding Duplicates

from collections import defaultdict

def find_duplicates(records: List[Dict], key_field: str) -> Dict[Any, List[Dict]]:
    """
    Find all duplicate groups.

    Usage:
        dups = find_duplicates(contacts, "email")
        for email, group in dups.items():
            print(f"{email}: {len(group)} duplicates")
    """
    groups = defaultdict(list)

    for record in records:
        key = record.get(key_field)
        if key:
            groups[key].append(record)

    return {k: v for k, v in groups.items() if len(v) > 1}

def count_duplicates(records: List[Dict], key_field: str) -> int:
    """Count total duplicate records."""
    seen = set()
    duplicates = 0

    for record in records:
        key = record.get(key_field)
        if key:
            if key in seen:
                duplicates += 1
            seen.add(key)

    return duplicates

Fuzzy Matching

def normalize_string(s: str) -> str:
    """Normalize string for comparison."""
    if not s:
        return ""
    return s.lower().strip()

def similarity_ratio(s1: str, s2: str) -> float:
    """
    Calculate similarity ratio between two strings.

    Returns value between 0 (no match) and 1 (exact match).
    """
    from difflib import SequenceMatcher
    return SequenceMatcher(None, normalize_string(s1), normalize_string(s2)).ratio()

def find_fuzzy_duplicates(
    records: List[Dict],
    field: str,
    threshold: float = 0.9
) -> List[List[Dict]]:
    """
    Find potential duplicates using fuzzy matching.

    Usage:
        similar_groups = find_fuzzy_duplicates(contacts, "company", threshold=0.85)
    """
    groups = []
    used = set()

    for i, record1 in enumerate(records):
        if i in used:
            continue

        value1 = record1.get(field, "")
        if not value1:
            continue

        group = [record1]
        used.add(i)

        for j, record2 in enumerate(records[i+1:], i+1):
            if j in used:
                continue

            value2 = record2.get(field, "")
            if value2 and similarity_ratio(value1, value2) >= threshold:
                group.append(record2)
                used.add(j)

        if len(group) > 1:
            groups.append(group)

    return groups

Email Normalization

def normalize_email(email: str) -> str:
    """
    Normalize email for duplicate detection.

    - Lowercase
    - Remove dots from gmail addresses
    - Remove plus addressing
    """
    if not email:
        return ""

    email = email.lower().strip()
    local, domain = email.rsplit("@", 1) if "@" in email else (email, "")

    # Handle Gmail dots
    if domain in ("gmail.com", "googlemail.com"):
        local = local.replace(".", "")

    # Remove plus addressing
    if "+" in local:
        local = local.split("+")[0]

    return f"{local}@{domain}" if domain else local

def deduplicate_emails(records: List[Dict], email_field: str = "email") -> List[Dict]:
    """Deduplicate using normalized email addresses."""
    seen = set()
    result = []

    for record in records:
        email = normalize_email(record.get(email_field, ""))
        if email and email not in seen:
            seen.add(email)
            result.append(record)

    return result

Merging Duplicates

def merge_duplicate_group(
    duplicates: List[Dict],
    prefer_fields: Dict[str, str] = None
) -> Dict:
    """
    Merge a group of duplicate records into one.

    prefer_fields: Dict mapping field -> source preference
                   e.g., {"email": "salesforce", "phone": "hubspot"}

    Usage:
        merged = merge_duplicate_group(duplicate_contacts)
    """
    if not duplicates:
        return {}

    prefer_fields = prefer_fields or {}
    merged = {}
    all_keys = set()

    for record in duplicates:
        all_keys.update(record.keys())

    for key in all_keys:
        # Get all non-empty values
        values = [r.get(key) for r in duplicates if r.get(key)]

        if not values:
            merged[key] = None
        elif key in prefer_fields:
            # Use preferred source
            source = prefer_fields[key]
            for record in duplicates:
                if record.get("source") == source and record.get(key):
                    merged[key] = record[key]
                    break
            else:
                merged[key] = values[0]
        else:
            # Take first non-empty value
            merged[key] = values[0]

    return merged

def deduplicate_and_merge(
    records: List[Dict],
    key_field: str,
    merge_fn = None
) -> List[Dict]:
    """
    Find duplicates, merge them, and return deduplicated list.

    Usage:
        unique = deduplicate_and_merge(contacts, "email")
    """
    groups = defaultdict(list)

    for record in records:
        key = record.get(key_field)
        if key:
            groups[key].append(record)
        else:
            groups[id(record)].append(record)

    merge_fn = merge_fn or merge_duplicate_group
    return [merge_fn(group) for group in groups.values()]

Reporting

def generate_duplicate_report(records: List[Dict], key_field: str) -> Dict:
    """
    Generate a duplicate analysis report.

    Returns:
        {
            "total_records": int,
            "unique_records": int,
            "duplicate_records": int,
            "duplicate_groups": int,
            "duplication_rate": float
        }
    """
    duplicates = find_duplicates(records, key_field)

    total = len(records)
    dup_count = sum(len(group) - 1 for group in duplicates.values())
    unique = total - dup_count

    return {
        "total_records": total,
        "unique_records": unique,
        "duplicate_records": dup_count,
        "duplicate_groups": len(duplicates),
        "duplication_rate": dup_count / total if total > 0 else 0
    }

Helper Script

Use helper.py for the Deduplicator class with comprehensive duplicate handling.