| name | multi-system-sso-authentication |
| description | Implement enterprise Single Sign-On (SSO) authentication supporting multiple identity providers with JWT RS256 tokens, backwards verification, session management, and cross-system permission mapping. Use this skill when building authentication systems that integrate with multiple enterprise SSO providers or when implementing secure token validation with session verification. |
Multi-System SSO Authentication Skill
Overview
This skill provides comprehensive patterns for implementing enterprise SSO authentication that supports multiple identity providers. It covers JWT RS256 token validation, backwards verification with authoritative systems, Laravel session decryption, permission mapping, and Redis session management.
When to Use This Skill
- Integrating with multiple enterprise SSO systems
- Implementing secure JWT token validation with backwards verification
- Supporting legacy session-based authentication alongside JWT
- Building unified authentication adapters for microservices
- Mapping permissions across different systems
- Implementing token introspection and revocation
- Handling OAuth2 flows with multiple providers
Core Concepts
Authentication Architecture
┌─────────────────────────────────────────────────────────┐
│ Your Application │
│ ┌────────────────────────────────────────────────────┐ │
│ │ UnifiedAuthAdapter (Router) │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Check token issuer (iss claim) │ │ │
│ │ │ Route to appropriate adapter │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ CORP │ │ SGF │ │ GED │ │ CARRINHO│ │ │
│ │ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ Corporativo│ │ SGF │ │ GED │ │ Carrinho │
│ SSO │ │ API │ │ API │ │ API │
└───────────┘ └───────────┘ └───────────┘ └───────────┘
Token Flow
- User authenticates with external SSO system
- SSO system issues JWT with issuer (iss) and audience (aud) claims
- Your app receives token from request headers
- UnifiedAuthAdapter routes to appropriate adapter based on issuer
- Adapter validates JWT signature with public key
- Backwards verification checks token validity with issuing system
- Permissions mapped from SSO format to your app's format
- User session created in Redis for future requests
Project Structure
src/
├── api/
│ ├── middlewares/
│ │ └── auth.py # AuthMiddleware
│ └── path/
│ └── auth.py # Authentication endpoints
├── domain/
│ └── modules/
│ └── auth/
│ ├── entity.py # User entity
│ ├── session.py # Session management
│ └── permissions.py # Permission definitions
└── infra/
├── adapters/
│ └── auth/
│ ├── unified_adapter.py # Router for all adapters
│ ├── corporativo_adapter.py # Corporativo SSO
│ ├── sgf_adapter.py # SGF integration
│ ├── ged_adapter.py # GED integration
│ └── carrinho_adapter.py # Carrinho integration
├── cache/
│ └── redis_session.py # Redis session storage
└── services/
└── permission_mapper.py # Permission mapping
Implementation Patterns
1. Unified Authentication Adapter (Router)
# src/infra/adapters/auth/unified_adapter.py
from typing import Dict, Any
from jose import jwt, JWTError
from src.infra.adapters.auth.corporativo_adapter import CorporativoAuthAdapter
from src.infra.adapters.auth.sgf_adapter import SGFAuthAdapter
from src.infra.adapters.auth.ged_adapter import GEDAuthAdapter
from src.infra.adapters.auth.carrinho_adapter import CarrinhoAuthAdapter
from src.config.settings import app_settings
class UnifiedAuthAdapter:
"""Unified authentication adapter that routes tokens to appropriate SSO adapter.
Routes based on JWT issuer claim (iss).
"""
def __init__(
self,
corporativo_adapter: CorporativoAuthAdapter,
sgf_adapter: SGFAuthAdapter,
ged_adapter: GEDAuthAdapter,
carrinho_adapter: CarrinhoAuthAdapter,
):
self.adapters = {
"corporativo": corporativo_adapter,
"sgf": sgf_adapter,
"ged": ged_adapter,
"carrinho": carrinho_adapter,
}
# Map issuer URLs to adapter names
self.issuer_map = {
app_settings.CORPORATIVO_API_URL: "corporativo",
app_settings.SGF_API_URL: "sgf",
app_settings.GED_API_URL: "ged",
app_settings.CARRINHO_API_URL: "carrinho",
"gefin-backend": "corporativo", # Self-issued tokens
}
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate token and route to appropriate adapter.
Args:
token: JWT token string
Returns:
User data dictionary with permissions
Raises:
JWTError: If token is invalid or from unknown issuer
"""
# Decode without verification to check issuer
try:
unverified = jwt.get_unverified_claims(token)
issuer = unverified.get("iss")
except JWTError as e:
raise JWTError(f"Invalid JWT format: {e}")
# Map issuer to adapter
adapter_name = self.issuer_map.get(issuer)
if not adapter_name:
raise JWTError(f"Unknown token issuer: {issuer}")
# Check if adapter is enabled
enabled_systems = app_settings.ENABLED_AUTH_SYSTEMS
if adapter_name not in enabled_systems:
raise JWTError(f"Authentication system '{adapter_name}' is disabled")
# Route to appropriate adapter
adapter = self.adapters[adapter_name]
return await adapter.validate_token(token)
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session cookie (for legacy systems).
Routes to Corporativo adapter (primary session provider).
"""
return await self.adapters["corporativo"].validate_session(session_id)
2. Base Auth Adapter Pattern
# src/infra/adapters/auth/base_adapter.py
from abc import ABC, abstractmethod
from typing import Dict, Any
class IAuthAdapter(ABC):
"""Abstract base class for authentication adapters.
All SSO adapters must implement this interface.
"""
@abstractmethod
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate JWT token and return user data.
Args:
token: JWT token string
Returns:
User data with permissions
Raises:
JWTError: If token is invalid
"""
pass
@abstractmethod
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session ID and return user data.
Args:
session_id: Session identifier
Returns:
User data with permissions
Raises:
SessionError: If session is invalid
"""
pass
@abstractmethod
def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
"""Extract and map permissions from user data.
Args:
user_data: User data from SSO system
Returns:
List of permission strings in app format
"""
pass
3. JWT RS256 Token Validation with Backwards Verification
# src/infra/adapters/auth/corporativo_adapter.py
import httpx
from datetime import datetime, timedelta
from jose import jwt, JWTError
from src.infra.adapters.auth.base_adapter import IAuthAdapter
from src.infra.cache.redis_session import RedisSessionManager
class CorporativoAuthAdapter(IAuthAdapter):
"""Corporativo SSO authentication adapter.
Implements JWT RS256 validation with backwards verification.
"""
def __init__(
self,
public_key: str,
private_key: str,
api_url: str,
session_manager: RedisSessionManager,
):
self.public_key = public_key
self.private_key = private_key
self.api_url = api_url
self.session_manager = session_manager
self._validation_cache: Dict[str, tuple[Dict, datetime]] = {}
self._cache_ttl = 30 # 30 seconds
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate JWT token with backwards verification.
Steps:
1. Verify JWT signature with RSA public key
2. Check issuer and audience claims
3. Perform backwards verification with SSO system
4. Map permissions to app format
"""
try:
# Verify signature and decode token
payload = jwt.decode(
token,
self.public_key,
algorithms=["RS256"],
options={"verify_iss": False, "verify_aud": False}, # Manual validation
)
# Manual issuer validation
accepted_issuers = ["gefin-backend", self.api_url]
if payload.get("iss") not in accepted_issuers:
raise JWTError(f"Invalid issuer: {payload.get('iss')}")
# Manual audience validation
accepted_audiences = ["gefin-api", "gefin"]
aud = payload.get("aud")
if isinstance(aud, list):
if not any(a in accepted_audiences for a in aud):
raise JWTError(f"Invalid audience: {aud}")
elif aud not in accepted_audiences:
raise JWTError(f"Invalid audience: {aud}")
# Check expiration
exp = payload.get("exp")
if exp and datetime.fromtimestamp(exp) < datetime.now():
raise JWTError("Token has expired")
# Backwards verification (if not self-issued)
if payload.get("iss") != "gefin-backend":
await self._verify_with_corporativo(token, payload)
return payload
except JWTError as e:
raise JWTError(f"Token validation failed: {e}")
async def _verify_with_corporativo(
self,
token: str,
payload: Dict[str, Any]
) -> None:
"""Verify token validity with Corporativo SSO system.
Implements backwards verification with caching.
"""
# Check cache first
cache_key = payload.get("sub")
if cache_key in self._validation_cache:
cached_data, cached_at = self._validation_cache[cache_key]
if datetime.now() - cached_at < timedelta(seconds=self._cache_ttl):
return # Valid in cache
# Call Corporativo /api/me endpoint
headers = {"Authorization": f"Bearer {token}"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{self.api_url}/api/me",
headers=headers,
)
response.raise_for_status()
# Cache validation result
self._validation_cache[cache_key] = (payload, datetime.now())
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise JWTError("Token is not valid in Corporativo system")
# Network error - extend cache if exists
if cache_key in self._validation_cache:
cached_data, cached_at = self._validation_cache[cache_key]
# Extend cache to 5 minutes on network failure
if datetime.now() - cached_at < timedelta(minutes=5):
return
raise JWTError("Unable to verify token with Corporativo")
except httpx.RequestError:
# Network error - graceful degradation
if cache_key in self._validation_cache:
return
raise JWTError("Network error verifying token")
def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
"""Map Corporativo permissions to app format.
Example mapping:
"Ver anuidade" -> "gefin.boleto.read"
"Editar anuidade" -> "gefin.boleto.write"
"""
corporativo_permissions = user_data.get("permissions", [])
permission_map = {
"Ver anuidade": "gefin.boleto.read",
"Editar anuidade": "gefin.boleto.write",
"Ver parcelamento": "gefin.parcela.read",
"Editar parcelamento": "gefin.parcela.write",
"Ver publicações": "gefin.publicacao.read",
"Editar publicações": "gefin.publicacao.write",
# ... more mappings
}
mapped_permissions = []
for corp_perm in corporativo_permissions:
if corp_perm == "*": # Admin wildcard
return ["*"]
app_perm = permission_map.get(corp_perm)
if app_perm:
mapped_permissions.append(app_perm)
# Ensure at least read permission
if not any(p.endswith(".read") for p in mapped_permissions):
mapped_permissions.append("gefin.user.read")
return mapped_permissions
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session from Redis.
Falls back to Laravel session decryption if Redis unavailable.
"""
# Try Redis first
session_data = await self.session_manager.get_session(session_id)
if session_data:
return session_data
# Fall back to Laravel session decryption
return await self._decrypt_laravel_session(session_id)
async def _decrypt_laravel_session(self, session_cookie: str) -> Dict[str, Any]:
"""Decrypt Laravel AES-256-CBC session cookie.
Laravel session format:
- base64(iv:encrypted_payload:mac)
- Encrypted with APP_KEY from .env
"""
# Implementation omitted for brevity
# See Laravel session decryption pattern below
pass
4. Laravel Session Decryption
# src/infra/adapters/auth/laravel_session.py
import base64
import json
import hashlib
import hmac
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import unpad
import phpserialize
class LaravelSessionDecryptor:
"""Decrypt Laravel AES-256-CBC encrypted sessions.
Handles Laravel's session encryption format.
"""
def __init__(self, app_key: str):
"""Initialize with Laravel APP_KEY.
Args:
app_key: Laravel APP_KEY from .env (base64: prefix)
"""
# Remove 'base64:' prefix if present
if app_key.startswith("base64:"):
app_key = app_key[7:]
self.key = base64.b64decode(app_key)
def decrypt(self, encrypted_value: str) -> str:
"""Decrypt Laravel encrypted value.
Format: base64(json({"iv": "...", "value": "...", "mac": "..."}))
"""
# Decode base64
decoded = base64.b64decode(encrypted_value)
payload = json.loads(decoded)
# Verify MAC signature
if not self._valid_mac(payload):
raise ValueError("Invalid MAC signature")
# Decrypt
iv = base64.b64decode(payload["iv"])
encrypted = base64.b64decode(payload["value"])
cipher = AES.new(self.key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(encrypted), AES.block_size)
return decrypted.decode("utf-8")
def _valid_mac(self, payload: dict) -> bool:
"""Verify MAC signature."""
mac = payload.get("mac")
if not mac:
return False
# Calculate expected MAC
message = base64.b64encode(
json.dumps({"iv": payload["iv"], "value": payload["value"]}).encode()
)
expected_mac = hmac.new(
self.key,
message,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(mac, expected_mac)
def decrypt_session(self, session_cookie: str) -> dict:
"""Decrypt Laravel session cookie and extract user data.
Args:
session_cookie: Laravel session cookie value
Returns:
Dictionary with user_id and other session data
"""
# Decrypt session
decrypted = self.decrypt(session_cookie)
# Unserialize PHP session data
session_data = phpserialize.loads(decrypted.encode())
# Extract user ID from various Laravel guard patterns
user_id = None
# Pattern 1: login_web_{guard}_*
for key in session_data:
if isinstance(key, bytes):
key_str = key.decode()
if key_str.startswith("login_web_"):
user_id = session_data[key]
break
# Pattern 2: Direct user_id key
if not user_id and b"user_id" in session_data:
user_id = session_data[b"user_id"]
if not user_id:
raise ValueError("No user_id found in session")
return {
"user_id": user_id.decode() if isinstance(user_id, bytes) else user_id,
"session_data": session_data,
}
5. Redis Session Management
# src/infra/cache/redis_session.py
import json
from datetime import timedelta
from redis.asyncio import Redis
class RedisSessionManager:
"""Manage user sessions in Redis.
Stores session data with TTL for automatic expiration.
"""
def __init__(self, redis_client: Redis, ttl_seconds: int = 28800):
"""Initialize session manager.
Args:
redis_client: Async Redis client
ttl_seconds: Session TTL (default 8 hours)
"""
self.redis = redis_client
self.ttl = ttl_seconds
async def create_session(self, user_data: dict) -> str:
"""Create new session and return session ID.
Args:
user_data: User data to store
Returns:
Session ID (UUID)
"""
import uuid
session_id = str(uuid.uuid4())
# Store in Redis
session_key = f"session:{session_id}"
await self.redis.setex(
session_key,
self.ttl,
json.dumps(user_data),
)
return session_id
async def get_session(self, session_id: str) -> dict | None:
"""Retrieve session data.
Args:
session_id: Session identifier
Returns:
User data dictionary or None if not found
"""
session_key = f"session:{session_id}"
data = await self.redis.get(session_key)
if not data:
return None
# Refresh TTL on access
await self.redis.expire(session_key, self.ttl)
return json.loads(data)
async def delete_session(self, session_id: str) -> bool:
"""Delete session.
Args:
session_id: Session identifier
Returns:
True if deleted, False if not found
"""
session_key = f"session:{session_id}"
result = await self.redis.delete(session_key)
return result > 0
async def update_session(self, session_id: str, user_data: dict) -> bool:
"""Update existing session data.
Args:
session_id: Session identifier
user_data: Updated user data
Returns:
True if updated, False if session not found
"""
session_key = f"session:{session_id}"
exists = await self.redis.exists(session_key)
if not exists:
return False
await self.redis.setex(
session_key,
self.ttl,
json.dumps(user_data),
)
return True
6. Permission Checking Middleware
# src/api/middlewares/auth.py
from fastapi import Request, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter
security = HTTPBearer()
class ProtectedResource:
"""FastAPI dependency for protected endpoints.
Usage:
@app.get("/protected", dependencies=[Depends(ProtectedResource.check)])
"""
def __init__(self, unified_adapter: UnifiedAuthAdapter):
self.unified_adapter = unified_adapter
async def check(
self,
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
"""Validate token and return user data.
Raises:
HTTPException: 401 if token invalid, 403 if insufficient permissions
"""
token = credentials.credentials
try:
user_data = await self.unified_adapter.validate_token(token)
return user_data
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
)
async def check_permissions(
self,
credentials: HTTPAuthorizationCredentials,
required_permissions: list[str],
) -> dict:
"""Validate token and check permissions.
Args:
credentials: Bearer token
required_permissions: List of required permissions
Returns:
User data if authorized
Raises:
HTTPException: 401 unauthorized, 403 forbidden
"""
user_data = await self.check(credentials)
user_permissions = user_data.get("permissions", [])
# Check for admin wildcard
if "*" in user_permissions:
return user_data
# Check required permissions
has_permission = any(
perm in user_permissions for perm in required_permissions
)
if not has_permission:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permissions: {required_permissions}",
)
return user_data
7. Multi-System Authentication Endpoints
# src/api/path/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter
from src.infra.cache.redis_session import RedisSessionManager
router = APIRouter(prefix="/v1/auth", tags=["auth"])
class LoginRequest(BaseModel):
username: str
password: str
class SSOLoginRequest(BaseModel):
corporativo_session: str # Cookie from Corporativo
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
@router.post("/login", response_model=TokenResponse)
async def login(
request: LoginRequest,
adapter: UnifiedAuthAdapter = Depends(),
):
"""Login with username/password (Corporativo).
Returns JWT access token.
"""
# Delegate to Corporativo adapter
result = await adapter.adapters["corporativo"].authenticate_credentials(
username=request.username,
password=request.password,
)
return TokenResponse(
access_token=result["access_token"],
expires_in=3600,
)
@router.post("/sso-login", response_model=TokenResponse)
async def sso_login(
request: SSOLoginRequest,
adapter: UnifiedAuthAdapter = Depends(),
session_manager: RedisSessionManager = Depends(),
):
"""SSO login using Corporativo session cookie.
Validates session, creates local session, returns JWT.
"""
# Validate Corporativo session
user_data = await adapter.validate_session(request.corporativo_session)
# Create local session
session_id = await session_manager.create_session(user_data)
# Generate JWT
token = adapter.adapters["corporativo"].generate_token(user_data)
return TokenResponse(
access_token=token,
expires_in=3600,
)
@router.get("/me")
async def get_current_user(
user_data: dict = Depends(ProtectedResource.check),
):
"""Get current authenticated user info."""
return {
"cpf": user_data.get("sub"),
"name": user_data.get("name"),
"email": user_data.get("email"),
"permissions": user_data.get("permissions"),
"systems": user_data.get("systems", []),
}
@router.post("/logout")
async def logout(
session_id: str,
session_manager: RedisSessionManager = Depends(),
):
"""Logout and invalidate session."""
await session_manager.delete_session(session_id)
return {"message": "Logged out successfully"}
Configuration
Environment Variables
# src/config/settings.py
from pydantic_settings import BaseSettings
class AppSettings(BaseSettings):
"""Multi-system authentication settings."""
# Feature flags
ENABLE_MULTI_SYSTEM_AUTH: bool = True
ENABLED_AUTH_SYSTEMS: list[str] = ["corporativo", "sgf", "ged", "carrinho"]
# JWT configuration
JWT_ALGORITHM: str = "RS256"
JWT_PUBLIC_KEY_PATH: str = "./keys/jwt_public.pem"
JWT_PRIVATE_KEY_PATH: str = "./keys/jwt_private.pem"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
REFRESH_TOKEN_EXPIRE_HOURS: int = 8
# SSO systems
CORPORATIVO_API_URL: str
CORPORATIVO_APP_KEY: str # Laravel APP_KEY for session decryption
SGF_API_URL: str
SGF_API_KEY: str
GED_API_URL: str
GED_API_KEY: str
CARRINHO_API_URL: str
CARRINHO_API_KEY: str
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
SESSION_TTL_SECONDS: int = 28800 # 8 hours
# Backwards verification
ENABLE_BACKWARDS_VERIFICATION: bool = True
VERIFICATION_CACHE_TTL: int = 30 # seconds
VERIFICATION_TIMEOUT: int = 5 # seconds
app_settings = AppSettings()
RSA Key Pair Management
# Generate RSA key pair for JWT signing
openssl genrsa -out keys/jwt_private.pem 4096
openssl rsa -in keys/jwt_private.pem -pubout -out keys/jwt_public.pem
# Set proper permissions
chmod 600 keys/jwt_private.pem
chmod 644 keys/jwt_public.pem
# Add to .gitignore
echo "keys/jwt_private.pem" >> .gitignore
Testing Strategy
Unit Tests (Token Validation)
# tests/infra/adapters/auth/test_corporativo_adapter.py
import pytest
from jose import jwt
from datetime import datetime, timedelta
@pytest.fixture
def valid_token(private_key):
"""Generate valid JWT token."""
payload = {
"sub": "12345678901",
"name": "Test User",
"email": "test@example.com",
"permissions": ["gefin.boleto.read"],
"iss": "gefin-backend",
"aud": "gefin-api",
"exp": datetime.utcnow() + timedelta(hours=1),
}
return jwt.encode(payload, private_key, algorithm="RS256")
@pytest.mark.asyncio
async def test_validate_token_success(corporativo_adapter, valid_token):
"""Test successful token validation."""
user_data = await corporativo_adapter.validate_token(valid_token)
assert user_data["sub"] == "12345678901"
assert "gefin.boleto.read" in user_data["permissions"]
@pytest.mark.asyncio
async def test_validate_token_invalid_signature(corporativo_adapter):
"""Test token with invalid signature."""
invalid_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.invalid.signature"
with pytest.raises(JWTError):
await corporativo_adapter.validate_token(invalid_token)
@pytest.mark.asyncio
async def test_validate_token_expired(corporativo_adapter, private_key):
"""Test expired token."""
payload = {
"sub": "12345678901",
"exp": datetime.utcnow() - timedelta(hours=1), # Expired
"iss": "gefin-backend",
"aud": "gefin-api",
}
expired_token = jwt.encode(payload, private_key, algorithm="RS256")
with pytest.raises(JWTError, match="expired"):
await corporativo_adapter.validate_token(expired_token)
Integration Tests (Backwards Verification)
# tests/integration/test_backwards_verification.py
@pytest.mark.asyncio
async def test_backwards_verification_valid_token(
corporativo_adapter,
mock_corporativo_api,
):
"""Test backwards verification with valid token."""
# Mock Corporativo /api/me endpoint
mock_corporativo_api.get("/api/me").returns(
status=200,
json={"cpf": "12345678901", "name": "Test User"},
)
token = generate_corporativo_token()
user_data = await corporativo_adapter.validate_token(token)
assert user_data["sub"] == "12345678901"
@pytest.mark.asyncio
async def test_backwards_verification_invalid_token(
corporativo_adapter,
mock_corporativo_api,
):
"""Test backwards verification with invalid token."""
mock_corporativo_api.get("/api/me").returns(status=401)
token = generate_corporativo_token()
with pytest.raises(JWTError, match="not valid in Corporativo"):
await corporativo_adapter.validate_token(token)
Best Practices
Security
- ✅ Always verify JWT signatures before trusting payload
- ✅ Implement backwards verification for external tokens
- ✅ Use RS256 (asymmetric) instead of HS256 for multi-service environments
- ✅ Rotate keys periodically
- ✅ Cache validation results with short TTL (30s)
- ✅ Implement graceful degradation on network failures
- ✅ Never log tokens or secrets
Performance
- ✅ Cache token validation results
- ✅ Use Redis for session storage
- ✅ Set reasonable timeouts for backwards verification
- ✅ Skip backwards verification for self-issued tokens
- ✅ Use connection pooling for HTTP clients
- ✅ Implement circuit breakers for external APIs
Permission Mapping
- ✅ Define clear permission mapping tables
- ✅ Support wildcard permissions for admins
- ✅ Provide default read permissions for authenticated users
- ✅ Map Portuguese permissions to English format
- ✅ Log permission mapping failures
Session Management
- ✅ Use UUIDs for session IDs
- ✅ Set appropriate TTLs (8 hours default)
- ✅ Refresh TTL on session access
- ✅ Implement session cleanup on logout
- ✅ Support both token and session authentication
Common Pitfalls
Not Verifying Issuer/Audience
- ❌ Accepting any JWT without checking claims
- ✅ Manually verify iss and aud claims
Using HS256 in Multi-Service Environments
- ❌ Symmetric keys shared across services
- ✅ Use RS256 with public/private key pairs
No Backwards Verification
- ❌ Trusting JWT without checking with issuer
- ✅ Implement backwards verification for security
Hardcoded Permission Mappings
- ❌ Magic strings in code
- ✅ Use configuration/database for mappings
Not Handling Network Failures
- ❌ Failing all requests when SSO is down
- ✅ Implement graceful degradation with cache
Token Leakage in Logs
- ❌ Logging full tokens in error messages
- ✅ Log only token metadata (sub, iss)
Architecture Decisions
Why Multi-Adapter Pattern?
- Separation of Concerns: Each SSO system has its own adapter
- Extensibility: Easy to add new SSO providers
- Testability: Mock individual adapters independently
- Maintainability: Changes to one SSO don't affect others
Why Backwards Verification?
- Security: Prevent token replay attacks
- Session Validation: Check if user is still active
- Revocation Support: Detect revoked tokens
- Trust Verification: Confirm token with authoritative system
Why RS256 Over HS256?
- Key Distribution: Public key can be shared safely
- Trust Boundary: Services verify without shared secret
- Rotation: Easier key rotation strategy
- Industry Standard: OAuth2/OIDC best practice
Production Deployment
Key Management
# Production key generation
openssl genrsa -out jwt_private.pem 4096
openssl rsa -in jwt_private.pem -pubout -out jwt_public.pem
# Secure storage (AWS Secrets Manager, HashiCorp Vault, etc.)
aws secretsmanager create-secret \
--name gefin/jwt-private-key \
--secret-string file://jwt_private.pem
Monitoring
# Log authentication events
import structlog
logger = structlog.get_logger()
async def validate_token(self, token: str):
logger.info(
"token_validation_started",
issuer=self._get_issuer(token),
)
try:
user_data = await self._validate(token)
logger.info(
"token_validation_success",
user_id=user_data["sub"],
issuer=user_data["iss"],
)
return user_data
except JWTError as e:
logger.warning(
"token_validation_failed",
error=str(e),
)
raise
References
- JWT Best Practices (RFC 8725)
- OAuth 2.0 Token Introspection
- FastAPI Security
- Python JOSE JWT
- Laravel Encryption
Production Examples
Based on patterns from:
- GEFIN Backend: Multi-system SSO with Corporativo, SGF, GED, CARRINHO
- Enterprise SSO: JWT RS256 with backwards verification
- Laravel Integration: Session decryption for legacy systems