Claude Code Plugins

Community-maintained marketplace

Feedback

sql-injection-prevention

@dengineproblem/agents-monorepo
0
0

Эксперт по защите от SQL injection. Используй для parameterized queries, input validation и database security.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name sql-injection-prevention
description Эксперт по защите от SQL injection. Используй для parameterized queries, input validation и database security.

SQL Injection Prevention Expert

Эксперт по идентификации, предотвращению и митигации SQL injection уязвимостей во всех языках программирования и СУБД.

Core Principles

Primary Defense Mechanisms

defense_layers:
  - name: "Parameterized Queries"
    priority: 1
    description: "The gold standard for preventing SQL injection"

  - name: "Input Validation"
    priority: 2
    description: "Whitelist validation with strict data type enforcement"

  - name: "Stored Procedures"
    priority: 3
    description: "When implemented correctly with parameterized inputs"

  - name: "Escaping"
    priority: 4
    description: "Last resort, database-specific escaping functions"

  - name: "Least Privilege"
    priority: 5
    description: "Database users with minimal required permissions"

Defense in Depth Strategy

  • Никогда не полагайся на один метод защиты
  • Комбинируй несколько слоёв: input validation, parameterized queries, WAF, мониторинг
  • Внедряй и превентивные, и детектирующие контроли
  • Регулярное тестирование безопасности и code review

Parameterized Queries Implementation

Java (JDBC)

// VULNERABLE - String concatenation
String query = "SELECT * FROM users WHERE username='" + username + "' AND password='" + password + "'";
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(query);

// SECURE - Prepared statements
String query = "SELECT * FROM users WHERE username=? AND password=?";
PreparedStatement pstmt = connection.prepareStatement(query);
pstmt.setString(1, username);
pstmt.setString(2, password);
ResultSet rs = pstmt.executeQuery();

Python (SQLAlchemy)

# VULNERABLE - String formatting
query = f"SELECT * FROM users WHERE email = '{email}' AND status = '{status}'"
result = db.execute(query)

# SECURE - Parameterized query
from sqlalchemy import text
query = text("SELECT * FROM users WHERE email = :email AND status = :status")
result = db.execute(query, {"email": email, "status": status})

# SECURE - ORM approach (preferred)
result = db.session.query(User).filter(
    User.email == email,
    User.status == status
).all()

PHP (PDO)

// VULNERABLE - Direct concatenation
$query = "SELECT * FROM products WHERE category = '$category' AND price < $maxPrice";
$result = $pdo->query($query);

// SECURE - Prepared statements
$query = "SELECT * FROM products WHERE category = :category AND price < :maxPrice";
$stmt = $pdo->prepare($query);
$stmt->bindParam(':category', $category, PDO::PARAM_STR);
$stmt->bindParam(':maxPrice', $maxPrice, PDO::PARAM_INT);
$stmt->execute();

Node.js (MySQL2)

// VULNERABLE - Template literals
const query = `SELECT * FROM orders WHERE user_id = ${userId} AND status = '${status}'`;
connection.query(query, (error, results) => { /* ... */ });

// SECURE - Parameterized queries
const query = 'SELECT * FROM orders WHERE user_id = ? AND status = ?';
connection.execute(query, [userId, status], (error, results) => {
    // Handle results
});

Go (database/sql)

// VULNERABLE
query := fmt.Sprintf("SELECT * FROM users WHERE id = %s", userID)
rows, err := db.Query(query)

// SECURE - Parameterized query
query := "SELECT * FROM users WHERE id = $1"
rows, err := db.Query(query, userID)

Input Validation and Sanitization

Robust Input Validation

import re
from typing import Optional, List

def validate_user_input(user_id: str, email: str, role: str) -> dict:
    """Validate user input with strict rules"""
    errors: List[str] = []

    # Validate user ID (numeric only)
    if not user_id.isdigit() or int(user_id) <= 0:
        errors.append("Invalid user ID format")

    # Validate email format
    email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    if not re.match(email_pattern, email):
        errors.append("Invalid email format")

    # Validate role against whitelist
    allowed_roles = ['user', 'admin', 'moderator']
    if role not in allowed_roles:
        errors.append("Invalid role specified")

    return {'valid': len(errors) == 0, 'errors': errors}


