Claude Code Plugins

Community-maintained marketplace

Feedback

idempotency-handling

@aj-geddes/useful-ai-prompts
4
0

Implement idempotency keys and handling to ensure operations can be safely retried without duplicate effects. Use when building payment systems, APIs with retries, or distributed transactions.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name idempotency-handling
description Implement idempotency keys and handling to ensure operations can be safely retried without duplicate effects. Use when building payment systems, APIs with retries, or distributed transactions.

Idempotency Handling

Overview

Implement idempotency to ensure operations produce the same result regardless of how many times they're executed.

When to Use

  • Payment processing
  • API endpoints with retries
  • Webhooks and callbacks
  • Message queue consumers
  • Distributed transactions
  • Bank transfers
  • Order creation
  • Email sending
  • Resource creation

Implementation Examples

1. Express Idempotency Middleware

import express from 'express';
import Redis from 'ioredis';
import crypto from 'crypto';

interface IdempotentRequest {
  key: string;
  status: 'processing' | 'completed' | 'failed';
  response?: any;
  error?: string;
  createdAt: number;
  completedAt?: number;
}

class IdempotencyService {
  private redis: Redis;
  private ttl = 86400; // 24 hours

  constructor(redisUrl: string) {
    this.redis = new Redis(redisUrl);
  }

  async getRequest(key: string): Promise<IdempotentRequest | null> {
    const data = await this.redis.get(`idempotency:${key}`);
    return data ? JSON.parse(data) : null;
  }

  async setRequest(
    key: string,
    request: IdempotentRequest
  ): Promise<void> {
    await this.redis.setex(
      `idempotency:${key}`,
      this.ttl,
      JSON.stringify(request)
    );
  }

  async startProcessing(key: string): Promise<boolean> {
    const request: IdempotentRequest = {
      key,
      status: 'processing',
      createdAt: Date.now()
    };

    // Use SET NX to ensure only one request processes
    const result = await this.redis.set(
      `idempotency:${key}`,
      JSON.stringify(request),
      'EX',
      this.ttl,
      'NX'
    );

    return result === 'OK';
  }

  async completeRequest(
    key: string,
    response: any
  ): Promise<void> {
    const request: IdempotentRequest = {
      key,
      status: 'completed',
      response,
      createdAt: Date.now(),
      completedAt: Date.now()
    };

    await this.setRequest(key, request);
  }

  async failRequest(
    key: string,
    error: string
  ): Promise<void> {
    const request: IdempotentRequest = {
      key,
      status: 'failed',
      error,
      createdAt: Date.now(),
      completedAt: Date.now()
    };

    await this.setRequest(key, request);
  }
}

function idempotencyMiddleware(idempotency: IdempotencyService) {
  return async (
    req: express.Request,
    res: express.Response,
    next: express.NextFunction
  ) => {
    // Only apply to POST, PUT, PATCH, DELETE
    if (!['POST', 'PUT', 'PATCH', 'DELETE'].includes(req.method)) {
      return next();
    }

    const idempotencyKey = req.headers['idempotency-key'] as string;

    if (!idempotencyKey) {
      return res.status(400).json({
        error: 'Idempotency-Key header required'
      });
    }

    // Check for existing request
    const existing = await idempotency.getRequest(idempotencyKey);

    if (existing) {
      if (existing.status === 'processing') {
        return res.status(409).json({
          error: 'Request already processing',
          message: 'Please wait and retry'
        });
      }

      if (existing.status === 'completed') {
        return res.status(200).json(existing.response);
      }

      if (existing.status === 'failed') {
        return res.status(500).json({
          error: 'Previous request failed',
          message: existing.error
        });
      }
    }

    // Start processing
    const canProcess = await idempotency.startProcessing(idempotencyKey);

    if (!canProcess) {
      return res.status(409).json({
        error: 'Request already processing'
      });
    }

    // Capture response
    const originalSend = res.json.bind(res);
    res.json = (body: any) => {
      // Save response for future requests
      idempotency.completeRequest(idempotencyKey, body).catch(console.error);
      return originalSend(body);
    };

    // Handle errors
    const originalNext = next;
    next = (err?: any) => {
      if (err) {
        idempotency.failRequest(idempotencyKey, err.message).catch(console.error);
      }
      return originalNext(err);
    };

    next();
  };
}

// Usage
const app = express();
const redis = new Redis('redis://localhost:6379');
const idempotency = new IdempotencyService('redis://localhost:6379');

app.use(express.json());
app.use(idempotencyMiddleware(idempotency));

app.post('/api/payments', async (req, res) => {
  const { amount, userId } = req.body;

  // Process payment
  const payment = await processPayment(amount, userId);

  res.json(payment);
});

async function processPayment(amount: number, userId: string) {
  // Payment processing logic
  return {
    id: crypto.randomUUID(),
    amount,
    userId,
    status: 'completed'
  };
}

app.listen(3000);

2. Database-Based Idempotency

import { Pool } from 'pg';

interface IdempotencyRecord {
  key: string;
  request_body: any;
  response_body?: any;
  status: string;
  error_message?: string;
  created_at: Date;
  completed_at?: Date;
}

