Claude Code Plugins

Community-maintained marketplace

Feedback

Complete fal.ai optimization system. PROACTIVELY activate for: (1) Queue vs run performance, (2) Parallel request batching, (3) Streaming for real-time UI, (4) WebSocket for interactive apps, (5) Model cost comparison, (6) Image size optimization, (7) Inference step tuning, (8) Webhook vs polling, (9) Result caching by seed, (10) Serverless scaling config. Provides: Parallel patterns, cost strategies, caching examples, monitoring setup. Ensures optimal performance and cost-effective usage.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md


name: fal-optimization description: Complete fal.ai optimization system. PROACTIVELY activate for: (1) Queue vs run performance, (2) Parallel request batching, (3) Streaming for real-time UI, (4) WebSocket for interactive apps, (5) Model cost comparison, (6) Image size optimization, (7) Inference step tuning, (8) Webhook vs polling, (9) Result caching by seed, (10) Serverless scaling config. Provides: Parallel patterns, cost strategies, caching examples, monitoring setup. Ensures optimal performance and cost-effective usage.

Quick Reference

Optimization Technique Impact
Parallel requests Promise.all() with batches 5-10x throughput
Avoid polling Use webhooks Lower API calls
Cache by seed Store prompt+seed results Avoid regeneration
Right-size images Use needed resolution Lower cost
Fewer steps Reduce inference steps Faster, cheaper
Model Tier Development Production
Image FLUX Schnell FLUX.2 Pro
Video Runway Turbo Kling 2.6 Pro
Serverless Config Cost-Optimized Latency-Optimized
min_concurrency 0 1+
keep_alive 120 600+
machine_type Smallest viable Higher tier

When to Use This Skill

Use for performance and cost optimization:

  • Reducing generation latency
  • Lowering API costs
  • Implementing parallel processing
  • Choosing between polling and webhooks
  • Configuring serverless scaling

Related skills:

  • For API patterns: see fal-api-reference
  • For model selection: see fal-model-guide
  • For serverless config: see fal-serverless-guide

fal.ai Performance and Cost Optimization

Strategies for optimizing performance, reducing costs, and scaling fal.ai integrations.

Performance Optimization

Client-Side Optimizations

1. Use Queue-Based Execution

Always prefer subscribe() over run() for generation tasks:

// Recommended: Queue-based with progress tracking
const result = await fal.subscribe("fal-ai/flux/dev", {
  input: { prompt: "test" },
  logs: true,
  onQueueUpdate: (update) => {
    // Show progress to users
    if (update.status === "IN_PROGRESS") {
      console.log("Generating...");
    }
  }
});

// Only use run() for fast endpoints (< 30s)
const quickResult = await fal.run("fal-ai/fast-sdxl", {
  input: { prompt: "quick test" }
});

2. Parallel Requests

Process multiple requests concurrently:

// JavaScript - Parallel execution
async function generateBatch(prompts: string[]) {
  const results = await Promise.all(
    prompts.map(prompt =>
      fal.subscribe("fal-ai/flux/dev", {
        input: { prompt }
      })
    )
  );
  return results;
}

// With rate limiting
async function generateBatchWithLimit(prompts: string[], limit = 5) {
  const results = [];
  for (let i = 0; i < prompts.length; i += limit) {
    const batch = prompts.slice(i, i + limit);
    const batchResults = await Promise.all(
      batch.map(prompt =>
        fal.subscribe("fal-ai/flux/dev", { input: { prompt } })
      )
    );
    results.push(...batchResults);
    // Small delay between batches
    if (i + limit < prompts.length) {
      await new Promise(r => setTimeout(r, 100));
    }
  }
  return results;
}
# Python - Async parallel
import asyncio
import fal_client

async def generate_batch(prompts: list[str]) -> list[dict]:
    tasks = [
        fal_client.run_async("fal-ai/flux/dev", arguments={"prompt": p})
        for p in prompts
    ]
    return await asyncio.gather(*tasks)

# With semaphore for rate limiting
async def generate_batch_limited(prompts: list[str], limit: int = 5):
    semaphore = asyncio.Semaphore(limit)

    async def generate_one(prompt: str):
        async with semaphore:
            return await fal_client.run_async(
                "fal-ai/flux/dev",
                arguments={"prompt": prompt}
            )

    return await asyncio.gather(*[generate_one(p) for p in prompts])

