Claude Code Plugins

Community-maintained marketplace

Feedback

authentication

@Lobbi-Docs/claude
0
0

Authentication and authorization including JWT, OAuth2, OIDC, sessions, RBAC, and security analysis. Activate for login, auth flows, security audits, threat modeling, access control, and identity management.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name authentication
description Authentication and authorization including JWT, OAuth2, OIDC, sessions, RBAC, and security analysis. Activate for login, auth flows, security audits, threat modeling, access control, and identity management.
allowed-tools Bash, Read, Write, Edit, Glob, Grep, Task, WebFetch, WebSearch
dependencies extended-thinking, deep-analysis, complex-reasoning
triggers authentication, authorization, auth flow, security audit, threat model, JWT, OAuth, OIDC, session, RBAC, access control

Authentication Skill

Provides comprehensive authentication and authorization capabilities for the Golden Armada AI Agent Fleet Platform.

When to Use This Skill

Activate this skill when working with:

  • User authentication flows
  • JWT token management
  • OAuth2 integration
  • Session management
  • Role-based access control (RBAC)

JWT Authentication

Token Generation

```python from jose import jwt from datetime import datetime, timedelta from passlib.context import CryptContext

SECRET_KEY = os.environ["JWT_SECRET"] ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_DAYS = 7

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_access_token(user_id: str, roles: list[str]) -> str: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) payload = { "sub": user_id, "roles": roles, "exp": expire, "iat": datetime.utcnow(), "type": "access" } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def create_refresh_token(user_id: str) -> str: expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) payload = { "sub": user_id, "exp": expire, "type": "refresh" } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(token: str) -> dict: try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.JWTError: raise HTTPException(status_code=401, detail="Invalid token") ```

Password Hashing

```python def hash_password(password: str) -> str: return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password)

Usage

async def authenticate_user(email: str, password: str) -> User | None: user = await get_user_by_email(email) if not user: return None if not verify_password(password, user.hashed_password): return None return user ```

FastAPI Auth Dependencies

```python from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User: payload = verify_token(token) user = await get_user(payload["sub"]) if not user: raise HTTPException(status_code=401, detail="User not found") return user

async def get_current_active_user(user: User = Depends(get_current_user)) -> User: if not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return user

Role-based access

def require_roles(*roles: str): async def role_checker(user: User = Depends(get_current_user)): if not any(role in user.roles for role in roles): raise HTTPException(status_code=403, detail="Insufficient permissions") return user return role_checker

Usage

@app.get("/admin") async def admin_route(user: User = Depends(require_roles("admin"))): return {"message": "Admin access granted"} ```

OAuth2 Integration

Google OAuth2

```python from authlib.integrations.starlette_client import OAuth

oauth = OAuth() oauth.register( name='google', client_id=os.environ['GOOGLE_CLIENT_ID'], client_secret=os.environ['GOOGLE_CLIENT_SECRET'], server_metadata_url='https://accounts.google.com/.well-known/openid-configuration', client_kwargs={'scope': 'openid email profile'} )

@app.get('/auth/google') async def google_login(request: Request): redirect_uri = request.url_for('google_callback') return await oauth.google.authorize_redirect(request, redirect_uri)

@app.get('/auth/google/callback') async def google_callback(request: Request): token = await oauth.google.authorize_access_token(request) user_info = token.get('userinfo')

# Find or create user
user = await get_or_create_user(
    email=user_info['email'],
    name=user_info['name'],
    provider='google'
)

# Generate JWT
access_token = create_access_token(user.id, user.roles)
return {"access_token": access_token, "token_type": "bearer"}

```

GitHub OAuth2

```python oauth.register( name='github', client_id=os.environ['GITHUB_CLIENT_ID'], client_secret=os.environ['GITHUB_CLIENT_SECRET'], authorize_url='https://github.com/login/oauth/authorize', access_token_url='https://github.com/login/oauth/access_token', api_base_url='https://api.github.com/', client_kwargs={'scope': 'user:email'} ) ```

Session Management

