Claude Code Plugins

Community-maintained marketplace

Feedback
15
0

Correlate and analyze data from multiple OSINT sources. Use this skill when combining information from different sources, identifying relationships between entities, or building comprehensive intelligence profiles.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name data-correlation
description Correlate and analyze data from multiple OSINT sources. Use this skill when combining information from different sources, identifying relationships between entities, or building comprehensive intelligence profiles.

Data Correlation

Correlate and analyze intelligence from multiple sources.

Installation

pip install pandas networkx matplotlib fuzzywuzzy python-Levenshtein

Data Normalization

import re
from typing import Dict, List, Any
from fuzzywuzzy import fuzz

def normalize_email(email: str) -> str:
    """Normalize email address."""
    return email.lower().strip()

def normalize_name(name: str) -> str:
    """Normalize person name."""
    # Remove titles
    titles = ['mr', 'mrs', 'ms', 'dr', 'prof', 'jr', 'sr', 'ii', 'iii']
    name = name.lower().strip()

    parts = name.split()
    parts = [p for p in parts if p not in titles and not p.endswith('.')]

    return ' '.join(parts)

def normalize_domain(domain: str) -> str:
    """Normalize domain name."""
    domain = domain.lower().strip()
    domain = domain.replace('http://', '').replace('https://', '')
    domain = domain.split('/')[0]
    if domain.startswith('www.'):
        domain = domain[4:]
    return domain

def normalize_phone(phone: str) -> str:
    """Normalize phone number."""
    return re.sub(r'[^\d+]', '', phone)

def normalize_ip(ip: str) -> str:
    """Normalize IP address."""
    import ipaddress
    try:
        return str(ipaddress.ip_address(ip.strip()))
    except:
        return ip.strip()

Entity Matching

class EntityMatcher:
    """Match and correlate entities across data sources."""

    def __init__(self, threshold: int = 80):
        self.threshold = threshold
        self.entities = {}

    def add_entity(self, entity_type: str, data: Dict, source: str):
        """Add entity from a data source."""
        if entity_type not in self.entities:
            self.entities[entity_type] = []

        data['_source'] = source
        self.entities[entity_type].append(data)

    def match_entities(self, entity_type: str, key_field: str) -> List[List[Dict]]:
        """Match entities based on key field similarity."""
        if entity_type not in self.entities:
            return []

        entities = self.entities[entity_type]
        matched_groups = []
        matched_indices = set()

        for i, entity1 in enumerate(entities):
            if i in matched_indices:
                continue

            group = [entity1]
            matched_indices.add(i)

            key1 = entity1.get(key_field, '')
            if not key1:
                continue

            for j, entity2 in enumerate(entities[i+1:], i+1):
                if j in matched_indices:
                    continue

                key2 = entity2.get(key_field, '')
                if not key2:
                    continue

                similarity = fuzz.ratio(str(key1).lower(), str(key2).lower())
                if similarity >= self.threshold:
                    group.append(entity2)
                    matched_indices.add(j)

            if len(group) > 1:
                matched_groups.append(group)

        return matched_groups

    def merge_entities(self, entities: List[Dict]) -> Dict:
        """Merge multiple entity records into one."""
        merged = {'_sources': []}

        for entity in entities:
            merged['_sources'].append(entity.get('_source', 'unknown'))
            for key, value in entity.items():
                if key.startswith('_'):
                    continue
                if key not in merged or not merged[key]:
                    merged[key] = value
                elif isinstance(value, list):
                    if isinstance(merged[key], list):
                        merged[key].extend(value)
                    else:
                        merged[key] = [merged[key]] + value
                elif value != merged[key]:
                    # Keep both values
                    if not isinstance(merged[key], list):
                        merged[key] = [merged[key]]
                    merged[key].append(value)

        # Deduplicate lists
        for key, value in merged.items():
            if isinstance(value, list):
                merged[key] = list(set(value))

        return merged

# Example usage
matcher = EntityMatcher(threshold=85)

# Add from different sources
matcher.add_entity('person', {'name': 'John Smith', 'email': 'jsmith@example.com'}, 'linkedin')
matcher.add_entity('person', {'name': 'John D. Smith', 'email': 'john.smith@example.com'}, 'github')
matcher.add_entity('person', {'name': 'J. Smith', 'phone': '555-1234'}, 'website')

matches = matcher.match_entities('person', 'name')
for group in matches:
    merged = matcher.merge_entities(group)
    print(f"Merged entity: {merged}")

Relationship Graph

import networkx as nx
import matplotlib.pyplot as plt

