Claude Code Plugins

Community-maintained marketplace

Feedback

Generic AIOps (AI for IT Operations) patterns and best practices for 2025. Provides comprehensive implementation strategies for intelligent monitoring, automation, incident response, and observability across any infrastructure. Framework-agnostic approach supporting multiple monitoring platforms, cloud providers, and automation tools.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name aiops
description Generic AIOps (AI for IT Operations) patterns and best practices for 2025. Provides comprehensive implementation strategies for intelligent monitoring, automation, incident response, and observability across any infrastructure. Framework-agnostic approach supporting multiple monitoring platforms, cloud providers, and automation tools.
license MIT

AIOps - AI for IT Operations

This skill provides comprehensive patterns for implementing AIOps strategies in 2025, including intelligent monitoring, automated incident response, predictive analytics, and observability best practices. The patterns are designed to be framework-agnostic and applicable across different infrastructure platforms.

When to Use This Skill

Use this skill when you need to:

  • Implement AIOps strategies for modern infrastructure
  • Build intelligent monitoring and alerting systems
  • Create automated incident response workflows
  • Deploy predictive maintenance solutions
  • Implement self-healing capabilities
  • Build observability platforms with AI/ML
  • Optimize multi-cloud operations
  • Create chaos engineering practices
  • Implement generative AI for operations
  • Build digital twins for infrastructure

1. AIOps Architecture Patterns

Core AIOps Platform Architecture

# aiops/core/architecture.py
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import asyncio
import json
import logging

logger = logging.getLogger(__name__)

class AlertSeverity(str, Enum):
    """Alert severity levels"""
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

class IncidentStatus(str, Enum):
    """Incident status"""
    OPEN = "open"
    INVESTIGATING = "investigating"
    IDENTIFIED = "identified"
    MONITORING = "monitoring"
    RESOLVED = "resolved"
    CLOSED = "closed"

@dataclass
class Metric:
    """Metric data point"""
    name: str
    value: float
    timestamp: datetime
    labels: Dict[str, str] = field(default_factory=dict)
    source: str = ""
    unit: str = ""

@dataclass
class LogEntry:
    """Structured log entry"""
    timestamp: datetime
    level: str
    message: str
    service: str
    trace_id: Optional[str] = None
    span_id: Optional[str] = None
    labels: Dict[str, str] = field(default_factory=dict)
    stack_trace: Optional[str] = None

@dataclass
class Trace:
    """Distributed trace data"""
    trace_id: str
    spans: List[Dict[str, Any]]
    duration: timedelta
    service_map: Dict[str, List[str]]
    error_count: int = 0

@dataclass
class Alert:
    """Alert representation"""
    id: str
    name: str
    severity: AlertSeverity
    status: IncidentStatus
    description: str
    source: str
    timestamp: datetime
    labels: Dict[str, str] = field(default_factory=dict)
    annotations: Dict[str, str] = field(default_factory=dict)
    fingerprint: str = ""
    related_entities: List[str] = field(default_factory=list)

class DataSource(ABC):
    """Abstract data source interface"""

    @abstractmethod
    async def collect_metrics(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime
    ) -> List[Metric]:
        """Collect metrics from data source"""
        pass

    @abstractmethod
    async def query_logs(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime,
        limit: int = 100
    ) -> List[LogEntry]:
        """Query logs from data source"""
        pass

    @abstractmethod
    async def get_trace(self, trace_id: str) -> Optional[Trace]:
        """Get trace data"""
        pass

