Claude Code Plugins

Community-maintained marketplace

Feedback
0
0

Generic Cloud-Native Deployment and Infrastructure as Code patterns for 2025. Provides comprehensive implementation strategies for multi-cloud deployments, GitOps workflows, progressive delivery, and platform engineering. Framework-agnostic approach supporting any cloud provider, deployment tool, and orchestration platform.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name cloud-native
description Generic Cloud-Native Deployment and Infrastructure as Code patterns for 2025. Provides comprehensive implementation strategies for multi-cloud deployments, GitOps workflows, progressive delivery, and platform engineering. Framework-agnostic approach supporting any cloud provider, deployment tool, and orchestration platform.
license MIT

Cloud-Native Deployment & Infrastructure Patterns

This skill provides comprehensive patterns for implementing cloud-native deployments using modern Infrastructure as Code, GitOps workflows, and progressive delivery strategies. The patterns are designed to be framework-agnostic and applicable across any cloud provider or orchestration platform.

When to Use This Skill

Use this skill when you need to:

  • Implement Infrastructure as Code with any provider (Terraform, Pulumi, CDK)
  • Build GitOps workflows with Argo CD, Flux, or similar tools
  • Create multi-cloud or hybrid deployment strategies
  • Implement progressive delivery (canary, blue-green)
  • Build internal developer platforms
  • Set up policy as code and compliance automation
  • Implement zero-trust security architectures
  • Create disaster recovery and failover strategies
  • Optimize costs across multiple providers
  • Build observability into deployments

1. Infrastructure as Code Patterns

Generic IaC Abstraction Layer

# iac/core/abstraction.py
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import json
import yaml
import asyncio
from datetime import datetime

class IaCProvider(str, Enum):
    """Supported IaC providers"""
    TERRAFORM = "terraform"
    PULUMI = "pulumi"
    CDK = "cdk"
    CROSSPLANE = "crossplane"

class CloudProvider(str, Enum):
    """Cloud providers"""
    AWS = "aws"
    AZURE = "azure"
    GCP = "gcp"
    DIGITALOCEAN = "digitalocean"
    ON_PREM = "on-prem"

@dataclass
class Resource:
    """Generic resource definition"""
    type: str
    name: str
    properties: Dict[str, Any] = field(default_factory=dict)
    dependencies: List[str] = field(default_factory=list)
    provider: Optional[CloudProvider] = None
    tags: Dict[str, str] = field(default_factory=dict)

@dataclass
class InfrastructureConfig:
    """Infrastructure configuration"""
    name: str
    environment: str
    provider: CloudProvider
    region: str
    resources: List[Resource] = field(default_factory=list)
    variables: Dict[str, Any] = field(default_factory=dict)
    outputs: Dict[str, str] = field(default_factory=dict)

class IaCBackend(ABC):
    """Abstract IaC backend interface"""

    @abstractmethod
    async def initialize(self, config: InfrastructureConfig) -> None:
        """Initialize IaC backend"""
        pass

    @abstractmethod
    async def plan(self, config: InfrastructureConfig) -> Dict[str, Any]:
        """Generate execution plan"""
        pass

    @abstractmethod
    async def apply(self, config: InfrastructureConfig) -> Dict[str, Any]:
        """Apply infrastructure changes"""
        pass

    @abstractmethod
    async def destroy(self, config: InfrastructureConfig) -> Dict[str, Any]:
        """Destroy infrastructure"""
        pass

    @abstractmethod
    async def output(self, config: InfrastructureConfig, key: str) -> Any:
        """Get output value"""
        pass

    @abstractmethod
    async def validate(self, config: InfrastructureConfig) -> List[str]:
        """Validate configuration"""
        pass

