Claude Code Plugins

Community-maintained marketplace

Feedback

streaming-architecture

@majiayu000/litellm-rs
8
0

LiteLLM-RS Streaming Architecture. Covers UnifiedSSEParser, SSETransformer trait, VecDeque buffering, provider-specific transformers, and real-time event handling.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name streaming-architecture
description LiteLLM-RS Streaming Architecture. Covers UnifiedSSEParser, SSETransformer trait, VecDeque buffering, provider-specific transformers, and real-time event handling.

Streaming Architecture Guide

Overview

LiteLLM-RS implements a unified streaming system that handles Server-Sent Events (SSE) from 66+ providers with provider-specific transformations while presenting a consistent OpenAI-compatible output format.

Streaming Flow

┌─────────────────────────────────────────────────────────────────┐
│                    Provider SSE Stream                          │
│  (OpenAI, Anthropic, Google, etc.)                             │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                    UnifiedSSEParser                             │
│  - Buffer management with VecDeque                              │
│  - Line-based SSE parsing                                       │
│  - Event type detection                                         │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                    SSETransformer                               │
│  - Provider-specific data parsing                               │
│  - Format normalization to ChatChunk                            │
│  - Error handling                                               │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                    OpenAI-Compatible Output                     │
│  ChatChunk (data: {...}\n\n or [DONE])                         │
└─────────────────────────────────────────────────────────────────┘

Core Components

UnifiedSSEParser

use std::collections::VecDeque;

/// Unified SSE parser that handles various provider formats
pub struct UnifiedSSEParser {
    /// Buffer for incomplete lines
    buffer: VecDeque<u8>,
    /// Current event type being parsed
    current_event_type: Option<String>,
    /// Maximum buffer size to prevent memory issues
    max_buffer_size: usize,
}

impl UnifiedSSEParser {
    pub fn new() -> Self {
        Self {
            buffer: VecDeque::with_capacity(8192),
            current_event_type: None,
            max_buffer_size: 1024 * 1024, // 1MB max buffer
        }
    }

    /// Feed bytes into the parser and extract complete events
    pub fn feed(&mut self, bytes: &[u8]) -> Vec<SSEEvent> {
        // Add bytes to buffer
        for &byte in bytes {
            if self.buffer.len() < self.max_buffer_size {
                self.buffer.push_back(byte);
            }
        }

        self.extract_events()
    }

    /// Extract complete SSE events from the buffer
    fn extract_events(&mut self) -> Vec<SSEEvent> {
        let mut events = Vec::new();
        let mut current_data = String::new();

        // Convert buffer to string for processing
        let text: String = self.buffer.iter().map(|&b| b as char).collect();

        // Process line by line
        let mut processed_len = 0;
        for line in text.split('\n') {
            processed_len += line.len() + 1; // +1 for \n

            let line = line.trim_end_matches('\r');

            if line.is_empty() {
                // Empty line marks end of event
                if !current_data.is_empty() {
                    events.push(SSEEvent {
                        event_type: self.current_event_type.take(),
                        data: current_data.clone(),
                    });
                    current_data.clear();
                }
                continue;
            }

            if let Some(event_type) = line.strip_prefix("event: ") {
                self.current_event_type = Some(event_type.to_string());
            } else if let Some(data) = line.strip_prefix("data: ") {
                if !current_data.is_empty() {
                    current_data.push('\n');
                }
                current_data.push_str(data);
            } else if line.starts_with(':') {
                // Comment line, ignore
                continue;
            } else if let Some(id) = line.strip_prefix("id: ") {
                // Event ID, can be stored if needed
                let _ = id;
            } else if let Some(retry) = line.strip_prefix("retry: ") {
                // Retry interval, can be stored if needed
                let _ = retry;
            }
        }

        // Remove processed bytes from buffer
        // Keep any incomplete line
        let last_newline = text.rfind('\n').map(|i| i + 1).unwrap_or(0);
        for _ in 0..last_newline {
            self.buffer.pop_front();
        }

        events
    }

