| name | osint-techniques |
| description | Apply Open Source Intelligence gathering techniques for reconnaissance. Use this skill when collecting publicly available information about targets, searching public databases, or aggregating intelligence from multiple open sources. |
OSINT Techniques
Open Source Intelligence gathering methods and tools.
Search Engine Dorking
from googlesearch import search
import time
def google_dork(query: str, num_results: int = 10) -> list:
"""Perform Google search with dork operators."""
results = []
try:
for url in search(query, num_results=num_results, sleep_interval=2):
results.append(url)
time.sleep(1) # Rate limiting
except Exception as e:
print(f"Search error: {e}")
return results
# Common Google dorks
DORKS = {
# File discovery
'documents': 'site:{domain} filetype:pdf OR filetype:doc OR filetype:xlsx',
'configs': 'site:{domain} filetype:xml OR filetype:json OR filetype:yml',
'backups': 'site:{domain} filetype:bak OR filetype:backup OR filetype:old',
# Sensitive data
'passwords': 'site:{domain} "password" OR "passwd" OR "credentials"',
'emails': 'site:{domain} "@{domain}" filetype:txt OR filetype:csv',
'keys': 'site:{domain} "api_key" OR "apikey" OR "secret_key"',
# Infrastructure
'admin_panels': 'site:{domain} inurl:admin OR inurl:login OR inurl:dashboard',
'subdomains': 'site:*.{domain} -www',
'error_pages': 'site:{domain} "error" OR "exception" OR "traceback"',
}
def run_dorks(domain: str) -> dict:
"""Run common dorks against domain."""
results = {}
for name, dork_template in DORKS.items():
dork = dork_template.format(domain=domain)
print(f"Running: {name}")
results[name] = google_dork(dork, num_results=5)
time.sleep(5) # Rate limiting between dorks
return results
# Example
# results = run_dorks('example.com')
WHOIS Lookup
import whois
import socket
def whois_lookup(domain: str) -> dict:
"""Perform WHOIS lookup."""
try:
w = whois.whois(domain)
return {
'domain_name': w.domain_name,
'registrar': w.registrar,
'creation_date': str(w.creation_date),
'expiration_date': str(w.expiration_date),
'name_servers': w.name_servers,
'emails': w.emails,
'org': w.org,
'address': w.address,
'city': w.city,
'country': w.country
}
except Exception as e:
return {'error': str(e)}
def reverse_whois(email: str) -> list:
"""Find domains registered with email (conceptual)."""
# Most reverse WHOIS requires paid APIs
# This is a placeholder for the concept
pass
def ip_whois(ip: str) -> dict:
"""WHOIS lookup for IP address."""
import ipwhois
try:
obj = ipwhois.IPWhois(ip)
result = obj.lookup_rdap()
return {
'asn': result.get('asn'),
'asn_description': result.get('asn_description'),
'network_name': result.get('network', {}).get('name'),
'network_cidr': result.get('network', {}).get('cidr'),
'country': result.get('network', {}).get('country')
}
except Exception as e:
return {'error': str(e)}
Shodan Integration
import shodan
def shodan_host_lookup(api_key: str, ip: str) -> dict:
"""Look up host information on Shodan."""
api = shodan.Shodan(api_key)
try:
host = api.host(ip)
return {
'ip': host['ip_str'],
'organization': host.get('org', ''),
'os': host.get('os'),
'ports': host.get('ports', []),
'vulns': host.get('vulns', []),
'hostnames': host.get('hostnames', []),
'services': [{
'port': s['port'],
'product': s.get('product', ''),
'version': s.get('version', '')
} for s in host.get('data', [])]
}
except shodan.APIError as e:
return {'error': str(e)}
def shodan_search(api_key: str, query: str, limit: int = 10) -> list:
"""Search Shodan for hosts."""
api = shodan.Shodan(api_key)
try:
results = api.search(query)
return [{
'ip': r['ip_str'],
'port': r['port'],
'org': r.get('org', ''),
'product': r.get('product', '')
} for r in results['matches'][:limit]]
except shodan.APIError as e:
return [{'error': str(e)}]
# Common Shodan queries
SHODAN_QUERIES = {
'webcams': 'webcam has_screenshot:true',
'databases': 'product:mongodb OR product:mysql OR product:postgresql',
'ics': 'port:502 OR port:102 OR port:47808', # Industrial control
'default_creds': '"default password" OR "admin:admin"',
}
Archive Research
import requests
from datetime import datetime
def wayback_lookup(url: str, year: int = None) -> list:
"""Search Wayback Machine for archived pages."""
base_url = "https://web.archive.org/cdx/search/cdx"
params = {
'url': url,
'output': 'json',
'fl': 'timestamp,original,statuscode',
'collapse': 'timestamp:8' # One per day
}
if year:
params['from'] = str(year)
params['to'] = str(year)
try:
response = requests.get(base_url, params=params, timeout=30)
results = response.json()
# First row is headers
if len(results) > 1:
return [{
'timestamp': r[0],
'url': r[1],
'status': r[2],
'archive_url': f"https://web.archive.org/web/{r[0]}/{r[1]}"
} for r in results[1:]]
return []
except Exception as e:
return [{'error': str(e)}]
def get_archived_page(url: str, timestamp: str) -> str:
"""Retrieve archived page content."""
archive_url = f"https://web.archive.org/web/{timestamp}/{url}"
try:
response = requests.get(archive_url, timeout=30)
return response.text
except Exception as e:
return f"Error: {e}"
Breach Data Research
def check_haveibeenpwned(email: str, api_key: str) -> dict:
"""Check if email appears in known breaches."""
headers = {
'hibp-api-key': api_key,
'user-agent': 'OSINT-Tool'
}
try:
# Check breaches
response = requests.get(
f'https://haveibeenpwned.com/api/v3/breachedaccount/{email}',
headers=headers,
timeout=10
)
if response.status_code == 200:
breaches = response.json()
return {
'found': True,
'breach_count': len(breaches),
'breaches': [b['Name'] for b in breaches]
}
elif response.status_code == 404:
return {'found': False}
else:
return {'error': f'API error: {response.status_code}'}
except Exception as e:
return {'error': str(e)}
def check_password_breach(password: str) -> dict:
"""Check if password appears in known breaches (k-anonymity)."""
import hashlib
sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
prefix = sha1[:5]
suffix = sha1[5:]
try:
response = requests.get(
f'https://api.pwnedpasswords.com/range/{prefix}',
timeout=10
)
for line in response.text.splitlines():
hash_suffix, count = line.split(':')
if hash_suffix == suffix:
return {'found': True, 'count': int(count)}
return {'found': False}
except Exception as e:
return {'error': str(e)}
Social Media OSINT
def search_username(username: str) -> dict:
"""Check username across platforms."""
platforms = {
'twitter': f'https://twitter.com/{username}',
'github': f'https://github.com/{username}',
'instagram': f'https://instagram.com/{username}',
'linkedin': f'https://linkedin.com/in/{username}',
'reddit': f'https://reddit.com/user/{username}',
'facebook': f'https://facebook.com/{username}',
}
results = {}
for platform, url in platforms.items():
try:
response = requests.head(url, timeout=5, allow_redirects=True)
results[platform] = {
'url': url,
'exists': response.status_code == 200
}
except Exception as e:
results[platform] = {'url': url, 'error': str(e)}
return results
def github_user_recon(username: str) -> dict:
"""Gather information from GitHub profile."""
try:
response = requests.get(
f'https://api.github.com/users/{username}',
timeout=10
)
if response.status_code == 200:
data = response.json()
return {
'name': data.get('name'),
'email': data.get('email'),
'company': data.get('company'),
'location': data.get('location'),
'bio': data.get('bio'),
'public_repos': data.get('public_repos'),
'followers': data.get('followers'),
'created_at': data.get('created_at')
}
return {'error': f'User not found: {response.status_code}'}
except Exception as e:
return {'error': str(e)}
Metadata Extraction
from PIL import Image
from PIL.ExifTags import TAGS
import PyPDF2
def extract_image_metadata(image_path: str) -> dict:
"""Extract EXIF metadata from image."""
try:
image = Image.open(image_path)
exif_data = {}
if hasattr(image, '_getexif') and image._getexif():
for tag_id, value in image._getexif().items():
tag = TAGS.get(tag_id, tag_id)
exif_data[tag] = str(value)
return exif_data
except Exception as e:
return {'error': str(e)}
def extract_pdf_metadata(pdf_path: str) -> dict:
"""Extract metadata from PDF."""
try:
with open(pdf_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
info = reader.metadata
return {
'title': info.get('/Title', ''),
'author': info.get('/Author', ''),
'creator': info.get('/Creator', ''),
'producer': info.get('/Producer', ''),
'creation_date': info.get('/CreationDate', ''),
'mod_date': info.get('/ModDate', ''),
'pages': len(reader.pages)
}
except Exception as e:
return {'error': str(e)}