LLM Streaming
Deliver LLM responses in real-time for better UX.
When to Use
- Chat interfaces (show tokens as generated)
- Long responses (don't wait for completion)
- Progress indication for slow operations
- Streaming with function calls
Basic Streaming (OpenAI)
from openai import OpenAI
client = OpenAI()
async def stream_response(prompt: str):
"""Stream tokens as they're generated."""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Streaming with Async
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def async_stream(prompt: str):
"""Async streaming for better concurrency."""
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
FastAPI SSE Endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
@app.get("/chat/stream")
async def stream_chat(prompt: str):
"""Server-Sent Events endpoint for streaming."""
async def generate():
async for token in async_stream(prompt):
yield {
"event": "token",
"data": token
}
yield {"event": "done", "data": ""}
return EventSourceResponse(generate())
Frontend SSE Consumer
async function streamChat(prompt: string, onToken: (t: string) => void) {
const response = await fetch(`/chat/stream?prompt=${encodeURIComponent(prompt)}`);
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data !== '[DONE]') {
onToken(data);
}
}
}
}
}
// Usage
let fullResponse = '';
await streamChat('Hello', (token) => {
fullResponse += token;
setDisplayText(fullResponse); // Update UI incrementally
});
Streaming with Tool Calls
async def stream_with_tools(messages: list, tools: list):
"""Handle streaming responses that include tool calls."""
stream = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
stream=True
)
collected_content = ""
collected_tool_calls = []
async for chunk in stream:
delta = chunk.choices[0].delta
# Collect content tokens
if delta.content:
collected_content += delta.content
yield {"type": "content", "data": delta.content}
# Collect tool call chunks
if delta.tool_calls:
for tc in delta.tool_calls:
# Tool calls come in chunks, accumulate them
if tc.index >= len(collected_tool_calls):
collected_tool_calls.append({
"id": tc.id,
"function": {"name": "", "arguments": ""}
})
if tc.function.name:
collected_tool_calls[tc.index]["function"]["name"] += tc.function.name
if tc.function.arguments:
collected_tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
# If tool calls, execute them
if collected_tool_calls:
yield {"type": "tool_calls", "data": collected_tool_calls}
Backpressure Handling
import asyncio
async def stream_with_backpressure(prompt: str, max_buffer: int = 100):
"""Handle slow consumers with backpressure."""
buffer = asyncio.Queue(maxsize=max_buffer)
async def producer():
async for token in async_stream(prompt):
await buffer.put(token) # Blocks if buffer full
await buffer.put(None) # Signal completion
async def consumer():
while True:
token = await buffer.get()
if token is None:
break
yield token
await asyncio.sleep(0) # Yield control
# Start producer in background
asyncio.create_task(producer())
# Return consumer generator
async for token in consumer():
yield token
Key Decisions
| Decision |
Recommendation |
| Protocol |
SSE for web, WebSocket for bidirectional |
| Buffer size |
50-200 tokens |
| Timeout |
30-60s for long responses |
| Retry |
Reconnect on disconnect |
Common Mistakes
- No timeout (hangs on network issues)
- Missing error handling in stream
- Not closing connections properly
- Buffering entire response (defeats purpose)
Related Skills
streaming-api-patterns - SSE/WebSocket deep dive
function-calling - Tool calls in streams
react-streaming-ui - React streaming components