| name | data-deduplication |
| description | Identify and merge duplicate records in CRM datasets using fuzzy matching and exact matching. Use when cleaning contact lists, merging records from multiple sources, finding duplicate customers, or consolidating CRM data before sync. |
Data Deduplication
Provides patterns for identifying and handling duplicate records in CRM data.
Exact Match Deduplication
from typing import List, Dict, Any
def deduplicate_exact(records: List[Dict], key_field: str) -> List[Dict]:
"""
Remove exact duplicates based on key field.
Usage:
unique = deduplicate_exact(contacts, "email")
"""
seen = set()
result = []
for record in records:
key = record.get(key_field)
if key and key not in seen:
seen.add(key)
result.append(record)
return result
def deduplicate_by_fields(records: List[Dict], fields: List[str]) -> List[Dict]:
"""
Remove duplicates based on combination of fields.
Usage:
unique = deduplicate_by_fields(contacts, ["first_name", "last_name", "company"])
"""
seen = set()
result = []
for record in records:
key = tuple(record.get(f, "") for f in fields)
if key not in seen:
seen.add(key)
result.append(record)
return result
Finding Duplicates
from collections import defaultdict
def find_duplicates(records: List[Dict], key_field: str) -> Dict[Any, List[Dict]]:
"""
Find all duplicate groups.
Usage:
dups = find_duplicates(contacts, "email")
for email, group in dups.items():
print(f"{email}: {len(group)} duplicates")
"""
groups = defaultdict(list)
for record in records:
key = record.get(key_field)
if key:
groups[key].append(record)
return {k: v for k, v in groups.items() if len(v) > 1}
def count_duplicates(records: List[Dict], key_field: str) -> int:
"""Count total duplicate records."""
seen = set()
duplicates = 0
for record in records:
key = record.get(key_field)
if key:
if key in seen:
duplicates += 1
seen.add(key)
return duplicates
Fuzzy Matching
def normalize_string(s: str) -> str:
"""Normalize string for comparison."""
if not s:
return ""
return s.lower().strip()
def similarity_ratio(s1: str, s2: str) -> float:
"""
Calculate similarity ratio between two strings.
Returns value between 0 (no match) and 1 (exact match).
"""
from difflib import SequenceMatcher
return SequenceMatcher(None, normalize_string(s1), normalize_string(s2)).ratio()
def find_fuzzy_duplicates(
records: List[Dict],
field: str,
threshold: float = 0.9
) -> List[List[Dict]]:
"""
Find potential duplicates using fuzzy matching.
Usage:
similar_groups = find_fuzzy_duplicates(contacts, "company", threshold=0.85)
"""
groups = []
used = set()
for i, record1 in enumerate(records):
if i in used:
continue
value1 = record1.get(field, "")
if not value1:
continue
group = [record1]
used.add(i)
for j, record2 in enumerate(records[i+1:], i+1):
if j in used:
continue
value2 = record2.get(field, "")
if value2 and similarity_ratio(value1, value2) >= threshold:
group.append(record2)
used.add(j)
if len(group) > 1:
groups.append(group)
return groups
Email Normalization
def normalize_email(email: str) -> str:
"""
Normalize email for duplicate detection.
- Lowercase
- Remove dots from gmail addresses
- Remove plus addressing
"""
if not email:
return ""
email = email.lower().strip()
local, domain = email.rsplit("@", 1) if "@" in email else (email, "")
# Handle Gmail dots
if domain in ("gmail.com", "googlemail.com"):
local = local.replace(".", "")
# Remove plus addressing
if "+" in local:
local = local.split("+")[0]
return f"{local}@{domain}" if domain else local
def deduplicate_emails(records: List[Dict], email_field: str = "email") -> List[Dict]:
"""Deduplicate using normalized email addresses."""
seen = set()
result = []
for record in records:
email = normalize_email(record.get(email_field, ""))
if email and email not in seen:
seen.add(email)
result.append(record)
return result
Merging Duplicates
def merge_duplicate_group(
duplicates: List[Dict],
prefer_fields: Dict[str, str] = None
) -> Dict:
"""
Merge a group of duplicate records into one.
prefer_fields: Dict mapping field -> source preference
e.g., {"email": "salesforce", "phone": "hubspot"}
Usage:
merged = merge_duplicate_group(duplicate_contacts)
"""
if not duplicates:
return {}
prefer_fields = prefer_fields or {}
merged = {}
all_keys = set()
for record in duplicates:
all_keys.update(record.keys())
for key in all_keys:
# Get all non-empty values
values = [r.get(key) for r in duplicates if r.get(key)]
if not values:
merged[key] = None
elif key in prefer_fields:
# Use preferred source
source = prefer_fields[key]
for record in duplicates:
if record.get("source") == source and record.get(key):
merged[key] = record[key]
break
else:
merged[key] = values[0]
else:
# Take first non-empty value
merged[key] = values[0]
return merged
def deduplicate_and_merge(
records: List[Dict],
key_field: str,
merge_fn = None
) -> List[Dict]:
"""
Find duplicates, merge them, and return deduplicated list.
Usage:
unique = deduplicate_and_merge(contacts, "email")
"""
groups = defaultdict(list)
for record in records:
key = record.get(key_field)
if key:
groups[key].append(record)
else:
groups[id(record)].append(record)
merge_fn = merge_fn or merge_duplicate_group
return [merge_fn(group) for group in groups.values()]
Reporting
def generate_duplicate_report(records: List[Dict], key_field: str) -> Dict:
"""
Generate a duplicate analysis report.
Returns:
{
"total_records": int,
"unique_records": int,
"duplicate_records": int,
"duplicate_groups": int,
"duplication_rate": float
}
"""
duplicates = find_duplicates(records, key_field)
total = len(records)
dup_count = sum(len(group) - 1 for group in duplicates.values())
unique = total - dup_count
return {
"total_records": total,
"unique_records": unique,
"duplicate_records": dup_count,
"duplicate_groups": len(duplicates),
"duplication_rate": dup_count / total if total > 0 else 0
}
Helper Script
Use helper.py for the Deduplicator class with comprehensive duplicate handling.