3. Streaming for Real-Time Feedback

Use streaming for progressive output:

// Show incremental progress
const stream = await fal.stream("fal-ai/flux/dev", {
  input: { prompt: "A landscape" }
});

for await (const event of stream) {
  updateProgressUI(event);
}

const result = await stream.done();

4. WebSockets for Interactive Apps

For real-time applications with continuous input:

const connection = fal.realtime.connect("fal-ai/lcm-sd15-i2i", {
  connectionKey: `user-${userId}`,
  throttleInterval: 128,  // Debounce rapid inputs
  onResult: (result) => {
    displayImage(result.images[0].url);
  }
});

// Send updates as user types/draws
inputElement.addEventListener('input', (e) => {
  connection.send({
    prompt: e.target.value,
    image_url: currentImage
  });
});

Server-Side Optimizations (Serverless)

1. Efficient Model Loading

class OptimizedApp(fal.App):
    machine_type = "GPU-A100"
    requirements = ["torch", "transformers", "accelerate"]

    volumes = {
        "/data": fal.Volume("model-cache")
    }

    def setup(self):
        import torch
        from transformers import AutoModelForCausalLM

        # Use fp16 for faster inference and less memory
        self.model = AutoModelForCausalLM.from_pretrained(
            "model-name",
            torch_dtype=torch.float16,
            device_map="auto",
            cache_dir="/data/models"  # Persistent cache
        )

        # Enable optimizations
        if hasattr(self.model, 'enable_attention_slicing'):
            self.model.enable_attention_slicing()

2. Reduce Cold Starts

class WarmApp(fal.App):
    machine_type = "GPU-A100"
    keep_alive = 600      # 10 minutes warm
    min_concurrency = 1   # Always keep one ready

    # Use lightweight health check
    @fal.endpoint("/health")
    def health(self):
        return {"status": "ok"}

3. Memory Management

class MemoryEfficientApp(fal.App):
    def setup(self):
        import torch

        # Use mixed precision
        self.model = load_model(torch_dtype=torch.float16)

        # Enable memory-efficient attention (if using transformers)
        self.model.enable_xformers_memory_efficient_attention()

    def teardown(self):
        # Clean up GPU memory
        import torch
        if hasattr(self, 'model'):
            del self.model
        torch.cuda.empty_cache()

    @fal.endpoint("/generate")
    def generate(self, request):
        import torch

        with torch.inference_mode():  # Disable gradient tracking
            result = self.model(request.input)

        return result

Cost Optimization

1. Choose the Right Model

Need Cheaper Option Premium Option
Quick iteration FLUX Schnell ($) FLUX.1 Dev ($$)
Production FLUX.1 Dev ($$) FLUX.2 Pro ($$$)
Video preview Runway Turbo ($$) Kling Pro ($$$)
// Development: Use fast/cheap models
const preview = await fal.subscribe("fal-ai/flux/schnell", {
  input: { prompt: "test", num_inference_steps: 4 }
});

// Production: Use quality models
const final = await fal.subscribe("fal-ai/flux-2-pro", {
  input: { prompt: "test" }
});

2. Optimize Image Sizes

Generate at the size you need, not larger:

// Don't generate larger than needed
const result = await fal.subscribe("fal-ai/flux/dev", {
  input: {
    prompt: "test",
    // Use preset sizes
    image_size: "square_hd",  // 1024x1024

    // Or specific dimensions
    image_size: { width: 800, height: 600 }
  }
});

3. Reduce Inference Steps

Find the minimum steps for acceptable quality:

// Quick previews: fewer steps
const preview = await fal.subscribe("fal-ai/flux/dev", {
  input: {
    prompt: "test",
    num_inference_steps: 15  // Faster, slightly lower quality
  }
});

// Final render: more steps
const final = await fal.subscribe("fal-ai/flux/dev", {
  input: {
    prompt: "test",
    num_inference_steps: 28  // Default, high quality
  }
});

4. Use Webhooks for High Volume

Avoid polling overhead with webhooks:

// Instead of polling
const result = await fal.subscribe("fal-ai/flux/dev", {
  input: { prompt: "test" },
  pollInterval: 1000  // Polling = more API calls
});

