| name | chatkit-backend |
| description | Build FastAPI backend for OpenAI ChatKit with SSE streaming, conversation persistence, and AI agent integration. Handles /chatkit endpoint, ChatKit-compatible SSE format, conversation models, and message storage. Use when implementing chat backend, SSE streaming endpoint, or connecting AI agent to ChatKit frontend. |
| allowed-tools | Bash, Write, Read, Edit, Glob, Grep |
ChatKit Backend Skill
Production-ready skill for implementing FastAPI backend that powers OpenAI ChatKit frontend.
Reference Repositories:
Overview
ChatKit backend provides:
- SSE Streaming Endpoint - Real-time response streaming in ChatKit format
- Conversation Persistence - Store conversations and messages in database
- AI Agent Integration - Connect OpenAI Agents SDK to ChatKit
- Tool Execution - Execute tools and stream results to frontend
Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ FastAPI Backend │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ POST /chatkit │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 1. Validate request (auth, user_id) │ │
│ │ 2. Get/create conversation │ │
│ │ 3. Load conversation history │ │
│ │ 4. Run AI agent with message │ │
│ │ 5. Stream response in ChatKit SSE format │ │
│ │ 6. Store message and response │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ SSE Response Format: │ │
│ │ data: {"type": "text", "content": "Hello"}\n\n │ │
│ │ data: {"type": "tool_call", "name": "add_task", "args": {...}}\n\n│ │
│ │ data: {"type": "tool_result", "result": {...}}\n\n │ │
│ │ data: [DONE]\n\n │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
SSE Event Format
ChatKit expects specific SSE event types:
Text Event (Streaming Content)
yield f"data: {json.dumps({'type': 'text', 'content': 'Hello'})}\n\n"
Tool Call Event
yield f"data: {json.dumps({'type': 'tool_call', 'name': 'add_task', 'args': {'title': 'Buy groceries'}})}\n\n"
Tool Result Event
yield f"data: {json.dumps({'type': 'tool_result', 'name': 'add_task', 'result': {'success': True, 'task_id': 123}})}\n\n"
Done Event
yield "data: [DONE]\n\n"
Or:
yield f"data: {json.dumps({'type': 'done'})}\n\n"
Project Structure
backend/src/
├── routers/
│ ├── chatkit.py # ChatKit SSE endpoint (NEW!)
│ └── conversations.py # Conversation CRUD endpoints
│
├── models/
│ ├── conversation.py # Conversation model
│ └── message.py # Message model
│
├── schemas/
│ └── chatkit.py # ChatKit request/response schemas
│
├── services/
│ └── chatkit_service.py # ChatKit business logic
│
├── agents/ # OpenAI Agents (from openai-agents-setup)
│ ├── config.py # Gemini/LiteLLM config
│ ├── todo_agent.py # Agent definition
│ └── runner.py # Agent execution
│
└── main.py # Register chatkit router
Quick Start
Step 1: Create ChatKit Router
# backend/src/routers/chatkit.py
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from sqlmodel import Session, select
from src.database import get_session
from src.middleware.auth import verify_jwt
from src.models.conversation import Conversation
from src.models.message import Message
from src.agents import run_todo_agent_streaming
from datetime import datetime
import json
import logging
router = APIRouter(tags=["chatkit"])
logger = logging.getLogger(__name__)
@router.post("/chatkit")
async def chatkit_endpoint(
request: Request,
session: Session = Depends(get_session),
current_user: dict = Depends(verify_jwt),
):
"""
ChatKit SSE streaming endpoint.
Receives messages from ChatKit frontend and streams responses
in ChatKit-compatible SSE format.
"""
user_id = current_user["id"]
# Parse request body
body = await request.json()
message = body.get("message", "")
thread_id = body.get("thread_id") # Optional conversation ID
if not message:
raise HTTPException(status_code=400, detail="Message is required")
# Get or create conversation
conversation = await get_or_create_conversation(
session, user_id, thread_id, message
)
# Load conversation history
history = await load_conversation_history(session, conversation.id)
# Store user message
user_msg = Message(
conversation_id=conversation.id,
role="user",
content=message,
)
session.add(user_msg)
session.commit()
async def generate():
response_content = ""
try:
# Stream agent response
async for event in run_todo_agent_streaming(
user_message=message,
user_id=user_id,
conversation_history=history,
):
event_type = event.get("type")
if event_type == "text":
content = event.get("content", "")
response_content += content
yield f"data: {json.dumps(event)}\n\n"
elif event_type == "tool_call":
yield f"data: {json.dumps(event)}\n\n"
elif event_type == "tool_result":
yield f"data: {json.dumps(event)}\n\n"
elif event_type == "thinking":
# Optional: send thinking events
yield f"data: {json.dumps(event)}\n\n"
# Store assistant response
if response_content:
assistant_msg = Message(
conversation_id=conversation.id,
role="assistant",
content=response_content,
)
session.add(assistant_msg)
# Update conversation timestamp
conversation.updated_at = datetime.utcnow()
session.add(conversation)
session.commit()
# Signal completion
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"ChatKit streaming error: {e}")
yield f"data: {json.dumps({'type': 'error', 'message': 'An error occurred'})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
async def get_or_create_conversation(
session: Session,
user_id: str,
thread_id: int | None,
message: str,
) -> Conversation:
"""Get existing conversation or create new one."""
if thread_id:
conversation = session.exec(
select(Conversation).where(
Conversation.id == thread_id,
Conversation.user_id == user_id,
)
).first()
if conversation:
return conversation
# Create new conversation
title = message[:50] + "..." if len(message) > 50 else message
conversation = Conversation(user_id=user_id, title=title)
session.add(conversation)
session.commit()
session.refresh(conversation)
return conversation
async def load_conversation_history(
session: Session,
conversation_id: int,
) -> list[dict]:
"""Load conversation history for context."""
messages = session.exec(
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at)
).all()
return [
{"role": msg.role, "content": msg.content}
for msg in messages
]
Step 2: Register Router
# backend/src/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.routers import tasks, chatkit, conversations
app = FastAPI(title="Todo API")
# CORS for ChatKit
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000",
"https://your-app.vercel.app",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Register routers
app.include_router(tasks.router)
app.include_router(chatkit.router)
app.include_router(conversations.router)
Streaming Agent Runner
Integrate with OpenAI Agents SDK:
# backend/src/agents/runner.py
from typing import AsyncGenerator
from agents import Runner
from .todo_agent import todo_agent
import asyncio
import logging
logger = logging.getLogger(__name__)
async def run_todo_agent_streaming(
user_message: str,
user_id: str,
conversation_history: list[dict] | None = None,
) -> AsyncGenerator[dict, None]:
"""
Execute agent and yield events for ChatKit streaming.
Event Types:
- {"type": "thinking", "content": "Analyzing request..."}
- {"type": "text", "content": "Hello"}
- {"type": "tool_call", "name": "add_task", "args": {...}}
- {"type": "tool_result", "name": "add_task", "result": {...}}
"""
# Enhance message with user context
enhanced_message = f"[User ID: {user_id}]\n{user_message}"
# Build input with history
input_messages = []
if conversation_history:
input_messages.extend(conversation_history)
input_messages.append({"role": "user", "content": enhanced_message})
try:
# Signal thinking
yield {"type": "thinking", "content": "Processing your request..."}
# Run agent
result = await Runner.run(
todo_agent,
input=input_messages if conversation_history else enhanced_message,
max_turns=10,
)
response_text = result.final_output
# Stream text in chunks for natural feel
chunk_size = 20
for i in range(0, len(response_text), chunk_size):
chunk = response_text[i:i + chunk_size]
yield {"type": "text", "content": chunk}
await asyncio.sleep(0.02) # Natural streaming pace
except Exception as e:
logger.error(f"Agent streaming error: {e}")
yield {"type": "error", "message": str(e)}
Streaming with Tool Events
Full streaming with tool call/result events:
# backend/src/agents/runner.py
async def run_todo_agent_streaming_with_tools(
user_message: str,
user_id: str,
conversation_history: list[dict] | None = None,
) -> AsyncGenerator[dict, None]:
"""
Stream agent response with tool execution events.
"""
from agents import Runner
enhanced_message = f"[User ID: {user_id}]\n{user_message}"
input_messages = []
if conversation_history:
input_messages.extend(conversation_history)
input_messages.append({"role": "user", "content": enhanced_message})
try:
yield {"type": "thinking", "content": "Analyzing your request..."}
# Use streaming run for real-time events
async with Runner.run_streamed(
todo_agent,
input=input_messages,
) as stream:
async for event in stream:
if event.type == "raw_model_stream_event":
# Text delta from model
if hasattr(event.data, "delta") and event.data.delta:
yield {"type": "text", "content": event.data.delta}
elif event.type == "tool_call_start":
yield {
"type": "tool_call",
"name": event.tool.name,
"args": event.arguments,
}
elif event.type == "tool_call_end":
yield {
"type": "tool_result",
"name": event.tool.name,
"result": event.result if hasattr(event, 'result') else {},
}
except Exception as e:
logger.error(f"Streaming error: {e}")
yield {"type": "error", "message": str(e)}
Database Models
Conversation Model
# backend/src/models/conversation.py
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .message import Message
class Conversation(SQLModel, table=True):
__tablename__ = "conversations"
id: int | None = Field(default=None, primary_key=True)
user_id: str = Field(index=True)
title: str | None = Field(default=None, max_length=200)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
messages: list["Message"] = Relationship(back_populates="conversation")
Message Model
# backend/src/models/message.py
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .conversation import Conversation
class Message(SQLModel, table=True):
__tablename__ = "messages"
id: int | None = Field(default=None, primary_key=True)
conversation_id: int = Field(foreign_key="conversations.id", index=True)
role: str = Field(max_length=20) # "user", "assistant", "system"
content: str = Field()
tool_calls: str | None = Field(default=None) # JSON string
created_at: datetime = Field(default_factory=datetime.utcnow)
conversation: "Conversation" = Relationship(back_populates="messages")
Conversation CRUD Endpoints
# backend/src/routers/conversations.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import Session, select, desc
from src.database import get_session
from src.middleware.auth import verify_jwt
from src.models.conversation import Conversation
from src.models.message import Message
from pydantic import BaseModel
from datetime import datetime
router = APIRouter(prefix="/api/conversations", tags=["conversations"])
class ConversationResponse(BaseModel):
id: int
title: str | None
created_at: datetime
updated_at: datetime
@router.get("/", response_model=list[ConversationResponse])
async def list_conversations(
session: Session = Depends(get_session),
current_user: dict = Depends(verify_jwt),
):
"""List user's conversations."""
conversations = session.exec(
select(Conversation)
.where(Conversation.user_id == current_user["id"])
.order_by(desc(Conversation.updated_at))
).all()
return conversations
@router.delete("/{conversation_id}")
async def delete_conversation(
conversation_id: int,
session: Session = Depends(get_session),
current_user: dict = Depends(verify_jwt),
):
"""Delete conversation and messages."""
conversation = session.exec(
select(Conversation).where(
Conversation.id == conversation_id,
Conversation.user_id == current_user["id"],
)
).first()
if not conversation:
raise HTTPException(status_code=404, detail="Not found")
# Delete messages
for msg in conversation.messages:
session.delete(msg)
# Delete conversation
session.delete(conversation)
session.commit()
return {"status": "deleted"}
Database Migration
# Create migration
cd backend
uv run alembic revision --autogenerate -m "Add conversations and messages"
uv run alembic upgrade head
Environment Variables
# Backend (.env)
DATABASE_URL=postgresql://user:pass@host/db
BETTER_AUTH_SECRET=your_auth_secret
GEMINI_API_KEY=your_gemini_api_key
GEMINI_MODEL=gemini-2.5-flash
Testing
Test SSE Endpoint
# Test with curl
curl -X POST http://localhost:8000/chatkit \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_TOKEN" \
-d '{"message": "Show my tasks"}'
Python Test
import httpx
import asyncio
async def test_chatkit():
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"http://localhost:8000/chatkit",
json={"message": "Show my tasks"},
headers={"Authorization": "Bearer TOKEN"},
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
print(line[6:])
asyncio.run(test_chatkit())
Verification Checklist
-
/chatkitendpoint created - SSE format matches ChatKit expectations
- Conversation model with user_id
- Message model with role and content
- Messages stored after streaming
- Conversation updated_at updated
- CORS configured for frontend
- Agent integration working
- Tool events streaming correctly
- Error handling with SSE error events
See Also
- REFERENCE.md - SSE format reference
- examples.md - Full code examples
- chatkit-frontend skill - Frontend integration
- openai-agents-setup skill - Agent configuration