class TerraformBackend(IaCBackend):
    """Terraform backend implementation"""

    def __init__(self, working_dir: Path):
        self.working_dir = working_dir
        self.state_backend: Optional[str] = None

    async def initialize(self, config: InfrastructureConfig) -> None:
        """Initialize Terraform"""
        # Generate Terraform files
        await self._generate_terraform_files(config)

        # Run terraform init
        cmd = ["terraform", "init"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        if result["returncode"] != 0:
            raise RuntimeError(f"Terraform init failed: {result['stderr']}")

    async def plan(self, config: InfrastructureConfig) -> Dict[str, Any]:
        """Generate Terraform plan"""
        cmd = ["terraform", "plan", "-out=tfplan", "-json"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        return {
            "success": result["returncode"] == 0,
            "plan_file": self.working_dir / "tfplan",
            "output": result["stdout"],
            "changes": json.loads(result["stdout"]) if result["stdout"] else {}
        }

    async def apply(self, config: InfrastructureConfig) -> Dict[str, Any]:
        """Apply Terraform changes"""
        cmd = ["terraform", "apply", "-auto-approve", "-json", "tfplan"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        return {
            "success": result["returncode"] == 0,
            "output": result["stdout"],
            "applied_at": datetime.utcnow().isoformat()
        }

    async def destroy(self, config: InfrastructureConfig) -> Dict[str, Any]:
        """Destroy Terraform resources"""
        cmd = ["terraform", "destroy", "-auto-approve", "-json"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        return {
            "success": result["returncode"] == 0,
            "output": result["stdout"],
            "destroyed_at": datetime.utcnow().isoformat()
        }

    async def output(self, config: InfrastructureConfig, key: str) -> Any:
        """Get Terraform output"""
        cmd = ["terraform", "output", "-json", key]
        result = await self._run_command(cmd, cwd=self.working_dir)

        if result["returncode"] == 0:
            return json.loads(result["stdout"])
        return None

    async def validate(self, config: InfrastructureConfig) -> List[str]:
        """Validate Terraform configuration"""
        errors = []

        # Run terraform validate
        cmd = ["terraform", "validate"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        if result["returncode"] != 0:
            errors.append(f"Validation failed: {result['stderr']}")

        # Run terraform fmt check
        cmd = ["terraform", "fmt", "-check", "-diff"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        if result["returncode"] != 0:
            errors.append("Formatting check failed - run terraform fmt")

        # Check for required variables
        cmd = ["terraform", "validate", "-json"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        return errors

    async def _generate_terraform_files(self, config: InfrastructureConfig) -> None:
        """Generate Terraform configuration files"""
        # Generate main.tf
        main_tf = {
            "terraform": {
                "required_providers": self._get_required_providers(config.provider),
                "backend": self._get_backend_config()
            },
            "provider": {config.provider.value: self._get_provider_config(config)},
            "resource": self._convert_resources(config.resources)
        }

        with open(self.working_dir / "main.tf", "w") as f:
            yaml.dump(main_tf, f, sort_keys=False)

        # Generate variables.tf
        if config.variables:
            variables_tf = {
                "variable": {
                    k: {"type": self._infer_type(v), "default": v}
                    for k, v in config.variables.items()
                }
            }

            with open(self.working_dir / "variables.tf", "w") as f:
                yaml.dump(variables_tf, f, sort_keys=False)

        # Generate outputs.tf
        if config.outputs:
            outputs_tf = {
                "output": {
                    k: {"value": v}
                    for k, v in config.outputs.items()
                }
            }

            with open(self.working_dir / "outputs.tf", "w") as f:
                yaml.dump(outputs_tf, f, sort_keys=False)

    async def _run_command(self, cmd: List[str], cwd: Path) -> Dict[str, Any]:
        """Run command and return result"""
        process = await asyncio.create_subprocess_exec(
            *cmd,
            cwd=cwd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout, stderr = await process.communicate()

        return {
            "returncode": process.returncode,
            "stdout": stdout.decode(),
            "stderr": stderr.decode()
        }

    def _get_required_providers(self, provider: CloudProvider) -> Dict[str, Any]:
        """Get required providers configuration"""
        providers = {}

        if provider == CloudProvider.AWS:
            providers["aws"] = {"source": "hashicorp/aws", "version": "~> 5.0"}
        elif provider == CloudProvider.AZURE:
            providers["azurerm"] = {"source": "hashicorp/azurerm", "version": "~> 3.0"}
        elif provider == CloudProvider.GCP:
            providers["google"] = {"source": "hashicorp/google", "version": "~> 5.0"}

        return providers

    def _get_backend_config(self) -> Dict[str, Any]:
        """Get Terraform backend configuration"""
        if self.state_backend:
            return {"s3": self.state_backend}
        return {}

    def _get_provider_config(self, config: InfrastructureConfig) -> Dict[str, Any]:
        """Get provider-specific configuration"""
        base_config = {"region": config.region}

        # Add provider-specific settings
        if config.provider == CloudProvider.AWS:
            base_config["default_tags"] = {
                "tags": {
                    "Environment": config.environment,
                    "ManagedBy": "terraform",
                    "Project": config.name
                }
            }

        return base_config

    def _convert_resources(self, resources: List[Resource]) -> Dict[str, Any]:
        """Convert generic resources to Terraform format"""
        terraform_resources = {}

        for resource in resources:
            resource_key = f"{resource.type}_{resource.name}"

            resource_config = {
                "provider": resource.provider.value if resource.provider else None,
                **resource.properties
            }

            if resource.tags:
                resource_config["tags"] = resource.tags

            terraform_resources[resource_key] = resource_config

        return terraform_resources

    def _infer_type(self, value: Any) -> str:
        """Infer Terraform variable type from value"""
        if isinstance(value, bool):
            return "bool"
        elif isinstance(value, int):
            return "number"
        elif isinstance(value, list):
            return "list(string)"
        elif isinstance(value, dict):
            return "map(string)"
        else:
            return "string"

class PulumiBackend(IaCBackend):
    """Pulumi backend implementation"""

    def __init__(self, working_dir: Path, language: str = "python"):
        self.working_dir = working_dir
        self.language = language

    async def initialize(self, config: InfrastructureConfig) -> None:
        """Initialize Pulumi"""
        # Generate Pulumi program
        await self._generate_pulumi_program(config)

        # Run pulumi stack init
        cmd = ["pulumi", "stack", "init", f"{config.name}-{config.environment}"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        if result["returncode"] != 0:
            raise RuntimeError(f"Pulumi stack init failed: {result['stderr']}")

    async def plan(self, config: InfrastructureConfig) -> Dict[str, Any]:
        """Preview Pulumi changes"""
        cmd = ["pulumi", "preview", "--json"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        return {
            "success": result["returncode"] == 0,
            "output": result["stdout"],
            "changes": json.loads(result["stdout"]) if result["stdout"] else {}
        }

    async def apply(self, config: InfrastructureConfig) -> Dict[str, Any]:
        """Apply Pulumi changes"""
        cmd = ["pulumi", "up", "--yes", "--json"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        return {
            "success": result["returncode"] == 0,
            "output": result["stdout"],
            "applied_at": datetime.utcnow().isoformat()
        }

    async def destroy(self, config: InfrastructureConfig) -> Dict[str, Any]:
        """Destroy Pulumi resources"""
        cmd = ["pulumi", "destroy", "--yes", "--json"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        return {
            "success": result["returncode"] == 0,
            "output": result["stdout"],
            "destroyed_at": datetime.utcnow().isoformat()
        }

    async def output(self, config: InfrastructureConfig, key: str) -> Any:
        """Get Pulumi stack output"""
        cmd = ["pulumi", "stack", "output", "--json"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        if result["returncode"] == 0:
            outputs = json.loads(result["stdout"])
            return outputs.get(key)
        return None

    async def validate(self, config: InfrastructureConfig) -> List[str]:
        """Validate Pulumi configuration"""
        errors = []

        # Run pulumi config validate
        cmd = ["pulumi", "config", "validate"]
        result = await self._run_command(cmd, cwd=self.working_dir)

        if result["returncode"] != 0:
            errors.append(f"Validation failed: {result['stderr']}")

        return errors

    async def _generate_pulumi_program(self, config: InfrastructureConfig) -> None:
        """Generate Pulumi program based on language"""
        if self.language == "python":
            await self._generate_python_program(config)
        elif self.language == "typescript":
            await self._generate_typescript_program(config)

    async def _generate_python_program(self, config: InfrastructureConfig) -> None:
        """Generate Python Pulumi program"""
        program = f'''"""
Pulumi program for {config.name} - {config.environment}
"""

import pulumi
import pulumi_{config.provider.value} as provider

# Configuration
config = pulumi.Config()

# Resources
'''

        # Generate resources
        for resource in config.resources:
            provider_module = f"provider.{resource.type.replace('_', '.')}"
            program += f'''
{resource.name}_resource = {provider_module}.{resource.title()}(
    "{resource.name}",
'''

            # Add properties
            for key, value in resource.properties.items():
                if isinstance(value, str):
                    program += f'    {key}="{value}",\n'
                else:
                    program += f'    {key}={value},\n'

            # Add tags if present
            if resource.tags:
                program += f'    tags={resource.tags},\n'

            program += ')\n\n'

        # Generate outputs
        for key, value in config.outputs.items():
            program += f'pulumi.export("{key}", {value})\n'

        # Write program file
        with open(self.working_dir / "__main__.py", "w") as f:
            f.write(program)

        # Generate requirements.txt
        with open(self.working_dir / "requirements.txt", "w") as f:
            f.write(f"pulumi>={config.provider.value}\n")

    async def _run_command(self, cmd: List[str], cwd: Path) -> Dict[str, Any]:
        """Run command and return result"""
        process = await asyncio.create_subprocess_exec(
            *cmd,
            cwd=cwd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout, stderr = await process.communicate()

        return {
            "returncode": process.returncode,
            "stdout": stdout.decode(),
            "stderr": stderr.decode()
        }

# Factory for creating IaC backends
class IaCFactory:
    """Factory for creating IaC backends"""

    @staticmethod
    def create(
        provider: IaCProvider,
        working_dir: Path,
        **kwargs
    ) -> IaCBackend:
        """Create IaC backend instance"""

        working_dir.mkdir(parents=True, exist_ok=True)

        if provider == IaCProvider.TERRAFORM:
            return TerraformBackend(working_dir)
        elif provider == IaCProvider.PULUMI:
            return PulumiBackend(
                working_dir,
                language=kwargs.get("language", "python")
            )
        else:
            raise ValueError(f"Unsupported IaC provider: {provider}")

Multi-Cloud Resource Manager

# iac/multicloud/manager.py
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import asyncio
from ..core.abstraction import (
    InfrastructureConfig, Resource, CloudProvider, IaCBackend
)

@dataclass
class CloudDeployment:
    """Cloud deployment configuration"""
    provider: CloudProvider
    region: str
    environment: str
    resources: List[Resource]
    priority: int = 0  # Higher priority = primary deployment
    enabled: bool = True

class MultiCloudManager:
    """Manages deployments across multiple cloud providers"""

    def __init__(self):
        self.deployments: Dict[str, CloudDeployment] = {}
        self.backends: Dict[str, IaCBackend] = {}
        self.global_resources: List[Resource] = []
        self.shared_state: Dict[str, Any] = {}

    def add_deployment(self, name: str, deployment: CloudDeployment) -> None:
        """Add a cloud deployment"""
        self.deployments[name] = deployment

    def add_global_resource(self, resource: Resource) -> None:
        """Add a global resource (DNS, CDN, etc.)"""
        self.global_resources.append(resource)

    async def deploy_all(self) -> Dict[str, Any]:
        """Deploy to all configured clouds"""
        results = {}

        # Sort by priority (primary deployments first)
        sorted_deployments = sorted(
            self.deployments.items(),
            key=lambda x: x[1].priority,
            reverse=True
        )

        for name, deployment in sorted_deployments:
            if not deployment.enabled:
                continue

            print(f"Deploying to {name} ({deployment.provider.value})...")

            try:
                result = await self._deploy_single(name, deployment)
                results[name] = result

                # Store shared outputs
                if result["success"]:
                    self.shared_state[name] = result.get("outputs", {})

            except Exception as e:
                results[name] = {
                    "success": False,
                    "error": str(e)
                }
                print(f"Failed to deploy to {name}: {e}")

        return results

    async def _deploy_single(
        self,
        name: str,
        deployment: CloudDeployment
    ) -> Dict[str, Any]:
        """Deploy to a single cloud provider"""

        # Create IaC config
        config = InfrastructureConfig(
            name=name,
            environment=deployment.environment,
            provider=deployment.provider,
            region=deployment.region,
            resources=self._merge_resources(deployment),
            variables=self._get_variables(deployment)
        )

        # Get or create backend
        if name not in self.backends:
            from ..core.abstraction import IaCFactory, IaCProvider
            backend = IaCFactory.create(
                IaCProvider.TERRAFORM,
                Path(f"iac/{name}")
            )
            self.backends[name] = backend

        backend = self.backends[name]

        # Initialize and deploy
        await backend.initialize(config)

        # Plan
        plan = await backend.plan(config)
        if not plan["success"]:
            return {"success": False, "error": "Plan failed"}

        # Apply
        apply_result = await backend.apply(config)

        # Get outputs
        outputs = {}
        for key in deployment.resources:
            if hasattr(key, 'name'):
                output_value = await backend.output(config, key.name)
                if output_value:
                    outputs[key.name] = output_value

        return {
            "success": apply_result["success"],
            "outputs": outputs,
            "deployment_info": {
                "provider": deployment.provider.value,
                "region": deployment.region,
                "environment": deployment.environment
            }
        }

    def _merge_resources(self, deployment: CloudDeployment) -> List[Resource]:
        """Merge global and deployment-specific resources"""
        resources = []

        # Add global resources first
        resources.extend(self.global_resources)

        # Add deployment-specific resources
        resources.extend(deployment.resources)

        return resources

    def _get_variables(self, deployment: CloudDeployment) -> Dict[str, Any]:
        """Get variables for deployment"""
        variables = {
            "environment": deployment.environment,
            "region": deployment.region
        }

        # Add shared state from other deployments
        for name, state in self.shared_state.items():
            if name != deployment.provider.value:
                variables.update({
                    f"{name}_endpoint": state.get("endpoint"),
                    f"{name}_credentials": state.get("credentials")
                })

        return variables

    async def failover(self, primary: str, backup: str) -> Dict[str, Any]:
        """Perform failover from primary to backup deployment"""

        print(f"Initiating failover from {primary} to {backup}")

        # Get backup deployment
        backup_deployment = self.deployments.get(backup)
        if not backup_deployment:
            return {"success": False, "error": f"Backup deployment {backup} not found"}

        # Update DNS to point to backup
        dns_resource = Resource(
            type="route53_record",
            name="failover",
            properties={
                "name": "api.example.com",
                "type": "A",
                "ttl": 60,
                "records": [self.shared_state[backup]["endpoint"]]
            }
        )

        # Add DNS update to backup deployment
        backup_deployment.resources.append(dns_resource)

        # Deploy backup with updated DNS
        result = await self._deploy_single(backup, backup_deployment)

        if result["success"]:
            # Disable primary
            if primary in self.deployments:
                self.deployments[primary].enabled = False

            print(f"Failover to {backup} completed successfully")

        return result

    async def sync_state(self) -> Dict[str, Any]:
        """Synchronize state across deployments"""

        sync_results = {}

        for name, deployment in self.deployments.items():
            if not deployment.enabled:
                continue

            try:
                backend = self.backends.get(name)
                if not backend:
                    continue

                # Refresh state
                config = InfrastructureConfig(
                    name=name,
                    environment=deployment.environment,
                    provider=deployment.provider,
                    region=deployment.region,
                    resources=[]
                )

                # Get current state
                outputs = {}
                for resource in deployment.resources:
                    if hasattr(resource, 'name'):
                        output = await backend.output(config, resource.name)
                        if output:
                            outputs[resource.name] = output

                sync_results[name] = {
                    "success": True,
                    "outputs": outputs
                }

            except Exception as e:
                sync_results[name] = {
                    "success": False,
                    "error": str(e)
                }

        return sync_results

2. GitOps Patterns

Generic GitOps Engine

# gitops/engine.py
import asyncio
import aiohttp
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import yaml
import json
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class GitOpsTool(str, Enum):
    """Supported GitOps tools"""
    ARGOCD = "argocd"
    FLUX = "flux"
    JENKINS_X = "jenkins-x"

class DeploymentStrategy(str, Enum):
    """Deployment strategies"""
    ROLLING = "rolling"
    BLUE_GREEN = "blue-green"
    CANARY = "canary"
    A_B = "a-b"

@dataclass
class Application:
    """Application definition for GitOps"""
    name: str
    repo_url: str
    path: str
    target_revision: str = "HEAD"
    namespace: str = "default"
    destination_server: str = "https://kubernetes.default.svc"
    sync_policy: Dict[str, Any] = field(default_factory=dict)
    health_checks: List[Dict[str, Any]] = field(default_factory=list)
    hooks: Dict[str, List[str]] = field(default_factory=dict)

@dataclass
class GitOpsConfig:
    """GitOps configuration"""
    tool: GitOpsTool
    namespace: str = "gitops-system"
    applications: List[Application] = field(default_factory=list)
    repositories: Dict[str, str] = field(default_factory=dict)
    policies: List[Dict[str, Any]] = field(default_factory=list)
    notifications: Dict[str, Any] = field(default_factory=dict)

class GitOpsEngine:
    """Generic GitOps engine"""

    def __init__(self, config: GitOpsConfig):
        self.config = config
        self.kubeconfig_path: Optional[Path] = None
        self.tool_client = None

    async def initialize(self) -> None:
        """Initialize GitOps tool"""

        if self.config.tool == GitOpsTool.ARGOCD:
            await self._init_argocd()
        elif self.config.tool == GitOpsTool.FLUX:
            await self._init_flux()
        else:
            raise ValueError(f"Unsupported GitOps tool: {self.config.tool}")

    async def deploy_application(
        self,
        app: Application,
        strategy: DeploymentStrategy = DeploymentStrategy.ROLLING
    ) -> Dict[str, Any]:
        """Deploy application using GitOps"""

        logger.info(f"Deploying application {app.name} with {strategy.value} strategy")

        # Pre-deployment hooks
        await self._execute_hooks(app, "pre-deploy")

        # Deploy based on strategy
        if strategy == DeploymentStrategy.CANARY:
            result = await self._deploy_canary(app)
        elif strategy == DeploymentStrategy.BLUE_GREEN:
            result = await self._deploy_blue_green(app)
        else:
            result = await self._deploy_rolling(app)

        # Post-deployment hooks
        if result["success"]:
            await self._execute_hooks(app, "post-deploy")

            # Health checks
            health_status = await self._check_health(app)
            result["health"] = health_status

        return result

    async def _init_argocd(self) -> None:
        """Initialize ArgoCD"""

        # Install ArgoCD if not present
        install_cmd = [
            "kubectl", "apply", "-n", self.config.namespace,
            "-f", "https://raw.githubusercontent.com/argoproj/argo-cd/stable/manifests/install.yaml"
        ]

        result = await self._run_kubectl_command(install_cmd)

        if result["returncode"] != 0 and "AlreadyExists" not in result["stderr"]:
            raise RuntimeError(f"Failed to install ArgoCD: {result['stderr']}")

        # Wait for ArgoCD to be ready
        await self._wait_for_deployment("argocd-server", self.config.namespace)

    async def _init_flux(self) -> None:
        """Initialize Flux"""

        # Install Flux CLI
        install_cmd = [
            "curl", "-s", "https://fluxcd.io/install.sh | bash"
        ]

        result = await self._run_command(install_cmd, shell=True)

        if result["returncode"] != 0:
            raise RuntimeError(f"Failed to install Flux: {result['stderr']}")

        # Bootstrap Flux
        bootstrap_cmd = [
            "flux", "bootstrap", "git",
            "--url", self.config.repositories.get("main", ""),
            "--branch", "main",
            "--path", "clusters/my-cluster"
        ]

        result = await self._run_command(bootstrap_cmd)

        if result["returncode"] != 0:
            raise RuntimeError(f"Failed to bootstrap Flux: {result['stderr']}")

    async def _deploy_rolling(self, app: Application) -> Dict[str, Any]:
        """Deploy using rolling update strategy"""

        # Create ArgoCD application manifest
        app_manifest = self._generate_argocd_app(app)

        # Apply manifest
        result = await self._apply_kubernetes_yaml(app_manifest)

        # Wait for sync
        if result["success"]:
            sync_result = await self._wait_for_sync(app.name)
            result.update(sync_result)

        return result

    async def _deploy_canary(self, app: Application) -> Dict[str, Any]:
        """Deploy using canary strategy"""

        # Create canary application
        canary_app = Application(
            name=f"{app.name}-canary",
            repo_url=app.repo_url,
            path=app.path,
            target_revision=app.target_revision,
            namespace=app.namespace,
            sync_policy={
                "automated": {
                    "prune": False,
                    "self_heal": False
                }
            }
        )

        # Create stable application
        stable_app = Application(
            name=f"{app.name}-stable",
            repo_url=app.repo_url,
            path=app.path,
            target_revision=app.target_revision,
            namespace=app.namespace,
            sync_policy=app.sync_policy
        )

        results = {}

        # Deploy stable first
        results["stable"] = await self._deploy_rolling(stable_app)

        # Deploy canary with small traffic split
        if results["stable"]["success"]:
            results["canary"] = await self._deploy_rolling(canary_app)

            # Set up traffic splitting
            if results["canary"]["success"]:
                traffic_result = await self._setup_traffic_split(app, canary_weight=10)
                results["traffic"] = traffic_result

        return {
            "success": all(r.get("success", False) for r in results.values()),
            "phases": results
        }

    async def _deploy_blue_green(self, app: Application) -> Dict[str, Any]:
        """Deploy using blue-green strategy"""

        # Determine active color
        active_color = await self._get_active_color(app.name)
        new_color = "green" if active_color == "blue" else "blue"

        # Create new deployment
        new_app = Application(
            name=f"{app.name}-{new_color}",
            repo_url=app.repo_url,
            path=app.path,
            target_revision=app.target_revision,
            namespace=app.namespace,
            sync_policy=app.sync_policy
        )

        # Deploy new color
        deploy_result = await self._deploy_rolling(new_app)

        if deploy_result["success"]:
            # Health check new deployment
            health_result = await self._check_deployment_health(
                f"{app.name}-{new_color}",
                app.namespace
            )

            if health_result["healthy"]:
                # Switch traffic
                switch_result = await self._switch_traffic(app.name, new_color)
                deploy_result["traffic_switch"] = switch_result

                if switch_result["success"]:
                    # Clean up old deployment
                    old_color = active_color
                    cleanup_result = await self._cleanup_deployment(
                        f"{app.name}-{old_color}",
                        app.namespace
                    )
                    deploy_result["cleanup"] = cleanup_result
            else:
                deploy_result["health_check"] = health_result
                deploy_result["success"] = False

        return deploy_result

    def _generate_argocd_app(self, app: Application) -> str:
        """Generate ArgoCD application manifest"""

        manifest = {
            "apiVersion": "argoproj.io/v1alpha1",
            "kind": "Application",
            "metadata": {
                "name": app.name,
                "namespace": self.config.namespace
            },
            "spec": {
                "project": "default",
                "source": {
                    "repoURL": app.repo_url,
                    "targetRevision": app.target_revision,
                    "path": app.path
                },
                "destination": {
                    "server": app.destination_server,
                    "namespace": app.namespace
                },
                "syncPolicy": {
                    "automated": {
                        "prune": app.sync_policy.get("prune", True),
                        "selfHeal": app.sync_policy.get("self_heal", True)
                    },
                    "syncOptions": [
                        "CreateNamespace=true"
                    ]
                }
            }
        }

        return yaml.dump(manifest, sort_keys=False)

    async def _apply_kubernetes_yaml(self, yaml_content: str) -> Dict[str, Any]:
        """Apply Kubernetes YAML manifest"""

        cmd = ["kubectl", "apply", "-f", "-"]

        process = await asyncio.create_subprocess_exec(
            *cmd,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env={"KUBECONFIG": str(self.kubeconfig_path)} if self.kubeconfig_path else None
        )

        stdout, stderr = await process.communicate(yaml_content.encode())

        return {
            "success": process.returncode == 0,
            "stdout": stdout.decode(),
            "stderr": stderr.decode()
        }

    async def _wait_for_sync(self, app_name: str, timeout: int = 300) -> Dict[str, Any]:
        """Wait for ArgoCD application to sync"""

        start_time = datetime.utcnow()

        while (datetime.utcnow() - start_time).seconds < timeout:
            cmd = ["argocd", "app", "get", app_name, "-o", "json"]
            result = await self._run_command(cmd)

            if result["returncode"] == 0:
                app_data = json.loads(result["stdout"])
                status = app_data.get("status", {})

                sync_status = status.get("sync", {}).get("status", "")
                health_status = status.get("health", {}).get("status", "")

                if sync_status == "Synced" and health_status == "Healthy":
                    return {
                        "success": True,
                        "status": "Synced and Healthy",
                        "duration": (datetime.utcnow() - start_time).seconds
                    }

            await asyncio.sleep(5)

        return {
            "success": False,
            "error": f"Sync timeout after {timeout} seconds"
        }

    async def _execute_hooks(self, app: Application, hook_type: str) -> None:
        """Execute deployment hooks"""

        hooks = app.hooks.get(hook_type, [])

        for hook in hooks:
            logger.info(f"Executing {hook_type} hook: {hook}")

            try:
                result = await self._run_command(hook, shell=True)

                if result["returncode"] != 0:
                    logger.error(f"Hook failed: {result['stderr']}")

            except Exception as e:
                logger.error(f"Error executing hook: {e}")

    async def _check_health(self, app: Application) -> Dict[str, Any]:
        """Check application health"""

        health_results = {}

        for check in app.health_checks:
            if check["type"] == "http":
                result = await self._check_http_health(check)
            elif check["type"] == "kubernetes":
                result = await self._check_kubernetes_health(check)
            else:
                result = {"healthy": False, "error": f"Unknown health check type: {check['type']}"}

            health_results[check["name"]] = result

        return health_results

    async def _run_kubectl_command(self, cmd: List[str]) -> Dict[str, Any]:
        """Run kubectl command"""
        return await self._run_command(
            ["kubectl"] + cmd,
            env={"KUBECONFIG": str(self.kubeconfig_path)} if self.kubeconfig_path else None
        )

    async def _run_command(
        self,
        cmd: List[str],
        shell: bool = False,
        env: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """Run command"""

        if shell:
            cmd = ["bash", "-c", " ".join(cmd) if isinstance(cmd, list) else cmd]

        process = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=env
        )

        stdout, stderr = await process.communicate()

        return {
            "returncode": process.returncode,
            "stdout": stdout.decode(),
            "stderr": stderr.decode()
        }

    async def _wait_for_deployment(
        self,
        deployment_name: str,
        namespace: str,
        timeout: int = 300
    ) -> None:
        """Wait for deployment to be ready"""

        cmd = [
            "kubectl", "wait",
            f"deployment/{deployment_name}",
            "--for", "condition=available",
            f"--namespace={namespace}",
            f"--timeout={timeout}s"
        ]

        result = await self._run_kubectl_command(cmd)

        if result["returncode"] != 0:
            raise RuntimeError(f"Deployment {deployment_name} not ready: {result['stderr']}")

3. Progressive Delivery Patterns

Canary Analysis Engine

# delivery/canary.py
import asyncio
import aiohttp
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import numpy as np
import logging

logger = logging.getLogger(__name__)

@dataclass
class CanaryMetric:
    """Canary analysis metric"""
    name: str
    query: str
    weight: float = 1.0
    threshold: Optional[float] = None
    comparison: str = "less_than"  # less_than, greater_than, equals

@dataclass
class CanaryConfig:
    """Canary deployment configuration"""
    application: str
    namespace: str
    baseline_label: str = "version=baseline"
    canary_label: str = "version=canary"
    traffic_step: int = 10
    max_traffic: int = 50
    analysis_interval: int = 60  # seconds
    metrics: List[CanaryMetric] = field(default_factory=list)
    success_criteria: Dict[str, float] = field(default_factory=dict)
    rollback_on_failure: bool = True

class CanaryAnalyzer:
    """Analyzes canary deployment metrics"""

    def __init__(self, prometheus_url: str):
        self.prometheus_url = prometheus_url
        self.session = None

    async def analyze(
        self,
        config: CanaryConfig,
        current_traffic: int
    ) -> Dict[str, Any]:
        """Analyze canary vs baseline metrics"""

        analysis = {
            "timestamp": datetime.utcnow().isoformat(),
            "traffic_percentage": current_traffic,
            "metrics": {},
            "overall_score": 0.0,
            "recommendation": "proceed"
        }

        # Query metrics for both baseline and canary
        baseline_metrics = await self._query_metrics(config, "baseline")
        canary_metrics = await self._query_metrics(config, "canary")

        # Analyze each metric
        metric_scores = []

        for metric in config.metrics:
            baseline_value = baseline_metrics.get(metric.name, 0)
            canary_value = canary_metrics.get(metric.name, 0)

            # Calculate percentage difference
            if baseline_value != 0:
                diff_percentage = ((canary_value - baseline_value) / baseline_value) * 100
            else:
                diff_percentage = 0

            # Determine if metric passes threshold
            passes = self._evaluate_metric(metric, canary_value, baseline_value)

            # Calculate score
            score = self._calculate_score(metric, diff_percentage, passes)
            metric_scores.append(score * metric.weight)

            analysis["metrics"][metric.name] = {
                "baseline": baseline_value,
                "canary": canary_value,
                "difference_percentage": diff_percentage,
                "passes": passes,
                "score": score
            }

        # Calculate overall score
        if metric_scores:
            analysis["overall_score"] = np.mean(metric_scores)

        # Determine recommendation
        analysis["recommendation"] = self._get_recommendation(
            analysis["overall_score"],
            config,
            current_traffic
        )

        return analysis

    async def _query_metrics(
        self,
        config: CanaryConfig,
        variant: str
    ) -> Dict[str, float]:
        """Query metrics from Prometheus"""

        label = config.canary_label if variant == "canary" else config.baseline_label
        metrics = {}

        for metric in config.metrics:
            query = metric.query.format(label=label)

            try:
                value = await self._query_prometheus(query)
                metrics[metric.name] = value
            except Exception as e:
                logger.error(f"Failed to query metric {metric.name}: {e}")
                metrics[metric.name] = 0

        return metrics

    async def _query_prometheus(self, query: str) -> float:
        """Query single metric from Prometheus"""

        if not self.session:
            self.session = aiohttp.ClientSession()

        url = f"{self.prometheus_url}/api/v1/query"
        params = {"query": query}

        async with self.session.get(url, params=params) as response:
            data = await response.json()

            if data["status"] == "success" and data["data"]["result"]:
                return float(data["data"]["result"][0]["value"][1])

            return 0.0

    def _evaluate_metric(
        self,
        metric: CanaryMetric,
        canary_value: float,
        baseline_value: float
    ) -> bool:
        """Evaluate if metric passes threshold"""

        if metric.threshold is None:
            return True

        if metric.comparison == "less_than":
            return canary_value <= metric.threshold
        elif metric.comparison == "greater_than":
            return canary_value >= metric.threshold
        elif metric.comparison == "equals":
            return abs(canary_value - metric.threshold) < 0.01

        return True

    def _calculate_score(
        self,
        metric: CanaryMetric,
        diff_percentage: float,
        passes: bool
    ) -> float:
        """Calculate metric score"""

        if not passes:
            return 0.0

        # Score based on how close to baseline
        if abs(diff_percentage) < 5:
            return 1.0
        elif abs(diff_percentage) < 10:
            return 0.8
        elif abs(diff_percentage) < 20:
            return 0.6
        else:
            return 0.4

    def _get_recommendation(
        self,
        score: float,
        config: CanaryConfig,
        current_traffic: int
    ) -> str:
        """Get deployment recommendation"""

        if score < 0.5:
            return "rollback"
        elif score < 0.7:
            if current_traffic >= config.max_traffic:
                return "rollback"
            return "hold"
        else:
            if current_traffic >= config.max_traffic:
                return "promote"
            return "promote"

class CanaryController:
    """Controls canary deployment lifecycle"""

    def __init__(self, analyzer: CanaryAnalyzer, k8s_client):
        self.analyzer = analyzer
        self.k8s_client = k8s_client
        self.active_canaries: Dict[str, Dict[str, Any]] = {}

    async def start_canary(self, config: CanaryConfig) -> str:
        """Start canary deployment"""

        canary_id = f"{config.application}-{int(datetime.utcnow().timestamp())}"

        # Create initial canary deployment
        await self._create_canary_deployment(config)

        # Set up initial traffic split
        await self._set_traffic_split(config, config.traffic_step)

        # Start analysis loop
        asyncio.create_task(self._analysis_loop(canary_id, config))

        self.active_canaries[canary_id] = {
            "config": config,
            "current_traffic": config.traffic_step,
            "started_at": datetime.utcnow(),
            "status": "running"
        }

        logger.info(f"Started canary deployment {canary_id}")
        return canary_id

    async def _analysis_loop(self, canary_id: str, config: CanaryConfig) -> None:
        """Run continuous analysis"""

        while canary_id in self.active_canaries:
            canary_info = self.active_canaries[canary_id]

            if canary_info["status"] != "running":
                break

            # Run analysis
            analysis = await self.analyzer.analyze(
                config,
                canary_info["current_traffic"]
            )

            # Act on recommendation
            await self._handle_recommendation(canary_id, analysis)

            # Wait for next analysis
            await asyncio.sleep(config.analysis_interval)

    async def _handle_recommendation(
        self,
        canary_id: str,
        analysis: Dict[str, Any]
    ) -> None:
        """Handle analysis recommendation"""

        canary_info = self.active_canaries.get(canary_id)
        if not canary_info:
            return

        recommendation = analysis["recommendation"]
        config = canary_info["config"]

        if recommendation == "rollback":
            logger.warning(f"Rolling back canary {canary_id}")
            await self._rollback_canary(canary_id)

        elif recommendation == "hold":
            logger.info(f"Holding canary {canary_id} at {canary_info['current_traffic']}% traffic")

        elif recommendation == "promote":
            if canary_info["current_traffic"] >= config.max_traffic:
                # Full promotion
                logger.info(f"Promoting canary {canary_id} to production")
                await self._promote_canary(canary_id)
            else:
                # Increase traffic
                new_traffic = min(
                    canary_info["current_traffic"] + config.traffic_step,
                    config.max_traffic
                )

                logger.info(f"Increasing canary {canary_id} traffic to {new_traffic}%")
                await self._set_traffic_split(config, new_traffic)
                canary_info["current_traffic"] = new_traffic

    async def _set_traffic_split(
        self,
        config: CanaryConfig,
        canary_percentage: int
    ) -> None:
        """Set traffic split between baseline and canary"""

        # Implementation depends on traffic management solution
        # Could be Istio VirtualService, AWS ALB, Nginx Ingress, etc.

        if config.namespace == "istio-system":
            await self._set_istio_traffic_split(config, canary_percentage)
        elif config.namespace == "aws":
            await self._set_alb_traffic_split(config, canary_percentage)
        else:
            await self._set_service_mesh_traffic_split(config, canary_percentage)

    async def _set_istio_traffic_split(
        self,
        config: CanaryConfig,
        canary_percentage: int
    ) -> None:
        """Set traffic split using Istio"""

        # Update Istio VirtualService
        virtual_service = {
            "apiVersion": "networking.istio.io/v1beta1",
            "kind": "VirtualService",
            "metadata": {
                "name": config.application,
                "namespace": config.namespace
            },
            "spec": {
                "http": [
                    {
                        "name": "primary",
                        "route": [
                            {
                                "destination": {
                                    "host": f"{config.application}",
                                    "subset": "canary"
                                },
                                "weight": canary_percentage
                            },
                            {
                                "destination": {
                                    "host": f"{config.application}",
                                    "subset": "baseline"
                                },
                                "weight": 100 - canary_percentage
                            }
                        ]
                    }
                ]
            }
        }

        # Apply to Kubernetes
        cmd = [
            "kubectl", "apply", "-f", "-",
            "-n", config.namespace
        ]

        # Implementation would apply the VirtualService
        pass

This comprehensive cloud-native deployment skill provides production-ready patterns for implementing modern infrastructure as code, GitOps workflows, and progressive delivery strategies across any cloud provider or platform.

★ Insight ───────────────────────────────────── The cloud-native patterns shown here emphasize several key 2025 best practices:

  1. Provider Abstraction: Generic interfaces allow switching between Terraform, Pulumi, and other IaC tools
  2. Multi-Cloud Strategy: Built-in support for managing deployments across multiple providers with failover capabilities
  3. GitOps-First: Declarative deployment through Git with support for ArgoCD, Flux, and custom tools
  4. Progressive Delivery: Safe deployments with canary analysis, blue-green strategies, and automated rollback
  5. Policy as Code: Integration with Open Policy Agent for automated compliance and security

These patterns ensure your cloud deployments are automated, secure, and resilient across any infrastructure. ─────────────────────────────────────────────────