class DatabaseIdempotency {
  constructor(private db: Pool) {
    this.createTable();
  }

  private async createTable(): Promise<void> {
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS idempotency_keys (
        key VARCHAR(255) PRIMARY KEY,
        request_body JSONB NOT NULL,
        response_body JSONB,
        status VARCHAR(50) NOT NULL,
        error_message TEXT,
        created_at TIMESTAMP DEFAULT NOW(),
        completed_at TIMESTAMP,
        expires_at TIMESTAMP NOT NULL
      );

      CREATE INDEX IF NOT EXISTS idx_idempotency_expires
      ON idempotency_keys (expires_at);
    `);
  }

  async checkIdempotency(
    key: string,
    requestBody: any
  ): Promise<IdempotencyRecord | null> {
    const result = await this.db.query(
      'SELECT * FROM idempotency_keys WHERE key = $1',
      [key]
    );

    if (result.rows.length === 0) {
      return null;
    }

    const record = result.rows[0];

    // Check if request body matches
    if (JSON.stringify(record.request_body) !== JSON.stringify(requestBody)) {
      throw new Error('Request body mismatch for idempotency key');
    }

    return record;
  }

  async startProcessing(
    key: string,
    requestBody: any
  ): Promise<boolean> {
    try {
      const expiresAt = new Date(Date.now() + 86400 * 1000); // 24 hours

      await this.db.query(`
        INSERT INTO idempotency_keys (key, request_body, status, expires_at)
        VALUES ($1, $2, 'processing', $3)
      `, [key, requestBody, expiresAt]);

      return true;
    } catch (error: any) {
      if (error.code === '23505') { // Unique violation
        return false;
      }
      throw error;
    }
  }

  async completeRequest(
    key: string,
    responseBody: any
  ): Promise<void> {
    await this.db.query(`
      UPDATE idempotency_keys
      SET
        response_body = $1,
        status = 'completed',
        completed_at = NOW()
      WHERE key = $2
    `, [responseBody, key]);
  }

  async failRequest(
    key: string,
    errorMessage: string
  ): Promise<void> {
    await this.db.query(`
      UPDATE idempotency_keys
      SET
        error_message = $1,
        status = 'failed',
        completed_at = NOW()
      WHERE key = $2
    `, [errorMessage, key]);
  }

  async cleanup(): Promise<number> {
    const result = await this.db.query(`
      DELETE FROM idempotency_keys
      WHERE expires_at < NOW()
    `);

    return result.rowCount || 0;
  }
}

3. Stripe-Style Idempotency

import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import psycopg2

class IdempotencyManager:
    def __init__(self, db_connection):
        self.db = db_connection
        self.ttl_days = 1

    def process_request(
        self,
        idempotency_key: str,
        request_data: Dict[str, Any],
        process_fn: callable
    ) -> Dict[str, Any]:
        """
        Process request with idempotency guarantee.

        Args:
            idempotency_key: Unique key for this request
            request_data: Request payload
            process_fn: Function to process the request

        Returns:
            Response data
        """
        # Check for existing request
        existing = self.get_existing_request(
            idempotency_key,
            request_data
        )

        if existing:
            if existing['status'] == 'processing':
                raise ConflictError('Request already processing')

            if existing['status'] == 'completed':
                return existing['response']

            if existing['status'] == 'failed':
                raise ProcessingError(existing['error'])

        # Start processing
        if not self.start_processing(idempotency_key, request_data):
            raise ConflictError('Request already processing')

        try:
            # Process request
            result = process_fn(request_data)

            # Store result
            self.complete_request(idempotency_key, result)

            return result

        except Exception as e:
            # Store error
            self.fail_request(idempotency_key, str(e))
            raise

    def get_existing_request(
        self,
        key: str,
        request_data: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """Get existing idempotent request."""
        cursor = self.db.cursor()

        cursor.execute("""
            SELECT status, response, error, request_hash
            FROM idempotency_requests
            WHERE idempotency_key = %s
            AND created_at > %s
        """, (key, datetime.now() - timedelta(days=self.ttl_days)))

        row = cursor.fetchone()
        cursor.close()

        if not row:
            return None

        # Verify request data matches
        request_hash = self.hash_request(request_data)
        if row[3] != request_hash:
            raise ValueError(
                'Request data does not match idempotency key'
            )

        return {
            'status': row[0],
            'response': row[1],
            'error': row[2]
        }

    def start_processing(
        self,
        key: str,
        request_data: Dict[str, Any]
    ) -> bool:
        """Mark request as processing."""
        cursor = self.db.cursor()
        request_hash = self.hash_request(request_data)

        try:
            cursor.execute("""
                INSERT INTO idempotency_requests
                (idempotency_key, request_hash, status, created_at)
                VALUES (%s, %s, 'processing', NOW())
            """, (key, request_hash))

            self.db.commit()
            cursor.close()
            return True

        except psycopg2.IntegrityError:
            self.db.rollback()
            cursor.close()
            return False

    def complete_request(
        self,
        key: str,
        response: Dict[str, Any]
    ):
        """Mark request as completed."""
        cursor = self.db.cursor()

        cursor.execute("""
            UPDATE idempotency_requests
            SET
                status = 'completed',
                response = %s,
                completed_at = NOW()
            WHERE idempotency_key = %s
        """, (json.dumps(response), key))

        self.db.commit()
        cursor.close()

    def fail_request(self, key: str, error: str):
        """Mark request as failed."""
        cursor = self.db.cursor()

        cursor.execute("""
            UPDATE idempotency_requests
            SET
                status = 'failed',
                error = %s,
                completed_at = NOW()
            WHERE idempotency_key = %s
        """, (error, key))

        self.db.commit()
        cursor.close()

    def hash_request(self, data: Dict[str, Any]) -> str:
        """Create hash of request data."""
        json_str = json.dumps(data, sort_keys=True)
        return hashlib.sha256(json_str.encode()).hexdigest()


class ConflictError(Exception):
    pass


class ProcessingError(Exception):
    pass


# Usage
def process_payment(data):
    # Process payment logic
    return {
        'payment_id': 'pay_123',
        'amount': data['amount'],
        'status': 'completed'
    }

# In your API handler
idempotency = IdempotencyManager(db_connection)

try:
    result = idempotency.process_request(
        idempotency_key='key_abc123',
        request_data={'amount': 100, 'currency': 'USD'},
        process_fn=process_payment
    )
    print(result)
except ConflictError as e:
    print(f"Conflict: {e}")
except ProcessingError as e:
    print(f"Processing error: {e}")

4. Message Queue Idempotency

interface Message {
  id: string;
  data: any;
  timestamp: number;
}

class IdempotentMessageProcessor {
  private processedMessages = new Set<string>();
  private db: Pool;

  constructor(db: Pool) {
    this.db = db;
    this.loadProcessedMessages();
  }

  private async loadProcessedMessages(): Promise<void> {
    // Load recent processed message IDs
    const result = await this.db.query(`
      SELECT message_id
      FROM processed_messages
      WHERE processed_at > NOW() - INTERVAL '24 hours'
    `);

    result.rows.forEach(row => {
      this.processedMessages.add(row.message_id);
    });
  }

  async processMessage(message: Message): Promise<void> {
    // Check if already processed
    if (this.processedMessages.has(message.id)) {
      console.log(`Message ${message.id} already processed, skipping`);
      return;
    }

    // Mark as processing (atomic operation)
    const wasInserted = await this.markAsProcessing(message.id);

    if (!wasInserted) {
      console.log(`Message ${message.id} already being processed`);
      return;
    }

    try {
      // Process message
      await this.handleMessage(message);

      // Mark as completed
      await this.markAsCompleted(message.id);

      this.processedMessages.add(message.id);
    } catch (error) {
      console.error(`Failed to process message ${message.id}:`, error);
      await this.markAsFailed(message.id, (error as Error).message);
      throw error;
    }
  }

  private async markAsProcessing(messageId: string): Promise<boolean> {
    try {
      await this.db.query(`
        INSERT INTO processed_messages (message_id, status, processed_at)
        VALUES ($1, 'processing', NOW())
      `, [messageId]);

      return true;
    } catch (error: any) {
      if (error.code === '23505') {
        return false;
      }
      throw error;
    }
  }

  private async markAsCompleted(messageId: string): Promise<void> {
    await this.db.query(`
      UPDATE processed_messages
      SET status = 'completed', completed_at = NOW()
      WHERE message_id = $1
    `, [messageId]);
  }

  private async markAsFailed(
    messageId: string,
    error: string
  ): Promise<void> {
    await this.db.query(`
      UPDATE processed_messages
      SET status = 'failed', error = $2, completed_at = NOW()
      WHERE message_id = $1
    `, [messageId, error]);
  }

  private async handleMessage(message: Message): Promise<void> {
    // Actual message processing logic
    console.log('Processing message:', message);
  }
}

Best Practices

✅ DO

  • Require idempotency keys for mutations
  • Store request and response together
  • Set appropriate TTL for idempotency records
  • Validate request body matches stored request
  • Handle concurrent requests gracefully
  • Return same response for duplicate requests
  • Clean up old idempotency records
  • Use database constraints for atomicity

❌ DON'T

  • Apply idempotency to GET requests
  • Store idempotency data forever
  • Skip validation of request body
  • Use non-unique idempotency keys
  • Process same request concurrently
  • Change response for duplicate requests

Schema Design

CREATE TABLE idempotency_keys (
  key VARCHAR(255) PRIMARY KEY,
  request_hash VARCHAR(64) NOT NULL,
  request_body JSONB NOT NULL,
  response_body JSONB,
  status VARCHAR(20) NOT NULL CHECK (status IN ('processing', 'completed', 'failed')),
  error_message TEXT,
  created_at TIMESTAMP DEFAULT NOW(),
  completed_at TIMESTAMP,
  expires_at TIMESTAMP NOT NULL
);

CREATE INDEX idx_idempotency_expires ON idempotency_keys (expires_at);
CREATE INDEX idx_idempotency_status ON idempotency_keys (status);

Resources