| name | Vulnerability Detection |
| description | Systematic approach to identifying security vulnerabilities in code, dependencies, and infrastructure |
Vulnerability Detection
You are a security expert specializing in vulnerability detection and assessment. You help identify, analyze, and remediate security vulnerabilities across applications, dependencies, and infrastructure.
Systematic Security Assessment
Security Audit Methodology
1. Reconnaissance Phase:
# Map the application surface
find . -type f -name "*.js" -o -name "*.py" -o -name "*.java" -o -name "*.php"
# Identify dependencies
cat package.json requirements.txt pom.xml composer.json
# Check for sensitive files
find . -name ".env" -o -name "*.key" -o -name "*.pem" -o -name "credentials*"
# Review configuration
find . -name "*.config.js" -o -name "*.yml" -o -name "*.ini"
2. Automated Scanning:
# Dependency vulnerability scanning
npm audit # Node.js
pip-audit # Python
snyk test # Multi-language
safety check # Python
# Static analysis
semgrep --config=auto . # Multi-language SAST
bandit -r . # Python security linter
eslint --plugin=security # JavaScript
brakeman # Ruby on Rails
# Secret detection
gitleaks detect # Git secrets
trufflehog git file://. # Deep secret scanning
3. Manual Code Review: Focus on high-risk areas:
- Authentication and authorization
- Input validation and sanitization
- Database queries and ORM usage
- File operations and uploads
- Cryptographic implementations
- Session management
- API endpoints and data exposure
OWASP Top 10 Vulnerabilities
1. Broken Access Control
Detection Patterns:
# VULNERABLE: Missing authorization check
@app.route('/api/user/<user_id>')
def get_user(user_id):
user = User.query.get(user_id)
return jsonify(user.to_dict())
# SECURE: Proper authorization
@app.route('/api/user/<user_id>')
@login_required
def get_user(user_id):
# Check if user can access this resource
if current_user.id != int(user_id) and not current_user.is_admin:
abort(403)
user = User.query.get(user_id)
return jsonify(user.to_dict())
Common Issues:
- Insecure Direct Object References (IDOR)
- Missing function-level access control
- Privilege escalation
- CORS misconfigurations
Detection Strategy:
def check_authorization_issues(code):
"""Check for missing authorization"""
vulnerabilities = []
# Check route handlers
routes = find_route_handlers(code)
for route in routes:
# Flag routes without auth decorators
if not has_auth_decorator(route):
vulnerabilities.append({
'type': 'Missing Authorization',
'severity': 'HIGH',
'location': route.location,
'description': 'Route lacks authentication/authorization check'
})
# Check for IDOR patterns
if uses_user_input_in_query(route):
if not validates_ownership(route):
vulnerabilities.append({
'type': 'Potential IDOR',
'severity': 'HIGH',
'location': route.location,
'description': 'User input used in query without ownership check'
})
return vulnerabilities
2. Cryptographic Failures
Vulnerable Patterns:
# VULNERABLE: Weak hashing
import hashlib
password_hash = hashlib.md5(password.encode()).hexdigest()
# VULNERABLE: Hardcoded secrets
API_KEY = "sk_live_1234567890abcdef"
# VULNERABLE: Weak encryption
from Crypto.Cipher import DES
cipher = DES.new(key)
# SECURE: Strong password hashing
import bcrypt
password_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
# SECURE: Environment variables
import os
API_KEY = os.getenv('API_KEY')
# SECURE: Strong encryption
from cryptography.fernet import Fernet
cipher = Fernet(key)
Detection Rules:
// Semgrep rule for weak crypto
rules:
- id: weak-crypto-md5
pattern: hashlib.md5($X)
message: "MD5 is cryptographically broken"
severity: ERROR
- id: hardcoded-secret
pattern: |
$KEY = "sk_..."
message: "Hardcoded API key detected"
severity: ERROR
- id: weak-encryption
pattern: |
from Crypto.Cipher import DES
message: "DES encryption is deprecated"
severity: ERROR
3. SQL Injection
Vulnerable Code:
# VULNERABLE: String concatenation
def get_user(username):
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
# VULNERABLE: Unsafe ORM usage
User.objects.raw(f"SELECT * FROM users WHERE id = {user_id}")
# SECURE: Parameterized queries
def get_user(username):
query = "SELECT * FROM users WHERE username = %s"
cursor.execute(query, (username,))
# SECURE: ORM with proper escaping
User.objects.filter(username=username)
Detection Script:
import re
def detect_sql_injection(code):
"""Detect potential SQL injection vulnerabilities"""
vulnerabilities = []
# Pattern 1: String formatting in SQL
pattern1 = r'(execute|raw)\s*\(\s*[f"\'`].*\{.*\}.*["\']'
# Pattern 2: Concatenation in SQL
pattern2 = r'(execute|raw)\s*\(\s*.*\+.*\)'
# Pattern 3: f-strings in SQL
pattern3 = r'f["\']SELECT.*FROM.*WHERE'
patterns = [
(pattern1, 'String formatting in SQL query'),
(pattern2, 'String concatenation in SQL query'),
(pattern3, 'f-string used in SQL query'),
]
for pattern, description in patterns:
matches = re.finditer(pattern, code, re.IGNORECASE)
for match in matches:
vulnerabilities.append({
'type': 'SQL Injection',
'severity': 'CRITICAL',
'pattern': description,
'location': match.span(),
})
return vulnerabilities
4. Cross-Site Scripting (XSS)
Vulnerable Patterns:
// VULNERABLE: Direct innerHTML assignment
element.innerHTML = userInput;
// VULNERABLE: Unescaped template rendering
const html = `<div>${userData.name}</div>`;
// VULNERABLE: eval() with user input
eval(userProvidedCode);
// SECURE: Use textContent
element.textContent = userInput;
// SECURE: Use DOMPurify
import DOMPurify from 'dompurify';
element.innerHTML = DOMPurify.sanitize(userInput);
// SECURE: Template with escaping
const html = `<div>${escapeHtml(userData.name)}</div>`;
Server-Side Detection:
# VULNERABLE: Unescaped Jinja2
@app.route('/hello')
def hello():
name = request.args.get('name')
return f'<h1>Hello {name}</h1>' # XSS!
# SECURE: Proper templating
from flask import render_template_string
@app.route('/hello')
def hello():
name = request.args.get('name')
return render_template_string('<h1>Hello {{ name }}</h1>', name=name)
Detection Rules:
def detect_xss_vulnerabilities(code, language):
"""Detect XSS vulnerabilities"""
vulnerabilities = []
if language == 'javascript':
dangerous_patterns = [
r'\.innerHTML\s*=\s*(?!DOMPurify)',
r'eval\s*\(',
r'document\.write\s*\(',
r'dangerouslySetInnerHTML',
]
elif language == 'python':
dangerous_patterns = [
r'return\s+f["\']<.*\{.*\}',
r'\|\s*safe', # Jinja2 safe filter
r'mark_safe\s*\(',
]
for pattern in dangerous_patterns:
matches = re.finditer(pattern, code)
for match in matches:
vulnerabilities.append({
'type': 'XSS',
'severity': 'HIGH',
'location': match.span(),
})
return vulnerabilities
5. Cross-Site Request Forgery (CSRF)
Vulnerable Code:
# VULNERABLE: Missing CSRF protection
@app.route('/transfer', methods=['POST'])
@login_required
def transfer_money():
amount = request.form.get('amount')
to_account = request.form.get('to_account')
process_transfer(current_user, to_account, amount)
return 'Transfer complete'
# SECURE: CSRF token validation
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
@app.route('/transfer', methods=['POST'])
@login_required
def transfer_money():
# CSRF token automatically validated
amount = request.form.get('amount')
to_account = request.form.get('to_account')
process_transfer(current_user, to_account, amount)
return 'Transfer complete'
Detection:
def check_csrf_protection(framework_code):
"""Check for CSRF protection in state-changing endpoints"""
vulnerabilities = []
# Find POST/PUT/DELETE routes
routes = find_routes_by_method(framework_code, ['POST', 'PUT', 'DELETE'])
for route in routes:
# Check for CSRF middleware/decorator
if not has_csrf_protection(route):
vulnerabilities.append({
'type': 'Missing CSRF Protection',
'severity': 'HIGH',
'route': route.path,
'method': route.method,
})
return vulnerabilities
6. Security Misconfiguration
Common Issues:
# VULNERABLE: Debug mode in production
DEBUG = True
ALLOWED_HOSTS = ['*']
# VULNERABLE: Weak CORS policy
CORS_ALLOW_ALL_ORIGINS = True
# VULNERABLE: Insecure cookies
SESSION_COOKIE_SECURE = False
SESSION_COOKIE_HTTPONLY = False
# SECURE: Production settings
DEBUG = False
ALLOWED_HOSTS = ['example.com', 'www.example.com']
CORS_ALLOWED_ORIGINS = ['https://example.com']
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Strict'
Security Headers Check:
def check_security_headers(url):
"""Check for security headers"""
import requests
response = requests.get(url)
headers = response.headers
required_headers = {
'Strict-Transport-Security': 'HSTS not set',
'X-Frame-Options': 'Clickjacking protection missing',
'X-Content-Type-Options': 'MIME sniffing protection missing',
'Content-Security-Policy': 'CSP not configured',
'X-XSS-Protection': 'XSS protection header missing',
}
missing = []
for header, message in required_headers.items():
if header not in headers:
missing.append({
'header': header,
'severity': 'MEDIUM',
'description': message,
})
return missing
Dependency Vulnerability Scanning
Automated Scanning
import subprocess
import json
class DependencyScanner:
@staticmethod
def scan_npm_dependencies():
"""Scan Node.js dependencies"""
result = subprocess.run(
['npm', 'audit', '--json'],
capture_output=True,
text=True
)
audit_data = json.loads(result.stdout)
vulnerabilities = []
for vuln_id, vuln_data in audit_data.get('vulnerabilities', {}).items():
vulnerabilities.append({
'package': vuln_data['name'],
'severity': vuln_data['severity'],
'vulnerable_versions': vuln_data['range'],
'fixed_version': vuln_data.get('fixAvailable', {}).get('version'),
'cve': vuln_data.get('cves', []),
})
return vulnerabilities
@staticmethod
def scan_python_dependencies():
"""Scan Python dependencies"""
result = subprocess.run(
['safety', 'check', '--json'],
capture_output=True,
text=True
)
safety_data = json.loads(result.stdout)
vulnerabilities = []
for vuln in safety_data:
vulnerabilities.append({
'package': vuln[0],
'installed_version': vuln[1],
'vulnerability_id': vuln[2],
'description': vuln[3],
'fixed_version': vuln[4],
})
return vulnerabilities
Software Composition Analysis (SCA)
def analyze_dependencies(project_path):
"""Comprehensive dependency analysis"""
analysis = {
'total_dependencies': 0,
'vulnerable_packages': [],
'outdated_packages': [],
'license_issues': [],
}
# Check for known vulnerabilities
vulnerabilities = scan_dependencies(project_path)
analysis['vulnerable_packages'] = vulnerabilities
# Check for outdated packages
outdated = check_outdated_packages(project_path)
analysis['outdated_packages'] = outdated
# License compliance check
licenses = check_licenses(project_path)
analysis['license_issues'] = find_problematic_licenses(licenses)
# Calculate risk score
analysis['risk_score'] = calculate_risk_score(analysis)
return analysis
Advanced Vulnerability Detection
1. Server-Side Request Forgery (SSRF)
# VULNERABLE: Unvalidated URL fetch
@app.route('/fetch')
def fetch_url():
url = request.args.get('url')
response = requests.get(url)
return response.content
# SECURE: Whitelist allowed domains
ALLOWED_DOMAINS = ['api.example.com', 'cdn.example.com']
@app.route('/fetch')
def fetch_url():
url = request.args.get('url')
# Validate domain
from urllib.parse import urlparse
parsed = urlparse(url)
if parsed.netloc not in ALLOWED_DOMAINS:
abort(400, 'Invalid domain')
# Prevent internal network access
if parsed.hostname in ['localhost', '127.0.0.1'] or \
parsed.hostname.startswith('192.168.'):
abort(400, 'Internal URLs not allowed')
response = requests.get(url, timeout=5)
return response.content
2. XML External Entity (XXE)
# VULNERABLE: Unsafe XML parsing
import xml.etree.ElementTree as ET
tree = ET.parse(user_uploaded_file)
# SECURE: Disable external entities
import defusedxml.ElementTree as ET
tree = ET.parse(user_uploaded_file)
3. Insecure Deserialization
# VULNERABLE: pickle deserialization
import pickle
data = pickle.loads(user_data) # RCE risk!
# SECURE: Use JSON for data exchange
import json
data = json.loads(user_data)
# If pickle needed, sign and verify
import hmac
import pickle
def secure_pickle_loads(signed_data, secret):
signature, pickled = signed_data.split(b':', 1)
expected = hmac.new(secret, pickled, 'sha256').hexdigest().encode()
if not hmac.compare_digest(signature, expected):
raise ValueError('Invalid signature')
return pickle.loads(pickled)
Security Testing Automation
Automated Security Test Suite
import unittest
class SecurityTestSuite(unittest.TestCase):
def test_sql_injection(self):
"""Test SQL injection protection"""
malicious_inputs = [
"' OR '1'='1",
"1; DROP TABLE users--",
"' UNION SELECT * FROM passwords--",
]
for payload in malicious_inputs:
response = self.client.get(f'/user?id={payload}')
self.assertNotIn('error', response.text.lower())
self.assertEqual(response.status_code, 200)
def test_xss_protection(self):
"""Test XSS protection"""
xss_payloads = [
'<script>alert(1)</script>',
'<img src=x onerror=alert(1)>',
'javascript:alert(1)',
]
for payload in xss_payloads:
response = self.client.post('/comment', data={'text': payload})
# Should be escaped in response
self.assertNotIn('<script>', response.text)
self.assertIn('<script>', response.text)
def test_authentication_bypass(self):
"""Test authentication bypass attempts"""
response = self.client.get('/admin/dashboard')
self.assertEqual(response.status_code, 401)
def test_csrf_protection(self):
"""Test CSRF protection"""
# POST without CSRF token should fail
response = self.client.post('/transfer', data={
'amount': 1000,
'to_account': '12345'
})
self.assertEqual(response.status_code, 403)
Vulnerability Reporting
Structured Vulnerability Report
class VulnerabilityReport:
def __init__(self):
self.vulnerabilities = []
def add_vulnerability(self, vuln):
"""Add vulnerability with CVSS scoring"""
self.vulnerabilities.append({
'id': len(self.vulnerabilities) + 1,
'title': vuln.get('title'),
'severity': vuln.get('severity'), # CRITICAL, HIGH, MEDIUM, LOW
'cvss_score': vuln.get('cvss_score'),
'cwe': vuln.get('cwe'), # CWE identifier
'description': vuln.get('description'),
'affected_component': vuln.get('component'),
'proof_of_concept': vuln.get('poc'),
'remediation': vuln.get('remediation'),
'references': vuln.get('references', []),
})
def generate_report(self, format='json'):
"""Generate vulnerability report"""
summary = {
'total_vulnerabilities': len(self.vulnerabilities),
'critical': sum(1 for v in self.vulnerabilities if v['severity'] == 'CRITICAL'),
'high': sum(1 for v in self.vulnerabilities if v['severity'] == 'HIGH'),
'medium': sum(1 for v in self.vulnerabilities if v['severity'] == 'MEDIUM'),
'low': sum(1 for v in self.vulnerabilities if v['severity'] == 'LOW'),
}
report = {
'summary': summary,
'vulnerabilities': sorted(
self.vulnerabilities,
key=lambda x: {'CRITICAL': 4, 'HIGH': 3, 'MEDIUM': 2, 'LOW': 1}[x['severity']],
reverse=True
)
}
if format == 'json':
return json.dumps(report, indent=2)
elif format == 'markdown':
return self.to_markdown(report)
def to_markdown(self, report):
"""Generate Markdown report"""
md = "# Security Vulnerability Report\n\n"
md += "## Summary\n\n"
md += f"- Total Vulnerabilities: {report['summary']['total_vulnerabilities']}\n"
md += f"- Critical: {report['summary']['critical']}\n"
md += f"- High: {report['summary']['high']}\n"
md += f"- Medium: {report['summary']['medium']}\n"
md += f"- Low: {report['summary']['low']}\n\n"
md += "## Vulnerabilities\n\n"
for vuln in report['vulnerabilities']:
md += f"### [{vuln['severity']}] {vuln['title']}\n\n"
md += f"**Description:** {vuln['description']}\n\n"
md += f"**Remediation:** {vuln['remediation']}\n\n"
md += "---\n\n"
return md
Related Skills
- Secure Coding Practices: Writing secure code from the start
- Penetration Testing: Active exploitation of vulnerabilities
- Code Review: Manual security code review techniques
- Compliance: OWASP, PCI-DSS, GDPR requirements
- Incident Response: Handling security breaches
- Threat Modeling: Identifying potential attack vectors
- Cryptography: Proper implementation of crypto primitives