class IntelligenceGraph:
    """Build relationship graph from intelligence data."""

    def __init__(self):
        self.graph = nx.Graph()

    def add_entity(self, entity_id: str, entity_type: str, attributes: Dict):
        """Add entity node."""
        self.graph.add_node(entity_id, type=entity_type, **attributes)

    def add_relationship(self, entity1: str, entity2: str, relationship_type: str,
                        weight: float = 1.0):
        """Add relationship edge."""
        self.graph.add_edge(entity1, entity2, type=relationship_type, weight=weight)

    def find_connections(self, entity_id: str, depth: int = 2) -> nx.Graph:
        """Find all connections within depth."""
        nodes = {entity_id}
        current_layer = {entity_id}

        for _ in range(depth):
            next_layer = set()
            for node in current_layer:
                next_layer.update(self.graph.neighbors(node))
            nodes.update(next_layer)
            current_layer = next_layer

        return self.graph.subgraph(nodes)

    def find_shortest_path(self, source: str, target: str) -> list:
        """Find shortest path between entities."""
        try:
            return nx.shortest_path(self.graph, source, target)
        except nx.NetworkXNoPath:
            return []

    def identify_key_entities(self, top_n: int = 10) -> List[tuple]:
        """Identify most connected entities."""
        centrality = nx.degree_centrality(self.graph)
        return sorted(centrality.items(), key=lambda x: x[1], reverse=True)[:top_n]

    def find_communities(self) -> List[set]:
        """Identify communities/clusters."""
        return list(nx.community.greedy_modularity_communities(self.graph))

    def visualize(self, output_file: str = None):
        """Visualize the intelligence graph."""
        plt.figure(figsize=(16, 12))

        # Color nodes by type
        colors = {
            'person': '#2196F3',
            'organization': '#4CAF50',
            'domain': '#FF9800',
            'ip': '#9C27B0',
            'email': '#F44336'
        }

        node_colors = [colors.get(self.graph.nodes[n].get('type', ''), '#9E9E9E')
                      for n in self.graph.nodes()]

        pos = nx.spring_layout(self.graph, k=2, iterations=50)

        nx.draw_networkx_nodes(self.graph, pos, node_color=node_colors,
                              node_size=500, alpha=0.8)
        nx.draw_networkx_edges(self.graph, pos, alpha=0.5)
        nx.draw_networkx_labels(self.graph, pos, font_size=8)

        plt.title('Intelligence Relationship Graph')
        plt.axis('off')

        if output_file:
            plt.savefig(output_file, dpi=300, bbox_inches='tight')
        else:
            plt.show()

        plt.close()

# Example
graph = IntelligenceGraph()
graph.add_entity('john_smith', 'person', {'name': 'John Smith'})
graph.add_entity('acme_corp', 'organization', {'name': 'ACME Corp'})
graph.add_entity('jsmith@acme.com', 'email', {})
graph.add_relationship('john_smith', 'acme_corp', 'works_at')
graph.add_relationship('john_smith', 'jsmith@acme.com', 'uses_email')

Data Fusion

import pandas as pd
from datetime import datetime

class DataFusion:
    """Fuse data from multiple sources."""

    def __init__(self):
        self.sources = {}
        self.confidence_weights = {}

    def add_source(self, name: str, data: pd.DataFrame, confidence: float = 1.0):
        """Add data source with confidence weight."""
        self.sources[name] = data
        self.confidence_weights[name] = confidence

    def fuse_on_key(self, key_column: str) -> pd.DataFrame:
        """Fuse all sources on common key."""
        if not self.sources:
            return pd.DataFrame()

        # Start with first source
        source_names = list(self.sources.keys())
        result = self.sources[source_names[0]].copy()
        result.columns = [f"{c}_{source_names[0]}" if c != key_column else c
                         for c in result.columns]

        # Merge other sources
        for name in source_names[1:]:
            df = self.sources[name].copy()
            df.columns = [f"{c}_{name}" if c != key_column else c
                         for c in df.columns]
            result = result.merge(df, on=key_column, how='outer')

        return result

    def resolve_conflicts(self, df: pd.DataFrame, field: str) -> pd.DataFrame:
        """Resolve conflicting values using confidence weights."""
        # Find columns for the field
        field_cols = [c for c in df.columns if c.startswith(f"{field}_")]

        if not field_cols:
            return df

        def select_best(row):
            values = {}
            for col in field_cols:
                if pd.notna(row[col]):
                    source = col.replace(f"{field}_", "")
                    weight = self.confidence_weights.get(source, 1.0)
                    values[row[col]] = values.get(row[col], 0) + weight

            if values:
                return max(values.items(), key=lambda x: x[1])[0]
            return None

        df[field] = df.apply(select_best, axis=1)
        return df

# Example
fusion = DataFusion()

# Add sources with confidence weights
df1 = pd.DataFrame({'email': ['a@x.com', 'b@x.com'], 'name': ['Alice', 'Bob']})
df2 = pd.DataFrame({'email': ['a@x.com', 'c@x.com'], 'name': ['Alice A.', 'Charlie']})