```python from fastapi import Request, Response import secrets

async def create_session(user_id: str, response: Response) -> str: session_id = secrets.token_urlsafe(32)

# Store in Redis
await redis.hset(f"session:{session_id}", mapping={
    "user_id": user_id,
    "created_at": datetime.utcnow().isoformat()
})
await redis.expire(f"session:{session_id}", 86400)  # 24 hours

# Set cookie
response.set_cookie(
    key="session_id",
    value=session_id,
    httponly=True,
    secure=True,
    samesite="lax",
    max_age=86400
)

return session_id

async def get_session(request: Request) -> dict | None: session_id = request.cookies.get("session_id") if not session_id: return None

session = await redis.hgetall(f"session:{session_id}")
if not session:
    return None

# Refresh TTL
await redis.expire(f"session:{session_id}", 86400)
return session

async def destroy_session(request: Request, response: Response): session_id = request.cookies.get("session_id") if session_id: await redis.delete(f"session:{session_id}") response.delete_cookie("session_id") ```

RBAC Implementation

```python from enum import Enum from typing import Set

class Permission(str, Enum): READ_AGENTS = "read:agents" WRITE_AGENTS = "write:agents" DELETE_AGENTS = "delete:agents" ADMIN = "admin"

ROLE_PERMISSIONS: dict[str, Set[Permission]] = { "viewer": {Permission.READ_AGENTS}, "operator": {Permission.READ_AGENTS, Permission.WRITE_AGENTS}, "admin": {Permission.READ_AGENTS, Permission.WRITE_AGENTS, Permission.DELETE_AGENTS, Permission.ADMIN}, }

def has_permission(user_roles: list[str], required: Permission) -> bool: for role in user_roles: if role in ROLE_PERMISSIONS and required in ROLE_PERMISSIONS[role]: return True return False

def require_permission(permission: Permission): async def permission_checker(user: User = Depends(get_current_user)): if not has_permission(user.roles, permission): raise HTTPException(status_code=403, detail="Permission denied") return user return permission_checker

Usage

@app.delete("/agents/{id}") async def delete_agent( id: str, user: User = Depends(require_permission(Permission.DELETE_AGENTS)) ): await agent_service.delete(id) return {"status": "deleted"} ```

Security Best Practices

  1. Use HTTPS always in production
  2. Hash passwords with bcrypt or argon2
  3. Short-lived access tokens (15-30 minutes)
  4. Refresh token rotation on each use
  5. HttpOnly, Secure cookies for tokens
  6. Rate limit authentication endpoints
  7. Log authentication events for auditing
  8. Implement account lockout after failed attempts

OAuth 2.0 & OIDC Best Practices

OAuth 2.0 Grant Types

Authorization Code Flow (Recommended for Web Apps)

from authlib.integrations.starlette_client import OAuth

oauth = OAuth()
oauth.register(
    name='custom_provider',
    client_id=os.environ['OAUTH_CLIENT_ID'],
    client_secret=os.environ['OAUTH_CLIENT_SECRET'],
    authorize_url='https://provider.com/oauth/authorize',
    authorize_params=None,
    access_token_url='https://provider.com/oauth/token',
    access_token_params=None,
    refresh_token_url=None,
    client_kwargs={'scope': 'openid profile email'}
)

@app.get('/auth/login')
async def login(request: Request):
    # Generate state for CSRF protection
    state = secrets.token_urlsafe(32)
    await redis.set(f"oauth_state:{state}", "1", ex=600)

    redirect_uri = request.url_for('auth_callback')
    return await oauth.custom_provider.authorize_redirect(
        request,
        redirect_uri,
        state=state
    )

@app.get('/auth/callback')
async def auth_callback(request: Request):
    # Verify state (CSRF protection)
    state = request.query_params.get('state')
    if not await redis.get(f"oauth_state:{state}"):
        raise HTTPException(status_code=400, detail="Invalid state")

    await redis.delete(f"oauth_state:{state}")

    # Exchange authorization code for tokens
    token = await oauth.custom_provider.authorize_access_token(request)
    user_info = token.get('userinfo')

    # Create or update user
    user = await upsert_user(user_info)

    # Issue application tokens
    access_token = create_access_token(user.id, user.roles)
    refresh_token = create_refresh_token(user.id)

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    }

PKCE (Proof Key for Code Exchange) for SPAs

# Frontend (JavaScript/TypeScript)
import { generateCodeVerifier, generateCodeChallenge } from 'oauth-pkce'

// Generate PKCE parameters
const codeVerifier = generateCodeVerifier()
const codeChallenge = await generateCodeChallenge(codeVerifier)