class AIOpsEngine:
    """Core AIOps processing engine"""

    def __init__(self):
        self.data_sources: List[DataSource] = []
        self.alert_processors: List[AlertProcessor] = []
        self.ml_models: Dict[str, Any] = {}
        self.knowledge_base: KnowledgeBase = KnowledgeBase()
        self.automation_engine = AutomationEngine()

    def register_data_source(self, data_source: DataSource) -> None:
        """Register a data source"""
        self.data_sources.append(data_source)
        logger.info(f"Registered data source: {type(data_source).__name__}")

    def register_alert_processor(self, processor: 'AlertProcessor') -> None:
        """Register an alert processor"""
        self.alert_processors.append(processor)
        logger.info(f"Registered alert processor: {type(processor).__name__}")

    async def process_alert(self, alert: Alert) -> Dict[str, Any]:
        """Process incoming alert through AIOps pipeline"""
        logger.info(f"Processing alert: {alert.id} - {alert.name}")

        # 1. Enrich alert with context
        enriched_alert = await self._enrich_alert(alert)

        # 2. Run through ML models for classification and prediction
        analysis = await self._analyze_alert(enriched_alert)

        # 3. Determine appropriate response
        response_plan = await self._generate_response_plan(enriched_alert, analysis)

        # 4. Execute automated actions if applicable
        if response_plan.get("auto_execute", False):
            await self._execute_response(enriched_alert, response_plan)

        # 5. Update knowledge base
        await self.knowledge_base.update_from_alert(enriched_alert, analysis, response_plan)

        return {
            "alert_id": alert.id,
            "status": "processed",
            "analysis": analysis,
            "response_plan": response_plan
        }

    async def _enrich_alert(self, alert: Alert) -> Alert:
        """Enrich alert with additional context"""
        # Get related metrics
        for source in self.data_sources:
            try:
                metrics = await source.collect_metrics(
                    query=f"{alert.name}[5m]",
                    start_time=alert.timestamp - timedelta(minutes=5),
                    end_time=alert.timestamp
                )

                # Add metrics to alert annotations
                alert.annotations[f"metrics_{type(source).__name__}"] = json.dumps([
                    {"name": m.name, "value": m.value} for m in metrics
                ])
            except Exception as e:
                logger.error(f"Failed to enrich alert with metrics: {e}")

        # Get related logs
        for source in self.data_sources:
            try:
                logs = await source.query_logs(
                    query=f"service:{alert.source} level:error",
                    start_time=alert.timestamp - timedelta(minutes=5),
                    end_time=alert.timestamp,
                    limit=10
                )

                # Add logs to alert context
                alert.annotations["recent_errors"] = json.dumps([
                    {"timestamp": l.timestamp.isoformat(), "message": l.message}
                    for l in logs
                ])
            except Exception as e:
                logger.error(f"Failed to enrich alert with logs: {e}")

        return alert

    async def _analyze_alert(self, alert: Alert) -> Dict[str, Any]:
        """Analyze alert using ML models"""
        analysis = {
            "classification": None,
            "severity_score": 0.0,
            "predicted_impact": "low",
            "recommendations": [],
            "related_incidents": []
        }

        # Classification model
        if "classification" in self.ml_models:
            features = self._extract_features(alert)
            analysis["classification"] = self.ml_models["classification"].predict([features])[0]

        # Severity prediction
        if "severity" in self.ml_models:
            features = self._extract_features(alert)
            analysis["severity_score"] = self.ml_models["severity"].predict_proba([features])[0][1]

        # Impact prediction
        if "impact" in self.ml_models:
            features = self._extract_features(alert)
            analysis["predicted_impact"] = self.ml_models["impact"].predict([features])[0]

        # Find related incidents from knowledge base
        analysis["related_incidents"] = await self.knowledge_base.find_similar_incidents(alert)

        return analysis

    async def _generate_response_plan(
        self,
        alert: Alert,
        analysis: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Generate automated response plan"""
        plan = {
            "priority": alert.severity,
            "actions": [],
            "escalation_rules": [],
            "auto_execute": False,
            "estimated_resolution_time": None
        }

        # Determine if auto-execution is safe
        if (analysis["classification"] in ["known_issue", "false_positive"] and
            alert.severity in [AlertSeverity.LOW, AlertSeverity.MEDIUM]):
            plan["auto_execute"] = True

        # Generate actions based on classification
        classification = analysis.get("classification")

        if classification == "resource_exhaustion":
            plan["actions"].append({
                "type": "scale",
                "target": alert.source,
                "params": {"replicas": "+20%"}
            })
            plan["actions"].append({
                "type": "notify",
                "channel": "devops",
                "message": f"Resource exhaustion detected in {alert.source}"
            })

        elif classification == "known_issue":
            # Apply known fix
            known_fix = await self.knowledge_base.get_known_fix(alert.name)
            if known_fix:
                plan["actions"].append(known_fix)

        elif classification == "false_positive":
            plan["actions"].append({
                "type": "suppress",
                "duration": "1h",
                "reason": "False positive detected"
            })

        # Add escalation rules
        if alert.severity == AlertSeverity.CRITICAL:
            plan["escalation_rules"].append({
                "condition": "no_response_5m",
                "action": "escalate_to_oncall",
                "timeout": 300
            })

        return plan

    def _extract_features(self, alert: Alert) -> List[float]:
        """Extract features for ML models"""
        # Simple feature extraction - in production, use more sophisticated methods
        features = [
            float(alert.severity == AlertSeverity.CRITICAL),
            float(alert.severity == AlertSeverity.HIGH),
            float(alert.severity == AlertSeverity.MEDIUM),
            len(alert.description),
            len(alert.labels),
            alert.timestamp.hour / 24.0,
            alert.timestamp.weekday() / 7.0
        ]
        return features

    async def _execute_response(
        self,
        alert: Alert,
        response_plan: Dict[str, Any]
    ) -> None:
        """Execute automated response actions"""
        for action in response_plan.get("actions", []):
            try:
                await self.automation_engine.execute(action)
                logger.info(f"Executed action for alert {alert.id}: {action['type']}")
            except Exception as e:
                logger.error(f"Failed to execute action: {e}")

Monitoring Platform Integrations

# aiops/integrations/monitoring.py
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
from datetime import datetime
from ..core.architecture import DataSource, Metric, LogEntry, Trace, Alert

class PrometheusDataSource(DataSource):
    """Prometheus metrics integration"""

    def __init__(self, url: str):
        self.url = url.rstrip("/")
        self.session = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if not self.session:
            self.session = aiohttp.ClientSession()
        return self.session

    async def collect_metrics(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime
    ) -> List[Metric]:
        """Query Prometheus for metrics"""
        session = await self._get_session()

        params = {
            "query": query,
            "start": start_time.timestamp(),
            "end": end_time.timestamp(),
            "step": "15s"
        }

        url = f"{self.url}/api/v1/query_range"

        async with session.get(url, params=params) as response:
            data = await response.json()
            metrics = []

            if data["status"] == "success":
                result = data["data"]
                if result["resultType"] == "matrix":
                    for series in result["result"]:
                        labels = series["metric"]
                        values = series["values"]

                        for timestamp, value in values:
                            metrics.append(Metric(
                                name=query,
                                value=float(value),
                                timestamp=datetime.fromtimestamp(float(timestamp)),
                                labels=labels,
                                source="prometheus"
                            ))

            return metrics

    async def query_logs(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime,
        limit: int = 100
    ) -> List[LogEntry]:
        """Prometheus doesn't store logs - use Loki integration"""
        return []

    async def get_trace(self, trace_id: str) -> Optional[Trace]:
        """Prometheus doesn't store traces - use Jaeger/Tempo integration"""
        return None

class LokiDataSource(DataSource):
    """Loki log aggregation integration"""

    def __init__(self, url: str):
        self.url = url.rstrip("/")
        self.session = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if not self.session:
            self.session = aiohttp.ClientSession()
        return self.session

    async def collect_metrics(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime
    ) -> List[Metric]:
        """Loki doesn't store metrics"""
        return []

    async def query_logs(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime,
        limit: int = 100
    ) -> List[LogEntry]:
        """Query Loki for logs"""
        session = await self._get_session()

        params = {
            "query": query,
            "start": start_time.isoformat(),
            "end": end_time.isoformat(),
            "limit": str(limit)
        }

        url = f"{self.url}/loki/api/v1/query_range"

        async with session.get(url, params=params) as response:
            data = await response.json()
            logs = []

            if data["status"] == "success":
                result = data["data"]
                for series in result["result"]:
                    stream = series["stream"]
                    for timestamp, line in series["values"]:
                        logs.append(LogEntry(
                            timestamp=datetime.fromtimestamp(float(timestamp) / 1e9),
                            level=stream.get("level", "info"),
                            message=line,
                            service=stream.get("service", "unknown"),
                            labels=stream
                        ))

            return logs

    async def get_trace(self, trace_id: str) -> Optional[Trace]:
        """Loki doesn't store traces"""
        return None

class JaegerDataSource(DataSource):
    """Jaeger distributed tracing integration"""

    def __init__(self, url: str):
        self.url = url.rstrip("/")
        self.session = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if not self.session:
            self.session = aiohttp.ClientSession()
        return self.session

    async def collect_metrics(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime
    ) -> List[Metric]:
        """Jaeger doesn't store metrics"""
        return []

    async def query_logs(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime,
        limit: int = 100
    ) -> List[LogEntry]:
        """Jaeger doesn't store logs"""
        return []

    async def get_trace(self, trace_id: str) -> Optional[Trace]:
        """Get trace from Jaeger"""
        session = await self._get_session()

        url = f"{self.url}/api/traces/{trace_id}"

        async with session.get(url) as response:
            data = await response.json()

            if data["data"] and data["data"][0]["spans"]:
                spans = data["data"][0]["spans"]
                service_map = {}
                error_count = 0

                for span in spans:
                    service = span["process"]["serviceName"]
                    if service not in service_map:
                        service_map[service] = []
                    service_map[service].append(span["spanID"])

                    if span.get("tags"):
                        for tag in span["tags"]:
                            if tag.get("key") == "error" and tag.get("value"):
                                error_count += 1

                return Trace(
                    trace_id=trace_id,
                    spans=spans,
                    duration=timedelta(
                        microseconds=data["data"][0]["duration"]
                    ),
                    service_map=service_map,
                    error_count=error_count
                )

            return None

class DatadogDataSource(DataSource):
    """Datadog APM integration"""

    def __init__(self, api_key: str, app_key: str, site: str = "datadoghq.com"):
        self.api_key = api_key
        self.app_key = app_key
        self.site = site
        self.base_url = f"https://api.{site}"
        self.session = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if not self.session:
            headers = {
                "DD-API-KEY": self.api_key,
                "DD-APPLICATION-KEY": self.app_key
            }
            self.session = aiohttp.ClientSession(headers=headers)
        return self.session

    async def collect_metrics(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime
    ) -> List[Metric]:
        """Query Datadog metrics"""
        session = await self._get_session()

        params = {
            "query": query,
            "from": int(start_time.timestamp()),
            "to": int(end_time.timestamp())
        }

        url = f"{self.base_url}/api/v1/query"

        async with session.get(url, params=params) as response:
            data = await response.json()
            metrics = []

            if "series" in data:
                for series in data["series"]:
                    for point in series["pointlist"]:
                        metrics.append(Metric(
                            name=series["metric"],
                            value=point[1],
                            timestamp=datetime.fromtimestamp(point[0]),
                            labels={k: str(v) for k, v in series["tag_set"].items()},
                            source="datadog",
                            unit=series.get("unit", "")
                        ))

            return metrics

    async def query_logs(
        self,
        query: str,
        start_time: datetime,
        end_time: datetime,
        limit: int = 100
    ) -> List[LogEntry]:
        """Query Datadog logs"""
        session = await self._get_session()

        payload = {
            "query": query,
            "time": {
                "from": start_time.isoformat(),
                "to": end_time.isoformat()
            },
            "limit": limit
        }

        url = f"{self.base_url}/api/v2/logs/events/search"

        async with session.post(url, json=payload) as response:
            data = await response.json()
            logs = []

            for log in data.get("data", []):
                attributes = log.get("attributes", {})

                logs.append(LogEntry(
                    timestamp=datetime.fromisoformat(attributes["timestamp"]),
                    level=attributes.get("status", "info"),
                    message=attributes.get("message", ""),
                    service=attributes.get("service", "unknown"),
                    trace_id=attributes.get("trace_id"),
                    labels=attributes.get("tags", [])
                ))

            return logs

    async def get_trace(self, trace_id: str) -> Optional[Trace]:
        """Get trace from Datadog"""
        session = await self._get_session()

        url = f"{self.base_url}/api/v1/trace/{trace_id}"

        async with session.get(url) as response:
            data = await response.json()

            # Parse Datadog trace format
            # Implementation depends on Datadog API response structure
            return None

2. Machine Learning for Operations

Anomaly Detection Models

# aiops/ml/anomaly_detection.py
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Optional
from datetime import datetime, timedelta
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import joblib
import asyncio
from collections import deque

class AnomalyDetector:
    """Generic anomaly detection for metrics and logs"""

    def __init__(self, model_type: str = "isolation_forest"):
        self.model_type = model_type
        self.models: Dict[str, Any] = {}
        self.scalers: Dict[str, StandardScaler] = {}
        self.is_trained: Dict[str, bool] = {}
        self.feature_history: Dict[str, deque] = {}

    async def detect_metric_anomaly(
        self,
        metric_name: str,
        current_value: float,
        historical_values: List[float],
        window_size: int = 100
    ) -> Tuple[bool, float]:
        """Detect anomalies in time series metrics"""
        if metric_name not in self.models:
            await self._train_metric_model(metric_name, historical_values)

        # Prepare features
        features = self._extract_time_series_features(
            current_value,
            historical_values,
            window_size
        )

        # Scale features
        if metric_name in self.scalers:
            features = self.scalers[metric_name].transform([features])

        # Predict anomaly
        model = self.models[metric_name]
        anomaly_score = model.decision_function([features])[0]
        is_anomaly = model.predict([features])[0] == -1

        return is_anomaly, float(anomaly_score)

    async def detect_log_anomaly(
        self,
        log_entries: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """Detect anomalies in log patterns"""
        # Extract log features
        features = []
        for entry in log_entries:
            features.append([
                len(entry.get("message", "")),
                entry.get("level_numeric", 0),
                entry.get("hour_of_day", 0),
                entry.get("day_of_week", 0)
            ])

        features = np.array(features)

        # Train or use existing model
        model_name = "log_patterns"
        if model_name not in self.models:
            self.models[model_name] = IsolationForest(
                contamination=0.1,
                random_state=42
            )
            self.models[model_name].fit(features)

        # Predict anomalies
        predictions = self.models[model_name].predict(features)
        scores = self.models[model_name].decision_function(features)

        # Return anomalous logs
        anomalies = []
        for i, (pred, score) in enumerate(zip(predictions, scores)):
            if pred == -1:  # Anomaly
                anomalies.append({
                    "log_index": i,
                    "log_entry": log_entries[i],
                    "anomaly_score": float(score),
                    "reason": "Unusual log pattern detected"
                })

        return anomalies

    async def detect_performance_anomaly(
        self,
        metrics: Dict[str, List[float]],
        threshold: float = 0.1
    ) -> Dict[str, Dict[str, Any]]:
        """Detect performance anomalies across multiple metrics"""
        results = {}

        for metric_name, values in metrics.items():
            if len(values) < 10:
                continue

            # Calculate statistical anomalies
            mean_val = np.mean(values)
            std_val = np.std(values)
            recent_values = values[-5:]

            for i, value in enumerate(recent_values):
                z_score = abs(value - mean_val) / (std_val + 1e-8)

                if z_score > 3:  # Statistical outlier
                    if metric_name not in results:
                        results[metric_name] = {
                            "anomalies": [],
                            "severity": "medium"
                        }

                    results[metric_name]["anomalies"].append({
                        "value": value,
                        "z_score": z_score,
                        "index": i,
                        "type": "statistical"
                    })

        return results

    async def _train_metric_model(
        self,
        metric_name: str,
        values: List[float]
    ) -> None:
        """Train anomaly detection model for a metric"""
        if len(values) < 30:
            # Not enough data for training
            return

        # Prepare training data
        training_features = []
        for i in range(10, len(values)):
            # Create window of previous values
            window = values[i-10:i]
            features = [
                np.mean(window),
                np.std(window),
                values[i],  # Current value
                (values[i] - np.mean(window)) / (np.std(window) + 1e-8),  # Z-score
                max(window) - min(window),  # Range
            ]
            training_features.append(features)

        training_features = np.array(training_features)

        # Scale features
        scaler = StandardScaler()
        scaled_features = scaler.fit_transform(training_features)
        self.scalers[metric_name] = scaler

        # Train model
        if self.model_type == "isolation_forest":
            model = IsolationForest(
                contamination=0.05,
                random_state=42
            )
            model.fit(scaled_features)
        else:
            # Add other model types as needed
            model = IsolationForest(contamination=0.05, random_state=42)
            model.fit(scaled_features)

        self.models[metric_name] = model
        self.is_trained[metric_name] = True

    def _extract_time_series_features(
        self,
        current_value: float,
        historical_values: List[float],
        window_size: int
    ) -> List[float]:
        """Extract features from time series data"""
        if len(historical_values) < window_size:
            window = historical_values
        else:
            window = historical_values[-window_size:]

        features = [
            np.mean(window),
            np.std(window),
            min(window),
            max(window),
            current_value,
            (current_value - np.mean(window)) / (np.std(window) + 1e-8),
            len([v for v in window if v > np.mean(window)]) / len(window),  # Percentage above mean
        ]

        return features

class PredictiveAnalyzer:
    """Predictive analytics for capacity planning and incident prediction"""

    def __init__(self):
        self.models: Dict[str, Any] = {}

    async def predict_capacity_needs(
        self,
        metric_history: Dict[str, List[float]],
        forecast_horizon: int = 7
    ) -> Dict[str, Dict[str, Any]]:
        """Predict future resource needs"""
        predictions = {}

        for metric_name, values in metric_history.items():
            if len(values) < 30:
                continue

            # Simple linear trend prediction (use more sophisticated models in production)
            x = np.arange(len(values))
            coeffs = np.polyfit(x, values, 1)

            # Predict future values
            future_x = np.arange(len(values), len(values) + forecast_horizon * 24)
            future_values = np.polyval(coeffs, future_x)

            # Calculate growth rate
            growth_rate = (future_values[-1] - values[-1]) / values[-1] * 100

            predictions[metric_name] = {
                "current": values[-1],
                "predicted_peak": float(max(future_values)),
                "predicted_average": float(np.mean(future_values)),
                "growth_rate": growth_rate,
                "recommendation": self._get_capacity_recommendation(growth_rate),
                "confidence": 0.75  # Should be calculated based on model accuracy
            }

        return predictions

    async def predict_incidents(
        self,
        alert_history: List[Dict[str, Any]],
        system_metrics: Dict[str, List[float]]
    ) -> Dict[str, Any]:
        """Predict likelihood of future incidents"""
        # Feature engineering
        features = self._extract_incident_features(alert_history, system_metrics)

        # Simplified risk score calculation
        risk_factors = {
            "error_rate_trend": self._calculate_trend(
                system_metrics.get("error_rate", [])
            ),
            "cpu_utilization_avg": np.mean(
                system_metrics.get("cpu_utilization", [0])
            ),
            "memory_utilization_avg": np.mean(
                system_metrics.get("memory_utilization", [0])
            ),
            "recent_incident_count": len([
                a for a in alert_history[-24:]
                if a.get("severity") == "critical"
            ]),
        }

        # Calculate risk score (0-100)
        risk_score = min(100, max(0, (
            risk_factors["error_rate_trend"] * 30 +
            risk_factors["cpu_utilization_avg"] * 25 +
            risk_factors["memory_utilization_avg"] * 25 +
            risk_factors["recent_incident_count"] * 20
        )))

        return {
            "risk_score": risk_score,
            "risk_level": self._get_risk_level(risk_score),
            "contributing_factors": risk_factors,
            "recommendations": self._get_mitigation_recommendations(risk_factors)
        }

    def _extract_incident_features(
        self,
        alert_history: List[Dict[str, Any]],
        system_metrics: Dict[str, List[float]]
    ) -> List[float]:
        """Extract features for incident prediction"""
        features = []

        # Alert patterns
        recent_alerts = alert_history[-24:] if len(alert_history) >= 24 else alert_history
        features.append(len(recent_alerts))
        features.append(len([a for a in recent_alerts if a.get("severity") == "critical"]))

        # System metrics
        for metric_name in ["cpu", "memory", "disk", "network"]:
            values = system_metrics.get(f"{metric_name}_utilization", [0])
            features.append(np.mean(values[-12:]) if values else 0)
            features.append(np.std(values[-12:]) if values else 0)

        return features

    def _calculate_trend(self, values: List[float]) -> float:
        """Calculate trend coefficient (positive = increasing)"""
        if len(values) < 2:
            return 0.0

        x = np.arange(len(values))
        coeffs = np.polyfit(x, values, 1)
        return coeffs[0]

    def _get_capacity_recommendation(self, growth_rate: float) -> str:
        """Get capacity recommendation based on growth rate"""
        if growth_rate > 50:
            return "Immediate scaling required - consider 2x current capacity"
        elif growth_rate > 20:
            return "Plan for scaling within next week - recommend 1.5x capacity"
        elif growth_rate > 0:
            return "Monitor growth - consider 25% buffer"
        else:
            return "Capacity adequate - no immediate action needed"

    def _get_risk_level(self, risk_score: float) -> str:
        """Get risk level from risk score"""
        if risk_score >= 80:
            return "critical"
        elif risk_score >= 60:
            return "high"
        elif risk_score >= 40:
            return "medium"
        else:
            return "low"

    def _get_mitigation_recommendations(
        self,
        risk_factors: Dict[str, float]
    ) -> List[str]:
        """Get mitigation recommendations based on risk factors"""
        recommendations = []

        if risk_factors["cpu_utilization_avg"] > 80:
            recommendations.append("Scale CPU resources or optimize CPU usage")

        if risk_factors["memory_utilization_avg"] > 80:
            recommendations.append("Increase memory allocation or check for memory leaks")

        if risk_factors["error_rate_trend"] > 0:
            recommendations.append("Investigate and fix increasing error rate")

        if risk_factors["recent_incident_count"] > 3:
            recommendations.append("Review recent incidents for root cause analysis")

        return recommendations

class AILogAnalyzer:
    """AI-powered log analysis and pattern detection"""

    def __init__(self):
        self.patterns: Dict[str, List[str]] = {}
        self.embeddings: Dict[str, np.ndarray] = {}

    async def analyze_logs(
        self,
        logs: List[LogEntry]
    ) -> Dict[str, Any]:
        """Analyze logs for patterns and anomalies"""
        analysis = {
            "total_logs": len(logs),
            "error_rate": 0.0,
            "patterns": [],
            "anomalies": [],
            "trending_topics": [],
            "recommendations": []
        }

        # Calculate error rate
        error_count = len([l for l in logs if l.level.lower() in ["error", "critical"]])
        analysis["error_rate"] = error_count / len(logs) * 100

        # Extract patterns using regex and ML
        analysis["patterns"] = await self._extract_patterns(logs)

        # Find anomalies
        analysis["anomalies"] = await self._detect_log_anomalies(logs)

        # Identify trending topics/errors
        analysis["trending_topics"] = await self._identify_trending_topics(logs)

        # Generate recommendations
        analysis["recommendations"] = self._generate_log_recommendations(analysis)

        return analysis

    async def _extract_patterns(self, logs: List[LogEntry]) -> List[Dict[str, Any]]:
        """Extract common patterns from logs"""
        # Simplified pattern extraction
        # In production, use more sophisticated NLP techniques
        patterns = []

        # Group logs by service
        service_logs = {}
        for log in logs:
            if log.service not in service_logs:
                service_logs[log.service] = []
            service_logs[log.service].append(log)

        # Find error patterns
        for service, service_log_list in service_logs.items():
            error_logs = [l for l in service_log_list if l.level.lower() == "error"]
            if len(error_logs) > 0:
                # Find common substrings in error messages
                common_errors = {}
                for log in error_logs:
                    # Simplified - use actual pattern matching in production
                    for word in log.message.split():
                        if len(word) > 5:  # Only consider longer words
                            common_errors[word] = common_errors.get(word, 0) + 1

                if common_errors:
                    most_common = max(common_errors, key=common_errors.get)
                    patterns.append({
                        "service": service,
                        "type": "error_pattern",
                        "pattern": most_common,
                        "count": common_errors[most_common],
                        "percentage": common_errors[most_common] / len(error_logs) * 100
                    })

        return patterns

    async def _detect_log_anomalies(self, logs: List[LogEntry]) -> List[Dict[str, Any]]:
        """Detect anomalies in logs"""
        anomalies = []

        # Check for unusual error spikes
        error_by_minute = {}
        for log in logs:
            if log.level.lower() in ["error", "critical"]:
                minute = log.timestamp.replace(second=0, microsecond=0)
                error_by_minute[minute] = error_by_minute.get(minute, 0) + 1

        if error_by_minute:
            avg_errors = np.mean(list(error_by_minute.values()))
            for minute, count in error_by_minute.items():
                if count > avg_errors * 3:  # 3x spike
                    anomalies.append({
                        "type": "error_spike",
                        "timestamp": minute.isoformat(),
                        "value": count,
                        "baseline": avg_errors,
                        "severity": "high"
                    })

        return anomalies

    async def _identify_trending_topics(self, logs: List[LogEntry]) -> List[Dict[str, Any]]:
        """Identify trending topics in logs"""
        # Simplified trending detection
        word_count = {}
        for log in logs[-100:]:  # Last 100 logs
            for word in log.message.lower().split():
                if len(word) > 5 and word.isalpha():
                    word_count[word] = word_count.get(word, 0) + 1

        # Get top trending words
        trending = sorted(word_count.items(), key=lambda x: x[1], reverse=True)[:10]

        return [
            {"topic": word, "count": count}
            for word, count in trending
        ]

    def _generate_log_recommendations(self, analysis: Dict[str, Any]) -> List[str]:
        """Generate recommendations based on log analysis"""
        recommendations = []

        if analysis["error_rate"] > 5:
            recommendations.append(
                f"High error rate ({analysis['error_rate']:.1f}%) detected - investigate immediately"
            )

        if len(analysis["anomalies"]) > 0:
            recommendations.append(
                f"Found {len(analysis['anomalies'])} anomalies - review and address"
            )

        for pattern in analysis["patterns"][:3]:
            if pattern["percentage"] > 30:
                recommendations.append(
                    f"Common error pattern in {pattern['service']}: {pattern['pattern']}"
                )

        return recommendations

3. Automation and Self-Healing

Automation Engine

# aiops/automation/engine.py
import asyncio
import subprocess
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import logging
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)

class ActionType(str, Enum):
    """Types of automation actions"""
    SCALE = "scale"
    RESTART = "restart"
    PATCH = "patch"
    NOTIFY = "notify"
    SUPPRESS = "suppress"
    ISOLATE = "isolate"
    FAILBACK = "failback"

@dataclass
class Action:
    """Automation action definition"""
    type: ActionType
    target: str
    params: Dict[str, Any]
    conditions: List[str] = None
    timeout: int = 300
    retries: int = 3

class AutomationEngine:
    """Automation engine for executing remediation actions"""

    def __init__(self):
        self.actions: Dict[str, Action] = {}
        self.execution_history: List[Dict[str, Any]] = []
        self.action_handlers: Dict[ActionType, Callable] = {
            ActionType.SCALE: self._handle_scale,
            ActionType.RESTART: self._handle_restart,
            ActionType.PATCH: self._handle_patch,
            ActionType.NOTIFY: self._handle_notify,
            ActionType.SUPPRESS: self._handle_suppress,
            ActionType.ISOLATE: self._handle_isolate,
            ActionType.FAILBACK: self._handle_failback
        }

    def register_action(self, action: Action) -> None:
        """Register an automation action"""
        self.actions[action.target] = action
        logger.info(f"Registered action: {action.type} for {action.target}")

    async def execute(self, action: Action) -> Dict[str, Any]:
        """Execute an automation action"""
        execution_id = f"{action.type}_{action.target}_{int(datetime.utcnow().timestamp())}"

        logger.info(f"Executing action {execution_id}: {action.type} on {action.target}")

        execution = {
            "id": execution_id,
            "action": action,
            "status": "running",
            "started_at": datetime.utcnow(),
            "result": None,
            "error": None
        }

        try:
            # Check conditions
            if action.conditions and not await self._check_conditions(action.conditions):
                execution["status"] = "skipped"
                execution["error"] = "Conditions not met"
                return execution

            # Execute action with retries
            result = await self._execute_with_retries(action)

            execution["status"] = "success"
            execution["result"] = result
            logger.info(f"Action {execution_id} completed successfully")

        except Exception as e:
            execution["status"] = "failed"
            execution["error"] = str(e)
            logger.error(f"Action {execution_id} failed: {e}")

        finally:
            execution["completed_at"] = datetime.utcnow()
            execution["duration"] = (execution["completed_at"] - execution["started_at"]).total_seconds()
            self.execution_history.append(execution)

        return execution

    async def _execute_with_retries(self, action: Action) -> Any:
        """Execute action with retry logic"""
        last_exception = None

        for attempt in range(action.retries + 1):
            try:
                handler = self.action_handlers.get(action.type)
                if not handler:
                    raise ValueError(f"No handler for action type: {action.type}")

                # Execute with timeout
                return await asyncio.wait_for(
                    handler(action),
                    timeout=action.timeout
                )

            except asyncio.TimeoutError:
                last_exception = TimeoutError(f"Action timed out after {action.timeout}s")
                if attempt < action.retries:
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
                    continue
                raise last_exception

            except Exception as e:
                last_exception = e
                if attempt < action.retries:
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
                    continue
                raise last_exception

    async def _check_conditions(self, conditions: List[str]) -> bool:
        """Check if action conditions are met"""
        # Implement condition checking logic
        # For example, check cluster health, time windows, etc.
        return True

    async def _handle_scale(self, action: Action) -> Dict[str, Any]:
        """Handle scale action"""
        params = action.params
        command = [
            "kubectl", "scale",
            f"--replicas={params.get('replicas', '+1')}",
            f"deployment/{action.target}"
        ]

        if "namespace" in params:
            command.insert(1, f"-n={params['namespace']}")

        result = await self._run_command(command)
        return {"scaled": True, "result": result}

    async def _handle_restart(self, action: Action) -> Dict[str, Any]:
        """Handle restart action"""
        params = action.params
        command = ["kubectl", "rollout", "restart", f"deployment/{action.target}"]

        if "namespace" in params:
            command.insert(2, f"-n={params['namespace']}")

        result = await self._run_command(command)
        return {"restarted": True, "result": result}

    async def _handle_patch(self, action: Action) -> Dict[str, Any]:
        """Handle patch action"""
        params = action.params

        # Build patch command
        command = ["kubectl", "patch"]

        if "namespace" in params:
            command.extend(["-n", params["namespace"]])

        resource_type = params.get("type", "deployment")
        command.extend([f"{resource_type}/{action.target}"])

        patch_type = params.get("patch_type", "strategic")
        command.extend(["--type", patch_type])

        patch_data = params.get("patch", {})
        command.extend(["-p", json.dumps(patch_data)])

        result = await self._run_command(command)
        return {"patched": True, "result": result}

    async def _handle_notify(self, action: Action) -> Dict[str, Any]:
        """Handle notification action"""
        params = action.params
        channel = params.get("channel", "default")
        message = params.get("message", f"AAction executed: {action.type} on {action.target}")

        # Implement notification logic (Slack, Teams, Email, etc.)
        # This is a placeholder for actual notification implementation
        notification = {
            "channel": channel,
            "message": message,
            "timestamp": datetime.utcnow().isoformat(),
            "sent": True
        }

        # Example: Send to Slack
        if channel == "slack":
            webhook_url = params.get("webhook_url")
            if webhook_url:
                slack_payload = {
                    "text": message,
                    "attachments": [{
                        "color": "warning",
                        "fields": [{
                            "title": "Action",
                            "value": f"{action.type} on {action.target}",
                            "short": True
                        }, {
                            "title": "Timestamp",
                            "value": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
                            "short": True
                        }]
                    }]
                }

                async with aiohttp.ClientSession() as session:
                    async with session.post(webhook_url, json=slack_payload) as response:
                        if response.status != 200:
                            raise Exception(f"Slack notification failed: {response.status}")

        return notification

    async def _handle_suppress(self, action: Action) -> Dict[str, Any]:
        """Handle alert suppression"""
        params = action.params
        duration = params.get("duration", "1h")
        reason = params.get("reason", "Automated suppression")

        # Implement alert suppression logic
        # This depends on your alerting system (Prometheus, AlertManager, etc.)
        suppression = {
            "target": action.target,
            "duration": duration,
            "reason": reason,
            "timestamp": datetime.utcnow().isoformat()
        }

        return {"suppressed": True, "suppression": suppression}

    async def _handle_isolate(self, action: Action) -> Dict[str, Any]:
        """Handle service isolation"""
        params = action.params
        namespace = params.get("namespace", "default")

        # Implement isolation logic
        # For example, update service to point to maintenance page
        command = [
            "kubectl", "patch", "service", action.target,
            "-n", namespace,
            "-p", '{"spec": {"selector": {"app": "maintenance"}}}'
        ]

        result = await self._run_command(command)
        return {"isolated": True, "result": result}

    async def _handle_failback(self, action: Action) -> Dict[str, Any]:
        """Handle failback to primary"""
        params = action.params
        namespace = params.get("namespace", "default")

        # Implement failback logic
        # Restore traffic to primary service
        command = [
            "kubectl", "patch", "service", action.target,
            "-n", namespace,
            "-p", '{"spec": {"selector": {"app": "' + action.target + '"}}}'
        ]

        result = await self._run_command(command)
        return {"failed_back": True, "result": result}

    async def _run_command(self, command: List[str]) -> str:
        """Run shell command"""
        process = await asyncio.create_subprocess_exec(
            *command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout, stderr = await process.communicate()

        if process.returncode != 0:
            raise Exception(f"Command failed: {stderr.decode()}")

        return stdout.decode()

class SelfHealingManager:
    """Manages self-healing policies and execution"""

    def __init__(self, automation_engine: AutomationEngine):
        self.automation_engine = automation_engine
        self.healing_policies: Dict[str, Dict[str, Any]] = {}
        self.active_healings: Dict[str, Dict[str, Any]] = {}

    def register_healing_policy(
        self,
        service_name: str,
        policy: Dict[str, Any]
    ) -> None:
        """Register a self-healing policy for a service"""
        self.healing_policies[service_name] = policy
        logger.info(f"Registered healing policy for {service_name}")

    async def trigger_healing(
        self,
        service_name: str,
        incident: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Trigger self-healing for a service"""
        if service_name not in self.healing_policies:
            return {"error": "No healing policy registered"}

        policy = self.healing_policies[service_name]
        healing_id = f"healing_{service_name}_{int(datetime.utcnow().timestamp())}"

        logger.info(f"Triggering self-healing {healing_id} for {service_name}")

        healing_record = {
            "id": healing_id,
            "service": service_name,
            "incident": incident,
            "policy": policy,
            "status": "running",
            "actions": [],
            "started_at": datetime.utcnow()
        }

        self.active_healings[healing_id] = healing_record

        try:
            # Execute healing actions based on policy
            for action_def in policy.get("actions", []):
                action = Action(
                    type=ActionType(action_def["type"]),
                    target=action_def.get("target", service_name),
                    params=action_def.get("params", {}),
                    timeout=action_def.get("timeout", 300)
                )

                result = await self.automation_engine.execute(action)
                healing_record["actions"].append({
                    "action": action_def,
                    "result": result
                })

                # Check if healing was successful
                if result["status"] != "success":
                    healing_record["status"] = "failed"
                    break
            else:
                healing_record["status"] = "success"

        except Exception as e:
            healing_record["status"] = "error"
            healing_record["error"] = str(e)
            logger.error(f"Self-healing failed: {e}")

        finally:
            healing_record["completed_at"] = datetime.utcnow()
            healing_record["duration"] = (
                healing_record["completed_at"] - healing_record["started_at"]
            ).total_seconds()

        return healing_record

    async def check_healing_status(self, healing_id: str) -> Optional[Dict[str, Any]]:
        """Get status of an active healing process"""
        return self.active_healings.get(healing_id)

4. Observability and Monitoring Best Practices

Unified Observability Platform

# aiops/observability/platform.py
import asyncio
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass, field
import json
import logging

logger = logging.getLogger(__name__)

@dataclass
class ServiceHealth:
    """Service health status"""
    service_name: str
    status: str  # healthy, degraded, unhealthy
    latency_p99: float
    error_rate: float
    throughput: float
    last_check: datetime
    dependencies: List[str] = field(default_factory=list)
    sla_status: str = "within_sla"

class ObservabilityPlatform:
    """Unified observability platform for AIOps"""

    def __init__(self, data_sources: List[DataSource]):
        self.data_sources = data_sources
        self.service_health: Dict[str, ServiceHealth] = {}
        self.slo_compliance: Dict[str, Dict[str, Any]] = {}
        self.dashboard_configs: Dict[str, Dict[str, Any]] = {}

    async def get_service_overview(self) -> Dict[str, Any]:
        """Get comprehensive service overview"""
        overview = {
            "total_services": len(self.service_health),
            "healthy_services": 0,
            "degraded_services": 0,
            "unhealthy_services": 0,
            "services": {}
        }

        for service_name, health in self.service_health.items():
            overview["services"][service_name] = {
                "status": health.status,
                "latency_p99": health.latency_p99,
                "error_rate": health.error_rate,
                "throughput": health.throughput,
                "sla_status": health.sla_status
            }

            if health.status == "healthy":
                overview["healthy_services"] += 1
            elif health.status == "degraded":
                overview["degraded_services"] += 1
            else:
                overview["unhealthy_services"] += 1

        return overview

    async def check_slo_compliance(
        self,
        service_name: str,
        slo_config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Check SLO compliance for a service"""
        now = datetime.utcnow()
        window = timedelta(days=30)  # 30-day rolling window

        # Collect metrics for SLO calculation
        metrics = []
        for source in self.data_sources:
            try:
                source_metrics = await source.collect_metrics(
                    query=slo_config["metric_query"],
                    start_time=now - window,
                    end_time=now
                )
                metrics.extend(source_metrics)
            except Exception as e:
                logger.error(f"Failed to collect metrics from {type(source).__name__}: {e}")

        if not metrics:
            return {"error": "No metrics available"}

        # Calculate SLO compliance
        total_requests = sum(m.value for m in metrics if m.name.endswith("_total"))
        good_requests = sum(m.value for m in metrics if m.name.endswith("_success"))

        if total_requests == 0:
            return {"error": "No requests in time window"}

        actual_availability = (good_requests / total_requests) * 100
        target_availability = slo_config["target"]

        compliance = {
            "service": service_name,
            "slo_name": slo_config["name"],
            "target_availability": target_availability,
            "actual_availability": actual_availability,
            "compliance": actual_availability >= target_availability,
            "error_budget": target_availability - actual_availability,
            "time_window": window.days,
            "total_requests": total_requests,
            "good_requests": good_requests,
            "calculated_at": now.isoformat()
        }

        self.slo_compliance[f"{service_name}_{slo_config['name']}"] = compliance
        return compliance

    async def generate_incident_report(
        self,
        incident_id: str,
        start_time: datetime,
        end_time: Optional[datetime] = None
    ) -> Dict[str, Any]:
        """Generate comprehensive incident report"""
        if not end_time:
            end_time = datetime.utcnow()

        report = {
            "incident_id": incident_id,
            "start_time": start_time.isoformat(),
            "end_time": end_time.isoformat(),
            "duration_hours": (end_time - start_time).total_seconds() / 3600,
            "affected_services": [],
            "metrics_impact": {},
            "error_rate_impact": {},
            "user_impact": {},
            "timeline": [],
            "root_cause_analysis": {},
            "action_items": []
        }

        # Collect data for affected services
        for service_name, health in self.service_health.items():
            if health.last_check >= start_time and health.last_check <= end_time:
                report["affected_services"].append(service_name)

                # Collect metrics impact
                for source in self.data_sources:
                    try:
                        metrics = await source.collect_metrics(
                            query=f"rate(http_requests_total[5m])",
                            start_time=start_time,
                            end_time=end_time
                        )
                        if metrics:
                            report["metrics_impact"][service_name] = [
                                {"timestamp": m.timestamp.isoformat(), "value": m.value}
                                for m in metrics
                            ]
                    except Exception as e:
                        logger.error(f"Failed to collect metrics for {service_name}: {e}")

        return report

    async def create_dashboard(
        self,
        dashboard_name: str,
        config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Create a new observability dashboard"""
        dashboard = {
            "name": dashboard_name,
            "description": config.get("description", ""),
            "panels": config.get("panels", []),
            "time_range": config.get("time_range", "1h"),
            "refresh_interval": config.get("refresh_interval", "5m"),
            "created_at": datetime.utcnow().isoformat()
        }

        self.dashboard_configs[dashboard_name] = dashboard
        return dashboard

    async def get_dashboard_data(
        self,
        dashboard_name: str,
        time_range: Optional[str] = None
    ) -> Dict[str, Any]:
        """Get data for a specific dashboard"""
        if dashboard_name not in self.dashboard_configs:
            return {"error": "Dashboard not found"}

        dashboard = self.dashboard_configs[dashboard_name]
        now = datetime.utcnow()

        # Parse time range
        if time_range:
            end_time = now
            if time_range.endswith("h"):
                start_time = now - timedelta(hours=int(time_range[:-1]))
            elif time_range.endswith("d"):
                start_time = now - timedelta(days=int(time_range[:-1]))
            else:
                start_time = now - timedelta(hours=1)
        else:
            start_time = now - timedelta(hours=1)
            end_time = now

        dashboard_data = {
            "name": dashboard_name,
            "time_range": {"start": start_time.isoformat(), "end": end_time.isoformat()},
            "panels": []
        }

        # Collect data for each panel
        for panel in dashboard["panels"]:
            panel_data = {
                "title": panel["title"],
                "type": panel["type"],
                "data": []
            }

            try:
                # Collect metrics based on panel query
                for source in self.data_sources:
                    metrics = await source.collect_metrics(
                        query=panel["query"],
                        start_time=start_time,
                        end_time=end_time
                    )
                    panel_data["data"].extend([
                        {"timestamp": m.timestamp.isoformat(), "value": m.value}
                        for m in metrics
                    ])
            except Exception as e:
                panel_data["error"] = str(e)

            dashboard_data["panels"].append(panel_data)

        return dashboard_data

# SLO and SLA Management
class SLOManager:
    """Manages Service Level Objectives and Service Level Agreements"""

    def __init__(self):
        self.slos: Dict[str, Dict[str, Any]] = {}
        self.slas: Dict[str, Dict[str, Any]] = {}

    def define_slo(
        self,
        service_name: str,
        slo_name: str,
        target: float,
        time_window: str,
        alerting_burn_rate: float = 2.0
    ) -> Dict[str, Any]:
        """Define a Service Level Objective"""
        slo = {
            "service_name": service_name,
            "slo_name": slo_name,
            "target": target,
            "time_window": time_window,
            "alerting_burn_rate": alerting_burn_rate,
            "created_at": datetime.utcnow().isoformat(),
            "status": "active"
        }

        self.slos[f"{service_name}_{slo_name}"] = slo
        return slo

    def define_sla(
        self,
        service_name: str,
        sla_name: str,
        availability_target: float,
        performance_targets: Dict[str, float],
        penalties: Dict[str, Any] = None
    ) -> Dict[str, Any]:
        """Define a Service Level Agreement"""
        sla = {
            "service_name": service_name,
            "sla_name": sla_name,
            "availability_target": availability_target,
            "performance_targets": performance_targets,
            "penalties": penalties or {},
            "created_at": datetime.utcnow().isoformat(),
            "status": "active"
        }

        self.slas[f"{service_name}_{sla_name}"] = sla
        return sla

    def calculate_error_budget(
        self,
        service_name: str,
        slo_name: str,
        period_days: int = 30
    ) -> Dict[str, Any]:
        """Calculate error budget for an SLO"""
        slo_key = f"{service_name}_{slo_name}"
        if slo_key not in self.slos:
            return {"error": "SLO not found"}

        slo = self.slos[slo_key]
        target_availability = slo["target"]

        # Calculate error budget
        period_hours = period_days * 24
        error_budget = (100 - target_availability) / 100 * period_hours

        return {
            "service": service_name,
            "slo": slo_name,
            "period_days": period_days,
            "target_availability": target_availability,
            "error_budget_hours": error_budget,
            "error_budget_percentage": 100 - target_availability,
            "remaining_error_budget": error_budget,  # Would be calculated from actual metrics
            "burn_rate": 0.0  # Would be calculated from actual metrics
        }

This comprehensive AIOps skill provides production-ready patterns for implementing AI-powered operations in 2025, including intelligent monitoring, anomaly detection, predictive analytics, automation, and self-healing capabilities.

★ Insight ───────────────────────────────────── The AIOps patterns shown here emphasize several key 2025 best practices:

  1. Data Source Abstraction: Generic interfaces allow integration with any monitoring platform (Prometheus, Datadog, New Relic, etc.)
  2. ML-Driven Operations: Anomaly detection, predictive analytics, and pattern recognition using proven ML models
  3. Automation with Safety: Retry logic, timeout handling, and condition checking ensure safe automated remediation
  4. Observability First: Unified platform approach to metrics, logs, and traces for comprehensive visibility
  5. SLO-Driven: Focus on business outcomes through Service Level Objectives and error budget management

These patterns ensure your AIOps implementation is intelligent, automated, and aligned with business objectives. ─────────────────────────────────────────────────