| name | FastAPI Customer Support Tech Enablement |
| version | 1.0.0 |
| description | Comprehensive FastAPI skill for building modern Python web APIs with focus on customer support systems, ticket management, real-time chat, and backend operations |
| tags | fastapi, python, web-framework, async, rest-api, customer-support, backend, postgresql, sqlalchemy, pydantic, websocket, authentication, api-documentation |
| capabilities | async_api_development, dependency_injection, database_integration, request_validation, authentication_authorization, websocket_real_time, background_tasks, api_documentation, testing, error_handling |
| supported_databases | PostgreSQL, SQLite, MySQL, MongoDB |
| context | customer_support_tech_enablement |
| difficulty | intermediate_to_advanced |
FastAPI Customer Support Tech Enablement Skill
Overview
This skill provides comprehensive guidance for building production-ready customer support APIs using FastAPI, the modern, fast (high-performance) web framework for building APIs with Python 3.8+ based on standard Python type hints.
FastAPI is ideal for customer support systems due to its:
- Async capabilities for handling concurrent requests (multiple support agents, real-time updates)
- Automatic data validation with Pydantic (ensuring data integrity for tickets, users, responses)
- Built-in API documentation (OpenAPI/Swagger for support team training)
- WebSocket support for real-time chat and notifications
- Easy database integration with SQLAlchemy for PostgreSQL operations
- Type safety reducing bugs in critical support workflows
Core Competencies
1. Async API Development
FastAPI is built on top of Starlette for web routing and Pydantic for data validation, providing excellent async support for I/O-bound operations common in customer support systems.
Key Concepts:
- Use
async deffor path operations when making database queries, external API calls, or file operations - Use regular
deffor CPU-bound operations or when using synchronous libraries - FastAPI automatically handles the async/await pattern under the hood
- Background tasks for non-blocking operations (email notifications, log processing)
Best Practices for Support APIs:
# Async for database operations (most support APIs)
@app.get("/tickets/{ticket_id}")
async def get_ticket(ticket_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Ticket).where(Ticket.id == ticket_id))
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
return ticket
# Sync for simple operations without I/O
@app.get("/health")
def health_check():
return {"status": "healthy"}
2. Dependency Injection System
FastAPI's dependency injection is powerful for managing shared resources like database sessions, authentication, and configuration.
Database Session Management:
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/support_db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# Use in endpoints
@app.post("/tickets/")
async def create_ticket(
ticket: TicketCreate,
db: AsyncSession = Depends(get_db)
):
db_ticket = Ticket(**ticket.dict())
db.add(db_ticket)
await db.commit()
await db.refresh(db_ticket)
return db_ticket
Authentication Dependencies:
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
result = await db.execute(select(User).where(User.username == username))
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return user
async def get_current_active_agent(
current_user: User = Depends(get_current_user)
) -> User:
if not current_user.is_active or current_user.role != "agent":
raise HTTPException(status_code=403, detail="Not authorized as support agent")
return current_user
3. Request Validation with Pydantic
Pydantic models ensure data integrity throughout your support system.
Base Models for Customer Support:
from pydantic import BaseModel, EmailStr, Field, validator
from datetime import datetime
from typing import Optional, List
from enum import Enum
class TicketPriority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class TicketStatus(str, Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
WAITING_CUSTOMER = "waiting_customer"
RESOLVED = "resolved"
CLOSED = "closed"
class TicketBase(BaseModel):
title: str = Field(..., min_length=3, max_length=200)
description: str = Field(..., min_length=10)
priority: TicketPriority = TicketPriority.MEDIUM
category: str = Field(..., max_length=50)
@validator('title')
def title_must_not_be_empty(cls, v):
if not v.strip():
raise ValueError('Title cannot be empty or whitespace')
return v.strip()
class TicketCreate(TicketBase):
customer_email: EmailStr
attachments: Optional[List[str]] = []
class TicketUpdate(BaseModel):
title: Optional[str] = Field(None, min_length=3, max_length=200)
description: Optional[str] = None
status: Optional[TicketStatus] = None
priority: Optional[TicketPriority] = None
assigned_to: Optional[int] = None
class TicketResponse(TicketBase):
id: int
status: TicketStatus
customer_email: str
assigned_to: Optional[int]
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True # For SQLAlchemy models
4. Database Integration with SQLAlchemy
Modern async SQLAlchemy integration for PostgreSQL operations.
Model Definitions:
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Enum, Text
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.sql import func
import enum
Base = declarative_base()
class TicketStatusEnum(enum.Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
WAITING_CUSTOMER = "waiting_customer"
RESOLVED = "resolved"
CLOSED = "closed"
class TicketPriorityEnum(enum.Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
full_name = Column(String(100))
role = Column(String(20), default="customer") # customer, agent, admin
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
assigned_tickets = relationship("Ticket", back_populates="assigned_agent")
comments = relationship("Comment", back_populates="author")
class Ticket(Base):
__tablename__ = "tickets"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(Text, nullable=False)
status = Column(Enum(TicketStatusEnum), default=TicketStatusEnum.OPEN, index=True)
priority = Column(Enum(TicketPriorityEnum), default=TicketPriorityEnum.MEDIUM, index=True)
category = Column(String(50), index=True)
customer_email = Column(String(100), index=True, nullable=False)
assigned_to = Column(Integer, ForeignKey("users.id"), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
resolved_at = Column(DateTime(timezone=True), nullable=True)
assigned_agent = relationship("User", back_populates="assigned_tickets")
comments = relationship("Comment", back_populates="ticket", cascade="all, delete-orphan")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
ticket_id = Column(Integer, ForeignKey("tickets.id"), nullable=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
content = Column(Text, nullable=False)
is_internal = Column(Boolean, default=False) # Internal agent notes
created_at = Column(DateTime(timezone=True), server_default=func.now())
ticket = relationship("Ticket", back_populates="comments")
author = relationship("User", back_populates="comments")
Database Initialization:
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@app.on_event("startup")
async def startup():
await init_db()
5. Authentication & Authorization
JWT-based authentication for support portal access.
Password Hashing:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
Token Generation:
from datetime import datetime, timedelta
from jose import jwt
SECRET_KEY = "your-secret-key-here" # Use environment variable in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
Login Endpoint:
from fastapi.security import OAuth2PasswordRequestForm
@app.post("/token")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(User).where(User.username == form_data.username))
user = result.scalar_one_or_none()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "role": user.role},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
6. WebSocket for Real-Time Support
Real-time chat between customers and support agents.
WebSocket Manager:
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, List
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[int, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, ticket_id: int):
await websocket.accept()
if ticket_id not in self.active_connections:
self.active_connections[ticket_id] = []
self.active_connections[ticket_id].append(websocket)
def disconnect(self, websocket: WebSocket, ticket_id: int):
if ticket_id in self.active_connections:
self.active_connections[ticket_id].remove(websocket)
if not self.active_connections[ticket_id]:
del self.active_connections[ticket_id]
async def send_message(self, message: str, ticket_id: int):
if ticket_id in self.active_connections:
for connection in self.active_connections[ticket_id]:
await connection.send_text(message)
async def broadcast(self, message: str, ticket_id: int, exclude: WebSocket = None):
if ticket_id in self.active_connections:
for connection in self.active_connections[ticket_id]:
if connection != exclude:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/ticket/{ticket_id}")
async def websocket_endpoint(
websocket: WebSocket,
ticket_id: int,
token: str,
db: AsyncSession = Depends(get_db)
):
# Verify token
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
except JWTError:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket, ticket_id)
try:
while True:
data = await websocket.receive_text()
message = {
"ticket_id": ticket_id,
"username": username,
"message": data,
"timestamp": datetime.utcnow().isoformat()
}
await manager.broadcast(json.dumps(message), ticket_id)
except WebSocketDisconnect:
manager.disconnect(websocket, ticket_id)
7. Background Tasks
Handle email notifications and long-running operations without blocking responses.
Email Notification Task:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def send_email_notification(to_email: str, subject: str, body: str):
"""Background task to send email"""
try:
msg = MIMEMultipart()
msg['From'] = "support@company.com"
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'html'))
# Configure SMTP
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login("support@company.com", "password")
server.send_message(msg)
except Exception as e:
print(f"Failed to send email: {e}")
@app.post("/tickets/")
async def create_ticket(
ticket: TicketCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
db_ticket = Ticket(**ticket.dict())
db.add(db_ticket)
await db.commit()
await db.refresh(db_ticket)
# Send confirmation email in background
background_tasks.add_task(
send_email_notification,
ticket.customer_email,
f"Ticket #{db_ticket.id} Created",
f"Your support ticket has been created. We'll respond within 24 hours."
)
return db_ticket
8. Pagination & Filtering
Essential for support ticket lists with many records.
Pagination Dependencies:
from typing import Optional
class PaginationParams(BaseModel):
skip: int = Field(0, ge=0, description="Number of records to skip")
limit: int = Field(10, ge=1, le=100, description="Maximum records to return")
class TicketFilters(BaseModel):
status: Optional[TicketStatus] = None
priority: Optional[TicketPriority] = None
category: Optional[str] = None
assigned_to: Optional[int] = None
search: Optional[str] = Field(None, description="Search in title/description")
@app.get("/tickets/", response_model=List[TicketResponse])
async def list_tickets(
filters: TicketFilters = Depends(),
pagination: PaginationParams = Depends(),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
query = select(Ticket)
# Apply filters
if filters.status:
query = query.where(Ticket.status == filters.status)
if filters.priority:
query = query.where(Ticket.priority == filters.priority)
if filters.category:
query = query.where(Ticket.category == filters.category)
if filters.assigned_to:
query = query.where(Ticket.assigned_to == filters.assigned_to)
if filters.search:
search_term = f"%{filters.search}%"
query = query.where(
(Ticket.title.ilike(search_term)) |
(Ticket.description.ilike(search_term))
)
# Apply pagination
query = query.offset(pagination.skip).limit(pagination.limit)
query = query.order_by(Ticket.created_at.desc())
result = await db.execute(query)
tickets = result.scalars().all()
return tickets
9. Error Handling & Middleware
Consistent error responses and logging.
Custom Exception Handlers:
from fastapi.responses import JSONResponse
class TicketNotFoundError(Exception):
def __init__(self, ticket_id: int):
self.ticket_id = ticket_id
@app.exception_handler(TicketNotFoundError)
async def ticket_not_found_handler(request: Request, exc: TicketNotFoundError):
return JSONResponse(
status_code=404,
content={
"error": "ticket_not_found",
"message": f"Ticket with ID {exc.ticket_id} not found",
"ticket_id": exc.ticket_id
}
)
@app.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
return JSONResponse(
status_code=422,
content={
"error": "validation_error",
"message": "Invalid request data",
"details": exc.errors()
}
)
Logging Middleware:
import time
import logging
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
# Log request
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# Log response
process_time = time.time() - start_time
logger.info(
f"Response: {response.status_code} "
f"(took {process_time:.2f}s)"
)
response.headers["X-Process-Time"] = str(process_time)
return response
10. CORS Configuration
Enable web clients to access the API.
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000", # React dev server
"https://support.company.com" # Production domain
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
11. API Documentation
FastAPI automatically generates interactive documentation.
Customizing Documentation:
from fastapi import FastAPI
app = FastAPI(
title="Customer Support API",
description="API for managing customer support tickets, agents, and real-time chat",
version="1.0.0",
contact={
"name": "Support Team",
"email": "tech@company.com"
},
license_info={
"name": "MIT"
}
)
# Add tags for organization
tags_metadata = [
{
"name": "tickets",
"description": "Operations with support tickets",
},
{
"name": "users",
"description": "User and agent management",
},
{
"name": "chat",
"description": "Real-time chat via WebSocket",
},
]
app = FastAPI(openapi_tags=tags_metadata)
12. Testing
Comprehensive testing with pytest and httpx.
Test Setup:
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from main import app, get_db
TEST_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/test_support_db"
@pytest.fixture
async def test_db():
engine = create_async_engine(TEST_DATABASE_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
TestSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with TestSessionLocal() as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
async def client(test_db):
async def override_get_db():
yield test_db
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
@pytest.mark.anyio
async def test_create_ticket(client: AsyncClient):
response = await client.post(
"/tickets/",
json={
"title": "Test Ticket",
"description": "This is a test ticket",
"priority": "high",
"category": "technical",
"customer_email": "customer@example.com"
}
)
assert response.status_code == 200
data = response.json()
assert data["title"] == "Test Ticket"
assert data["status"] == "open"
Performance Optimization
Database Query Optimization
Use Eager Loading:
from sqlalchemy.orm import selectinload
@app.get("/tickets/{ticket_id}/full")
async def get_ticket_with_comments(
ticket_id: int,
db: AsyncSession = Depends(get_db)
):
query = select(Ticket).options(
selectinload(Ticket.comments),
selectinload(Ticket.assigned_agent)
).where(Ticket.id == ticket_id)
result = await db.execute(query)
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
return ticket
Database Connection Pooling:
engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
Caching with Redis
import redis.asyncio as redis
from fastapi import Depends
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
async def get_cached_ticket(ticket_id: int) -> Optional[dict]:
cached = await redis_client.get(f"ticket:{ticket_id}")
if cached:
return json.loads(cached)
return None
async def cache_ticket(ticket_id: int, data: dict, expire: int = 300):
await redis_client.setex(
f"ticket:{ticket_id}",
expire,
json.dumps(data)
)
@app.get("/tickets/{ticket_id}")
async def get_ticket(
ticket_id: int,
db: AsyncSession = Depends(get_db)
):
# Try cache first
cached = await get_cached_ticket(ticket_id)
if cached:
return cached
# Query database
result = await db.execute(select(Ticket).where(Ticket.id == ticket_id))
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
# Cache result
ticket_dict = {
"id": ticket.id,
"title": ticket.title,
"status": ticket.status.value,
# ... other fields
}
await cache_ticket(ticket_id, ticket_dict)
return ticket_dict
Security Best Practices
- Environment Variables: Never hardcode secrets
- Input Validation: Use Pydantic models for all inputs
- Rate Limiting: Implement with slowapi or custom middleware
- SQL Injection Prevention: Use SQLAlchemy ORM (never raw SQL)
- HTTPS Only: Configure SSL/TLS in production
- CORS: Restrict to known origins
- Authentication: Use JWT with short expiration times
- Password Hashing: Always use bcrypt or similar
Deployment Considerations
Docker Configuration
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Health Checks
@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
try:
# Check database connection
await db.execute(text("SELECT 1"))
return {
"status": "healthy",
"database": "connected",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
raise HTTPException(
status_code=503,
detail=f"Service unhealthy: {str(e)}"
)
Monitoring & Observability
Prometheus Metrics
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.observe(duration)
return response
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
Common Patterns for Customer Support
1. Ticket Assignment Logic
@app.post("/tickets/{ticket_id}/assign")
async def assign_ticket(
ticket_id: int,
agent_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_agent)
):
# Get ticket
result = await db.execute(select(Ticket).where(Ticket.id == ticket_id))
ticket = result.scalar_one_or_none()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
# Verify agent exists and is active
agent_result = await db.execute(select(User).where(User.id == agent_id))
agent = agent_result.scalar_one_or_none()
if not agent or agent.role != "agent" or not agent.is_active:
raise HTTPException(status_code=400, detail="Invalid agent")
# Update ticket
ticket.assigned_to = agent_id
ticket.status = TicketStatusEnum.IN_PROGRESS
await db.commit()
return {"message": f"Ticket assigned to {agent.full_name}"}
2. SLA Tracking
from datetime import timedelta
def calculate_sla_breach(ticket: Ticket) -> dict:
sla_hours = {
TicketPriorityEnum.URGENT: 4,
TicketPriorityEnum.HIGH: 8,
TicketPriorityEnum.MEDIUM: 24,
TicketPriorityEnum.LOW: 48
}
sla_deadline = ticket.created_at + timedelta(hours=sla_hours[ticket.priority])
now = datetime.utcnow()
if ticket.status in [TicketStatusEnum.RESOLVED, TicketStatusEnum.CLOSED]:
resolution_time = ticket.resolved_at or ticket.updated_at
breached = resolution_time > sla_deadline
time_to_resolution = (resolution_time - ticket.created_at).total_seconds() / 3600
else:
breached = now > sla_deadline
time_to_resolution = None
return {
"sla_deadline": sla_deadline,
"breached": breached,
"time_to_resolution_hours": time_to_resolution
}
3. Bulk Operations
@app.post("/tickets/bulk-update")
async def bulk_update_tickets(
ticket_ids: List[int],
update_data: TicketUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_agent)
):
result = await db.execute(
select(Ticket).where(Ticket.id.in_(ticket_ids))
)
tickets = result.scalars().all()
if not tickets:
raise HTTPException(status_code=404, detail="No tickets found")
for ticket in tickets:
if update_data.status:
ticket.status = update_data.status
if update_data.priority:
ticket.priority = update_data.priority
if update_data.assigned_to:
ticket.assigned_to = update_data.assigned_to
await db.commit()
return {"updated_count": len(tickets), "ticket_ids": ticket_ids}
Troubleshooting Guide
Common Issues
1. Database Connection Errors
- Check connection string format:
postgresql+asyncpg://user:pass@host:port/db - Ensure asyncpg is installed:
pip install asyncpg - Verify PostgreSQL is running and accepting connections
2. Async/Await Errors
- Always use
async deffor async database operations - Use
awaitwhen calling async functions - Don't mix sync and async SQLAlchemy sessions
3. Pydantic Validation Errors
- Check field types match the schema
- Use Optional[] for nullable fields
- Add validators for custom business logic
4. JWT Token Issues
- Verify SECRET_KEY is consistent
- Check token expiration time
- Ensure ALGORITHM matches between encoding and decoding
5. WebSocket Connection Drops
- Implement reconnection logic on client side
- Add heartbeat/ping messages
- Check for network timeouts
Summary
This FastAPI skill provides everything needed to build production-ready customer support APIs with:
- High-performance async operations
- Type-safe data validation
- Secure authentication
- Real-time communication
- Comprehensive testing
- Monitoring and observability
Key advantages for customer support systems:
- Handle thousands of concurrent requests
- Real-time updates for agents and customers
- Automatic API documentation for onboarding
- Type safety reduces bugs in critical workflows
- Easy integration with PostgreSQL and other databases