// Store verifier for later use
sessionStorage.setItem('code_verifier', codeVerifier)

// Redirect to authorization endpoint
const authUrl = new URL('https://provider.com/oauth/authorize')
authUrl.searchParams.set('client_id', CLIENT_ID)
authUrl.searchParams.set('redirect_uri', REDIRECT_URI)
authUrl.searchParams.set('response_type', 'code')
authUrl.searchParams.set('scope', 'openid profile email')
authUrl.searchParams.set('code_challenge', codeChallenge)
authUrl.searchParams.set('code_challenge_method', 'S256')
authUrl.searchParams.set('state', generateState())

window.location.href = authUrl.toString()

// In callback handler
const code = new URLSearchParams(window.location.search).get('code')
const codeVerifier = sessionStorage.getItem('code_verifier')

const tokenResponse = await fetch('https://provider.com/oauth/token', {
  method: 'POST',
  headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
  body: new URLSearchParams({
    grant_type: 'authorization_code',
    code: code,
    redirect_uri: REDIRECT_URI,
    client_id: CLIENT_ID,
    code_verifier: codeVerifier
  })
})

Client Credentials Flow (Service-to-Service)

import httpx
from datetime import datetime, timedelta

class ServiceAuthClient:
    def __init__(self):
        self.token = None
        self.expires_at = None

    async def get_token(self) -> str:
        # Return cached token if still valid
        if self.token and self.expires_at > datetime.utcnow():
            return self.token

        # Request new token
        async with httpx.AsyncClient() as client:
            response = await client.post(
                'https://provider.com/oauth/token',
                data={
                    'grant_type': 'client_credentials',
                    'client_id': os.environ['SERVICE_CLIENT_ID'],
                    'client_secret': os.environ['SERVICE_CLIENT_SECRET'],
                    'scope': 'api.read api.write'
                }
            )
            response.raise_for_status()
            data = response.json()

            self.token = data['access_token']
            self.expires_at = datetime.utcnow() + timedelta(seconds=data['expires_in'] - 60)

            return self.token

# Usage
auth_client = ServiceAuthClient()

async def call_protected_api():
    token = await auth_client.get_token()
    async with httpx.AsyncClient() as client:
        response = await client.get(
            'https://api.service.com/resource',
            headers={'Authorization': f'Bearer {token}'}
        )
        return response.json()

OpenID Connect (OIDC)

ID Token Validation

from jose import jwt, jwk
from jose.utils import base64url_decode
import httpx

class OIDCValidator:
    def __init__(self, issuer: str, client_id: str):
        self.issuer = issuer
        self.client_id = client_id
        self.jwks = None
        self.jwks_updated_at = None

    async def get_jwks(self) -> dict:
        # Refresh JWKS if stale (cache for 24 hours)
        if not self.jwks or (datetime.utcnow() - self.jwks_updated_at).seconds > 86400:
            async with httpx.AsyncClient() as client:
                response = await client.get(f"{self.issuer}/.well-known/jwks.json")
                response.raise_for_status()
                self.jwks = response.json()
                self.jwks_updated_at = datetime.utcnow()

        return self.jwks

    async def validate_id_token(self, id_token: str) -> dict:
        # Decode header to get key ID
        header = jwt.get_unverified_header(id_token)
        kid = header['kid']

        # Get JWKS and find matching key
        jwks = await self.get_jwks()
        key = next((k for k in jwks['keys'] if k['kid'] == kid), None)

        if not key:
            raise ValueError("Public key not found in JWKS")

        # Convert JWK to PEM
        public_key = jwk.construct(key)

        # Validate and decode ID token
        try:
            claims = jwt.decode(
                id_token,
                public_key.to_pem().decode('utf-8'),
                algorithms=['RS256'],
                audience=self.client_id,
                issuer=self.issuer,
                options={
                    'verify_exp': True,
                    'verify_iat': True,
                    'verify_aud': True,
                    'verify_iss': True
                }
            )

            # Additional validations
            if claims.get('nonce'):
                # Verify nonce matches what was sent in auth request
                pass

            return claims

        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="ID token expired")
        except jwt.JWTClaimsError as e:
            raise HTTPException(status_code=401, detail=f"Invalid ID token claims: {e}")
        except Exception as e:
            raise HTTPException(status_code=401, detail=f"ID token validation failed: {e}")