def sanitize_identifier(identifier: str) -> Optional[str]:
    """Sanitize SQL identifiers (table/column names)"""
    # Only allow alphanumeric and underscore
    if re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', identifier):
        return identifier
    return None

TypeScript Input Validation

interface ValidationResult {
  valid: boolean;
  errors: string[];
  sanitized?: Record<string, unknown>;
}

function validateSearchParams(params: {
  query?: string;
  limit?: number;
  sortBy?: string;
}): ValidationResult {
  const errors: string[] = [];
  const sanitized: Record<string, unknown> = {};

  // Validate query (alphanumeric, spaces, basic punctuation)
  if (params.query) {
    const cleanQuery = params.query.replace(/[^a-zA-Z0-9\s\-_.]/g, '');
    if (cleanQuery.length > 100) {
      errors.push('Query too long');
    } else {
      sanitized.query = cleanQuery;
    }
  }

  // Validate limit (positive integer, max 100)
  if (params.limit !== undefined) {
    const limit = Number(params.limit);
    if (!Number.isInteger(limit) || limit < 1 || limit > 100) {
      errors.push('Invalid limit');
    } else {
      sanitized.limit = limit;
    }
  }

  // Validate sortBy against whitelist
  const allowedSortFields = ['name', 'created_at', 'updated_at', 'price'];
  if (params.sortBy && !allowedSortFields.includes(params.sortBy)) {
    errors.push('Invalid sort field');
  } else if (params.sortBy) {
    sanitized.sortBy = params.sortBy;
  }

  return {
    valid: errors.length === 0,
    errors,
    sanitized: errors.length === 0 ? sanitized : undefined
  };
}

Advanced Prevention Techniques

Stored Procedures with Parameters

-- SQL Server stored procedure
CREATE PROCEDURE GetUserOrders
    @UserID INT,
    @Status NVARCHAR(20),
    @StartDate DATE
AS
BEGIN
    SET NOCOUNT ON;

    SELECT OrderID, OrderDate, TotalAmount
    FROM Orders
    WHERE UserID = @UserID
        AND Status = @Status
        AND OrderDate >= @StartDate
    ORDER BY OrderDate DESC;
END
GO

Dynamic Query Building (Secure Approach)

public class SecureQueryBuilder {
    private static final Set<String> ALLOWED_SORT_COLUMNS =
        Set.of("name", "email", "created_date", "status");

    private static final Set<String> ALLOWED_SORT_ORDERS =
        Set.of("ASC", "DESC");

    public PreparedStatement buildUserQuery(
            Connection conn,
            String sortColumn,
            String sortOrder,
            String statusFilter) throws SQLException {

        // Validate sort column against whitelist
        if (!ALLOWED_SORT_COLUMNS.contains(sortColumn)) {
            throw new IllegalArgumentException("Invalid sort column: " + sortColumn);
        }

        // Validate sort order
        String normalizedOrder = sortOrder.toUpperCase();
        if (!ALLOWED_SORT_ORDERS.contains(normalizedOrder)) {
            throw new IllegalArgumentException("Invalid sort order: " + sortOrder);
        }

        // Build query with validated column names and parameterized values
        String query = "SELECT user_id, name, email FROM users " +
                      "WHERE status = ? " +
                      "ORDER BY " + sortColumn + " " + normalizedOrder;

        PreparedStatement stmt = conn.prepareStatement(query);
        stmt.setString(1, statusFilter);
        return stmt;
    }
}

Query Builder Pattern (Node.js)

import { Pool } from 'pg';

interface QueryOptions {
  table: string;
  filters: Record<string, unknown>;
  sortBy?: string;
  sortOrder?: 'ASC' | 'DESC';
  limit?: number;
  offset?: number;
}

class SecureQueryBuilder {
  private static readonly ALLOWED_TABLES = new Set([
    'users', 'orders', 'products', 'categories'
  ]);

  private static readonly ALLOWED_COLUMNS: Record<string, Set<string>> = {
    users: new Set(['id', 'name', 'email', 'status', 'created_at']),
    orders: new Set(['id', 'user_id', 'total', 'status', 'created_at']),
    products: new Set(['id', 'name', 'price', 'category_id', 'created_at']),
    categories: new Set(['id', 'name', 'parent_id'])
  };

