query_validator.py

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 3

Klassen 1

Code

"""Query Validator - SRP: Separate Validierungslogik"""

import re
from typing import Tuple

from config import Config


class QueryValidator:
    """Validiert SQL Queries - SRP: Nur Validierung"""

    @staticmethod
    def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:
        """
        Validiert eine Query gegen alle Sicherheitsregeln.

        Returns:
            (is_valid, error_message)
        """
        # Basis-Validierung
        if not query or len(query) < 1:
            return False, "Query must not be empty"

        if len(query) > Config.MAX_QUERY_LENGTH:
            return False, f"Query must be max {Config.MAX_QUERY_LENGTH} chars"

        # Nur SELECT erlaubt
        query_upper = query.strip().upper()
        if not query_upper.startswith("SELECT"):
            return False, "Only SELECT queries allowed"

        # Dangerous Keyword Blocklist
        for keyword in Config.BLOCKED_KEYWORDS:
            # Pruefe Keyword mit Word Boundaries
            pattern = r"\b" + re.escape(keyword) + r"\b"
            if re.search(pattern, query_upper):
                return False, f"Blocked keyword detected: {keyword}"

        # Database Allowlist
        if database not in Config.ALLOWED_DATABASES:
            return (
                False,
                f"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}",
            )

        # Max Rows pruefen
        if max_rows < 1 or max_rows > Config.MAX_ROWS:
            return False, f"max_rows must be 1-{Config.MAX_ROWS}"

        # Table Allowlist (information_schema Schutz)
        from_tables = QueryValidator._extract_table_names(query_upper)
        for table in from_tables:
            # Erlaube information_schema.TABLES fuer Schema-Tool
            if "INFORMATION_SCHEMA" in table:
                continue
            # Pruefe gegen Allowlist
            table_clean = table.split(".")[-1]
            if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:
                return (
                    False,
                    f"Table '{table}' not allowed. Allowed: {', '.join(Config.ALLOWED_TABLES)}",
                )

        return True, ""

    @staticmethod
    def _extract_table_names(query_upper: str) -> list:
        """Extrahiert Tabellennamen aus FROM und JOIN Clauses"""
        tables = []

        # FROM table, JOIN table
        from_match = re.findall(r"\bFROM\s+([a-zA-Z0-9_\.]+)", query_upper)
        join_match = re.findall(r"\bJOIN\s+([a-zA-Z0-9_\.]+)", query_upper)

        tables.extend(from_match)
        tables.extend(join_match)

        return tables
← Übersicht Graph