Threat Mitigation Mapping
Connect threats to controls for effective security planning.
When to Use This Skill
- Prioritizing security investments
- Creating remediation roadmaps
- Validating control coverage
- Designing defense-in-depth
- Security architecture review
- Risk treatment planning
Core Concepts
1. Control Categories
Preventive ────► Stop attacks before they occur
│ (Firewall, Input validation)
│
Detective ─────► Identify attacks in progress
│ (IDS, Log monitoring)
│
Corrective ────► Respond and recover from attacks
(Incident response, Backup restore)
2. Control Layers
| Layer |
Examples |
| Network |
Firewall, WAF, DDoS protection |
| Application |
Input validation, authentication |
| Data |
Encryption, access controls |
| Endpoint |
EDR, patch management |
| Process |
Security training, incident response |
3. Defense in Depth
┌──────────────────────┐
│ Perimeter │ ← Firewall, WAF
│ ┌──────────────┐ │
│ │ Network │ │ ← Segmentation, IDS
│ │ ┌────────┐ │ │
│ │ │ Host │ │ │ ← EDR, Hardening
│ │ │ ┌────┐ │ │ │
│ │ │ │App │ │ │ │ ← Auth, Validation
│ │ │ │Data│ │ │ │ ← Encryption
│ │ │ └────┘ │ │ │
│ │ └────────┘ │ │
│ └──────────────┘ │
└──────────────────────┘
Templates
Template 1: Mitigation Model
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional, Set
from datetime import datetime
class ControlType(Enum):
PREVENTIVE = "preventive"
DETECTIVE = "detective"
CORRECTIVE = "corrective"
class ControlLayer(Enum):
NETWORK = "network"
APPLICATION = "application"
DATA = "data"
ENDPOINT = "endpoint"
PROCESS = "process"
PHYSICAL = "physical"
class ImplementationStatus(Enum):
NOT_IMPLEMENTED = "not_implemented"
PARTIAL = "partial"
IMPLEMENTED = "implemented"
VERIFIED = "verified"
class Effectiveness(Enum):
NONE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
VERY_HIGH = 4
@dataclass
class SecurityControl:
id: str
name: str
description: str
control_type: ControlType
layer: ControlLayer
effectiveness: Effectiveness
implementation_cost: str # Low, Medium, High
maintenance_cost: str
status: ImplementationStatus = ImplementationStatus.NOT_IMPLEMENTED
mitigates_threats: List[str] = field(default_factory=list)
dependencies: List[str] = field(default_factory=list)
technologies: List[str] = field(default_factory=list)
compliance_refs: List[str] = field(default_factory=list)
def coverage_score(self) -> float:
"""Calculate coverage score based on status and effectiveness."""
status_multiplier = {
ImplementationStatus.NOT_IMPLEMENTED: 0.0,
ImplementationStatus.PARTIAL: 0.5,
ImplementationStatus.IMPLEMENTED: 0.8,
ImplementationStatus.VERIFIED: 1.0,
}
return self.effectiveness.value * status_multiplier[self.status]
@dataclass
class Threat:
id: str
name: str
category: str # STRIDE category
description: str
impact: str # Critical, High, Medium, Low
likelihood: str
risk_score: float
@dataclass
class MitigationMapping:
threat: Threat
controls: List[SecurityControl]
residual_risk: str = "Unknown"
notes: str = ""
def calculate_coverage(self) -> float:
"""Calculate how well controls cover the threat."""
if not self.controls:
return 0.0
total_score = sum(c.coverage_score() for c in self.controls)
max_possible = len(self.controls) * Effectiveness.VERY_HIGH.value
return (total_score / max_possible) * 100 if max_possible > 0 else 0
def has_defense_in_depth(self) -> bool:
"""Check if multiple layers are covered."""
layers = set(c.layer for c in self.controls if c.status != ImplementationStatus.NOT_IMPLEMENTED)
return len(layers) >= 2
def has_control_diversity(self) -> bool:
"""Check if multiple control types are present."""
types = set(c.control_type for c in self.controls if c.status != ImplementationStatus.NOT_IMPLEMENTED)
return len(types) >= 2
@dataclass
class MitigationPlan:
name: str
threats: List[Threat] = field(default_factory=list)
controls: List[SecurityControl] = field(default_factory=list)
mappings: List[MitigationMapping] = field(default_factory=list)
def get_unmapped_threats(self) -> List[Threat]:
"""Find threats without mitigations."""
mapped_ids = {m.threat.id for m in self.mappings}
return [t for t in self.threats if t.id not in mapped_ids]
def get_control_coverage(self) -> Dict[str, float]:
"""Get coverage percentage for each threat."""
return {
m.threat.id: m.calculate_coverage()
for m in self.mappings
}
def get_gaps(self) -> List[Dict]:
"""Identify mitigation gaps."""
gaps = []
for mapping in self.mappings:
coverage = mapping.calculate_coverage()
if coverage < 50:
gaps.append({
"threat": mapping.threat.id,
"threat_name": mapping.threat.name,
"coverage": coverage,
"issue": "Insufficient control coverage",
"recommendation": "Add more controls or improve existing ones"
})
if not mapping.has_defense_in_depth():
gaps.append({
"threat": mapping.threat.id,
"threat_name": mapping.threat.name,
"coverage": coverage,
"issue": "No defense in depth",
"recommendation": "Add controls at different layers"
})
if not mapping.has_control_diversity():
gaps.append({
"threat": mapping.threat.id,
"threat_name": mapping.threat.name,
"coverage": coverage,
"issue": "No control diversity",
"recommendation": "Add detective/corrective controls"
})
return gaps
Template 2: Control Library
class ControlLibrary:
"""Library of standard security controls."""
STANDARD_CONTROLS = {
# Authentication Controls
"AUTH-001": SecurityControl(
id="AUTH-001",
name="Multi-Factor Authentication",
description="Require MFA for all user authentication",
control_type=ControlType.PREVENTIVE,
layer=ControlLayer.APPLICATION,
effectiveness=Effectiveness.HIGH,
implementation_cost="Medium",
maintenance_cost="Low",
mitigates_threats=["SPOOFING"],
technologies=["TOTP", "WebAuthn", "SMS OTP"],
compliance_refs=["PCI-DSS 8.3", "NIST 800-63B"]
),
"AUTH-002": SecurityControl(
id="AUTH-002",
name="Account Lockout Policy",
description="Lock accounts after failed authentication attempts",
control_type=ControlType.PREVENTIVE,
layer=ControlLayer.APPLICATION,
effectiveness=Effectiveness.MEDIUM,
implementation_cost="Low",
maintenance_cost="Low",
mitigates_threats=["SPOOFING"],
technologies=["Custom implementation"],
compliance_refs=["PCI-DSS 8.1.6"]
),
# Input Validation Controls
"VAL-001": SecurityControl(
id="VAL-001",
name="Input Validation Framework",
description="Validate and sanitize all user input",
control_type=ControlType.PREVENTIVE,
layer=ControlLayer.APPLICATION,
effectiveness=Effectiveness.HIGH,
implementation_cost="Medium",
maintenance_cost="Medium",
mitigates_threats=["TAMPERING", "INJECTION"],
technologies=["Joi", "Yup", "Pydantic"],
compliance_refs=["OWASP ASVS V5"]
),
"VAL-002": SecurityControl(
id="VAL-002",
name="Web Application Firewall",
description="Deploy WAF to filter malicious requests",
control_type=ControlType.PREVENTIVE,
layer=ControlLayer.NETWORK,
effectiveness=Effectiveness.MEDIUM,
implementation_cost="Medium",
maintenance_cost="Medium",
mitigates_threats=["TAMPERING", "INJECTION", "DOS"],
technologies=["AWS WAF", "Cloudflare", "ModSecurity"],
compliance_refs=["PCI-DSS 6.6"]
),
# Encryption Controls
"ENC-001": SecurityControl(
id="ENC-001",
name="Data Encryption at Rest",
description="Encrypt sensitive data in storage",
control_type=ControlType.PREVENTIVE,
layer=ControlLayer.DATA,
effectiveness=Effectiveness.HIGH,
implementation_cost="Medium",
maintenance_cost="Low",
mitigates_threats=["INFORMATION_DISCLOSURE"],
technologies=["AES-256", "KMS", "HSM"],
compliance_refs=["PCI-DSS 3.4", "GDPR Art. 32"]
),
"ENC-002": SecurityControl(
id="ENC-002",
name="TLS Encryption",
description="Encrypt data in transit using TLS 1.3",
control_type=ControlType.PREVENTIVE,
layer=ControlLayer.NETWORK,
effectiveness=Effectiveness.HIGH,
implementation_cost="Low",
maintenance_cost="Low",
mitigates_threats=["INFORMATION_DISCLOSURE", "TAMPERING"],
technologies=["TLS 1.3", "Certificate management"],
compliance_refs=["PCI-DSS 4.1", "HIPAA"]
),
# Logging Controls
"LOG-001": SecurityControl(
id="LOG-001",
name="Security Event Logging",
description="Log all security-relevant events",
control_type=ControlType.DETECTIVE,
layer=ControlLayer.APPLICATION,
effectiveness=Effectiveness.MEDIUM,
implementation_cost="Low",
maintenance_cost="Medium",
mitigates_threats=["REPUDIATION"],
technologies=["ELK Stack", "Splunk", "CloudWatch"],
compliance_refs=["PCI-DSS 10.2", "SOC2"]
),
"LOG-002": SecurityControl(
id="LOG-002",
name="Log Integrity Protection",
description="Protect logs from tampering",
control_type=ControlType.PREVENTIVE,
layer=ControlLayer.DATA,
effectiveness=Effectiveness.MEDIUM,
implementation_cost="Medium",
maintenance_cost="Low",
mitigates_threats=["REPUDIATION", "TAMPERING"],
technologies=["Immutable storage", "Log signing"],
compliance_refs=["PCI-DSS 10.5"]
),
# Access Control
"ACC-001": SecurityControl(
id="ACC-001",
name="Role-Based Access Control",
description="Implement RBAC for authorization",
control_type=ControlType.PREVENTIVE,
layer=ControlLayer.APPLICATION,
effectiveness=Effectiveness.HIGH,
implementation_cost="Medium",
maintenance_cost="Medium",
mitigates_threats=["ELEVATION_OF_PRIVILEGE", "INFORMATION_DISCLOSURE"],
technologies=["RBAC", "ABAC", "Policy engines"],
compliance_refs=["PCI-DSS 7.1", "SOC2"]
),
# Availability Controls
"AVL-001": SecurityControl(
id="AVL-001",
name="Rate Limiting",
description="Limit request rates to prevent abuse",
control_type=ControlType.PREVENTIVE,
layer=ControlLayer.APPLICATION,
effectiveness=Effectiveness.MEDIUM,
implementation_cost="Low",
maintenance_cost="Low",
mitigates_threats=["DENIAL_OF_SERVICE"],
technologies=["API Gateway", "Redis", "Token bucket"],
compliance_refs=["OWASP API Security"]
),
"AVL-002": SecurityControl(
id="AVL-002",
name="DDoS Protection",
description="Deploy DDoS mitigation services",
control_type=ControlType.PREVENTIVE,
layer=ControlLayer.NETWORK,
effectiveness=Effectiveness.HIGH,
implementation_cost="High",
maintenance_cost="Medium",
mitigates_threats=["DENIAL_OF_SERVICE"],
technologies=["Cloudflare", "AWS Shield", "Akamai"],
compliance_refs=["NIST CSF"]
),
}
def get_controls_for_threat(self, threat_category: str) -> List[SecurityControl]:
"""Get all controls that mitigate a threat category."""
return [
c for c in self.STANDARD_CONTROLS.values()
if threat_category in c.mitigates_threats
]
def get_controls_by_layer(self, layer: ControlLayer) -> List[SecurityControl]:
"""Get controls for a specific layer."""
return [c for c in self.STANDARD_CONTROLS.values() if c.layer == layer]
def get_control(self, control_id: str) -> Optional[SecurityControl]:
"""Get a specific control by ID."""
return self.STANDARD_CONTROLS.get(control_id)
def recommend_controls(
self,
threat: Threat,
existing_controls: List[str]
) -> List[SecurityControl]:
"""Recommend additional controls for a threat."""
available = self.get_controls_for_threat(threat.category)
return [c for c in available if c.id not in existing_controls]
Template 3: Mitigation Analysis
class MitigationAnalyzer:
"""Analyze and optimize mitigation strategies."""
def __init__(self, plan: MitigationPlan, library: ControlLibrary):
self.plan = plan
self.library = library
def calculate_overall_risk_reduction(self) -> float:
"""Calculate overall risk reduction percentage."""
if not self.plan.mappings:
return 0.0
weighted_coverage = 0
total_weight = 0
for mapping in self.plan.mappings:
# Weight by threat risk score
weight = mapping.threat.risk_score
coverage = mapping.calculate_coverage()
weighted_coverage += weight * coverage
total_weight += weight
return weighted_coverage / total_weight if total_weight > 0 else 0
def get_critical_gaps(self) -> List[Dict]:
"""Find critical gaps that need immediate attention."""
gaps = self.plan.get_gaps()
critical_threats = {t.id for t in self.plan.threats if t.impact == "Critical"}
return [g for g in gaps if g["threat"] in critical_threats]
def optimize_budget(
self,
budget: float,
cost_map: Dict[str, float]
) -> List[SecurityControl]:
"""Select controls that maximize risk reduction within budget."""
# Simple greedy approach - can be replaced with optimization algorithm
recommended = []
remaining_budget = budget
unmapped = self.plan.get_unmapped_threats()
# Sort controls by effectiveness/cost ratio
all_controls = list(self.library.STANDARD_CONTROLS.values())
controls_with_value = []
for control in all_controls:
if control.status == ImplementationStatus.NOT_IMPLEMENTED:
cost = cost_map.get(control.id, float('inf'))
if cost <= remaining_budget:
# Calculate value as threats covered * effectiveness / cost
threats_covered = len([
t for t in unmapped
if t.category in control.mitigates_threats
])
if threats_covered > 0:
value = (threats_covered * control.effectiveness.value) / cost
controls_with_value.append((control, value, cost))
# Sort by value (higher is better)
controls_with_value.sort(key=lambda x: x[1], reverse=True)
for control, value, cost in controls_with_value:
if cost <= remaining_budget:
recommended.append(control)
remaining_budget -= cost
return recommended
def generate_roadmap(self) -> List[Dict]:
"""Generate implementation roadmap by priority."""
roadmap = []
gaps = self.plan.get_gaps()
# Phase 1: Critical threats with low coverage
phase1 = []
for gap in gaps:
mapping = next(
(m for m in self.plan.mappings if m.threat.id == gap["threat"]),
None
)
if mapping and mapping.threat.impact == "Critical":
controls = self.library.get_controls_for_threat(mapping.threat.category)
phase1.extend([
{
"threat": gap["threat"],
"control": c.id,
"control_name": c.name,
"phase": 1,
"priority": "Critical"
}
for c in controls
if c.status == ImplementationStatus.NOT_IMPLEMENTED
])
roadmap.extend(phase1[:5]) # Top 5 for phase 1
# Phase 2: High impact threats
phase2 = []
for gap in gaps:
mapping = next(
(m for m in self.plan.mappings if m.threat.id == gap["threat"]),
None
)
if mapping and mapping.threat.impact == "High":
controls = self.library.get_controls_for_threat(mapping.threat.category)
phase2.extend([
{
"threat": gap["threat"],
"control": c.id,
"control_name": c.name,
"phase": 2,
"priority": "High"
}
for c in controls
if c.status == ImplementationStatus.NOT_IMPLEMENTED
])
roadmap.extend(phase2[:5]) # Top 5 for phase 2
return roadmap
def defense_in_depth_analysis(self) -> Dict[str, List[str]]:
"""Analyze defense in depth coverage."""
layer_coverage = {layer.value: [] for layer in ControlLayer}
for mapping in self.plan.mappings:
for control in mapping.controls:
if control.status in [ImplementationStatus.IMPLEMENTED, ImplementationStatus.VERIFIED]:
layer_coverage[control.layer.value].append(control.id)
return layer_coverage
def generate_report(self) -> str:
"""Generate comprehensive mitigation report."""
risk_reduction = self.calculate_overall_risk_reduction()
gaps = self.plan.get_gaps()
critical_gaps = self.get_critical_gaps()
layer_coverage = self.defense_in_depth_analysis()
report = f"""
# Threat Mitigation Report
## Executive Summary
- **Overall Risk Reduction:** {risk_reduction:.1f}%
- **Total Threats:** {len(self.plan.threats)}
- **Total Controls:** {len(self.plan.controls)}
- **Identified Gaps:** {len(gaps)}
- **Critical Gaps:** {len(critical_gaps)}
## Defense in Depth Coverage
{self._format_layer_coverage(layer_coverage)}
## Critical Gaps Requiring Immediate Action
{self._format_gaps(critical_gaps)}
## Recommendations
{self._format_recommendations()}
## Implementation Roadmap
{self._format_roadmap()}
"""
return report
def _format_layer_coverage(self, coverage: Dict[str, List[str]]) -> str:
lines = []
for layer, controls in coverage.items():
status = "✓" if controls else "✗"
lines.append(f"- {layer}: {status} ({len(controls)} controls)")
return "\n".join(lines)
def _format_gaps(self, gaps: List[Dict]) -> str:
if not gaps:
return "No critical gaps identified."
lines = []
for gap in gaps:
lines.append(f"- **{gap['threat_name']}**: {gap['issue']}")
lines.append(f" - Coverage: {gap['coverage']:.1f}%")
lines.append(f" - Recommendation: {gap['recommendation']}")
return "\n".join(lines)
def _format_recommendations(self) -> str:
recommendations = []
layer_coverage = self.defense_in_depth_analysis()
for layer, controls in layer_coverage.items():
if not controls:
recommendations.append(f"- Add {layer} layer controls")
gaps = self.plan.get_gaps()
if any(g["issue"] == "No control diversity" for g in gaps):
recommendations.append("- Add more detective and corrective controls")
return "\n".join(recommendations) if recommendations else "Current coverage is adequate."
def _format_roadmap(self) -> str:
roadmap = self.generate_roadmap()
if not roadmap:
return "No additional controls recommended at this time."
lines = []
current_phase = 0
for item in roadmap:
if item["phase"] != current_phase:
current_phase = item["phase"]
lines.append(f"\n### Phase {current_phase}")
lines.append(f"- [{item['priority']}] {item['control_name']} (for {item['threat']})")
return "\n".join(lines)
Template 4: Control Effectiveness Testing
from dataclasses import dataclass
from typing import List, Callable, Any
import asyncio
@dataclass
class ControlTest:
control_id: str
test_name: str
test_function: Callable[[], bool]
expected_result: bool
description: str
class ControlTester:
"""Test control effectiveness."""
def __init__(self):
self.tests: List[ControlTest] = []
self.results: List[Dict] = []
def add_test(self, test: ControlTest) -> None:
self.tests.append(test)
async def run_tests(self) -> List[Dict]:
"""Run all control tests."""
self.results = []
for test in self.tests:
try:
result = test.test_function()
passed = result == test.expected_result
self.results.append({
"control_id": test.control_id,
"test_name": test.test_name,
"passed": passed,
"actual_result": result,
"expected_result": test.expected_result,
"description": test.description,
"error": None
})
except Exception as e:
self.results.append({
"control_id": test.control_id,
"test_name": test.test_name,
"passed": False,
"actual_result": None,
"expected_result": test.expected_result,
"description": test.description,
"error": str(e)
})
return self.results
def get_effectiveness_score(self, control_id: str) -> float:
"""Calculate effectiveness score for a control."""
control_results = [r for r in self.results if r["control_id"] == control_id]
if not control_results:
return 0.0
passed = sum(1 for r in control_results if r["passed"])
return (passed / len(control_results)) * 100
def generate_test_report(self) -> str:
"""Generate test results report."""
if not self.results:
return "No tests have been run."
total = len(self.results)
passed = sum(1 for r in self.results if r["passed"])
report = f"""
# Control Effectiveness Test Report
## Summary
- **Total Tests:** {total}
- **Passed:** {passed}
- **Failed:** {total - passed}
- **Pass Rate:** {(passed/total)*100:.1f}%
## Results by Control
"""
# Group by control
controls = {}
for result in self.results:
cid = result["control_id"]
if cid not in controls:
controls[cid] = []
controls[cid].append(result)
for control_id, results in controls.items():
score = self.get_effectiveness_score(control_id)
report += f"\n### {control_id} (Effectiveness: {score:.1f}%)\n"
for r in results:
status = "✓" if r["passed"] else "✗"
report += f"- {status} {r['test_name']}\n"
if r["error"]:
report += f" - Error: {r['error']}\n"
return report
Best Practices
Do's
- Map all threats - No threat should be unmapped
- Layer controls - Defense in depth is essential
- Mix control types - Preventive, detective, corrective
- Track effectiveness - Measure and improve
- Review regularly - Controls degrade over time
Don'ts
- Don't rely on single controls - Single points of failure
- Don't ignore cost - ROI matters
- Don't skip testing - Untested controls may fail
- Don't set and forget - Continuous improvement
- Don't ignore people/process - Technology alone isn't enough
Resources