Claude Code Plugins

Community-maintained marketplace

Feedback

openai-agents-sdk-mcp-backend

@Okashanadeem/GIAIC-HACKATHON-II
0
0

>

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name openai-agents-sdk-mcp-backend
description Patterns for building AI-powered chatbot backends using OpenAI Agents SDK with MCP (Model Context Protocol) server integration in FastAPI applications. Supports both standalone MCP servers and function tools.
version 2.0.0

OpenAI Agents SDK + MCP Backend Skill

When to use this Skill

Use this Skill whenever you are:

  • Building an AI-powered chatbot backend using OpenAI Agents SDK.
  • Creating MCP servers that expose application functionality to AI agents.
  • Creating MCP tools using either standalone server or function tools approach.
  • Implementing a chat endpoint that processes natural language requests.
  • Integrating AI agents with existing CRUD operations via MCP.
  • Building stateless conversation systems with database-backed history.

This Skill works for any FastAPI application that needs AI agent capabilities with tool-calling functionality.

Core Goals

  • Build production-ready AI chatbot backends with proper error handling.
  • Create MCP servers following the official MCP protocol specification.
  • Create reusable MCP tools that wrap existing business logic.
  • Maintain stateless architecture where conversation state is stored in DB.
  • Follow consistent patterns for agent configuration and tool definition.
  • Enable natural language interfaces for application functionality.

Technology Stack

Component Technology Version
AI Framework OpenAI Agents SDK openai-agents>=0.6.0
MCP Server Official MCP SDK mcp[cli]>=1.2.0
Web Framework FastAPI >=0.115.0
Database ORM SQLModel >=0.0.22
Async HTTP httpx >=0.27.0

Installation

# Using pip
pip install openai-agents "mcp[cli]" fastapi sqlmodel httpx

# Using uv
uv add openai-agents "mcp[cli]" fastapi sqlmodel httpx

Required environment variables:

# For OpenAI (default)
OPENAI_API_KEY=sk-your-api-key-here

# For Gemini (alternative)
GEMINI_API_KEY=your-gemini-api-key-here

DATABASE_URL=postgresql://user:pass@host/db

Two Approaches: MCP Server vs Function Tools

The OpenAI Agents SDK supports two approaches for providing tools to agents:

Approach 1: Standalone MCP Server (RECOMMENDED for Hackathon)

Create a separate MCP server using the Official MCP Python SDK that exposes tools via the MCP protocol. The agent connects to this server.

Advantages:

  • Follows the official MCP protocol specification
  • Tools are reusable by any MCP-compatible client
  • Clean separation between server and client
  • Industry standard approach

Architecture:

┌─────────────────┐     ┌────────────────────┐     ┌─────────────────────┐
│  ChatKit UI     │────►│  FastAPI Backend   │     │  MCP Server         │
│                 │     │  POST /api/chat    │────►│  (Standalone)       │
│                 │     │       │            │     │  @mcp.tool()        │
│                 │◄────│       ▼            │◄────│  - add_task         │
│                 │     │  OpenAI Agent      │     │  - list_tasks       │
│                 │     │  (mcp_servers=[])  │     │  - complete_task    │
└─────────────────┘     └────────────────────┘     └─────────────────────┘
                                                           │
                                                           ▼
                                                   ┌─────────────────┐
                                                   │    Database     │
                                                   └─────────────────┘

Code Example:

# mcp_server.py - Standalone MCP Server
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("todo-mcp-server")

@mcp.tool()
async def add_task(user_id: str, title: str, description: str = "") -> dict:
    """Add a new task for the user.

    Args:
        user_id: The user's unique identifier.
        title: The title of the task.
        description: Optional description.
    """
    # Database operation
    return {"task_id": 1, "status": "created", "title": title}

if __name__ == "__main__":
    mcp.run(transport="streamable-http", port=8001)
# agent.py - Agent connects to MCP Server
from agents import Agent, Runner
from agents.mcp import MCPServerStreamableHttp

async def run_agent_with_mcp(user_message: str, user_id: str):
    async with MCPServerStreamableHttp(
        name="Todo MCP Server",
        params={"url": "http://localhost:8001/mcp"},
    ) as mcp_server:
        agent = Agent(
            name="Todo Assistant",
            instructions="Help users manage tasks.",
            mcp_servers=[mcp_server],
        )
        result = await Runner.run(
            agent,
            input=user_message,
            context={"user_id": user_id},
        )
        return result.final_output

Approach 2: Function Tools (Simpler but not MCP standard)

Define tools as Python functions decorated with @function_tool directly in your FastAPI application.

Advantages:

  • Simpler setup, no separate server
  • Functions can access request context directly
  • Good for quick prototypes

Disadvantages:

  • Not following official MCP protocol
  • Tools are not reusable outside this agent
  • Tighter coupling

Architecture:

┌─────────────────┐     ┌────────────────────────────────────────────┐
│  ChatKit UI     │────►│  FastAPI Backend                           │
│                 │     │  POST /api/chat                            │
│                 │     │       │                                    │
│                 │◄────│       ▼                                    │
│                 │     │  OpenAI Agent                              │
│                 │     │  (tools=[@function_tool])                  │
│                 │     │       │                                    │
│                 │     │       ▼                                    │
│                 │     │  Function Tools (same process)             │──► Database
│                 │     │  - add_task                                │
│                 │     │  - list_tasks                              │
└─────────────────┘     └────────────────────────────────────────────┘

Code Example:

# tools.py - Function Tools (not MCP standard)
from agents import function_tool

@function_tool
async def add_task(user_id: str, title: str, description: str = "") -> dict:
    """Add a new task for the user."""
    # Database operation
    return {"task_id": 1, "status": "created", "title": title}

# agent.py
from agents import Agent

agent = Agent(
    name="Todo Assistant",
    instructions="Help users manage tasks.",
    tools=[add_task, list_tasks, complete_task],  # Direct function references
)

LLM Provider Configuration

The OpenAI Agents SDK supports multiple LLM providers. You can use OpenAI, Gemini, or any OpenAI-compatible API.

Option 1: OpenAI (Default)

from agents import Agent, Runner
import os

# Uses OPENAI_API_KEY environment variable automatically
agent = Agent(
    name="Todo Assistant",
    instructions="Help users manage tasks.",
    mcp_servers=[mcp_server],  # or tools=[...]
)

result = await Runner.run(agent, "Show my tasks")

Option 2: Google Gemini

from openai import AsyncOpenAI
from agents import Agent, OpenAIChatCompletionsModel, Runner, set_tracing_disabled
import os

# Configure Gemini client
# Reference: https://ai.google.dev/gemini-api/docs/openai
gemini_client = AsyncOpenAI(
    api_key=os.environ["GEMINI_API_KEY"],
    base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
)

# Disable tracing for non-OpenAI providers
set_tracing_disabled(disabled=True)

# Create agent with Gemini model
agent = Agent(
    name="Todo Assistant",
    instructions="Help users manage tasks.",
    model=OpenAIChatCompletionsModel(
        model="gemini-2.0-flash",  # or "gemini-1.5-pro", "gemini-1.5-flash"
        openai_client=gemini_client,
    ),
    mcp_servers=[mcp_server],
)

result = await Runner.run(agent, "Show my tasks")

Supported Gemini Models

Model Context Window Best For
gemini-2.0-flash 1M tokens Fast responses, general use
gemini-1.5-pro 2M tokens Complex tasks, long context
gemini-1.5-flash 1M tokens Balance of speed and quality

MCP Server Implementation (Official SDK)

FastMCP Server Pattern

"""MCP Server using Official MCP Python SDK."""
from typing import Any
from mcp.server.fastmcp import FastMCP
from sqlmodel import Session, select
from app.db import engine
from app.models.task import Task

# Initialize FastMCP server
mcp = FastMCP("todo-mcp-server")


@mcp.tool()
async def add_task(
    user_id: str,
    title: str,
    description: str = "",
) -> dict[str, Any]:
    """Add a new task for the user.

    Creates a new task with the given title and optional description.
    The task is associated with the specified user.

    Args:
        user_id: The unique identifier of the user.
        title: The title of the task (required, 1-200 characters).
        description: Optional detailed description of the task.

    Returns:
        Dictionary containing task_id, status, and title.
    """
    with Session(engine) as session:
        task = Task(
            user_id=user_id,
            title=title,
            description=description,
            completed=False,
        )
        session.add(task)
        session.commit()
        session.refresh(task)
        return {
            "task_id": str(task.id),
            "status": "created",
            "title": task.title,
        }


@mcp.tool()
async def list_tasks(
    user_id: str,
    status: str = "all",
) -> dict[str, Any]:
    """List tasks for the user.

    Retrieves tasks belonging to the specified user, optionally
    filtered by completion status.

    Args:
        user_id: The unique identifier of the user.
        status: Filter by status - "all", "pending", or "completed".
                Defaults to "all".

    Returns:
        Dictionary containing tasks list, count, and filter applied.
    """
    with Session(engine) as session:
        query = select(Task).where(Task.user_id == user_id)

        if status == "pending":
            query = query.where(Task.completed == False)
        elif status == "completed":
            query = query.where(Task.completed == True)

        tasks = session.exec(query).all()
        task_list = [
            {
                "id": str(task.id),
                "title": task.title,
                "description": task.description or "",
                "completed": task.completed,
            }
            for task in tasks
        ]
        return {
            "tasks": task_list,
            "count": len(task_list),
            "filter": status,
        }


