Claude Code Plugins

Community-maintained marketplace

Feedback

LLM streaming response patterns. Use when implementing real-time token streaming, Server-Sent Events for AI responses, or streaming with tool calls.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name llm-streaming
description LLM streaming response patterns. Use when implementing real-time token streaming, Server-Sent Events for AI responses, or streaming with tool calls.

LLM Streaming

Deliver LLM responses in real-time for better UX.

When to Use

  • Chat interfaces (show tokens as generated)
  • Long responses (don't wait for completion)
  • Progress indication for slow operations
  • Streaming with function calls

Basic Streaming (OpenAI)

from openai import OpenAI

client = OpenAI()

async def stream_response(prompt: str):
    """Stream tokens as they're generated."""
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

Streaming with Async

from openai import AsyncOpenAI

client = AsyncOpenAI()

async def async_stream(prompt: str):
    """Async streaming for better concurrency."""
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

FastAPI SSE Endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse

app = FastAPI()

@app.get("/chat/stream")
async def stream_chat(prompt: str):
    """Server-Sent Events endpoint for streaming."""
    async def generate():
        async for token in async_stream(prompt):
            yield {
                "event": "token",
                "data": token
            }
        yield {"event": "done", "data": ""}

    return EventSourceResponse(generate())

Frontend SSE Consumer

async function streamChat(prompt: string, onToken: (t: string) => void) {
  const response = await fetch(`/chat/stream?prompt=${encodeURIComponent(prompt)}`);
  const reader = response.body?.getReader();
  const decoder = new TextDecoder();

  while (reader) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    const lines = text.split('\n');

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6);
        if (data !== '[DONE]') {
          onToken(data);
        }
      }
    }
  }
}

// Usage
let fullResponse = '';
await streamChat('Hello', (token) => {
  fullResponse += token;
  setDisplayText(fullResponse);  // Update UI incrementally
});

Streaming with Tool Calls

async def stream_with_tools(messages: list, tools: list):
    """Handle streaming responses that include tool calls."""
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        tools=tools,
        stream=True
    )

    collected_content = ""
    collected_tool_calls = []

    async for chunk in stream:
        delta = chunk.choices[0].delta

        # Collect content tokens
        if delta.content:
            collected_content += delta.content
            yield {"type": "content", "data": delta.content}

        # Collect tool call chunks
        if delta.tool_calls:
            for tc in delta.tool_calls:
                # Tool calls come in chunks, accumulate them
                if tc.index >= len(collected_tool_calls):
                    collected_tool_calls.append({
                        "id": tc.id,
                        "function": {"name": "", "arguments": ""}
                    })

                if tc.function.name:
                    collected_tool_calls[tc.index]["function"]["name"] += tc.function.name
                if tc.function.arguments:
                    collected_tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments

    # If tool calls, execute them
    if collected_tool_calls:
        yield {"type": "tool_calls", "data": collected_tool_calls}

Backpressure Handling

import asyncio

async def stream_with_backpressure(prompt: str, max_buffer: int = 100):
    """Handle slow consumers with backpressure."""
    buffer = asyncio.Queue(maxsize=max_buffer)

    async def producer():
        async for token in async_stream(prompt):
            await buffer.put(token)  # Blocks if buffer full
        await buffer.put(None)  # Signal completion

    async def consumer():
        while True:
            token = await buffer.get()
            if token is None:
                break
            yield token
            await asyncio.sleep(0)  # Yield control

    # Start producer in background
    asyncio.create_task(producer())

    # Return consumer generator
    async for token in consumer():
        yield token

Key Decisions

Decision Recommendation
Protocol SSE for web, WebSocket for bidirectional
Buffer size 50-200 tokens
Timeout 30-60s for long responses
Retry Reconnect on disconnect

Common Mistakes

  • No timeout (hangs on network issues)
  • Missing error handling in stream
  • Not closing connections properly
  • Buffering entire response (defeats purpose)

Related Skills

  • streaming-api-patterns - SSE/WebSocket deep dive
  • function-calling - Tool calls in streams
  • react-streaming-ui - React streaming components