execute_validator.py

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 2

Klassen 1

Code

"""Execute Validator - Validierung fuer DDL/DML Statements."""

import re

from config import Config


class ExecuteValidator:
    """Validiert DDL/DML Statements gegen Sicherheitsregeln."""

    # Erlaubte Statement-Typen
    ALLOWED_STATEMENTS: list[str] = ["ALTER", "CREATE", "DROP", "TRUNCATE", "SET"]

    # Erlaubte SET-Variablen (Whitelist)
    ALLOWED_SET_VARS: list[str] = ["FOREIGN_KEY_CHECKS"]

    # Verbotene Patterns (kritisch!)
    FORBIDDEN_PATTERNS: list[str] = [
        r"\bDROP\s+DATABASE\b",
        r"\bDROP\s+SCHEMA\b",
        r"\bCREATE\s+DATABASE\b",
        r"\bCREATE\s+SCHEMA\b",
        r"\bGRANT\b",
        r"\bREVOKE\b",
        r"\bSHUTDOWN\b",
    ]

    # Verbotene Datenbanken (System-DBs)
    FORBIDDEN_DATABASES: list[str] = [
        "information_schema",
        "mysql",
        "performance_schema",
        "sys",
    ]

    @classmethod
    def validate_statement(
        cls, statement: str, database: str
    ) -> tuple[bool, str]:
        """
        Validiert ein DDL/DML Statement.

        Args:
            statement: SQL Statement
            database: Zieldatenbank

        Returns:
            Tuple (is_valid, error_message)
        """
        if not statement or len(statement.strip()) < 5:
            return False, "Statement must not be empty"

        if len(statement) > Config.MAX_QUERY_LENGTH:
            return False, f"Statement exceeds max length ({Config.MAX_QUERY_LENGTH})"

        statement_upper = statement.strip().upper()

        # Pruefe ob Statement mit erlaubtem Keyword beginnt
        statement_type = cls._get_statement_type(statement_upper)
        if statement_type is None:
            return (
                False,
                f"Statement type not allowed. Allowed: {', '.join(cls.ALLOWED_STATEMENTS)}",
            )

        # Bei SET: nur erlaubte Variablen
        if statement_type == "SET":
            allowed = False
            for var in cls.ALLOWED_SET_VARS:
                if var in statement_upper:
                    allowed = True
                    break
            if not allowed:
                return (
                    False,
                    f"SET variable not allowed. Allowed: {', '.join(cls.ALLOWED_SET_VARS)}",
                )

        # Pruefe verbotene Patterns
        for pattern in cls.FORBIDDEN_PATTERNS:
            if re.search(pattern, statement_upper):
                return False, f"Forbidden pattern detected: {pattern}"

        # Pruefe Datenbank
        if database not in Config.ALLOWED_DATABASES:
            return (
                False,
                f"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}",
            )

        # Pruefe ob Statement gegen System-DB zielt (als Wort, nicht Substring)
        for forbidden_db in cls.FORBIDDEN_DATABASES:
            pattern = rf"\b{forbidden_db}\b"
            if re.search(pattern, statement_upper, re.IGNORECASE):
                return False, f"Operations on '{forbidden_db}' are forbidden"

        return True, ""

    @classmethod
    def _get_statement_type(cls, statement_upper: str) -> str | None:
        """Extrahiert den Statement-Typ (ALTER, CREATE, etc.)."""
        for stmt_type in cls.ALLOWED_STATEMENTS:
            if statement_upper.startswith(stmt_type):
                return stmt_type
        return None
← Übersicht Graph