@mcp.tool()
async def complete_task(user_id: str, task_id: str) -> dict[str, Any]:
    """Mark a task as complete.

    Args:
        user_id: The unique identifier of the user.
        task_id: The ID of the task to mark as complete.

    Returns:
        Dictionary with task_id, status, and title.
    """
    with Session(engine) as session:
        task = session.get(Task, task_id)

        if not task or task.user_id != user_id:
            return {"error": "Task not found", "task_id": task_id}

        task.completed = True
        session.add(task)
        session.commit()
        return {
            "task_id": str(task.id),
            "status": "completed",
            "title": task.title,
        }


@mcp.tool()
async def delete_task(user_id: str, task_id: str) -> dict[str, Any]:
    """Delete a task from the list.

    Args:
        user_id: The unique identifier of the user.
        task_id: The ID of the task to delete.

    Returns:
        Dictionary with task_id, status, and title.
    """
    with Session(engine) as session:
        task = session.get(Task, task_id)

        if not task or task.user_id != user_id:
            return {"error": "Task not found", "task_id": task_id}

        title = task.title
        session.delete(task)
        session.commit()
        return {
            "task_id": task_id,
            "status": "deleted",
            "title": title,
        }


@mcp.tool()
async def update_task(
    user_id: str,
    task_id: str,
    title: str | None = None,
    description: str | None = None,
) -> dict[str, Any]:
    """Update a task's title or description.

    Args:
        user_id: The unique identifier of the user.
        task_id: The ID of the task to update.
        title: New title for the task (optional).
        description: New description for the task (optional).

    Returns:
        Dictionary with task_id, status, and title.
    """
    with Session(engine) as session:
        task = session.get(Task, task_id)

        if not task or task.user_id != user_id:
            return {"error": "Task not found", "task_id": task_id}

        if title is not None:
            task.title = title
        if description is not None:
            task.description = description

        session.add(task)
        session.commit()
        return {
            "task_id": str(task.id),
            "status": "updated",
            "title": task.title,
        }


# Run the MCP server
def main():
    """Run the MCP server with streamable HTTP transport."""
    mcp.run(transport="streamable-http", port=8001)


if __name__ == "__main__":
    main()

Running MCP Server

# Option 1: Direct run
python mcp_server.py

# Option 2: Using uvicorn (if using HTTP transport)
uvicorn mcp_server:mcp --port 8001

# Option 3: Using uv
uv run mcp_server.py

Agent Connection to MCP Server

Using MCPServerStreamableHttp

from agents import Agent, Runner
from agents.mcp import MCPServerStreamableHttp

AGENT_INSTRUCTIONS = """You are a helpful assistant that helps users manage their tasks.

## CAPABILITIES
- Add new tasks with titles and optional descriptions
- List all tasks, or filter by status (pending/completed)
- Mark tasks as complete
- Delete tasks they no longer need
- Update task titles or descriptions

## BEHAVIOR GUIDELINES
1. Always confirm actions with a friendly, concise message
2. When listing tasks, format them clearly with task IDs
3. If a task is not found, explain politely
4. Ask for clarification if the request is ambiguous
"""


async def create_agent_with_mcp_server():
    """Create agent connected to MCP server."""
    async with MCPServerStreamableHttp(
        name="Todo MCP Server",
        params={
            "url": "http://localhost:8001/mcp",
            "timeout": 30,
        },
        cache_tools_list=True,
    ) as mcp_server:
        agent = Agent(
            name="Todo Assistant",
            instructions=AGENT_INSTRUCTIONS,
            mcp_servers=[mcp_server],
        )
        yield agent


async def process_chat(user_message: str, user_id: str) -> str:
    """Process a chat message through the agent."""
    async with MCPServerStreamableHttp(
        name="Todo MCP Server",
        params={"url": "http://localhost:8001/mcp"},
    ) as mcp_server:
        agent = Agent(
            name="Todo Assistant",
            instructions=AGENT_INSTRUCTIONS,
            mcp_servers=[mcp_server],
        )
        result = await Runner.run(
            agent,
            input=user_message,
            context={"user_id": user_id},
        )
        return result.final_output

MCP Server Transport Options

Transport Use Case Connection Method
streamable-http HTTP-based server MCPServerStreamableHttp
sse Server-Sent Events MCPServerSse
stdio Local subprocess MCPServerStdio

Database Models for Chat

Conversation Model

from sqlmodel import SQLModel, Field
from datetime import datetime
from typing import Optional

