Claude Code Plugins

Community-maintained marketplace

Feedback

Implement comprehensive audit logging for all admin actions, capturing user ID, action type, entity changes, IP address, and user agent. Use when tracking system activities or adding audit trails.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name audit-logging
description Implement comprehensive audit logging for all admin actions, capturing user ID, action type, entity changes, IP address, and user agent. Use when tracking system activities or adding audit trails.
allowed-tools Read, Write, Edit, Bash, Glob

You implement audit logging for all administrative actions in the QA Team Portal.

Requirements from PROJECT_PLAN.md

  • Log all create, update, delete operations
  • Capture user ID, timestamp, IP, user agent
  • Display audit trail in admin panel
  • Export logs to CSV
  • Retention policy: 1 year minimum

Implementation

1. Audit Log Model

Location: backend/app/models/audit_log.py

from sqlalchemy import Column, String, JSON, ForeignKey, Text
from sqlalchemy.dialects.postgresql import UUID
import uuid
from app.db.base_class import Base

class AuditLog(Base):
    __tablename__ = "audit_logs"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
    action = Column(String(50), nullable=False)  # create, update, delete, login
    entity_type = Column(String(50), nullable=False)  # team_member, tool, etc.
    entity_id = Column(UUID(as_uuid=True), nullable=True)
    old_values = Column(JSON, nullable=True)  # Before update/delete
    new_values = Column(JSON, nullable=True)  # After create/update
    ip_address = Column(String(45), nullable=True)  # IPv6 support
    user_agent = Column(Text, nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)

2. Audit Service

Location: backend/app/services/audit_service.py

from fastapi import Request
from sqlalchemy.orm import Session
from app.models.audit_log import AuditLog
from app.models.user import User

class AuditService:
    """Service for logging admin activities."""

    @staticmethod
    async def log_action(
        db: Session,
        user: User,
        action: str,
        entity_type: str,
        entity_id: str = None,
        old_values: dict = None,
        new_values: dict = None,
        request: Request = None
    ):
        """
        Log an audit event.

        Args:
            db: Database session
            user: Current user
            action: Action type (create, update, delete, login)
            entity_type: Type of entity (team_member, tool, etc.)
            entity_id: ID of entity
            old_values: Values before change
            new_values: Values after change
            request: FastAPI request object
        """
        ip_address = None
        user_agent = None

        if request:
            # Get real IP (behind proxy)
            ip_address = request.headers.get(
                "X-Forwarded-For",
                request.client.host
            ).split(',')[0].strip()

            user_agent = request.headers.get("User-Agent")

        audit_log = AuditLog(
            user_id=user.id,
            action=action,
            entity_type=entity_type,
            entity_id=entity_id,
            old_values=old_values,
            new_values=new_values,
            ip_address=ip_address,
            user_agent=user_agent
        )

        db.add(audit_log)
        db.commit()

        return audit_log

3. Audit Middleware

Location: backend/app/middleware/audit_middleware.py

from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request
from app.services.audit_service import AuditService

class AuditMiddleware(BaseHTTPMiddleware):
    """Middleware to automatically log admin actions."""

    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)

        # Only log successful admin operations
        if (
            response.status_code < 400 and
            request.url.path.startswith("/api/v1/admin/") and
            request.method in ["POST", "PUT", "PATCH", "DELETE"]
        ):
            # Audit logging handled in endpoints
            # This middleware can be used for additional logging
            pass

        return response

4. Usage in Endpoints

Location: backend/app/api/v1/endpoints/team_members.py

from app.services.audit_service import AuditService

@router.post("/admin/team-members")
async def create_team_member(
    data: TeamMemberCreate,
    request: Request,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_lead_or_admin)
):
    """Create team member with audit logging."""
    # Create team member
    team_member = await crud.team_member.create(db, obj_in=data)

    # Log action
    await AuditService.log_action(
        db=db,
        user=current_user,
        action="create",
        entity_type="team_member",
        entity_id=str(team_member.id),
        new_values=data.dict(),
        request=request
    )

    return team_member

@router.put("/admin/team-members/{id}")
async def update_team_member(
    id: UUID,
    data: TeamMemberUpdate,
    request: Request,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_lead_or_admin)
):
    """Update team member with audit logging."""
    # Get old values
    old_member = await crud.team_member.get(db, id=id)
    old_values = TeamMemberResponse.from_orm(old_member).dict()

    # Update team member
    updated_member = await crud.team_member.update(
        db,
        db_obj=old_member,
        obj_in=data
    )

    # Log action
    await AuditService.log_action(
        db=db,
        user=current_user,
        action="update",
        entity_type="team_member",
        entity_id=str(id),
        old_values=old_values,
        new_values=data.dict(exclude_unset=True),
        request=request
    )

    return updated_member

@router.delete("/admin/team-members/{id}")
async def delete_team_member(
    id: UUID,
    request: Request,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_admin)
):
    """Delete team member with audit logging."""
    # Get old values before deletion
    team_member = await crud.team_member.get(db, id=id)
    old_values = TeamMemberResponse.from_orm(team_member).dict()

    # Delete
    await crud.team_member.remove(db, id=id)

    # Log action
    await AuditService.log_action(
        db=db,
        user=current_user,
        action="delete",
        entity_type="team_member",
        entity_id=str(id),
        old_values=old_values,
        request=request
    )

    return {"message": "Deleted successfully"}