  buildSelectQuery(options: QueryOptions): { text: string; values: unknown[] } {
    // Validate table
    if (!SecureQueryBuilder.ALLOWED_TABLES.has(options.table)) {
      throw new Error(`Invalid table: ${options.table}`);
    }

    const allowedColumns = SecureQueryBuilder.ALLOWED_COLUMNS[options.table];
    const values: unknown[] = [];
    const conditions: string[] = [];
    let paramIndex = 1;

    // Build WHERE clause with parameterized values
    for (const [column, value] of Object.entries(options.filters)) {
      if (!allowedColumns.has(column)) {
        throw new Error(`Invalid column: ${column}`);
      }
      conditions.push(`${column} = $${paramIndex}`);
      values.push(value);
      paramIndex++;
    }

    let query = `SELECT * FROM ${options.table}`;

    if (conditions.length > 0) {
      query += ` WHERE ${conditions.join(' AND ')}`;
    }

    // Validate and add ORDER BY
    if (options.sortBy) {
      if (!allowedColumns.has(options.sortBy)) {
        throw new Error(`Invalid sort column: ${options.sortBy}`);
      }
      const order = options.sortOrder === 'DESC' ? 'DESC' : 'ASC';
      query += ` ORDER BY ${options.sortBy} ${order}`;
    }

    // Add LIMIT and OFFSET with parameterized values
    if (options.limit !== undefined) {
      query += ` LIMIT $${paramIndex}`;
      values.push(Math.min(Math.max(1, options.limit), 100));
      paramIndex++;
    }

    if (options.offset !== undefined) {
      query += ` OFFSET $${paramIndex}`;
      values.push(Math.max(0, options.offset));
    }

    return { text: query, values };
  }
}

Database Security Configuration

MySQL Security Settings

-- Create limited privilege user for web application
CREATE USER 'webapp'@'localhost' IDENTIFIED BY 'strong_random_password_here';

-- Grant minimal required permissions
GRANT SELECT, INSERT, UPDATE ON myapp.users TO 'webapp'@'localhost';
GRANT SELECT, INSERT, UPDATE, DELETE ON myapp.orders TO 'webapp'@'localhost';
GRANT SELECT ON myapp.products TO 'webapp'@'localhost';

-- Explicitly deny dangerous permissions
-- (These are denied by default, but good to be explicit)
REVOKE FILE, PROCESS, SUPER ON *.* FROM 'webapp'@'localhost';

-- Disable dangerous global settings
SET GLOBAL log_bin_trust_function_creators = 0;
SET GLOBAL local_infile = 0;

-- Flush privileges
FLUSH PRIVILEGES;

PostgreSQL Row Level Security

-- Enable RLS on sensitive table
ALTER TABLE user_data ENABLE ROW LEVEL SECURITY;

-- Create policy to restrict access by current user
CREATE POLICY user_data_isolation_policy ON user_data
    FOR ALL
    TO webapp_user
    USING (user_id = current_setting('app.current_user_id')::int);

-- Create policy for admin access
CREATE POLICY user_data_admin_policy ON user_data
    FOR ALL
    TO admin_user
    USING (true);

-- Force RLS even for table owners
ALTER TABLE user_data FORCE ROW LEVEL SECURITY;

Detection and Monitoring

SQL Injection Detection Patterns

import re
import logging
from typing import List, Tuple
from datetime import datetime

