| name | data-correlation |
| description | Correlate and analyze data from multiple OSINT sources. Use this skill when combining information from different sources, identifying relationships between entities, or building comprehensive intelligence profiles. |
Data Correlation
Correlate and analyze intelligence from multiple sources.
Installation
pip install pandas networkx matplotlib fuzzywuzzy python-Levenshtein
Data Normalization
import re
from typing import Dict, List, Any
from fuzzywuzzy import fuzz
def normalize_email(email: str) -> str:
"""Normalize email address."""
return email.lower().strip()
def normalize_name(name: str) -> str:
"""Normalize person name."""
# Remove titles
titles = ['mr', 'mrs', 'ms', 'dr', 'prof', 'jr', 'sr', 'ii', 'iii']
name = name.lower().strip()
parts = name.split()
parts = [p for p in parts if p not in titles and not p.endswith('.')]
return ' '.join(parts)
def normalize_domain(domain: str) -> str:
"""Normalize domain name."""
domain = domain.lower().strip()
domain = domain.replace('http://', '').replace('https://', '')
domain = domain.split('/')[0]
if domain.startswith('www.'):
domain = domain[4:]
return domain
def normalize_phone(phone: str) -> str:
"""Normalize phone number."""
return re.sub(r'[^\d+]', '', phone)
def normalize_ip(ip: str) -> str:
"""Normalize IP address."""
import ipaddress
try:
return str(ipaddress.ip_address(ip.strip()))
except:
return ip.strip()
Entity Matching
class EntityMatcher:
"""Match and correlate entities across data sources."""
def __init__(self, threshold: int = 80):
self.threshold = threshold
self.entities = {}
def add_entity(self, entity_type: str, data: Dict, source: str):
"""Add entity from a data source."""
if entity_type not in self.entities:
self.entities[entity_type] = []
data['_source'] = source
self.entities[entity_type].append(data)
def match_entities(self, entity_type: str, key_field: str) -> List[List[Dict]]:
"""Match entities based on key field similarity."""
if entity_type not in self.entities:
return []
entities = self.entities[entity_type]
matched_groups = []
matched_indices = set()
for i, entity1 in enumerate(entities):
if i in matched_indices:
continue
group = [entity1]
matched_indices.add(i)
key1 = entity1.get(key_field, '')
if not key1:
continue
for j, entity2 in enumerate(entities[i+1:], i+1):
if j in matched_indices:
continue
key2 = entity2.get(key_field, '')
if not key2:
continue
similarity = fuzz.ratio(str(key1).lower(), str(key2).lower())
if similarity >= self.threshold:
group.append(entity2)
matched_indices.add(j)
if len(group) > 1:
matched_groups.append(group)
return matched_groups
def merge_entities(self, entities: List[Dict]) -> Dict:
"""Merge multiple entity records into one."""
merged = {'_sources': []}
for entity in entities:
merged['_sources'].append(entity.get('_source', 'unknown'))
for key, value in entity.items():
if key.startswith('_'):
continue
if key not in merged or not merged[key]:
merged[key] = value
elif isinstance(value, list):
if isinstance(merged[key], list):
merged[key].extend(value)
else:
merged[key] = [merged[key]] + value
elif value != merged[key]:
# Keep both values
if not isinstance(merged[key], list):
merged[key] = [merged[key]]
merged[key].append(value)
# Deduplicate lists
for key, value in merged.items():
if isinstance(value, list):
merged[key] = list(set(value))
return merged
# Example usage
matcher = EntityMatcher(threshold=85)
# Add from different sources
matcher.add_entity('person', {'name': 'John Smith', 'email': 'jsmith@example.com'}, 'linkedin')
matcher.add_entity('person', {'name': 'John D. Smith', 'email': 'john.smith@example.com'}, 'github')
matcher.add_entity('person', {'name': 'J. Smith', 'phone': '555-1234'}, 'website')
matches = matcher.match_entities('person', 'name')
for group in matches:
merged = matcher.merge_entities(group)
print(f"Merged entity: {merged}")
Relationship Graph
import networkx as nx
import matplotlib.pyplot as plt
class IntelligenceGraph:
"""Build relationship graph from intelligence data."""
def __init__(self):
self.graph = nx.Graph()
def add_entity(self, entity_id: str, entity_type: str, attributes: Dict):
"""Add entity node."""
self.graph.add_node(entity_id, type=entity_type, **attributes)
def add_relationship(self, entity1: str, entity2: str, relationship_type: str,
weight: float = 1.0):
"""Add relationship edge."""
self.graph.add_edge(entity1, entity2, type=relationship_type, weight=weight)
def find_connections(self, entity_id: str, depth: int = 2) -> nx.Graph:
"""Find all connections within depth."""
nodes = {entity_id}
current_layer = {entity_id}
for _ in range(depth):
next_layer = set()
for node in current_layer:
next_layer.update(self.graph.neighbors(node))
nodes.update(next_layer)
current_layer = next_layer
return self.graph.subgraph(nodes)
def find_shortest_path(self, source: str, target: str) -> list:
"""Find shortest path between entities."""
try:
return nx.shortest_path(self.graph, source, target)
except nx.NetworkXNoPath:
return []
def identify_key_entities(self, top_n: int = 10) -> List[tuple]:
"""Identify most connected entities."""
centrality = nx.degree_centrality(self.graph)
return sorted(centrality.items(), key=lambda x: x[1], reverse=True)[:top_n]
def find_communities(self) -> List[set]:
"""Identify communities/clusters."""
return list(nx.community.greedy_modularity_communities(self.graph))
def visualize(self, output_file: str = None):
"""Visualize the intelligence graph."""
plt.figure(figsize=(16, 12))
# Color nodes by type
colors = {
'person': '#2196F3',
'organization': '#4CAF50',
'domain': '#FF9800',
'ip': '#9C27B0',
'email': '#F44336'
}
node_colors = [colors.get(self.graph.nodes[n].get('type', ''), '#9E9E9E')
for n in self.graph.nodes()]
pos = nx.spring_layout(self.graph, k=2, iterations=50)
nx.draw_networkx_nodes(self.graph, pos, node_color=node_colors,
node_size=500, alpha=0.8)
nx.draw_networkx_edges(self.graph, pos, alpha=0.5)
nx.draw_networkx_labels(self.graph, pos, font_size=8)
plt.title('Intelligence Relationship Graph')
plt.axis('off')
if output_file:
plt.savefig(output_file, dpi=300, bbox_inches='tight')
else:
plt.show()
plt.close()
# Example
graph = IntelligenceGraph()
graph.add_entity('john_smith', 'person', {'name': 'John Smith'})
graph.add_entity('acme_corp', 'organization', {'name': 'ACME Corp'})
graph.add_entity('jsmith@acme.com', 'email', {})
graph.add_relationship('john_smith', 'acme_corp', 'works_at')
graph.add_relationship('john_smith', 'jsmith@acme.com', 'uses_email')
Data Fusion
import pandas as pd
from datetime import datetime
class DataFusion:
"""Fuse data from multiple sources."""
def __init__(self):
self.sources = {}
self.confidence_weights = {}
def add_source(self, name: str, data: pd.DataFrame, confidence: float = 1.0):
"""Add data source with confidence weight."""
self.sources[name] = data
self.confidence_weights[name] = confidence
def fuse_on_key(self, key_column: str) -> pd.DataFrame:
"""Fuse all sources on common key."""
if not self.sources:
return pd.DataFrame()
# Start with first source
source_names = list(self.sources.keys())
result = self.sources[source_names[0]].copy()
result.columns = [f"{c}_{source_names[0]}" if c != key_column else c
for c in result.columns]
# Merge other sources
for name in source_names[1:]:
df = self.sources[name].copy()
df.columns = [f"{c}_{name}" if c != key_column else c
for c in df.columns]
result = result.merge(df, on=key_column, how='outer')
return result
def resolve_conflicts(self, df: pd.DataFrame, field: str) -> pd.DataFrame:
"""Resolve conflicting values using confidence weights."""
# Find columns for the field
field_cols = [c for c in df.columns if c.startswith(f"{field}_")]
if not field_cols:
return df
def select_best(row):
values = {}
for col in field_cols:
if pd.notna(row[col]):
source = col.replace(f"{field}_", "")
weight = self.confidence_weights.get(source, 1.0)
values[row[col]] = values.get(row[col], 0) + weight
if values:
return max(values.items(), key=lambda x: x[1])[0]
return None
df[field] = df.apply(select_best, axis=1)
return df
# Example
fusion = DataFusion()
# Add sources with confidence weights
df1 = pd.DataFrame({'email': ['a@x.com', 'b@x.com'], 'name': ['Alice', 'Bob']})
df2 = pd.DataFrame({'email': ['a@x.com', 'c@x.com'], 'name': ['Alice A.', 'Charlie']})
fusion.add_source('linkedin', df1, confidence=0.9)
fusion.add_source('website', df2, confidence=0.7)
fused = fusion.fuse_on_key('email')
resolved = fusion.resolve_conflicts(fused, 'name')
Timeline Analysis
from datetime import datetime, timedelta
class TimelineAnalyzer:
"""Analyze temporal patterns in intelligence data."""
def __init__(self):
self.events = []
def add_event(self, timestamp: datetime, event_type: str,
entity: str, details: Dict = None):
"""Add event to timeline."""
self.events.append({
'timestamp': timestamp,
'type': event_type,
'entity': entity,
'details': details or {}
})
def get_timeline(self, entity: str = None, start: datetime = None,
end: datetime = None) -> List[Dict]:
"""Get filtered timeline."""
events = self.events
if entity:
events = [e for e in events if e['entity'] == entity]
if start:
events = [e for e in events if e['timestamp'] >= start]
if end:
events = [e for e in events if e['timestamp'] <= end]
return sorted(events, key=lambda x: x['timestamp'])
def find_patterns(self, window: timedelta = timedelta(hours=1)) -> List[Dict]:
"""Find temporal patterns (events occurring together)."""
patterns = []
events = sorted(self.events, key=lambda x: x['timestamp'])
for i, event in enumerate(events):
window_end = event['timestamp'] + window
related = [e for e in events[i+1:]
if e['timestamp'] <= window_end]
if related:
patterns.append({
'anchor_event': event,
'related_events': related,
'window': window
})
return patterns
def visualize_timeline(self, output_file: str = None):
"""Visualize timeline."""
import matplotlib.dates as mdates
fig, ax = plt.subplots(figsize=(14, 6))
events = sorted(self.events, key=lambda x: x['timestamp'])
entities = list(set(e['entity'] for e in events))
entity_y = {e: i for i, e in enumerate(entities)}
for event in events:
y = entity_y[event['entity']]
ax.scatter(event['timestamp'], y, s=100, zorder=5)
ax.annotate(event['type'], (event['timestamp'], y),
xytext=(5, 5), textcoords='offset points', fontsize=8)
ax.set_yticks(range(len(entities)))
ax.set_yticklabels(entities)
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
plt.xticks(rotation=45)
plt.title('Event Timeline')
plt.tight_layout()
if output_file:
plt.savefig(output_file, dpi=300)
else:
plt.show()
plt.close()
Correlation Report
def generate_correlation_report(graph: IntelligenceGraph,
matcher: EntityMatcher,
timeline: TimelineAnalyzer = None) -> str:
"""Generate comprehensive correlation report."""
report = []
report.append("=" * 60)
report.append("INTELLIGENCE CORRELATION REPORT")
report.append("=" * 60)
report.append(f"Generated: {datetime.now().isoformat()}")
# Entity Summary
report.append("\n" + "-" * 60)
report.append("ENTITY SUMMARY")
report.append("-" * 60)
entity_types = {}
for node in graph.graph.nodes():
etype = graph.graph.nodes[node].get('type', 'unknown')
entity_types[etype] = entity_types.get(etype, 0) + 1
for etype, count in entity_types.items():
report.append(f" {etype}: {count}")
# Key Entities
report.append("\n" + "-" * 60)
report.append("KEY ENTITIES (by connections)")
report.append("-" * 60)
for entity, centrality in graph.identify_key_entities(5):
attrs = graph.graph.nodes[entity]
report.append(f" {entity}")
report.append(f" Type: {attrs.get('type', 'unknown')}")
report.append(f" Centrality: {centrality:.3f}")
# Communities
report.append("\n" + "-" * 60)
report.append("IDENTIFIED CLUSTERS")
report.append("-" * 60)
communities = graph.find_communities()
for i, community in enumerate(communities[:5]):
report.append(f" Cluster {i+1}: {len(community)} entities")
report.append(f" Members: {', '.join(list(community)[:5])}...")
# Matched Entities
report.append("\n" + "-" * 60)
report.append("CORRELATED ENTITIES")
report.append("-" * 60)
for entity_type in matcher.entities.keys():
matches = matcher.match_entities(entity_type, 'name')
if matches:
report.append(f"\n {entity_type}:")
for group in matches[:3]:
sources = [e.get('_source', '?') for e in group]
report.append(f" - Matched across: {', '.join(sources)}")
return '\n'.join(report)