| name | authentication-setup |
| description | Implement JWT authentication with bcrypt password hashing, refresh tokens, account lockout, and password reset flow. Use when setting up authentication or login system. |
| allowed-tools | Read, Write, Edit, Bash, Glob |
You implement secure authentication for the QA Team Portal using JWT and bcrypt.
When to Use This Skill
- Setting up user authentication system
- Implementing JWT with refresh tokens
- Adding password hashing with bcrypt
- Creating password reset flow
- Implementing account lockout mechanism
- Setting up session management
Prerequisites
- FastAPI backend initialized
- User model exists in
backend/app/models/user.py - Database configured
Implementation Components
1. Password Hashing (bcrypt)
Location: backend/app/core/security.py
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash password using bcrypt."""
return pwd_context.hash(password)
def validate_password_strength(password: str) -> tuple[bool, str]:
"""
Validate password meets requirements:
- Minimum 12 characters
- At least 1 uppercase, 1 lowercase, 1 number, 1 special char
"""
if len(password) < 12:
return False, "Password must be at least 12 characters"
if not any(c.isupper() for c in password):
return False, "Password must contain at least one uppercase letter"
if not any(c.islower() for c in password):
return False, "Password must contain at least one lowercase letter"
if not any(c.isdigit() for c in password):
return False, "Password must contain at least one number"
if not any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password):
return False, "Password must contain at least one special character"
return True, "Password is strong"
2. JWT Token Generation
Location: backend/app/core/security.py
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from app.core.config import settings
def create_access_token(
subject: Union[str, Any],
expires_delta: timedelta = None
) -> str:
"""Create JWT access token."""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject), "type": "access"}
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
def create_refresh_token(
subject: Union[str, Any],
expires_delta: timedelta = None
) -> str:
"""Create JWT refresh token."""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
days=settings.REFRESH_TOKEN_EXPIRE_DAYS
)
to_encode = {"exp": expire, "sub": str(subject), "type": "refresh"}
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
def decode_token(token: str) -> dict:
"""Decode and validate JWT token."""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token has expired")
except jwt.JWTError:
raise HTTPException(401, "Could not validate credentials")
3. Authentication Dependencies
Location: backend/app/api/deps.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.security import decode_token
from app.crud.user import user as user_crud
from app.db.session import get_db
from app.models.user import User
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""Get current authenticated user from JWT token."""
token = credentials.credentials
try:
payload = decode_token(token)
user_id: str = payload.get("sub")
token_type: str = payload.get("type")
if user_id is None or token_type != "access":
raise HTTPException(401, "Invalid token")
except JWTError:
raise HTTPException(401, "Could not validate credentials")
user = await user_crud.get(db, id=user_id)
if user is None:
raise HTTPException(404, "User not found")
if user.status != "active":
raise HTTPException(403, "User account is inactive")
return user
async def get_current_active_admin(
current_user: User = Depends(get_current_user)
) -> User:
"""Verify current user is an admin."""
if current_user.role not in ["admin", "lead"]:
raise HTTPException(403, "Not enough permissions")
return current_user
4. Login Endpoint
Location: backend/app/api/v1/endpoints/auth.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from datetime import timedelta
from app.api.deps import get_db, get_current_user
from app.core.security import verify_password, create_access_token, create_refresh_token
from app.core.config import settings
from app.crud.user import user as user_crud
from app.schemas.auth import LoginRequest, TokenResponse
router = APIRouter()
@router.post("/login", response_model=TokenResponse)
async def login(
login_data: LoginRequest,
db: Session = Depends(get_db)
):
"""
Login with email and password, returns access and refresh tokens.
Account lockout after 5 failed attempts.
"""
# Get user by email
user = await user_crud.get_by_email(db, email=login_data.email)
if not user:
# Don't reveal if user exists or not
raise HTTPException(401, "Incorrect email or password")
# Check if account is locked
if user.failed_login_attempts >= 5:
if user.locked_until and user.locked_until > datetime.utcnow():
raise HTTPException(403, "Account locked. Try again later.")
else:
# Reset lockout if time expired
await user_crud.reset_failed_attempts(db, user_id=user.id)
# Verify password
if not verify_password(login_data.password, user.password_hash):
# Increment failed attempts
await user_crud.increment_failed_attempts(db, user_id=user.id)
raise HTTPException(401, "Incorrect email or password")
# Check if user is active
if user.status != "active":
raise HTTPException(403, "User account is inactive")
# Reset failed attempts on successful login
await user_crud.reset_failed_attempts(db, user_id=user.id)
# Update last login
await user_crud.update_last_login(db, user_id=user.id)
# Create tokens
access_token = create_access_token(subject=str(user.id))
refresh_token = create_refresh_token(subject=str(user.id))
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
}
5. Refresh Token Endpoint
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(
refresh_token: str,
db: Session = Depends(get_db)
):
"""Refresh access token using refresh token."""
try:
payload = decode_token(refresh_token)
user_id = payload.get("sub")
token_type = payload.get("type")
if token_type != "refresh":
raise HTTPException(401, "Invalid token type")
user = await user_crud.get(db, id=user_id)
if not user or user.status != "active":
raise HTTPException(401, "Invalid token")
# Create new access token
new_access_token = create_access_token(subject=str(user.id))
return {
"access_token": new_access_token,
"token_type": "bearer",
"expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
}
except JWTError:
raise HTTPException(401, "Invalid refresh token")
6. Password Reset Flow
import secrets
from datetime import datetime, timedelta
@router.post("/forgot-password")
async def forgot_password(
email: str,
db: Session = Depends(get_db)
):
"""Send password reset email."""
user = await user_crud.get_by_email(db, email=email)
# Don't reveal if user exists
if not user:
return {"message": "If the email exists, a reset link has been sent"}
# Generate reset token (random, not JWT)
reset_token = secrets.token_urlsafe(32)
expires = datetime.utcnow() + timedelta(minutes=15)
# Store token in database
await user_crud.set_reset_token(
db,
user_id=user.id,
token=reset_token,
expires=expires
)
# Send email (use email service)
# await send_password_reset_email(user.email, reset_token)
return {"message": "If the email exists, a reset link has been sent"}
@router.post("/reset-password")
async def reset_password(
token: str,
new_password: str,
db: Session = Depends(get_db)
):
"""Reset password using reset token."""
# Validate password strength
is_valid, message = validate_password_strength(new_password)
if not is_valid:
raise HTTPException(400, message)
# Find user by reset token
user = await user_crud.get_by_reset_token(db, token=token)
if not user or not user.reset_token_expires:
raise HTTPException(400, "Invalid or expired reset token")
# Check if token expired
if user.reset_token_expires < datetime.utcnow():
raise HTTPException(400, "Reset token has expired")
# Update password
password_hash = get_password_hash(new_password)
await user_crud.update_password(
db,
user_id=user.id,
password_hash=password_hash
)
# Clear reset token
await user_crud.clear_reset_token(db, user_id=user.id)
return {"message": "Password reset successful"}
7. User Model Updates
Location: backend/app/models/user.py
Add these fields to User model:
from sqlalchemy import Column, String, Integer, DateTime
class User(Base):
# ... existing fields ...
# Account lockout
failed_login_attempts = Column(Integer, default=0)
locked_until = Column(DateTime, nullable=True)
# Password reset
reset_token = Column(String(255), nullable=True)
reset_token_expires = Column(DateTime, nullable=True)
# Session tracking
last_login = Column(DateTime, nullable=True)
8. CRUD Operations
Location: backend/app/crud/user.py
Add these methods to UserCRUD:
async def increment_failed_attempts(self, db: Session, user_id: UUID):
"""Increment failed login attempts and lock if needed."""
user = await self.get(db, id=user_id)
user.failed_login_attempts += 1
if user.failed_login_attempts >= 5:
user.locked_until = datetime.utcnow() + timedelta(minutes=30)
db.commit()
return user
async def reset_failed_attempts(self, db: Session, user_id: UUID):
"""Reset failed login attempts."""
user = await self.get(db, id=user_id)
user.failed_login_attempts = 0
user.locked_until = None
db.commit()
return user
async def set_reset_token(
self,
db: Session,
user_id: UUID,
token: str,
expires: datetime
):
"""Set password reset token."""
user = await self.get(db, id=user_id)
user.reset_token = token
user.reset_token_expires = expires
db.commit()
return user
Configuration
Location: backend/app/core/config.py
class Settings(BaseSettings):
# JWT
SECRET_KEY: str # Generate with: openssl rand -hex 32
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
Testing Authentication
# tests/integration/test_api_auth.py
import pytest
from fastapi.testclient import TestClient
def test_login_success(client, test_user):
response = client.post("/api/v1/auth/login", json={
"email": "admin@test.com",
"password": "testpass123"
})
assert response.status_code == 200
assert "access_token" in response.json()
assert "refresh_token" in response.json()
def test_login_invalid_password(client, test_user):
response = client.post("/api/v1/auth/login", json={
"email": "admin@test.com",
"password": "wrongpassword"
})
assert response.status_code == 401
def test_account_lockout(client, test_user):
# Try 5 times with wrong password
for i in range(5):
client.post("/api/v1/auth/login", json={
"email": "admin@test.com",
"password": "wrongpassword"
})
# 6th attempt should be locked
response = client.post("/api/v1/auth/login", json={
"email": "admin@test.com",
"password": "testpass123"
})
assert response.status_code == 403
assert "locked" in response.json()["detail"].lower()
Security Checklist
- ✅ Passwords hashed with bcrypt (cost factor 12)
- ✅ JWT with short expiry (15 minutes access, 7 days refresh)
- ✅ Password strength validation (12+ chars, complexity)
- ✅ Account lockout after 5 failed attempts (30 min)
- ✅ Password reset with secure random token (15 min expiry)
- ✅ Tokens validated on every request
- ✅ User status checked (active/inactive)
- ✅ HTTPOnly cookies for refresh tokens (frontend)
- ✅ No password exposure in logs or errors
- ✅ Rate limiting on auth endpoints (use /security command)
Frontend Integration
// frontend/src/services/authService.ts
export const login = async (email: string, password: string) => {
const response = await api.post('/auth/login', { email, password })
// Store tokens
localStorage.setItem('access_token', response.data.access_token)
// Refresh token in HttpOnly cookie (set by backend)
return response.data
}
export const refreshAccessToken = async () => {
const response = await api.post('/auth/refresh')
localStorage.setItem('access_token', response.data.access_token)
return response.data
}
// Add to axios interceptor
api.interceptors.response.use(
response => response,
async error => {
if (error.response?.status === 401) {
try {
await refreshAccessToken()
// Retry original request
return api(error.config)
} catch {
// Refresh failed, logout
logout()
}
}
return Promise.reject(error)
}
)
Report Format
After implementation, provide:
- ✅ JWT authentication implemented
- ✅ Password hashing with bcrypt
- ✅ Account lockout mechanism active
- ✅ Password reset flow complete
- ✅ Refresh token mechanism working
- ✅ Tests passing (X/Y)
- ⚠️ Security recommendations (if any)