// Use webhooks
const { request_id } = await fal.queue.submit("fal-ai/flux/dev", {
  input: { prompt: "test" },
  webhookUrl: "https://your-server.com/webhook"
});
// No polling needed - result delivered to webhook

5. Cache Results

Use seeds for reproducible outputs:

// Cache key based on prompt + seed
const cacheKey = `${prompt}-${seed}`;
const cached = await cache.get(cacheKey);

if (cached) {
  return cached;
}

const result = await fal.subscribe("fal-ai/flux/dev", {
  input: { prompt, seed }
});

await cache.set(cacheKey, result);
return result;

6. Serverless Cost Optimization

class CostOptimizedApp(fal.App):
    machine_type = "GPU-A10G"   # Cheaper than A100 if sufficient
    min_concurrency = 0         # Scale to zero when not used
    keep_alive = 120            # Shorter keep-alive

    # Use appropriate GPU for model size
    # T4: < 16GB VRAM models
    # A10G: 16-24GB VRAM models
    # A100: 24-80GB VRAM models

Scaling Strategies

1. Horizontal Scaling

class ScalableApp(fal.App):
    machine_type = "GPU-A100"
    min_concurrency = 2    # Always have 2 instances
    max_concurrency = 20   # Scale up to 20

    # fal handles auto-scaling based on queue depth

2. Request Batching

class BatchApp(fal.App):
    @fal.endpoint("/batch")
    def batch_generate(self, prompts: list[str]) -> list[dict]:
        # Process multiple prompts in one request
        results = []
        for prompt in prompts:
            result = self.model(prompt)
            results.append(result)
        return results

3. Priority Queues

Use different endpoints for different priorities:

class PriorityApp(fal.App):
    machine_type = "GPU-A100"

    @fal.endpoint("/high-priority")
    def high_priority(self, request):
        # Separate endpoint for important requests
        return self.process(request)

    @fal.endpoint("/standard")
    def standard(self, request):
        # Standard processing
        return self.process(request)

Monitoring and Debugging

1. Add Logging

import logging

class MonitoredApp(fal.App):
    def setup(self):
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        self.logger.info("App starting up")
        # Load model
        self.logger.info("Model loaded successfully")

    @fal.endpoint("/generate")
    def generate(self, request):
        import time
        start = time.time()

        result = self.process(request)

        elapsed = time.time() - start
        self.logger.info(f"Request processed in {elapsed:.2f}s")

        return result

2. Track Metrics

// Client-side timing
const start = Date.now();

const result = await fal.subscribe("fal-ai/flux/dev", {
  input: { prompt: "test" },
  onQueueUpdate: (update) => {
    if (update.status === "IN_QUEUE") {
      console.log(`Queue position: ${update.queue_position}`);
    }
  }
});

const elapsed = Date.now() - start;
console.log(`Total time: ${elapsed}ms`);

// Track in your analytics
analytics.track("fal_generation", {
  model: "flux/dev",
  elapsed_ms: elapsed,
  queue_time_ms: result.timings?.queue,
  inference_time_ms: result.timings?.inference
});

3. Error Monitoring

try {
  const result = await fal.subscribe("fal-ai/flux/dev", {
    input: { prompt: "test" }
  });
} catch (error) {
  // Log to error tracking service
  errorTracker.captureException(error, {
    tags: {
      model: "flux/dev",
      type: error.constructor.name
    },
    extra: {
      status: error.status,
      body: error.body
    }
  });

  // Handle gracefully
  return fallbackResult();
}

Checklist

Before Production

  • Using queue-based execution (subscribe)
  • Appropriate model selected for use case
  • Image sizes optimized
  • Error handling implemented
  • Rate limiting in place
  • Caching strategy defined

Serverless Deployment

  • Correct machine type for model size
  • Models loaded in setup(), not per-request
  • Persistent volumes for large models
  • Secrets properly configured
  • Health check endpoint
  • Logging enabled

Cost Management

  • Scale-to-zero enabled (min_concurrency = 0)
  • Appropriate keep_alive setting
  • Using cheaper models for development
  • Batch processing where possible
  • Webhook callbacks instead of polling

Monitoring

  • Latency tracking
  • Error rate monitoring
  • Cost tracking
  • Queue depth alerts