5. Audit Log API

Location: backend/app/api/v1/endpoints/audit_logs.py

@router.get("/admin/audit-logs")
async def get_audit_logs(
    skip: int = 0,
    limit: int = 100,
    action: str = None,
    entity_type: str = None,
    user_id: UUID = None,
    start_date: datetime = None,
    end_date: datetime = None,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_lead_or_admin)
):
    """Get audit logs with filters."""
    query = db.query(AuditLog)

    if action:
        query = query.filter(AuditLog.action == action)
    if entity_type:
        query = query.filter(AuditLog.entity_type == entity_type)
    if user_id:
        query = query.filter(AuditLog.user_id == user_id)
    if start_date:
        query = query.filter(AuditLog.created_at >= start_date)
    if end_date:
        query = query.filter(AuditLog.created_at <= end_date)

    logs = query.order_by(AuditLog.created_at.desc()).offset(skip).limit(limit).all()

    return logs

@router.get("/admin/audit-logs/export")
async def export_audit_logs_csv(
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_admin)
):
    """Export audit logs to CSV."""
    import csv
    from io import StringIO

    logs = db.query(AuditLog).order_by(AuditLog.created_at.desc()).all()

    output = StringIO()
    writer = csv.writer(output)

    # Header
    writer.writerow([
        'Timestamp', 'User', 'Action', 'Entity Type',
        'Entity ID', 'IP Address', 'User Agent'
    ])

    # Data
    for log in logs:
        writer.writerow([
            log.created_at.isoformat(),
            log.user_id,
            log.action,
            log.entity_type,
            log.entity_id or '',
            log.ip_address or '',
            log.user_agent or ''
        ])

    return Response(
        content=output.getvalue(),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=audit_logs.csv"}
    )

6. Frontend Audit Log Viewer

Location: frontend/src/components/admin/audit/AuditLogs.tsx

export const AuditLogs = () => {
  const [logs, setLogs] = useState([])
  const [filters, setFilters] = useState({
    action: '',
    entity_type: '',
    start_date: '',
    end_date: ''
  })

  useEffect(() => {
    fetchLogs()
  }, [filters])

  const fetchLogs = async () => {
    const response = await api.get('/admin/audit-logs', { params: filters })
    setLogs(response.data)
  }

  const exportCSV = () => {
    window.open('/api/v1/admin/audit-logs/export', '_blank')
  }

  return (
    <div>
      <div className="flex justify-between mb-4">
        <h1>Audit Logs</h1>
        <Button onClick={exportCSV}>Export CSV</Button>
      </div>

      {/* Filters */}
      <div className="grid grid-cols-4 gap-4 mb-4">
        <Select value={filters.action} onValueChange={...}>
          <option value="">All Actions</option>
          <option value="create">Create</option>
          <option value="update">Update</option>
          <option value="delete">Delete</option>
        </Select>

        {/* More filters */}
      </div>

      {/* Table */}
      <Table>
        <TableHeader>
          <TableRow>
            <TableHead>Timestamp</TableHead>
            <TableHead>User</TableHead>
            <TableHead>Action</TableHead>
            <TableHead>Entity</TableHead>
            <TableHead>IP Address</TableHead>
          </TableRow>
        </TableHeader>
        <TableBody>
          {logs.map(log => (
            <TableRow key={log.id}>
              <TableCell>{formatDate(log.created_at)}</TableCell>
              <TableCell>{log.user_email}</TableCell>
              <TableCell>
                <Badge variant={getActionVariant(log.action)}>
                  {log.action}
                </Badge>
              </TableCell>
              <TableCell>{log.entity_type}</TableCell>
              <TableCell>{log.ip_address}</TableCell>
            </TableRow>
          ))}
        </TableBody>
      </Table>
    </div>
  )
}

Cleanup Policy

Location: backend/scripts/cleanup_audit_logs.py

# Run this as a cron job to enforce retention policy
from datetime import datetime, timedelta
from app.db.session import SessionLocal
from app.models.audit_log import AuditLog

def cleanup_old_logs(days: int = 365):
    """Delete audit logs older than specified days."""
    db = SessionLocal()
    cutoff_date = datetime.utcnow() - timedelta(days=days)

    deleted = db.query(AuditLog).filter(
        AuditLog.created_at < cutoff_date
    ).delete()

    db.commit()
    print(f"Deleted {deleted} old audit logs")

if __name__ == "__main__":
    cleanup_old_logs(365)  # 1 year retention

Testing

def test_audit_log_created_on_create(client, admin_token, db):
    response = client.post(
        "/api/v1/admin/team-members",
        json={"name": "Test", "role": "QA Engineer"},
        headers={"Authorization": f"Bearer {admin_token}"}
    )

    assert response.status_code == 201

    # Check audit log created
    audit_log = db.query(AuditLog).filter(
        AuditLog.action == "create",
        AuditLog.entity_type == "team_member"
    ).first()

    assert audit_log is not None
    assert audit_log.new_values["name"] == "Test"

Report

✅ Audit logging implemented ✅ All CRUD operations logged ✅ IP address and user agent captured ✅ Audit log viewer created ✅ CSV export functional ✅ Retention policy defined (1 year) ✅ Tests passing