    /// Reset parser state
    pub fn reset(&mut self) {
        self.buffer.clear();
        self.current_event_type = None;
    }
}

#[derive(Debug, Clone)]
pub struct SSEEvent {
    pub event_type: Option<String>,
    pub data: String,
}

SSETransformer Trait

use crate::core::types::responses::ChatChunk;

/// Trait for transforming provider-specific SSE data to ChatChunk
#[async_trait]
pub trait SSETransformer: Send + Sync {
    /// Transform raw SSE data to ChatChunk
    fn transform(&self, event: &SSEEvent) -> Result<Option<ChatChunk>, StreamError>;

    /// Check if the event indicates stream end
    fn is_done(&self, event: &SSEEvent) -> bool;

    /// Get provider name for error context
    fn provider_name(&self) -> &'static str;

    /// Handle provider-specific error events
    fn handle_error(&self, event: &SSEEvent) -> Option<StreamError>;
}

#[derive(Debug, thiserror::Error)]
pub enum StreamError {
    #[error("[{provider}] Parse error: {message}")]
    Parse {
        provider: &'static str,
        message: String,
    },

    #[error("[{provider}] Stream interrupted: {message}")]
    Interrupted {
        provider: &'static str,
        message: String,
    },

    #[error("[{provider}] Provider error: {message}")]
    ProviderError {
        provider: &'static str,
        message: String,
    },
}

Provider-Specific Transformers

OpenAI Transformer

pub struct OpenAITransformer;

impl SSETransformer for OpenAITransformer {
    fn transform(&self, event: &SSEEvent) -> Result<Option<ChatChunk>, StreamError> {
        // Handle [DONE] marker
        if event.data == "[DONE]" {
            return Ok(None);
        }

        // Parse OpenAI chunk format
        let chunk: ChatChunk = serde_json::from_str(&event.data)
            .map_err(|e| StreamError::Parse {
                provider: self.provider_name(),
                message: e.to_string(),
            })?;

        Ok(Some(chunk))
    }

    fn is_done(&self, event: &SSEEvent) -> bool {
        event.data == "[DONE]"
    }

    fn provider_name(&self) -> &'static str {
        "openai"
    }

    fn handle_error(&self, event: &SSEEvent) -> Option<StreamError> {
        if event.data.contains("\"error\"") {
            if let Ok(error) = serde_json::from_str::<serde_json::Value>(&event.data) {
                if let Some(msg) = error.get("error").and_then(|e| e.get("message")).and_then(|m| m.as_str()) {
                    return Some(StreamError::ProviderError {
                        provider: self.provider_name(),
                        message: msg.to_string(),
                    });
                }
            }
        }
        None
    }
}

Anthropic Transformer

pub struct AnthropicTransformer {
    /// Accumulated content for multi-part responses
    accumulated_content: std::cell::RefCell<String>,
}

impl AnthropicTransformer {
    pub fn new() -> Self {
        Self {
            accumulated_content: std::cell::RefCell::new(String::new()),
        }
    }
}