class SQLInjectionDetector:
    """Detect potential SQL injection patterns in user input"""

    SUSPICIOUS_PATTERNS: List[Tuple[str, str]] = [
        (r"('|(\-\-)|(;)|(\||\|)|(\*|\*))", "SQL metacharacters"),
        (r"((union\s*(all\s*)?select))", "UNION attack"),
        (r"((select\s+.*\s+from)|(insert\s+into)|(update\s+.*\s+set)|(delete\s+from))", "SQL keywords"),
        (r"(exec(ute)?\s*(sp_|xp_))", "Stored procedure execution"),
        (r"(waitfor\s+delay|benchmark\s*\(|sleep\s*\()", "Time-based attack"),
        (r"(0x[0-9a-fA-F]+)", "Hex encoding"),
        (r"(char\s*\(|concat\s*\(|substr\s*\()", "String functions"),
        (r"(information_schema|sys\.)", "Schema enumeration"),
        (r"(or\s+1\s*=\s*1|and\s+1\s*=\s*1|or\s+'[^']*'\s*=\s*'[^']*')", "Boolean injection"),
    ]

    def __init__(self, logger: logging.Logger = None):
        self.logger = logger or logging.getLogger(__name__)
        self.compiled_patterns = [
            (re.compile(pattern, re.IGNORECASE), name)
            for pattern, name in self.SUSPICIOUS_PATTERNS
        ]

    def detect(self, user_input: str, context: dict = None) -> dict:
        """Detect potential SQL injection in user input"""
        detections = []

        for pattern, attack_type in self.compiled_patterns:
            match = pattern.search(user_input)
            if match:
                detections.append({
                    'type': attack_type,
                    'matched': match.group()[:50],  # Truncate for logging
                    'position': match.start()
                })

        if detections:
            self.logger.warning(
                f"Potential SQL injection detected: {detections}",
                extra={
                    'input_preview': user_input[:100],
                    'context': context,
                    'timestamp': datetime.utcnow().isoformat()
                }
            )

        return {
            'is_suspicious': len(detections) > 0,
            'detections': detections,
            'risk_score': min(len(detections) * 25, 100)
        }

Monitoring Query Patterns

-- PostgreSQL: Enable query logging for analysis
ALTER SYSTEM SET log_statement = 'all';
ALTER SYSTEM SET log_min_duration_statement = 0;

-- Create table for storing suspicious queries
CREATE TABLE security_audit_log (
    id SERIAL PRIMARY KEY,
    timestamp TIMESTAMPTZ DEFAULT NOW(),
    query_text TEXT,
    user_name VARCHAR(100),
    client_ip INET,
    risk_indicators JSONB,
    blocked BOOLEAN DEFAULT FALSE
);

-- Function to log suspicious queries
CREATE OR REPLACE FUNCTION log_suspicious_query(
    p_query TEXT,
    p_user VARCHAR,
    p_ip INET,
    p_indicators JSONB
) RETURNS VOID AS $$
BEGIN
    INSERT INTO security_audit_log (query_text, user_name, client_ip, risk_indicators)
    VALUES (p_query, p_user, p_ip, p_indicators);
END;
$$ LANGUAGE plpgsql;

Web Application Firewall Rules

ModSecurity Rules for SQL Injection

# ModSecurity Core Rule Set - SQL Injection Detection

# Detect SQL injection in request parameters
SecRule ARGS "@detectSQLi" \
    "id:942100,\
    phase:2,\
    block,\
    msg:'SQL Injection Attack Detected',\
    logdata:'Matched Data: %{MATCHED_VAR} found within %{MATCHED_VAR_NAME}',\
    t:none,t:urlDecodeUni,t:htmlEntityDecode,\
    ctl:auditLogParts=+E,\
    ver:'OWASP_CRS/3.3.0',\
    severity:'CRITICAL',\
    setvar:'tx.sql_injection_score=+%{tx.critical_anomaly_score}',\
    setvar:'tx.anomaly_score_pl1=+%{tx.critical_anomaly_score}'"

# Block common SQL injection patterns
SecRule ARGS|ARGS_NAMES|REQUEST_COOKIES|REQUEST_COOKIES_NAMES \
    "(?i:(\b(select|union|insert|update|delete|drop|alter|create|truncate)\b))" \
    "id:942110,\
    phase:2,\
    block,\
    msg:'SQL Keyword Detected in User Input',\
    severity:'WARNING'"

# Block SQL comment sequences
SecRule ARGS "--" \
    "id:942120,\
    phase:2,\
    block,\
    msg:'SQL Comment Sequence Detected',\
    severity:'WARNING'"

Testing and Validation

Automated Security Testing Script

#!/bin/bash
# SQLMap testing script for your own applications (authorized testing only)

TARGET_URL="http://localhost:8080"
COOKIE="JSESSIONID=your_session_cookie"
OUTPUT_DIR="./sqlmap_results"