fusion.add_source('linkedin', df1, confidence=0.9)
fusion.add_source('website', df2, confidence=0.7)

fused = fusion.fuse_on_key('email')
resolved = fusion.resolve_conflicts(fused, 'name')

Timeline Analysis

from datetime import datetime, timedelta

class TimelineAnalyzer:
    """Analyze temporal patterns in intelligence data."""

    def __init__(self):
        self.events = []

    def add_event(self, timestamp: datetime, event_type: str,
                  entity: str, details: Dict = None):
        """Add event to timeline."""
        self.events.append({
            'timestamp': timestamp,
            'type': event_type,
            'entity': entity,
            'details': details or {}
        })

    def get_timeline(self, entity: str = None, start: datetime = None,
                    end: datetime = None) -> List[Dict]:
        """Get filtered timeline."""
        events = self.events

        if entity:
            events = [e for e in events if e['entity'] == entity]

        if start:
            events = [e for e in events if e['timestamp'] >= start]

        if end:
            events = [e for e in events if e['timestamp'] <= end]

        return sorted(events, key=lambda x: x['timestamp'])

    def find_patterns(self, window: timedelta = timedelta(hours=1)) -> List[Dict]:
        """Find temporal patterns (events occurring together)."""
        patterns = []
        events = sorted(self.events, key=lambda x: x['timestamp'])

        for i, event in enumerate(events):
            window_end = event['timestamp'] + window
            related = [e for e in events[i+1:]
                      if e['timestamp'] <= window_end]

            if related:
                patterns.append({
                    'anchor_event': event,
                    'related_events': related,
                    'window': window
                })

        return patterns

    def visualize_timeline(self, output_file: str = None):
        """Visualize timeline."""
        import matplotlib.dates as mdates

        fig, ax = plt.subplots(figsize=(14, 6))

        events = sorted(self.events, key=lambda x: x['timestamp'])
        entities = list(set(e['entity'] for e in events))
        entity_y = {e: i for i, e in enumerate(entities)}

        for event in events:
            y = entity_y[event['entity']]
            ax.scatter(event['timestamp'], y, s=100, zorder=5)
            ax.annotate(event['type'], (event['timestamp'], y),
                       xytext=(5, 5), textcoords='offset points', fontsize=8)

        ax.set_yticks(range(len(entities)))
        ax.set_yticklabels(entities)
        ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
        plt.xticks(rotation=45)

        plt.title('Event Timeline')
        plt.tight_layout()

        if output_file:
            plt.savefig(output_file, dpi=300)
        else:
            plt.show()

        plt.close()

Correlation Report

def generate_correlation_report(graph: IntelligenceGraph,
                                matcher: EntityMatcher,
                                timeline: TimelineAnalyzer = None) -> str:
    """Generate comprehensive correlation report."""
    report = []

    report.append("=" * 60)
    report.append("INTELLIGENCE CORRELATION REPORT")
    report.append("=" * 60)
    report.append(f"Generated: {datetime.now().isoformat()}")

    # Entity Summary
    report.append("\n" + "-" * 60)
    report.append("ENTITY SUMMARY")
    report.append("-" * 60)

    entity_types = {}
    for node in graph.graph.nodes():
        etype = graph.graph.nodes[node].get('type', 'unknown')
        entity_types[etype] = entity_types.get(etype, 0) + 1

    for etype, count in entity_types.items():
        report.append(f"  {etype}: {count}")

    # Key Entities
    report.append("\n" + "-" * 60)
    report.append("KEY ENTITIES (by connections)")
    report.append("-" * 60)

    for entity, centrality in graph.identify_key_entities(5):
        attrs = graph.graph.nodes[entity]
        report.append(f"  {entity}")
        report.append(f"    Type: {attrs.get('type', 'unknown')}")
        report.append(f"    Centrality: {centrality:.3f}")

    # Communities
    report.append("\n" + "-" * 60)
    report.append("IDENTIFIED CLUSTERS")
    report.append("-" * 60)

    communities = graph.find_communities()
    for i, community in enumerate(communities[:5]):
        report.append(f"  Cluster {i+1}: {len(community)} entities")
        report.append(f"    Members: {', '.join(list(community)[:5])}...")

    # Matched Entities
    report.append("\n" + "-" * 60)
    report.append("CORRELATED ENTITIES")
    report.append("-" * 60)

    for entity_type in matcher.entities.keys():
        matches = matcher.match_entities(entity_type, 'name')
        if matches:
            report.append(f"\n  {entity_type}:")
            for group in matches[:3]:
                sources = [e.get('_source', '?') for e in group]
                report.append(f"    - Matched across: {', '.join(sources)}")

    return '\n'.join(report)