impl SSETransformer for AnthropicTransformer {
    fn transform(&self, event: &SSEEvent) -> Result<Option<ChatChunk>, StreamError> {
        // Anthropic uses event types
        let event_type = event.event_type.as_deref().unwrap_or("");

        match event_type {
            "message_start" => {
                // Initialize message, extract ID
                let data: serde_json::Value = serde_json::from_str(&event.data)
                    .map_err(|e| StreamError::Parse {
                        provider: self.provider_name(),
                        message: e.to_string(),
                    })?;

                let id = data.get("message")
                    .and_then(|m| m.get("id"))
                    .and_then(|i| i.as_str())
                    .unwrap_or("msg_anthropic")
                    .to_string();

                Ok(Some(ChatChunk {
                    id,
                    object: "chat.completion.chunk".to_string(),
                    created: chrono::Utc::now().timestamp(),
                    model: data.get("message")
                        .and_then(|m| m.get("model"))
                        .and_then(|m| m.as_str())
                        .unwrap_or("claude")
                        .to_string(),
                    choices: vec![],
                    usage: None,
                    system_fingerprint: None,
                }))
            }

            "content_block_delta" => {
                let data: serde_json::Value = serde_json::from_str(&event.data)
                    .map_err(|e| StreamError::Parse {
                        provider: self.provider_name(),
                        message: e.to_string(),
                    })?;

                let delta_text = data.get("delta")
                    .and_then(|d| d.get("text"))
                    .and_then(|t| t.as_str())
                    .unwrap_or("");

                // Convert to OpenAI chunk format
                Ok(Some(ChatChunk {
                    id: "".to_string(),
                    object: "chat.completion.chunk".to_string(),
                    created: chrono::Utc::now().timestamp(),
                    model: "".to_string(),
                    choices: vec![crate::core::types::responses::ChunkChoice {
                        index: 0,
                        delta: crate::core::types::responses::ChunkDelta {
                            role: None,
                            content: Some(delta_text.to_string()),
                            tool_calls: None,
                            function_call: None,
                        },
                        finish_reason: None,
                        logprobs: None,
                    }],
                    usage: None,
                    system_fingerprint: None,
                }))
            }

            "message_stop" => {
                Ok(None)
            }

            "message_delta" => {
                // Final message with usage info
                let data: serde_json::Value = serde_json::from_str(&event.data)
                    .map_err(|e| StreamError::Parse {
                        provider: self.provider_name(),
                        message: e.to_string(),
                    })?;

                let finish_reason = data.get("delta")
                    .and_then(|d| d.get("stop_reason"))
                    .and_then(|r| r.as_str())
                    .map(|r| match r {
                        "end_turn" => crate::core::types::responses::FinishReason::Stop,
                        "max_tokens" => crate::core::types::responses::FinishReason::Length,
                        "tool_use" => crate::core::types::responses::FinishReason::ToolCalls,
                        _ => crate::core::types::responses::FinishReason::Stop,
                    });

                Ok(Some(ChatChunk {
                    id: "".to_string(),
                    object: "chat.completion.chunk".to_string(),
                    created: chrono::Utc::now().timestamp(),
                    model: "".to_string(),
                    choices: vec![crate::core::types::responses::ChunkChoice {
                        index: 0,
                        delta: crate::core::types::responses::ChunkDelta {
                            role: None,
                            content: None,
                            tool_calls: None,
                            function_call: None,
                        },
                        finish_reason,
                        logprobs: None,
                    }],
                    usage: data.get("usage").and_then(|u| {
                        Some(crate::core::types::responses::Usage {
                            prompt_tokens: u.get("input_tokens").and_then(|t| t.as_u64()).unwrap_or(0) as u32,
                            completion_tokens: u.get("output_tokens").and_then(|t| t.as_u64()).unwrap_or(0) as u32,
                            total_tokens: 0,
                            prompt_tokens_details: None,
                            completion_tokens_details: None,
                            thinking_usage: None,
                        })
                    }),
                    system_fingerprint: None,
                }))
            }

            "error" => {
                Err(StreamError::ProviderError {
                    provider: self.provider_name(),
                    message: event.data.clone(),
                })
            }

            _ => Ok(None),
        }
    }

    fn is_done(&self, event: &SSEEvent) -> bool {
        event.event_type.as_deref() == Some("message_stop")
    }

    fn provider_name(&self) -> &'static str {
        "anthropic"
    }

    fn handle_error(&self, event: &SSEEvent) -> Option<StreamError> {
        if event.event_type.as_deref() == Some("error") {
            return Some(StreamError::ProviderError {
                provider: self.provider_name(),
                message: event.data.clone(),
            });
        }
        None
    }
}

Google Gemini Transformer

pub struct GeminiTransformer;

