Claude Code Plugins

Community-maintained marketplace

Feedback

Complete Python Cloudflare deployment system. PROACTIVELY activate for: (1) Python Workers with Pyodide, (2) FastAPI on Workers, (3) Cloudflare Containers for heavy compute, (4) Service bindings (RPC), (5) Environment variables in Workers, (6) Cold start optimization, (7) Workflows for durable execution, (8) GPU containers for AI. Provides: Worker code patterns, wrangler config, Dockerfile templates, FastAPI integration. Ensures edge deployment with proper architecture choices.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md


name: python-cloudflare description: Complete Python Cloudflare deployment system. PROACTIVELY activate for: (1) Python Workers with Pyodide, (2) FastAPI on Workers, (3) Cloudflare Containers for heavy compute, (4) Service bindings (RPC), (5) Environment variables in Workers, (6) Cold start optimization, (7) Workflows for durable execution, (8) GPU containers for AI. Provides: Worker code patterns, wrangler config, Dockerfile templates, FastAPI integration. Ensures edge deployment with proper architecture choices.

Quick Reference

Platform Cold Start Packages Best For
Workers (Pyodide) ~50ms Limited API endpoints
Containers ~10s Any Heavy compute, AI
Worker Pattern Code
Basic handler class Default(WorkerEntrypoint):
FastAPI await asgi.fetch(app, request, self.env)
Env vars self.env.API_KEY
Service binding await self.env.WORKER_B.method()
Command Purpose
pywrangler init Create Python Worker
pywrangler dev Local development
pywrangler deploy Deploy to Cloudflare
Container vs Worker Recommendation
Simple API Worker
pandas/numpy Container
GPU/AI Container
<50ms latency Worker

When to Use This Skill

Use for Cloudflare edge deployment:

  • Deploying Python APIs to Cloudflare Workers
  • Running FastAPI on Cloudflare edge
  • Using Containers for heavy compute
  • Setting up service-to-service RPC
  • Optimizing cold starts

Related skills:

  • For FastAPI: see python-fastapi
  • For async patterns: see python-asyncio
  • For Docker: see python-github-actions

Python on Cloudflare (Workers & Containers)

Overview

Cloudflare provides two ways to run Python:

  1. Python Workers - Serverless functions using Pyodide (WebAssembly)
  2. Cloudflare Containers - Full Docker containers (beta, June 2025)

Python Workers

How It Works

  • Python runs via Pyodide (CPython compiled to WebAssembly)
  • Executes inside V8 isolates on Cloudflare's edge network
  • Memory snapshots enable fast cold starts
  • Supports many pure Python packages

Quick Start

# Install pywrangler (Python Workers CLI)
pip install pywrangler

# Create new Python Worker
pywrangler init my-worker
cd my-worker

# Project structure
my-worker/
├── src/
│   └── entry.py
├── pyproject.toml
└── wrangler.toml

Basic Worker

# src/entry.py
from workers import Response, WorkerEntrypoint

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        return Response("Hello from Python Worker!")

Configuration

# wrangler.toml
name = "my-python-worker"
main = "src/entry.py"
compatibility_date = "2024-12-01"

[build]
command = ""
# pyproject.toml
[project]
name = "my-python-worker"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
    "fastapi",
    "pydantic",
]

[dependency-groups]
dev = ["workers-py"]

FastAPI on Workers

# src/entry.py
from workers import WorkerEntrypoint
from fastapi import FastAPI, Request
from pydantic import BaseModel

app = FastAPI()

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        import asgi
        return await asgi.fetch(app, request, self.env)

@app.get("/")
async def root():
    return {"message": "Hello from FastAPI on Cloudflare!"}

@app.get("/env")
async def get_env(req: Request):
    env = req.scope.get("env")
    if env and hasattr(env, "API_KEY"):
        return {"has_api_key": True}
    return {"has_api_key": False}

class Item(BaseModel):
    name: str
    price: float
    description: str | None = None

@app.post("/items/")
async def create_item(item: Item):
    return item

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    return {"item_id": item_id}

Using Environment Variables

from workers import WorkerEntrypoint, Response

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        # Access environment variables via self.env
        api_key = self.env.API_KEY
        database_url = self.env.DATABASE_URL

        return Response(f"API Key exists: {bool(api_key)}")
# wrangler.toml
[vars]
API_KEY = "your-api-key"
DATABASE_URL = "your-database-url"

Service Bindings (RPC)

# Worker A - Caller
from workers import WorkerEntrypoint, Response

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        # Call Worker B's add method
        result = await self.env.WORKER_B.add(1, 2)
        return Response(f"Result: {result}")
# Worker B - Callee
from workers import WorkerEntrypoint, Response

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        return Response("Hello from Worker B")

    def add(self, a: int, b: int) -> int:
        return a + b
# Worker A wrangler.toml
[[services]]
binding = "WORKER_B"
service = "worker-b"

Caching

from workers import WorkerEntrypoint
from pyodide.ffi import to_js as _to_js
from js import Response, URL, Object, fetch

def to_js(x):
    return _to_js(x, dict_converter=Object.fromEntries)

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        request_url = URL.new(request.url)
        params = request_url.searchParams
        tags = params["tags"].split(",") if "tags" in params else []
        url = params["uri"] or None

        if url is None:
            return Response.json(to_js({"error": "URL required"}), status=400)

        # Fetch with cache tags
        options = {"cf": {"cacheTags": tags}}
        result = await fetch(url, to_js(options))

        cache_status = result.headers["cf-cache-status"]
        return Response.json(to_js({
            "cache": cache_status,
            "status": result.status
        }))

