Claude Code Plugins

Community-maintained marketplace

Feedback

Server-Sent Events (SSE) streaming for Claude API with support for text, tool use, and extended thinking. Activate for real-time responses, stream handling, and progressive output.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name streaming
description Server-Sent Events (SSE) streaming for Claude API with support for text, tool use, and extended thinking. Activate for real-time responses, stream handling, and progressive output.
allowed-tools Bash, Read, Write, Edit, Glob, Grep, Task
triggers streaming, stream, sse, server-sent events, real-time, progressive, delta, event
dependencies llm-integration
related-skills tool-use, extended-thinking

Streaming Skill

Implement real-time streaming responses from Claude API using Server-Sent Events (SSE).

When to Use This Skill

  • Real-time user interfaces
  • Long-running generations
  • Progressive output display
  • Tool use with streaming
  • Extended thinking visualization

SSE Event Flow

message_start
    → content_block_start
        → content_block_delta (repeated)
    → content_block_stop
    → (more blocks...)
→ message_delta
→ message_stop

Core Implementation

Basic Text Streaming (Python)

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a short story."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Event-Based Streaming

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="")
            elif event.delta.type == "input_json_delta":
                # Tool input (accumulate, don't parse yet!)
                tool_input_buffer += event.delta.partial_json
        elif event.type == "content_block_stop":
            # Now safe to parse tool input
            if tool_input_buffer:
                tool_input = json.loads(tool_input_buffer)

TypeScript Streaming

import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

const stream = client.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Write a story.' }]
});

for await (const event of stream) {
    if (event.type === 'content_block_delta' &&
        event.delta.type === 'text_delta') {
        process.stdout.write(event.delta.text);
    }
}

const finalMessage = await stream.finalMessage();

Event Types Reference

Event When Data
message_start Beginning Message metadata
content_block_start Block begins Block type, index
content_block_delta Content chunk Delta content
content_block_stop Block ends -
message_delta Message update Stop reason, usage
message_stop Complete -

Delta Types

Delta Type Content When
text_delta .text Text content
input_json_delta .partial_json Tool input
thinking_delta .thinking Extended thinking
signature_delta .signature Thinking signature

Tool Use Streaming

Critical Rule: Never Parse JSON Mid-Stream!

# WRONG - Will fail on partial JSON!
for event in stream:
    if event.delta.type == "input_json_delta":
        tool_input = json.loads(event.delta.partial_json)  # FAILS!

# CORRECT - Accumulate then parse
tool_json_buffer = ""
for event in stream:
    if event.delta.type == "input_json_delta":
        tool_json_buffer += event.delta.partial_json
    elif event.type == "content_block_stop":
        if tool_json_buffer:
            tool_input = json.loads(tool_json_buffer)  # Safe now!
            tool_json_buffer = ""

Complete Tool Streaming Pattern

def stream_with_tools(client, messages, tools):
    current_block = None
    tool_input_buffer = ""

    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        messages=messages,
        tools=tools
    ) as stream:
        for event in stream:
            if event.type == "content_block_start":
                current_block = event.content_block
                tool_input_buffer = ""

            elif event.type == "content_block_delta":
                if event.delta.type == "text_delta":
                    yield {"type": "text", "content": event.delta.text}
                elif event.delta.type == "input_json_delta":
                    tool_input_buffer += event.delta.partial_json

            elif event.type == "content_block_stop":
                if current_block.type == "tool_use":
                    yield {
                        "type": "tool_call",
                        "id": current_block.id,
                        "name": current_block.name,
                        "input": json.loads(tool_input_buffer)
                    }

Extended Thinking Streaming

thinking_content = ""
signature = ""

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=16000,
    thinking={"type": "enabled", "budget_tokens": 10000},
    messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "thinking_delta":
                thinking_content += event.delta.thinking
                # Optionally display thinking in UI
            elif event.delta.type == "signature_delta":
                signature = event.delta.signature
            elif event.delta.type == "text_delta":
                print(event.delta.text, end="")

Error Handling

Retriable Errors

import time

RETRIABLE_ERRORS = [529, 429, 500, 502, 503]

def stream_with_retry(client, **kwargs):
    max_retries = 3
    base_delay = 1

    for attempt in range(max_retries):
        try:
            with client.messages.stream(**kwargs) as stream:
                for event in stream:
                    yield event
            return
        except anthropic.APIStatusError as e:
            if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt)
                time.sleep(delay)
            else:
                raise

Silent Overloaded Errors

# CRITICAL: Check for error events even at HTTP 200!
for event in stream:
    if event.type == "error":
        if event.error.type == "overloaded_error":
            # Retry with backoff
            pass

Connection Management

Keep-Alive Configuration

import httpx

# Proper timeout configuration
http_client = httpx.Client(
    timeout=httpx.Timeout(
        connect=10.0,    # Connection timeout
        read=120.0,      # Read timeout (long for streaming!)
        write=30.0,      # Write timeout
        pool=30.0        # Pool timeout
    )
)

client = anthropic.Anthropic(http_client=http_client)

Connection Pooling

http_client = httpx.Client(
    limits=httpx.Limits(
        max_keepalive_connections=20,
        max_connections=100,
        keepalive_expiry=30.0
    )
)

Best Practices

DO:

  • Set read timeout >= 60 seconds
  • Accumulate tool JSON, parse after block_stop
  • Handle error events even at HTTP 200
  • Use exponential backoff for retries

DON'T:

  • Parse partial JSON during streaming
  • Use short timeouts
  • Ignore overloaded_error events
  • Leave connections idle >5 minutes

UI Integration Pattern

async def stream_to_ui(websocket, prompt):
    """Stream Claude response to WebSocket client"""
    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            await websocket.send_json({
                "type": "chunk",
                "content": text
            })

        await websocket.send_json({
            "type": "complete",
            "usage": stream.get_final_message().usage
        })

See Also

  • [[llm-integration]] - API basics
  • [[tool-use]] - Tool calling
  • [[extended-thinking]] - Deep reasoning