impl SSETransformer for GeminiTransformer {
    fn transform(&self, event: &SSEEvent) -> Result<Option<ChatChunk>, StreamError> {
        // Gemini uses a different format
        let data: serde_json::Value = serde_json::from_str(&event.data)
            .map_err(|e| StreamError::Parse {
                provider: self.provider_name(),
                message: e.to_string(),
            })?;

        // Extract text from candidates[0].content.parts[0].text
        let text = data.get("candidates")
            .and_then(|c| c.as_array())
            .and_then(|c| c.first())
            .and_then(|c| c.get("content"))
            .and_then(|c| c.get("parts"))
            .and_then(|p| p.as_array())
            .and_then(|p| p.first())
            .and_then(|p| p.get("text"))
            .and_then(|t| t.as_str());

        let finish_reason = data.get("candidates")
            .and_then(|c| c.as_array())
            .and_then(|c| c.first())
            .and_then(|c| c.get("finishReason"))
            .and_then(|r| r.as_str())
            .map(|r| match r {
                "STOP" => crate::core::types::responses::FinishReason::Stop,
                "MAX_TOKENS" => crate::core::types::responses::FinishReason::Length,
                "SAFETY" => crate::core::types::responses::FinishReason::ContentFilter,
                _ => crate::core::types::responses::FinishReason::Stop,
            });

        Ok(Some(ChatChunk {
            id: "".to_string(),
            object: "chat.completion.chunk".to_string(),
            created: chrono::Utc::now().timestamp(),
            model: "gemini".to_string(),
            choices: vec![crate::core::types::responses::ChunkChoice {
                index: 0,
                delta: crate::core::types::responses::ChunkDelta {
                    role: None,
                    content: text.map(|t| t.to_string()),
                    tool_calls: None,
                    function_call: None,
                },
                finish_reason,
                logprobs: None,
            }],
            usage: None,
            system_fingerprint: None,
        }))
    }

    fn is_done(&self, event: &SSEEvent) -> bool {
        serde_json::from_str::<serde_json::Value>(&event.data)
            .ok()
            .and_then(|d| d.get("candidates"))
            .and_then(|c| c.as_array())
            .and_then(|c| c.first())
            .and_then(|c| c.get("finishReason"))
            .is_some()
    }

    fn provider_name(&self) -> &'static str {
        "google"
    }

    fn handle_error(&self, event: &SSEEvent) -> Option<StreamError> {
        if let Ok(data) = serde_json::from_str::<serde_json::Value>(&event.data) {
            if let Some(error) = data.get("error") {
                return Some(StreamError::ProviderError {
                    provider: self.provider_name(),
                    message: error.to_string(),
                });
            }
        }
        None
    }
}

Stream Processing Pipeline

use futures::{Stream, StreamExt};
use std::pin::Pin;

pub struct StreamProcessor<T: SSETransformer> {
    parser: UnifiedSSEParser,
    transformer: T,
}

impl<T: SSETransformer> StreamProcessor<T> {
    pub fn new(transformer: T) -> Self {
        Self {
            parser: UnifiedSSEParser::new(),
            transformer,
        }
    }

    /// Process a byte stream and produce ChatChunks
    pub fn process_stream(
        mut self,
        input: impl Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
    ) -> Pin<Box<dyn Stream<Item = Result<ChatChunk, StreamError>> + Send>> {
        let stream = input.flat_map(move |result| {
            match result {
                Ok(bytes) => {
                    let events = self.parser.feed(&bytes);
                    let chunks: Vec<Result<ChatChunk, StreamError>> = events
                        .into_iter()
                        .filter_map(|event| {
                            // Check for errors first
                            if let Some(error) = self.transformer.handle_error(&event) {
                                return Some(Err(error));
                            }

                            // Check if done
                            if self.transformer.is_done(&event) {
                                return None;
                            }

                            // Transform event
                            match self.transformer.transform(&event) {
                                Ok(Some(chunk)) => Some(Ok(chunk)),
                                Ok(None) => None,
                                Err(e) => Some(Err(e)),
                            }
                        })
                        .collect();

                    futures::stream::iter(chunks)
                }
                Err(e) => {
                    futures::stream::iter(vec![Err(StreamError::Interrupted {
                        provider: self.transformer.provider_name(),
                        message: e.to_string(),
                    })])
                }
            }
        });

        Box::pin(stream)
    }
}