# Usage
validator = OIDCValidator(
    issuer="https://provider.com",
    client_id=os.environ['OIDC_CLIENT_ID']
)

@app.post("/auth/oidc/callback")
async def oidc_callback(id_token: str):
    claims = await validator.validate_id_token(id_token)

    # Extract user information
    user = await get_or_create_user(
        email=claims['email'],
        name=claims['name'],
        sub=claims['sub']
    )

    return {"user": user, "claims": claims}

JWT Security Best Practices

Secure Token Generation

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import os

# Use RS256 (asymmetric) instead of HS256 for public verification
PRIVATE_KEY_PATH = os.environ.get('JWT_PRIVATE_KEY_PATH')
PUBLIC_KEY_PATH = os.environ.get('JWT_PUBLIC_KEY_PATH')

def load_keys():
    with open(PRIVATE_KEY_PATH, 'rb') as f:
        private_key = serialization.load_pem_private_key(
            f.read(),
            password=None,
            backend=default_backend()
        )

    with open(PUBLIC_KEY_PATH, 'rb') as f:
        public_key = serialization.load_pem_public_key(
            f.read(),
            backend=default_backend()
        )

    return private_key, public_key

PRIVATE_KEY, PUBLIC_KEY = load_keys()

def create_secure_access_token(
    user_id: str,
    roles: list[str],
    tenant_id: str = None,
    custom_claims: dict = None
) -> str:
    now = datetime.utcnow()

    payload = {
        # Standard claims (RFC 7519)
        "iss": "https://api.yourdomain.com",  # Issuer
        "sub": user_id,                        # Subject (user ID)
        "aud": ["https://api.yourdomain.com"], # Audience
        "exp": now + timedelta(minutes=15),    # Expiration (short-lived)
        "nbf": now,                             # Not before
        "iat": now,                             # Issued at
        "jti": secrets.token_urlsafe(16),      # JWT ID (unique token identifier)

        # Custom claims
        "roles": roles,
        "type": "access",
    }

    # Add tenant context for multi-tenant applications
    if tenant_id:
        payload["tenant_id"] = tenant_id

    # Add any custom claims
    if custom_claims:
        payload.update(custom_claims)

    return jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")

def verify_secure_token(token: str) -> dict:
    try:
        payload = jwt.decode(
            token,
            PUBLIC_KEY,
            algorithms=["RS256"],
            audience=["https://api.yourdomain.com"],
            issuer="https://api.yourdomain.com",
            options={
                'verify_exp': True,
                'verify_nbf': True,
                'verify_iat': True,
                'verify_aud': True,
                'verify_iss': True,
                'require_exp': True,
                'require_iat': True,
                'require_nbf': True
            }
        )

        # Validate token type
        if payload.get('type') != 'access':
            raise HTTPException(status_code=401, detail="Invalid token type")

        return payload

    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError as e:
        raise HTTPException(status_code=401, detail=f"Invalid token: {e}")

Token Revocation with Blacklist

class TokenBlacklist:
    """Redis-based token blacklist for revoked tokens"""

    def __init__(self, redis_client):
        self.redis = redis_client

    async def revoke_token(self, jti: str, exp: int):
        """Add token to blacklist until expiration"""
        ttl = exp - int(datetime.utcnow().timestamp())
        if ttl > 0:
            await self.redis.set(f"blacklist:{jti}", "1", ex=ttl)

    async def is_revoked(self, jti: str) -> bool:
        """Check if token is blacklisted"""
        return await self.redis.exists(f"blacklist:{jti}")

# Global blacklist instance
token_blacklist = TokenBlacklist(redis)

async def get_current_user_with_revocation_check(
    token: str = Depends(oauth2_scheme)
) -> User:
    payload = verify_secure_token(token)

    # Check if token has been revoked
    if await token_blacklist.is_revoked(payload['jti']):
        raise HTTPException(status_code=401, detail="Token has been revoked")

    user = await get_user(payload["sub"])
    if not user:
        raise HTTPException(status_code=401, detail="User not found")

    return user

@app.post("/auth/logout")
async def logout(token: str = Depends(oauth2_scheme)):
    payload = verify_secure_token(token)
    await token_blacklist.revoke_token(payload['jti'], payload['exp'])
    return {"message": "Logged out successfully"}

