Claude Code Plugins

Community-maintained marketplace

Feedback

service-detection

@benchflow-ai/skillsbench
15
0

Identify and fingerprint network services beyond port numbers. Use this skill when determining exact software versions, identifying non-standard service configurations, or detecting services running on unusual ports.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name service-detection
description Identify and fingerprint network services beyond port numbers. Use this skill when determining exact software versions, identifying non-standard service configurations, or detecting services running on unusual ports.

Service Detection

Identify and fingerprint network services.

Banner Grabbing

import socket

def grab_banner(host: str, port: int, timeout: float = 5.0) -> str:
    """Grab service banner from port."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        sock.connect((host, port))

        # Some services send banner immediately
        sock.settimeout(2.0)
        try:
            banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
        except socket.timeout:
            # Send probe for services that wait for input
            sock.send(b'\r\n')
            banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()

        sock.close()
        return banner
    except Exception as e:
        return f"Error: {e}"

def grab_http_banner(host: str, port: int = 80) -> dict:
    """Grab HTTP server information."""
    import requests

    try:
        url = f"http://{host}:{port}" if port != 443 else f"https://{host}:{port}"
        response = requests.head(url, timeout=5, verify=False, allow_redirects=False)

        return {
            'server': response.headers.get('Server', 'Unknown'),
            'powered_by': response.headers.get('X-Powered-By', ''),
            'content_type': response.headers.get('Content-Type', ''),
            'all_headers': dict(response.headers)
        }
    except Exception as e:
        return {'error': str(e)}

# Example
banner = grab_banner('scanme.nmap.org', 22)
print(f"SSH Banner: {banner}")

Protocol-Specific Detection

import ssl
import struct

def detect_ssh(host: str, port: int = 22) -> dict:
    """Detect SSH server details."""
    banner = grab_banner(host, port)
    if 'SSH' in banner:
        parts = banner.split('-')
        return {
            'protocol': parts[1] if len(parts) > 1 else 'unknown',
            'software': parts[2] if len(parts) > 2 else 'unknown',
            'raw_banner': banner
        }
    return {'error': 'Not SSH service'}

def detect_ssl_tls(host: str, port: int = 443) -> dict:
    """Detect SSL/TLS configuration."""
    try:
        context = ssl.create_default_context()
        context.check_hostname = False
        context.verify_mode = ssl.CERT_NONE

        with socket.create_connection((host, port), timeout=5) as sock:
            with context.wrap_socket(sock, server_hostname=host) as ssock:
                cert = ssock.getpeercert(binary_form=True)
                version = ssock.version()
                cipher = ssock.cipher()

                return {
                    'version': version,
                    'cipher': cipher[0] if cipher else 'unknown',
                    'cipher_bits': cipher[2] if cipher else 0,
                    'has_cert': cert is not None
                }
    except Exception as e:
        return {'error': str(e)}

def detect_smtp(host: str, port: int = 25) -> dict:
    """Detect SMTP server details."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5)
        sock.connect((host, port))

        banner = sock.recv(1024).decode('utf-8', errors='ignore')

        sock.send(b'EHLO test\r\n')
        ehlo_response = sock.recv(2048).decode('utf-8', errors='ignore')

        sock.close()

        return {
            'banner': banner.strip(),
            'capabilities': ehlo_response.strip().split('\n')
        }
    except Exception as e:
        return {'error': str(e)}