Supported Packages

# HTTP clients (async only)
import aiohttp
import httpx

# Data processing
import numpy  # Limited support
import pandas  # Limited support

# Standard library works well
import json
import re
import urllib
import base64
import hashlib

# Check Pyodide package list for full support
# https://pyodide.org/en/stable/usage/packages-in-pyodide.html

Cold Start Optimization

# Imports at module level are cached in memory snapshot
import json
import hashlib
from pydantic import BaseModel

# This code runs during snapshot creation
CACHED_CONFIG = load_config()

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        # Fast access to pre-loaded modules and data
        return Response(json.dumps(CACHED_CONFIG))

Cloudflare Containers (Beta)

Overview (June 2025)

  • Full Docker container support
  • Run any language/runtime (Python, Go, Java, etc.)
  • FFmpeg, Pandas, AI toolchains supported
  • Pay-per-use pricing
  • Global edge deployment

When to Use Containers vs Workers

Feature Workers Containers
Cold start ~50ms ~10s (with prewarming)
Package support Pyodide-compatible Any
Memory Limited Configurable
File system No Yes
Native binaries No Yes
Best for API endpoints Batch jobs, AI, heavy compute

Container Setup

# Dockerfile
FROM python:3.12-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Run application
CMD ["python", "main.py"]
# main.py
import pandas as pd
import numpy as np
from fastapi import FastAPI
import uvicorn

app = FastAPI()

@app.get("/process")
async def process_data():
    # Heavy computation that wouldn't work in Workers
    df = pd.DataFrame(np.random.randn(10000, 4))
    result = df.describe().to_dict()
    return result

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)

Wrangler Commands

# Build container
wrangler containers build . --tag my-app:latest

# Push to registry
wrangler containers push my-app:latest

# Deploy
wrangler containers deploy my-app

# List containers
wrangler containers list

# Delete
wrangler containers delete my-app

Container Optimization

# Multi-stage build for smaller images
FROM python:3.12-slim as builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt --target=/app/deps

FROM python:3.12-slim

WORKDIR /app
COPY --from=builder /app/deps /app/deps
COPY . .

ENV PYTHONPATH=/app/deps
CMD ["python", "main.py"]
# Optimize for container cold starts
import asyncio

# Lazy imports for faster startup
def get_pandas():
    import pandas as pd
    return pd

async def process_large_data():
    pd = get_pandas()  # Import only when needed
    # Process data...

GPU Support (Preview)

# Container with GPU support
import torch

def run_inference(data):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = load_model().to(device)

    with torch.no_grad():
        result = model(data.to(device))

    return result.cpu().numpy()

Python Workflows (Durable Execution)

Overview

Cloudflare Workflows now supports Python for multi-step, long-running applications with automatic retries and state persistence.

from cloudflare.workflows import Workflow, step

class DataPipeline(Workflow):
    @step
    async def fetch_data(self, url: str) -> dict:
        """Fetch data from external API."""
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()

    @step
    async def process_data(self, data: dict) -> dict:
        """Process the fetched data."""
        # Heavy processing - will retry if fails
        processed = transform(data)
        return processed

    @step
    async def save_results(self, data: dict) -> str:
        """Save to database."""
        result_id = await save_to_db(data)
        return result_id

    async def run(self, input_url: str) -> str:
        data = await self.fetch_data(input_url)
        processed = await self.process_data(data)
        result_id = await self.save_results(processed)
        return f"Saved as {result_id}"

Best Practices

1. Minimize Cold Starts

# Do expensive imports at module level
import json
import hashlib
from pydantic import BaseModel

# Pre-compute static data
STATIC_CONFIG = {"version": "1.0", "features": ["a", "b"]}

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        # Fast path - no initialization needed
        return Response(json.dumps(STATIC_CONFIG))

2. Use Async HTTP Clients

# Workers only support async HTTP
import httpx

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        async with httpx.AsyncClient() as client:
            response = await client.get("https://api.example.com/data")
            return Response(response.text)

3. Handle Errors Gracefully

from workers import WorkerEntrypoint, Response

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        try:
            result = await self.process_request(request)
            return Response(json.dumps(result), headers={
                "Content-Type": "application/json"
            })
        except ValueError as e:
            return Response(json.dumps({"error": str(e)}), status=400)
        except Exception as e:
            # Log error (sent to Workers Logs)
            print(f"Error: {e}")
            return Response(json.dumps({"error": "Internal error"}), status=500)

4. Structure for Testability

# src/handlers.py - Pure Python logic
async def handle_create_item(data: dict) -> dict:
    # Testable without Worker runtime
    validated = validate_item(data)
    return {"id": generate_id(), **validated}

# src/entry.py - Worker entrypoint
from workers import WorkerEntrypoint, Response
from handlers import handle_create_item

class Default(WorkerEntrypoint):
    async def fetch(self, request):
        if request.method == "POST":
            data = await request.json()
            result = await handle_create_item(data)
            return Response(json.dumps(result))

5. Local Development

# Start local development server
pywrangler dev

# Test locally
curl http://localhost:8787/

# Deploy to Cloudflare
pywrangler deploy