Security Analysis with Extended Thinking

When reviewing authentication flows, use extended thinking for comprehensive security analysis.

Authentication Flow Security Review Template

## Authentication Flow Security Review

**Flow**: [Login/OAuth/SSO/API Authentication]
**Date**: [YYYY-MM-DD]
**Reviewer**: [Name/Agent]

### Flow Diagram
[Document the authentication flow step-by-step]

### Security Analysis Checklist

#### Confidentiality
- [ ] Credentials transmitted over HTTPS only
- [ ] Passwords hashed with strong algorithm (bcrypt/argon2)
- [ ] Tokens encrypted in transit
- [ ] Sensitive data not logged
- [ ] Secrets stored securely (env vars, secrets manager)

#### Integrity
- [ ] CSRF protection implemented
- [ ] Request tampering prevented
- [ ] Token signature validation
- [ ] State parameter validated (OAuth)
- [ ] Nonce validated (OIDC)

#### Availability
- [ ] Rate limiting on auth endpoints
- [ ] Account lockout after failed attempts
- [ ] DDoS protection in place
- [ ] Graceful degradation strategy
- [ ] Session timeout configured

#### Authentication
- [ ] MFA available for sensitive accounts
- [ ] Password complexity requirements
- [ ] Credential stuffing protection
- [ ] Brute force mitigation
- [ ] Session fixation prevention

#### Authorization
- [ ] Principle of least privilege
- [ ] Role-based access control
- [ ] Permission checks on every request
- [ ] Token scope validation
- [ ] Tenant isolation (multi-tenant apps)

#### Session Management
- [ ] Secure session tokens
- [ ] HttpOnly, Secure, SameSite cookies
- [ ] Session timeout implemented
- [ ] Logout functionality
- [ ] Concurrent session handling

### Extended Thinking Analysis

Use Claude with extended thinking for deep security review:

```python
import anthropic

client = anthropic.Anthropic()

security_review_prompt = """
Perform a comprehensive security analysis of this authentication flow:

[Paste authentication code/flow description]

Analyze for:
1. OWASP Top 10 vulnerabilities
2. Authentication bypass possibilities
3. Token security weaknesses
4. Session management issues
5. Input validation gaps
6. Race conditions
7. Logic flaws

Provide specific findings with:
- Severity (Critical/High/Medium/Low)
- Location (file:line)
- Attack vector
- Remediation steps
"""

response = client.messages.create(
    model="claude-opus-4-5-20250514",
    max_tokens=32000,
    thinking={
        "type": "enabled",
        "budget_tokens": 20000  # High budget for security analysis
    },
    messages=[{
        "role": "user",
        "content": security_review_prompt
    }]
)

# Extract thinking and analysis
for block in response.content:
    if block.type == "thinking":
        print(f"Deep Analysis:\n{block.thinking}\n")
    elif block.type == "text":
        print(f"Findings:\n{block.text}")

Threat Modeling for Authentication

STRIDE Threat Model for Auth Systems

Adapted from [[deep-analysis]] skill threat modeling template:

## Authentication Threat Model

### System: [Auth System Name]
**Version**: 1.0
**Last Updated**: [Date]

### Trust Boundaries

┌─────────────────────────────────────────┐ │ External (Untrusted) │ │ [End Users] [Credential Stuffers] │ │ [MITM Attackers] │ └──────────────────┬──────────────────────┘ │ TLS/HTTPS ┌──────────────────┴──────────────────────┐ │ Public API (Semi-trusted) │ │ [API Gateway] [Auth Endpoints] │ │ [OAuth Providers] [OIDC IdP] │ └──────────────────┬──────────────────────┘ │ Internal Auth ┌──────────────────┴──────────────────────┐ │ Application Layer (Trusted) │ │ [Business Logic] [User Management] │ └──────────────────┬──────────────────────┘ │ DB Protocol ┌──────────────────┴──────────────────────┐ │ Data Layer (Highly Trusted) │ │ [User DB] [Session Store] [Secrets] │ └─────────────────────────────────────────┘


### STRIDE Analysis

#### Spoofing Identity
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Credential theft via phishing | High | Critical | MFA, user education | ✅ |
| Session hijacking | Medium | High | Secure cookies, HTTPS | ✅ |
| Token replay attacks | Medium | High | Short token lifetime, JTI tracking | ✅ |
| OAuth state manipulation | Low | Medium | Cryptographic state validation | ✅ |
| Impersonation via stolen refresh token | Medium | Critical | Refresh token rotation, device binding | ⚠️ |

#### Tampering with Data
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| JWT payload manipulation | Low | Critical | Signature verification, RS256 | ✅ |
| Cookie tampering | Low | High | Signed cookies, HMAC validation | ✅ |
| OAuth callback manipulation | Medium | High | Redirect URI validation | ✅ |
| Password reset token tampering | Low | High | Cryptographic tokens, time limits | ✅ |

#### Repudiation
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Deny unauthorized access | Medium | Medium | Comprehensive audit logging | ✅ |
| Dispute authentication events | Low | Low | Immutable audit trail, timestamps | ✅ |

#### Information Disclosure
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Credentials in logs | Medium | Critical | Sanitize logs, secret detection | ✅ |
| User enumeration via login | High | Medium | Generic error messages | ⚠️ |
| Token leakage in URLs | Low | High | Tokens in headers only | ✅ |
| Timing attacks on password check | Medium | Medium | Constant-time comparison | ✅ |
| JWKS endpoint information leak | Low | Low | Rate limiting, monitoring | ✅ |

#### Denial of Service
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Brute force attacks | High | High | Rate limiting, CAPTCHA, account lockout | ✅ |
| Resource exhaustion (bcrypt) | Medium | Medium | Request throttling, async processing | ✅ |
| Session store exhaustion | Low | High | Session limits per user, TTL | ✅ |
| OAuth callback flooding | Medium | Medium | State validation, rate limiting | ⚠️ |

#### Elevation of Privilege
| Threat | Likelihood | Impact | Mitigation | Status |
|--------|------------|--------|------------|--------|
| Role manipulation in JWT | Low | Critical | Server-side role verification | ✅ |
| Privilege escalation via API | Medium | Critical | Permission checks on every request | ✅ |
| Admin impersonation | Low | Critical | Additional auth for admin actions | ⚠️ |
| OAuth scope escalation | Low | High | Strict scope validation | ✅ |

### Risk Matrix
| Threat ID | Threat | Likelihood | Impact | Risk Score | Priority |
|-----------|--------|------------|--------|------------|----------|
| T1 | Credential stuffing attack | High | Critical | 9 | P0 |
| T2 | Refresh token theft | Medium | Critical | 8 | P1 |
| T3 | User enumeration | High | Medium | 6 | P2 |
| T4 | OAuth callback flooding | Medium | Medium | 4 | P2 |
| T5 | Admin impersonation | Low | Critical | 7 | P1 |

### Attack Scenarios

#### Scenario 1: Credential Stuffing Attack
**Attacker Goal**: Gain unauthorized access using leaked credentials

**Attack Steps**:
1. Obtain credential database from breach
2. Automate login attempts across accounts
3. Bypass rate limiting with distributed IPs
4. Identify valid credentials
5. Access user accounts

**Defenses**:
- Rate limiting per IP and per account
- CAPTCHA after N failed attempts
- Anomaly detection (impossible travel, new device)
- Breach database monitoring (HaveIBeenPwned)
- Mandatory MFA for sensitive accounts

#### Scenario 2: Token Theft via XSS
**Attacker Goal**: Steal access token to impersonate user

**Attack Steps**:
1. Inject malicious script via vulnerable input
2. Script reads token from localStorage
3. Exfiltrate token to attacker server
4. Use token to access API as victim

**Defenses**:
- Store tokens in HttpOnly cookies (not accessible to JS)
- Content Security Policy (CSP)
- Input sanitization and validation
- Regular security audits
- Short token lifetimes

### Recommendations by Priority

#### P0 (Critical - Immediate Action)
1. Implement credential stuffing protection
2. Add device fingerprinting for anomaly detection
3. Enable MFA for all admin accounts

#### P1 (High - Within Sprint)
1. Implement refresh token rotation
2. Add additional auth step for admin impersonation
3. Strengthen OAuth callback validation

#### P2 (Medium - Next Quarter)
1. Improve user enumeration protection
2. Implement risk-based authentication
3. Add behavioral biometrics

Common Vulnerabilities & Mitigations

OWASP Top 10 for Authentication

A01: Broken Access Control

# VULNERABLE: Client-side role check only
@app.get("/admin/users")
async def get_users(user: User = Depends(get_current_user)):
    # No server-side permission check!
    return await db.users.find_all()

# SECURE: Server-side permission enforcement
@app.get("/admin/users")
async def get_users(user: User = Depends(require_permission(Permission.ADMIN))):
    # Permission verified on server
    return await db.users.find_all()

A02: Cryptographic Failures

# VULNERABLE: Weak hashing
hashed = hashlib.md5(password.encode()).hexdigest()

# SECURE: Strong adaptive hashing
from passlib.context import CryptContext
pwd_context = CryptContext(
    schemes=["bcrypt"],
    deprecated="auto",
    bcrypt__rounds=12  # Adjust based on security/performance needs
)
hashed = pwd_context.hash(password)

A03: Injection

# VULNERABLE: SQL injection in auth query
query = f"SELECT * FROM users WHERE email = '{email}' AND password = '{password}'"

# SECURE: Parameterized queries
query = "SELECT * FROM users WHERE email = $1"
user = await db.fetch_one(query, email)
if user and pwd_context.verify(password, user.hashed_password):
    return user

A07: Identification and Authentication Failures

# VULNERABLE: Weak session management
session_id = hashlib.md5(str(time.time()).encode()).hexdigest()

# SECURE: Cryptographically secure session tokens
import secrets
session_id = secrets.token_urlsafe(32)

# VULNERABLE: No rate limiting
@app.post("/auth/login")
async def login(credentials: LoginRequest):
    return await authenticate(credentials)

# SECURE: Rate limiting with slowdown
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/auth/login")
@limiter.limit("5/minute")  # 5 attempts per minute
async def login(request: Request, credentials: LoginRequest):
    return await authenticate(credentials)

Integration with Keycloak

For enterprise-grade auth, integrate with Keycloak (see [[keycloak]] skill):

from keycloak import KeycloakOpenID

# Configure Keycloak client
keycloak_openid = KeycloakOpenID(
    server_url="http://localhost:8080/",
    client_id="your-client",
    realm_name="your-realm",
    client_secret_key="your-secret"
)

# Get token
token = keycloak_openid.token(username, password)

# Validate token
token_info = keycloak_openid.introspect(token['access_token'])

# Get user info
user_info = keycloak_openid.userinfo(token['access_token'])

# Decode and verify token locally
KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + \
    keycloak_openid.public_key() + \
    "\n-----END PUBLIC KEY-----"

options = {"verify_signature": True, "verify_aud": True, "verify_exp": True}
token_info = keycloak_openid.decode_token(
    token['access_token'],
    key=KEYCLOAK_PUBLIC_KEY,
    options=options
)

Related Skills & Resources

Skills

  • [[keycloak]] - Enterprise identity and access management
  • [[deep-analysis]] - Security audit templates and threat modeling
  • [[extended-thinking]] - Enable deep reasoning for security analysis
  • [[complex-reasoning]] - Hypothesis-driven debugging for auth issues

Keycloak Agents

  • keycloak-realm-admin - Realm and client management
  • keycloak-security-auditor - Security review and compliance
  • keycloak-auth-flow-designer - Custom authentication flows
  • keycloak-identity-specialist - Federation and SSO setup

External Resources

Troubleshooting

Common Issues

Token Verification Failures

# Debug JWT token
python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False}))"

# Verify token signature
openssl dgst -sha256 -verify public_key.pem -signature signature.bin token_payload.txt

# Check token expiration
date -d @$(python -c "import jwt; print(jwt.decode('YOUR_TOKEN', options={'verify_signature': False})['exp'])")

OAuth Flow Issues

# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)

# Log OAuth flow steps
logger.debug(f"Redirect URI: {redirect_uri}")
logger.debug(f"State: {state}")
logger.debug(f"Code: {code}")
logger.debug(f"Token response: {token_response}")

Session Issues

# Check Redis session data
redis-cli
> KEYS session:*
> HGETALL session:abc123
> TTL session:abc123

Last Updated: 2025-12-12 Version: 2.0.0 Enhanced with security analysis, threat modeling, and OIDC/OAuth 2.0 best practices