| name | json |
| description | Parse, transform, and generate JSON data for CRM integrations. Use when processing CRM API responses, extracting contact data from JSON exports, transforming data between systems, or preparing JSON payloads for sync operations. |
JSON
Provides patterns for JSON handling in CRM data sync pipelines.
Reading and Parsing JSON
import json
from pathlib import Path
def load_json(filepath: str) -> dict:
"""Load JSON from file."""
return json.loads(Path(filepath).read_text())
def parse_json(json_string: str) -> dict:
"""Parse JSON string."""
return json.loads(json_string)
def load_json_lines(filepath: str) -> list[dict]:
"""Load JSON Lines file (one record per line)."""
with open(filepath) as f:
return [json.loads(line) for line in f if line.strip()]
Writing JSON
def save_json(data: dict, filepath: str, indent: int = 2):
"""Save dictionary to JSON file."""
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
Path(filepath).write_text(json.dumps(data, indent=indent, default=str))
def to_json_string(data: dict, indent: int = 2) -> str:
"""Convert to formatted JSON string."""
return json.dumps(data, indent=indent, default=str)
def save_json_lines(records: list[dict], filepath: str):
"""Save as JSON Lines (one record per line)."""
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w') as f:
for record in records:
f.write(json.dumps(record, default=str) + '\n')
Extracting Nested Data
from typing import Any
def get_nested(data: dict, path: str, default: Any = None) -> Any:
"""
Get value from nested dictionary using dot notation.
Usage:
contact = {"properties": {"email": {"value": "user@example.com"}}}
email = get_nested(contact, "properties.email.value")
"""
keys = path.split(".")
value = data
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
elif isinstance(value, list) and key.isdigit():
index = int(key)
if 0 <= index < len(value):
value = value[index]
else:
return default
else:
return default
return value
def set_nested(data: dict, path: str, value: Any) -> dict:
"""Set value in nested dictionary."""
keys = path.split(".")
current = data
for key in keys[:-1]:
if key not in current:
current[key] = {}
current = current[key]
current[keys[-1]] = value
return data
CRM Data Extraction
def extract_contacts_from_api_response(response: dict, contacts_path: str = "results") -> list[dict]:
"""
Extract contact records from CRM API response.
Usage:
# HubSpot-style response
contacts = extract_contacts_from_api_response(response, "results")
# Salesforce-style response
contacts = extract_contacts_from_api_response(response, "records")
"""
return get_nested(response, contacts_path, [])
def extract_field_values(record: dict, field_mapping: dict[str, str]) -> dict:
"""
Extract and rename fields from CRM record.
Usage:
mapping = {
"email": "properties.email",
"first_name": "properties.firstname",
"last_name": "properties.lastname"
}
clean_record = extract_field_values(hubspot_contact, mapping)
"""
return {
new_name: get_nested(record, old_path)
for new_name, old_path in field_mapping.items()
}
Flattening and Restructuring
def flatten_record(record: dict, separator: str = "_") -> dict:
"""
Flatten nested record structure.
Usage:
nested = {"contact": {"name": "John", "address": {"city": "NYC"}}}
flat = flatten_record(nested)
# Result: {"contact_name": "John", "contact_address_city": "NYC"}
"""
def _flatten(obj, parent_key=""):
items = []
if isinstance(obj, dict):
for k, v in obj.items():
new_key = f"{parent_key}{separator}{k}" if parent_key else k
items.extend(_flatten(v, new_key).items())
elif isinstance(obj, list):
for i, v in enumerate(obj):
new_key = f"{parent_key}{separator}{i}" if parent_key else str(i)
items.extend(_flatten(v, new_key).items())
else:
items.append((parent_key, obj))
return dict(items)
return _flatten(record)
def normalize_crm_record(record: dict, schema: dict[str, str]) -> dict:
"""
Normalize CRM record to standard schema.
Usage:
schema = {
"email": "email",
"first_name": "firstname",
"last_name": "lastname",
"company": "company"
}
normalized = normalize_crm_record(hubspot_contact["properties"], schema)
"""
return {
standard_name: record.get(crm_field, "")
for standard_name, crm_field in schema.items()
}
Merging and Comparing
def merge_records(record1: dict, record2: dict, prefer: str = "first") -> dict:
"""
Merge two records, choosing non-empty values.
Usage:
merged = merge_records(salesforce_contact, hubspot_contact)
"""
result = {}
all_keys = set(record1.keys()) | set(record2.keys())
for key in all_keys:
val1 = record1.get(key)
val2 = record2.get(key)
if prefer == "first":
result[key] = val1 if val1 not in (None, "") else val2
else:
result[key] = val2 if val2 not in (None, "") else val1
return result
def diff_records(old: dict, new: dict) -> dict:
"""
Find differences between two records.
Returns dict of changed fields with old and new values.
"""
changes = {}
all_keys = set(old.keys()) | set(new.keys())
for key in all_keys:
old_val = old.get(key)
new_val = new.get(key)
if old_val != new_val:
changes[key] = {"old": old_val, "new": new_val}
return changes
Batch Processing
def process_records(records: list[dict], transform_fn) -> list[dict]:
"""Apply transformation to all records."""
return [transform_fn(record) for record in records]
def filter_records(records: list[dict], condition) -> list[dict]:
"""Filter records by condition."""
return [r for r in records if condition(r)]
Helper Script
Use helper.py for the JsonProcessor class with comprehensive CRM JSON handling.