class Conversation(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    user_id: str = Field(index=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)

Message Model

class Message(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    conversation_id: int = Field(foreign_key="conversation.id", index=True)
    user_id: str = Field(index=True)
    role: str  # "user" or "assistant"
    content: str
    created_at: datetime = Field(default_factory=datetime.utcnow)

Chat Endpoint Pattern

Request/Response Schemas

from pydantic import BaseModel
from typing import Optional, List

class ChatRequest(BaseModel):
    message: str
    conversation_id: Optional[int] = None

class ToolCall(BaseModel):
    name: str
    arguments: dict
    result: dict

class ChatResponse(BaseModel):
    conversation_id: int
    response: str
    tool_calls: List[ToolCall] = []

Chat Router Implementation

from fastapi import APIRouter, Depends, HTTPException
from agents import Agent, Runner
from agents.mcp import MCPServerStreamableHttp

router = APIRouter(prefix="/api/{user_id}", tags=["chat"])

MCP_SERVER_URL = "http://localhost:8001/mcp"

@router.post("/chat", response_model=ChatResponse)
async def chat(
    user_id: str,
    request: ChatRequest,
    session: Session = Depends(get_session),
    current_user: User = Depends(get_current_user),
):
    # 1. Verify user authorization
    if current_user.id != user_id:
        raise HTTPException(status_code=403, detail="Forbidden")

    # 2. Get or create conversation
    conversation = await get_or_create_conversation(
        session, user_id, request.conversation_id
    )

    # 3. Fetch conversation history
    history = await get_conversation_history(session, conversation.id)

    # 4. Store user message
    await store_message(session, conversation.id, user_id, "user", request.message)

    # 5. Run agent with MCP server
    async with MCPServerStreamableHttp(
        name="Todo MCP Server",
        params={"url": MCP_SERVER_URL},
    ) as mcp_server:
        agent = Agent(
            name="Todo Assistant",
            instructions=AGENT_INSTRUCTIONS,
            mcp_servers=[mcp_server],
        )
        result = await Runner.run(
            agent,
            input=request.message,
            context={"user_id": user_id, "history": history},
        )

    # 6. Store assistant response
    await store_message(
        session, conversation.id, user_id, "assistant", result.final_output
    )

    # 7. Return response
    return ChatResponse(
        conversation_id=conversation.id,
        response=result.final_output,
        tool_calls=extract_tool_calls(result),
    )

Agent Instructions Best Practices

Good Instructions

instructions = """You are a helpful todo assistant for managing tasks.

CAPABILITIES:
- Add new tasks with titles and descriptions
- List all tasks, pending tasks, or completed tasks
- Mark tasks as complete
- Delete tasks
- Update task details

BEHAVIOR:
- Always confirm actions with a friendly message
- When listing tasks, format them clearly with IDs
- If a task is not found, explain politely
- Ask for clarification if the request is ambiguous

EXAMPLES:
- "Add a task" -> Ask for the task title
- "Show my tasks" -> Use list_tasks with status="all"
- "Mark task 3 done" -> Use complete_task with task_id=3
"""

Error Handling

Tool-Level Errors

@mcp.tool()
async def complete_task(user_id: str, task_id: str) -> dict:
    """Mark a task as complete."""
    task = get_task(user_id, task_id)
    if not task:
        return {"error": "Task not found", "task_id": task_id}

    task.completed = True
    save_task(task)
    return {"task_id": task_id, "status": "completed", "title": task.title}

Endpoint-Level Errors

@router.post("/chat")
async def chat(user_id: str, request: ChatRequest):
    try:
        async with MCPServerStreamableHttp(...) as mcp_server:
            agent = Agent(name="Assistant", mcp_servers=[mcp_server])
            result = await Runner.run(agent, input=request.message)
        return ChatResponse(response=result.final_output)
    except Exception as e:
        logger.error(f"Agent error: {e}")
        raise HTTPException(
            status_code=500,
            detail="Failed to process your request. Please try again."
        )

Testing Patterns

Unit Testing MCP Tools

import pytest
from mcp_server import add_task, list_tasks

@pytest.mark.asyncio
async def test_add_task():
    result = await add_task(user_id="test-user", title="Test Task")
    assert result["status"] == "created"
    assert result["title"] == "Test Task"

@pytest.mark.asyncio
async def test_list_tasks():
    result = await list_tasks(user_id="test-user", status="all")
    assert "tasks" in result
    assert isinstance(result["tasks"], list)

Integration Testing Chat

from fastapi.testclient import TestClient

def test_chat_endpoint(client: TestClient, auth_headers: dict):
    response = client.post(
        "/api/test-user/chat",
        json={"message": "Add a task to buy groceries"},
        headers=auth_headers,
    )
    assert response.status_code == 200
    data = response.json()
    assert "conversation_id" in data
    assert "response" in data

Things to Avoid

  • Using only @function_tool for hackathon - Use proper MCP server instead.
  • Hardcoding API keys - Always use environment variables.
  • Ignoring rate limits - Implement retry logic with backoff.
  • Unbounded history - Limit conversation history length.
  • Missing error handling - Always handle tool and agent failures.
  • Blocking operations - Use async patterns for database and API calls.
  • Vague agent instructions - Be specific about capabilities and behavior.

References