HTTP Response Streaming

use actix_web::{HttpResponse, web};
use futures::StreamExt;

pub async fn stream_chat_completion(
    request: ChatRequest,
    provider: Arc<dyn LLMProvider>,
) -> HttpResponse {
    // Get streaming response from provider
    let stream = match provider.chat_completion_stream(request, context).await {
        Ok(s) => s,
        Err(e) => {
            return HttpResponse::InternalServerError()
                .json(json!({"error": e.to_string()}));
        }
    };

    // Transform to SSE format
    let sse_stream = stream.map(|result| {
        match result {
            Ok(chunk) => {
                let json = serde_json::to_string(&chunk).unwrap_or_default();
                Ok::<_, std::io::Error>(bytes::Bytes::from(format!("data: {}\n\n", json)))
            }
            Err(e) => {
                let error_json = json!({"error": e.to_string()});
                Ok(bytes::Bytes::from(format!("data: {}\n\n", error_json)))
            }
        }
    });

    // Add [DONE] marker at the end
    let final_stream = sse_stream.chain(futures::stream::once(async {
        Ok::<_, std::io::Error>(bytes::Bytes::from("data: [DONE]\n\n"))
    }));

    HttpResponse::Ok()
        .content_type("text/event-stream")
        .insert_header(("Cache-Control", "no-cache"))
        .insert_header(("Connection", "keep-alive"))
        .streaming(final_stream)
}

Buffer Management

VecDeque Advantages

/// VecDeque is used for efficient buffer management:
/// - O(1) push/pop at both ends
/// - Contiguous memory for cache efficiency
/// - No reallocation when cycling through buffer

impl UnifiedSSEParser {
    /// Efficient buffer trimming
    fn trim_processed(&mut self, bytes_processed: usize) {
        // VecDeque allows efficient removal from front
        for _ in 0..bytes_processed {
            self.buffer.pop_front();
        }
    }

    /// Prevent buffer overflow
    fn check_buffer_limit(&mut self) {
        while self.buffer.len() > self.max_buffer_size {
            // Drop oldest data if buffer exceeds limit
            self.buffer.pop_front();
        }
    }
}

Configuration

streaming:
  enabled: true

  buffer:
    initial_size: 8192      # Initial buffer size in bytes
    max_size: 1048576       # 1MB max buffer
    chunk_size: 4096        # Read chunk size

  timeouts:
    first_byte: 30000       # 30s timeout for first byte
    between_events: 60000   # 60s timeout between events
    total: 300000           # 5 minute total timeout

  retry:
    enabled: true
    max_attempts: 3
    backoff_ms: 1000

Best Practices

1. Handle Incomplete Events

// Good - buffer incomplete data
pub fn feed(&mut self, bytes: &[u8]) -> Vec<SSEEvent> {
    self.buffer.extend(bytes);
    self.extract_complete_events() // Only return complete events
}

// Bad - assume complete data
pub fn feed(&mut self, bytes: &[u8]) -> Vec<SSEEvent> {
    let text = String::from_utf8_lossy(bytes);
    self.parse_all(&text) // May split events incorrectly
}

2. Preserve Stream Order

// Good - maintain ordering
let stream = input.map(|chunk| {
    self.process_chunk(chunk) // Process in order
});

// Bad - parallel processing breaks order
let stream = input
    .map(|chunk| async move { self.process_chunk(chunk).await })
    .buffer_unordered(10); // Order not guaranteed!

3. Clean Resource Handling

// Good - cleanup on drop
impl Drop for StreamProcessor {
    fn drop(&mut self) {
        self.parser.reset();
        // Signal stream end if needed
    }
}

4. Backpressure Handling

// Good - respect backpressure
let stream = input
    .map(|chunk| self.process(chunk))
    .buffer_unordered(1); // Limit concurrent processing

// Bad - unbounded buffering
let stream = input
    .map(|chunk| self.process(chunk))
    .buffer_unordered(1000); // May exhaust memory