def detect_mysql(host: str, port: int = 3306) -> dict:
    """Detect MySQL server details."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5)
        sock.connect((host, port))

        packet = sock.recv(1024)
        sock.close()

        if len(packet) > 5:
            # Parse MySQL greeting packet
            version_end = packet.find(b'\x00', 5)
            if version_end > 5:
                version = packet[5:version_end].decode('utf-8', errors='ignore')
                return {'version': version}

        return {'error': 'Could not parse MySQL response'}
    except Exception as e:
        return {'error': str(e)}

Service Fingerprinting

SERVICE_SIGNATURES = {
    'ssh': {
        'patterns': [b'SSH-'],
        'default_ports': [22]
    },
    'http': {
        'patterns': [b'HTTP/', b'<!DOCTYPE', b'<html'],
        'default_ports': [80, 8080, 8000]
    },
    'https': {
        'patterns': [],  # Uses SSL detection
        'default_ports': [443, 8443]
    },
    'ftp': {
        'patterns': [b'220', b'FTP'],
        'default_ports': [21]
    },
    'smtp': {
        'patterns': [b'220', b'SMTP', b'ESMTP'],
        'default_ports': [25, 587, 465]
    },
    'mysql': {
        'patterns': [b'\x00\x00\x00\x0a'],  # MySQL protocol
        'default_ports': [3306]
    },
    'redis': {
        'patterns': [b'-ERR', b'+PONG'],
        'default_ports': [6379]
    },
    'mongodb': {
        'patterns': [b'ismaster'],
        'default_ports': [27017]
    }
}

def fingerprint_service(host: str, port: int) -> str:
    """Attempt to fingerprint service on port."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(3)
        sock.connect((host, port))

        # Receive initial data
        sock.settimeout(2)
        try:
            initial = sock.recv(1024)
        except socket.timeout:
            initial = b''

        # Try probes
        probes = [
            b'\r\n',
            b'GET / HTTP/1.0\r\n\r\n',
            b'PING\r\n',
            b'\x00',
        ]

        responses = [initial]
        for probe in probes:
            try:
                sock.send(probe)
                responses.append(sock.recv(1024))
            except:
                break

        sock.close()

        # Match against signatures
        all_data = b''.join(responses)
        for service, sig in SERVICE_SIGNATURES.items():
            for pattern in sig['patterns']:
                if pattern in all_data:
                    return service

        return 'unknown'

    except Exception as e:
        return f'error: {e}'

def scan_service(host: str, port: int) -> dict:
    """Comprehensive service detection."""
    result = {
        'port': port,
        'state': 'open',
        'service': 'unknown',
        'version': '',
        'details': {}
    }

    # Try fingerprinting
    service = fingerprint_service(host, port)
    result['service'] = service

    # Get version details based on service
    if service == 'ssh':
        result['details'] = detect_ssh(host, port)
        result['version'] = result['details'].get('software', '')
    elif service in ['http', 'https']:
        result['details'] = grab_http_banner(host, port)
        result['version'] = result['details'].get('server', '')
    elif service == 'smtp':
        result['details'] = detect_smtp(host, port)
    elif service == 'mysql':
        result['details'] = detect_mysql(host, port)
        result['version'] = result['details'].get('version', '')

    return result

Web Technology Detection

import requests
import re

def detect_web_technologies(url: str) -> dict:
    """Detect web technologies from headers and content."""
    try:
        response = requests.get(url, timeout=10, verify=False)

        technologies = {
            'server': [],
            'frameworks': [],
            'languages': [],
            'cms': [],
            'javascript': []
        }

        headers = response.headers
        content = response.text.lower()

        # Server detection
        if 'Server' in headers:
            technologies['server'].append(headers['Server'])

        # Framework detection from headers
        if 'X-Powered-By' in headers:
            powered = headers['X-Powered-By']
            technologies['languages'].append(powered)

        # CMS detection from content
        cms_patterns = {
            'wordpress': ['/wp-content/', '/wp-includes/', 'wordpress'],
            'drupal': ['/sites/default/', 'drupal'],
            'joomla': ['/components/', '/modules/', 'joomla'],
            'magento': ['mage.js', '/skin/frontend/', 'magento'],
        }

        for cms, patterns in cms_patterns.items():
            for pattern in patterns:
                if pattern in content:
                    technologies['cms'].append(cms)
                    break

        # JavaScript framework detection
        js_patterns = {
            'react': ['react', 'reactdom'],
            'angular': ['ng-app', 'angular'],
            'vue': ['vue.js', 'v-bind', 'v-model'],
            'jquery': ['jquery'],
        }

        for framework, patterns in js_patterns.items():
            for pattern in patterns:
                if pattern in content:
                    technologies['javascript'].append(framework)
                    break

        return technologies

    except Exception as e:
        return {'error': str(e)}

Network Service Discovery

import nmap

def comprehensive_service_scan(target: str) -> list:
    """Comprehensive service detection scan."""
    scanner = nmap.PortScanner()

    # Run version detection with scripts
    scanner.scan(target, arguments='-sV -sC --version-all')

    services = []
    for host in scanner.all_hosts():
        for proto in scanner[host].all_protocols():
            for port in scanner[host][proto].keys():
                port_info = scanner[host][proto][port]
                if port_info['state'] == 'open':
                    services.append({
                        'host': host,
                        'port': port,
                        'protocol': proto,
                        'service': port_info.get('name', 'unknown'),
                        'product': port_info.get('product', ''),
                        'version': port_info.get('version', ''),
                        'extrainfo': port_info.get('extrainfo', ''),
                        'scripts': port_info.get('script', {})
                    })

    return services