| name | openai-agents-mcp-integration |
| description | Build AI agents with OpenAI Agents SDK + Model Context Protocol (MCP) for tool orchestration. Supports multi-provider backends (OpenAI, Gemini, Groq, OpenRouter) with MCPServerStdio. Use this skill for conversational AI features with external tool access via MCP protocol. |
OpenAI Agents SDK + MCP Integration Skill
You are a specialist in building AI agents with OpenAI Agents SDK and MCP tool orchestration.
Your job is to help users design and implement conversational AI agents that:
- Use OpenAI Agents SDK (v0.2.9+) for agent orchestration
- Connect to MCP servers via stdio transport for tool access
- Support multiple LLM providers (OpenAI, Gemini, Groq, OpenRouter)
- Integrate with web frameworks (FastAPI, Django, Flask)
- Handle streaming responses with Server-Sent Events (SSE)
- Persist conversation state in databases (PostgreSQL, SQLite)
This Skill acts as a stable, opinionated guide for:
- Clean separation between agent logic and MCP tools
- Multi-provider model factory patterns
- Database-backed conversation persistence
- Production-ready error handling and timeouts
1. When to Use This Skill
Use this Skill whenever the user mentions:
- "OpenAI Agents SDK with MCP"
- "conversational AI with external tools"
- "agent with MCP server"
- "multi-provider AI backend"
- "chat agent with database persistence"
Or asks to:
- Build a chatbot that calls external APIs/tools
- Create an agent that uses MCP protocol for tool access
- Implement conversation history with AI agents
- Support multiple LLM providers in one codebase
- Stream agent responses to frontend
If the user wants simple OpenAI API calls without agents or tools, this Skill is overkill.
2. Architecture Overview
2.1 High-Level Flow
User → Frontend → FastAPI Backend → Agent → MCP Server → Tools → Database/APIs
↓ ↓
Conversation DB Tool Results
2.2 Component Responsibilities
Frontend:
- Sends user messages to backend chat endpoint
- Receives streaming SSE responses
- Displays agent responses and tool results
FastAPI Backend:
- Handles
/api/{user_id}/chatendpoint - Creates Agent with model from factory
- Manages MCP server connection lifecycle
- Persists conversations to database
- Streams agent responses via SSE
Agent (OpenAI Agents SDK):
- Orchestrates conversation flow
- Decides when to call tools
- Generates natural language responses
- Handles multi-turn conversations
MCP Server (Official MCP SDK):
- Exposes tools via MCP protocol
- Runs as separate process (stdio transport)
- Handles tool execution (database, APIs)
- Returns results to agent
3. Core Implementation Patterns
3.1 Multi-Provider Model Factory
Pattern: Centralized create_model() function for LLM provider abstraction.
Why:
- Single codebase supports multiple providers
- Easy provider switching via environment variable
- Cost optimization (use free/cheap models for dev)
- Vendor independence
Implementation:
# agent_config/factory.py
import os
from pathlib import Path
from dotenv import load_dotenv
from agents import OpenAIChatCompletionsModel
from openai import AsyncOpenAI
# Load .env file
env_path = Path(__file__).parent.parent / ".env"
if env_path.exists():
load_dotenv(env_path, override=True)
def create_model(provider: str | None = None, model: str | None = None) -> OpenAIChatCompletionsModel:
"""
Create LLM model instance based on environment configuration.
Args:
provider: Override LLM_PROVIDER env var ("openai" | "gemini" | "groq" | "openrouter")
model: Override model name
Returns:
OpenAIChatCompletionsModel configured for selected provider
Raises:
ValueError: If provider unsupported or API key missing
"""
provider = provider or os.getenv("LLM_PROVIDER", "openai").lower()
if provider == "openai":
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY required when LLM_PROVIDER=openai")
client = AsyncOpenAI(api_key=api_key)
model_name = model or os.getenv("OPENAI_DEFAULT_MODEL", "gpt-4o-mini")
return OpenAIChatCompletionsModel(model=model_name, openai_client=client)
elif provider == "gemini":
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("GEMINI_API_KEY required when LLM_PROVIDER=gemini")
# Gemini via OpenAI-compatible API
client = AsyncOpenAI(
api_key=api_key,
base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
)
model_name = model or os.getenv("GEMINI_DEFAULT_MODEL", "gemini-2.5-flash")
return OpenAIChatCompletionsModel(model=model_name, openai_client=client)
elif provider == "groq":
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
raise ValueError("GROQ_API_KEY required when LLM_PROVIDER=groq")
client = AsyncOpenAI(
api_key=api_key,
base_url="https://api.groq.com/openai/v1",
)
model_name = model or os.getenv("GROQ_DEFAULT_MODEL", "llama-3.3-70b-versatile")
return OpenAIChatCompletionsModel(model=model_name, openai_client=client)
elif provider == "openrouter":
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
raise ValueError("OPENROUTER_API_KEY required when LLM_PROVIDER=openrouter")
client = AsyncOpenAI(
api_key=api_key,
base_url="https://openrouter.ai/api/v1",
)
model_name = model or os.getenv("OPENROUTER_DEFAULT_MODEL", "openai/gpt-oss-20b:free")
return OpenAIChatCompletionsModel(model=model_name, openai_client=client)
else:
raise ValueError(
f"Unsupported provider: {provider}. "
f"Supported: openai, gemini, groq, openrouter"
)
Environment Variables:
# Provider selection
LLM_PROVIDER=openrouter # "openai", "gemini", "groq", or "openrouter"
# OpenAI
OPENAI_API_KEY=sk-...
OPENAI_DEFAULT_MODEL=gpt-4o-mini
# Gemini
GEMINI_API_KEY=AIza...
GEMINI_DEFAULT_MODEL=gemini-2.5-flash
# Groq
GROQ_API_KEY=gsk_...
GROQ_DEFAULT_MODEL=llama-3.3-70b-versatile
# OpenRouter (free models available!)
OPENROUTER_API_KEY=sk-or-v1-...
OPENROUTER_DEFAULT_MODEL=openai/gpt-oss-20b:free
3.2 Agent with MCP Server Connection
Pattern: Agent connects to MCP server via MCPServerStdio for tool access.
Why:
- Clean separation: Agent logic vs tool implementation
- MCP server runs as separate process (stdio transport)
- Tools accessed via standardized MCP protocol
- Easy to add/remove tools without changing agent code
Critical Configuration:
# IMPORTANT: Set client_session_timeout_seconds for database operations
# Default 5s is too short - database queries may timeout
# Increase to 30s or more for production workloads
MCPServerStdio(
name="task-management-server",
params={...},
client_session_timeout_seconds=30.0, # MCP ClientSession timeout
)
Implementation:
# agent_config/todo_agent.py
import os
from pathlib import Path
from agents import Agent
from agents.mcp import MCPServerStdio
from agents.model_settings import ModelSettings
from agent_config.factory import create_model
class TodoAgent:
"""
AI agent for conversational task management.
Connects to MCP server via stdio for tool access.
Supports multiple LLM providers via model factory.
"""
def __init__(self, provider: str | None = None, model: str | None = None):
"""
Initialize agent with model and MCP server.
Args:
provider: LLM provider ("openai" | "gemini" | "groq" | "openrouter")
model: Model name (overrides env var default)
"""
# Create model from factory
self.model = create_model(provider=provider, model=model)
# Get MCP server module path
backend_dir = Path(__file__).parent.parent
mcp_server_path = backend_dir / "mcp_server" / "tools.py"
# Create MCP server connection via stdio
# CRITICAL: Set client_session_timeout_seconds for database operations
# Default: 5 seconds → Setting to 30 seconds for production
self.mcp_server = MCPServerStdio(
name="task-management-server",
params={
"command": "python",
"args": ["-m", "mcp_server"], # Run as module
"env": os.environ.copy(), # Pass environment
},
client_session_timeout_seconds=30.0, # MCP ClientSession timeout
)
# Create agent
# ModelSettings(parallel_tool_calls=False) prevents database lock issues
self.agent = Agent(
name="TodoAgent",
model=self.model,
instructions=AGENT_INSTRUCTIONS, # See section 3.3
mcp_servers=[self.mcp_server],
model_settings=ModelSettings(
parallel_tool_calls=False, # Prevent concurrent DB writes
),
)
def get_agent(self) -> Agent:
"""Get configured agent instance."""
return self.agent
MCP Server Lifecycle:
# MCP server must be managed with async context manager
async with todo_agent.mcp_server:
# Server is running, agent can call tools
result = await Runner.run_streamed(
agent=todo_agent.get_agent(),
messages=[{"role": "user", "content": "Add buy milk"}]
)
# Process streaming results...
# Server stopped automatically
3.3 Agent Instructions
Pattern: Clear, behavioral instructions for conversational AI.
Why:
- Agent understands task domain and capabilities
- Handles natural language variations
- Provides friendly, helpful responses
- Never exposes technical details to users
Example Instructions:
AGENT_INSTRUCTIONS = """
You are a helpful task management assistant. Your role is to help users manage
their todo lists through natural conversation.
## Your Capabilities
You have access to these task management tools:
- add_task: Create new tasks with title, description, priority
- list_tasks: Show tasks (all, pending, or completed)
- complete_task: Mark a task as done
- delete_task: Remove a task permanently
- update_task: Modify task details
- set_priority: Update task priority (low, medium, high)
## Behavior Guidelines
1. **Task Creation**
- When user mentions adding/creating/remembering something, use add_task
- Extract clear, actionable titles from messages
- Confirm creation with friendly message
2. **Task Listing**
- Use appropriate status filter (all, pending, completed)
- Present tasks clearly with IDs for easy reference
3. **Conversational Style**
- Be friendly, helpful, concise
- Use natural language, not technical jargon
- Acknowledge actions positively
- NEVER expose internal IDs or technical details
## Response Pattern
✅ Good: "I've added 'Buy groceries' to your tasks!"
❌ Bad: "Task created with ID 42. Status: created."
✅ Good: "You have 3 pending tasks: Buy groceries, Call dentist, Pay bills"
❌ Bad: "Here's the JSON: [{...}]"
"""
3.4 MCP Server with Official MCP SDK
Pattern: MCP server exposes tools using Official MCP SDK (FastMCP).
Why:
- Standard MCP protocol compliance
- Easy tool registration with decorators
- Type-safe tool definitions
- Automatic schema generation
Implementation:
# mcp_server/tools.py
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp import types
from services.task_service import TaskService
from db import get_session
from sqlmodel import Session
# Create MCP server
app = Server("task-management-server")
@app.call_tool()
async def add_task(
user_id: str,
title: str,
description: str | None = None,
priority: str = "medium"
) -> list[types.TextContent]:
"""
Create a new task for the user.
Args:
user_id: User's unique identifier
title: Task title (required)
description: Optional task description
priority: Task priority (low, medium, high)
Returns:
Success message with task details
"""
session = next(get_session())
try:
task = await TaskService.create_task(
session=session,
user_id=user_id,
title=title,
description=description,
priority=priority
)
return [types.TextContent(
type="text",
text=f"Task created: {task.title} (Priority: {task.priority})"
)]
finally:
session.close()
@app.call_tool()
async def list_tasks(
user_id: str,
status: str = "all"
) -> list[types.TextContent]:
"""
List user's tasks filtered by status.
Args:
user_id: User's unique identifier
status: Filter by status ("all", "pending", "completed")
Returns:
Formatted list of tasks
"""
session = next(get_session())
try:
tasks = await TaskService.get_tasks(
session=session,
user_id=user_id,
status=status
)
if not tasks:
return [types.TextContent(
type="text",
text="No tasks found."
)]
task_list = "\n".join([
f"{i+1}. [{task.status}] {task.title} (Priority: {task.priority})"
for i, task in enumerate(tasks)
])
return [types.TextContent(
type="text",
text=f"Your tasks:\n{task_list}"
)]
finally:
session.close()
# Run server
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
Module Structure for MCP Server:
# mcp_server/__init__.py
"""MCP server exposing task management tools."""
# mcp_server/__main__.py
"""Entry point for MCP server when run as module."""
from mcp_server.tools import main
import asyncio
if __name__ == "__main__":
asyncio.run(main())
3.5 Database Persistence (Conversations)
Pattern: Store conversation history in database for stateless backend.
Why:
- Stateless backend (no in-memory state)
- Users can resume conversations
- Full conversation history available
- Multi-device support
Models:
# models.py
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from uuid import UUID, uuid4
class Conversation(SQLModel, table=True):
"""
Conversation session between user and AI agent.
"""
__tablename__ = "conversations"
id: UUID = Field(default_factory=uuid4, primary_key=True)
user_id: UUID = Field(foreign_key="users.id", index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
messages: list["Message"] = Relationship(back_populates="conversation")
user: "User" = Relationship(back_populates="conversations")
class Message(SQLModel, table=True):
"""
Individual message in a conversation.
"""
__tablename__ = "messages"
id: UUID = Field(default_factory=uuid4, primary_key=True)
conversation_id: UUID = Field(foreign_key="conversations.id", index=True)
user_id: UUID = Field(foreign_key="users.id", index=True)
role: str = Field(index=True) # "user" | "assistant" | "system"
content: str
tool_calls: str | None = None # JSON string of tool calls
created_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
conversation: Conversation = Relationship(back_populates="messages")
user: "User" = Relationship()
Service Layer:
# services/conversation_service.py
from uuid import UUID
from sqlmodel import Session, select
from models import Conversation, Message
class ConversationService:
@staticmethod
async def get_or_create_conversation(
session: Session,
user_id: UUID,
conversation_id: UUID | None = None
) -> Conversation:
"""Get existing conversation or create new one."""
if conversation_id:
stmt = select(Conversation).where(
Conversation.id == conversation_id,
Conversation.user_id == user_id
)
conversation = session.exec(stmt).first()
if conversation:
return conversation
# Create new conversation
conversation = Conversation(user_id=user_id)
session.add(conversation)
session.commit()
session.refresh(conversation)
return conversation
@staticmethod
async def add_message(
session: Session,
conversation_id: UUID,
user_id: UUID,
role: str,
content: str,
tool_calls: str | None = None
) -> Message:
"""Add message to conversation."""
message = Message(
conversation_id=conversation_id,
user_id=user_id,
role=role,
content=content,
tool_calls=tool_calls
)
session.add(message)
session.commit()
session.refresh(message)
return message
@staticmethod
async def get_conversation_history(
session: Session,
conversation_id: UUID,
user_id: UUID
) -> list[dict]:
"""Get conversation messages formatted for agent."""
stmt = select(Message).where(
Message.conversation_id == conversation_id,
Message.user_id == user_id
).order_by(Message.created_at)
messages = session.exec(stmt).all()
return [
{
"role": msg.role,
"content": msg.content
}
for msg in messages
]
3.6 FastAPI Streaming Endpoint
Pattern: SSE endpoint for streaming agent responses.
Why:
- Real-time streaming improves UX
- Works with ChatKit frontend
- Server-Sent Events (SSE) standard protocol
- Handles long-running agent calls
Implementation:
# routers/chat.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from sqlmodel import Session
from uuid import UUID
from db import get_session
from agent_config.todo_agent import TodoAgent
from services.conversation_service import ConversationService
from schemas.chat import ChatRequest
from agents import Runner
router = APIRouter()
@router.post("/{user_id}/chat")
async def chat(
user_id: UUID,
request: ChatRequest,
session: Session = Depends(get_session)
):
"""
Chat endpoint with streaming SSE response.
Args:
user_id: User's unique identifier
request: ChatRequest with conversation_id and message
session: Database session
Returns:
StreamingResponse with SSE events
"""
# Get or create conversation
conversation = await ConversationService.get_or_create_conversation(
session=session,
user_id=user_id,
conversation_id=request.conversation_id
)
# Save user message
await ConversationService.add_message(
session=session,
conversation_id=conversation.id,
user_id=user_id,
role="user",
content=request.message
)
# Get conversation history
history = await ConversationService.get_conversation_history(
session=session,
conversation_id=conversation.id,
user_id=user_id
)
# Create agent
todo_agent = TodoAgent()
agent = todo_agent.get_agent()
# Stream response
async def event_generator():
try:
async with todo_agent.mcp_server:
response_chunks = []
async for chunk in Runner.run_streamed(
agent=agent,
messages=history,
context_variables={"user_id": str(user_id)}
):
# Handle different chunk types
if hasattr(chunk, 'delta') and chunk.delta:
response_chunks.append(chunk.delta)
yield f"data: {chunk.delta}\n\n"
# Save assistant response
full_response = "".join(response_chunks)
await ConversationService.add_message(
session=session,
conversation_id=conversation.id,
user_id=user_id,
role="assistant",
content=full_response
)
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: Error: {str(e)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
4. Common Patterns
4.1 Error Handling
# Handle provider API failures gracefully
try:
async with todo_agent.mcp_server:
result = await Runner.run_streamed(agent, messages)
except Exception as e:
# Log error
logger.error(f"Agent execution failed: {e}")
# Return user-friendly message
return {"error": "AI service temporarily unavailable. Please try again."}
4.2 Timeout Configuration
# CRITICAL: Increase MCP timeout for database operations
# Default 5s is too short - may cause timeouts
MCPServerStdio(
name="server",
params={...},
client_session_timeout_seconds=30.0, # Increase from default 5s
)
4.3 Parallel Tool Calls Prevention
# Prevent concurrent database writes (causes locks)
Agent(
name="MyAgent",
model=model,
instructions=instructions,
mcp_servers=[mcp_server],
model_settings=ModelSettings(
parallel_tool_calls=False, # Serialize tool calls
),
)
5. Testing
5.1 Unit Tests (Model Factory)
# tests/test_factory.py
import pytest
from agent_config.factory import create_model
def test_create_model_openai(monkeypatch):
monkeypatch.setenv("LLM_PROVIDER", "openai")
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
model = create_model()
assert model is not None
def test_create_model_missing_key(monkeypatch):
monkeypatch.setenv("LLM_PROVIDER", "openai")
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
with pytest.raises(ValueError, match="OPENAI_API_KEY required"):
create_model()
5.2 Integration Tests (MCP Tools)
# tests/test_mcp_tools.py
import pytest
from mcp_server.tools import add_task
@pytest.mark.asyncio
async def test_add_task(test_session, test_user):
result = await add_task(
user_id=str(test_user.id),
title="Test task",
description="Test description",
priority="high"
)
assert len(result) == 1
assert "Task created" in result[0].text
assert "Test task" in result[0].text
6. Production Checklist
- Set appropriate MCP timeout (30s+)
- Disable parallel tool calls for database operations
- Add error handling for provider API failures
- Implement retry logic with exponential backoff
- Add rate limiting to chat endpoints
- Monitor MCP server process health
- Log agent interactions for debugging
- Set up alerts for high error rates
- Use database connection pooling
- Configure CORS for production domains
- Validate JWT tokens on all endpoints
- Sanitize user inputs before tool execution
- Set up conversation cleanup (old conversations)
- Monitor database query performance
- Add caching for frequent queries
7. References
- OpenAI Agents SDK: https://github.com/openai/agents
- Official MCP SDK: https://github.com/modelcontextprotocol/python-sdk
- FastAPI SSE: https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
- SQLModel: https://sqlmodel.tiangolo.com/
- Better Auth: https://better-auth.com/
Last Updated: December 2024 Skill Version: 1.0.0 OpenAI Agents SDK: v0.2.9+ Official MCP SDK: v1.0.0+