mkdir -p "$OUTPUT_DIR"

# Test login form
echo "Testing login form..."
sqlmap -u "$TARGET_URL/login" \
    --data="username=admin&password=pass" \
    --cookie="$COOKIE" \
    --level=3 \
    --risk=2 \
    --batch \
    --output-dir="$OUTPUT_DIR/login" \
    --forms

# Test search endpoint
echo "Testing search endpoint..."
sqlmap -u "$TARGET_URL/api/search?q=test&category=1" \
    --cookie="$COOKIE" \
    --level=3 \
    --risk=2 \
    --batch \
    --output-dir="$OUTPUT_DIR/search"

# Test with different techniques
echo "Testing with all techniques..."
sqlmap -u "$TARGET_URL/api/users?id=1" \
    --cookie="$COOKIE" \
    --technique=BEUSTQ \
    --level=5 \
    --risk=3 \
    --batch \
    --output-dir="$OUTPUT_DIR/users"

echo "Testing complete. Results in $OUTPUT_DIR"

Unit Tests for Input Validation

import pytest
from your_app.security import validate_user_input, SQLInjectionDetector

class TestSQLInjectionPrevention:

    @pytest.fixture
    def detector(self):
        return SQLInjectionDetector()

    def test_clean_input_passes(self, detector):
        result = detector.detect("John Doe")
        assert not result['is_suspicious']

    def test_union_attack_detected(self, detector):
        result = detector.detect("' UNION SELECT * FROM users--")
        assert result['is_suspicious']
        assert any(d['type'] == 'UNION attack' for d in result['detections'])

    def test_comment_attack_detected(self, detector):
        result = detector.detect("admin'--")
        assert result['is_suspicious']

    def test_boolean_injection_detected(self, detector):
        result = detector.detect("' OR '1'='1")
        assert result['is_suspicious']

    def test_valid_email_passes_validation(self):
        result = validate_user_input("123", "user@example.com", "user")
        assert result['valid']

    def test_sql_in_email_fails_validation(self):
        result = validate_user_input("123", "'; DROP TABLE users;--", "user")
        assert not result['valid']

Emergency Response Procedures

Incident Response Checklist

sql_injection_incident_response:
  immediate_actions:
    - action: "Block malicious IP addresses"
      command: "iptables -A INPUT -s <attacker_ip> -j DROP"
      priority: 1

    - action: "Disable affected endpoints"
      description: "Temporarily disable vulnerable API endpoints"
      priority: 2

    - action: "Enable enhanced logging"
      description: "Capture all queries for forensic analysis"
      priority: 3

  short_term:
    - action: "Patch vulnerable code"
      description: "Replace vulnerable queries with parameterized versions"

    - action: "Deploy fixes to production"
      description: "Emergency release with security patches"

    - action: "Reset compromised credentials"
      description: "Rotate database passwords, API keys"

  investigation:
    - action: "Analyze access logs"
      description: "Identify attack timeline and scope"

    - action: "Check for data exfiltration"
      description: "Review what data was accessed/modified"

    - action: "Assess lateral movement"
      description: "Check if attacker accessed other systems"

  long_term:
    - action: "Implement WAF rules"
      description: "Deploy ModSecurity or cloud WAF"

    - action: "Security code review"
      description: "Review all database interactions"

    - action: "Penetration testing"
      description: "Hire external security firm"

    - action: "Developer training"
      description: "Secure coding practices workshop"

  compliance:
    - action: "Data breach notification"
      condition: "If PII was exposed"
      deadline: "72 hours (GDPR)"

    - action: "Regulatory reporting"
      condition: "If required by industry regulations"

Лучшие практики

  1. Всегда используй parameterized queries — это единственный надёжный способ
  2. Валидируй input на стороне сервера — client-side validation недостаточно
  3. Применяй принцип минимальных привилегий для database users
  4. Используй ORM где возможно — они автоматически параметризуют запросы
  5. Регулярно тестируй с SQLMap и другими инструментами (только в dev/staging!)
  6. Мониторь логи на подозрительные паттерны
  7. Внедри WAF как дополнительный слой защиты
  8. Обучай разработчиков secure coding practices