MCP-DB Validators

SQL-Validierung nach dem Single Responsibility Principle.

QueryValidator

Validiert SELECT-Queries für das db_select Tool.

"""SRP: Separate Validierungslogik"""
from typing import Tuple
import re
from config import Config

class QueryValidator:
    """Validiert SQL Queries - SRP: Nur Validierung"""

    @staticmethod
    def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:
        """
        Validiert eine Query gegen alle Sicherheitsregeln.

        Returns:
            (is_valid, error_message)
        """
        # Basis-Validierung
        if not query or len(query) < 1:
            return False, "Query must not be empty"

        if len(query) > Config.MAX_QUERY_LENGTH:
            return False, f"Query must be max {Config.MAX_QUERY_LENGTH} chars"

        # Nur SELECT erlaubt
        query_upper = query.strip().upper()
        if not query_upper.startswith("SELECT"):
            return False, "Only SELECT queries allowed"

        # Dangerous Keyword Blocklist
        for keyword in Config.BLOCKED_KEYWORDS:
            pattern = r'\b' + re.escape(keyword) + r'\b'
            if re.search(pattern, query_upper):
                return False, f"Blocked keyword detected: {keyword}"

        # Database Allowlist
        if database not in Config.ALLOWED_DATABASES:
            return False, f"Database '{database}' not allowed"

        # Max Rows prüfen
        if max_rows < 1 or max_rows > Config.MAX_ROWS:
            return False, f"max_rows must be 1-{Config.MAX_ROWS}"

        # Table Allowlist
        from_tables = QueryValidator._extract_table_names(query_upper)
        for table in from_tables:
            if "INFORMATION_SCHEMA" in table:
                continue
            table_clean = table.split('.')[-1]
            if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:
                return False, f"Table '{table}' not allowed"

        return True, ""

Validierungsschritte (QueryValidator)

#PrüfungFehler bei
1Query nicht leerLeere Query
2Query-Länge> 2000 Zeichen
3SELECT-OnlyNicht mit SELECT beginnend
4Keyword-BlocklistBlockiertes Keyword gefunden
5Database-AllowlistDatenbank nicht erlaubt
6Max-Rows< 1 oder > 100
7Table-AllowlistTabelle nicht erlaubt

ExecuteValidator

Validiert DDL-Statements für das db_execute Tool.

"""Execute Validator - Validierung für DDL/DML Statements."""
import re
from config import Config

class ExecuteValidator:
    """Validiert DDL/DML Statements gegen Sicherheitsregeln."""

    # Erlaubte Statement-Typen
    ALLOWED_STATEMENTS: list[str] = ["ALTER", "CREATE", "DROP", "TRUNCATE", "SET"]

    # Erlaubte SET-Variablen (Whitelist)
    ALLOWED_SET_VARS: list[str] = ["FOREIGN_KEY_CHECKS"]

    # Verbotene Patterns (kritisch!)
    FORBIDDEN_PATTERNS: list[str] = [
        r"\bDROP\s+DATABASE\b",
        r"\bDROP\s+SCHEMA\b",
        r"\bCREATE\s+DATABASE\b",
        r"\bCREATE\s+SCHEMA\b",
        r"\bGRANT\b",
        r"\bREVOKE\b",
        r"\bSHUTDOWN\b",
    ]

    # Verbotene Datenbanken (System-DBs)
    FORBIDDEN_DATABASES: list[str] = [
        "information_schema",
        "mysql",
        "performance_schema",
        "sys",
    ]

    @classmethod
    def validate_statement(cls, statement: str, database: str) -> tuple[bool, str]:
        """
        Validiert ein DDL/DML Statement.

        Returns:
            Tuple (is_valid, error_message)
        """
        if not statement or len(statement.strip()) < 5:
            return False, "Statement must not be empty"

        if len(statement) > Config.MAX_QUERY_LENGTH:
            return False, f"Statement exceeds max length"

        statement_upper = statement.strip().upper()

        # Prüfe ob Statement mit erlaubtem Keyword beginnt
        statement_type = cls._get_statement_type(statement_upper)
        if statement_type is None:
            return False, f"Statement type not allowed. Allowed: {', '.join(cls.ALLOWED_STATEMENTS)}"

        # Bei SET: nur erlaubte Variablen
        if statement_type == "SET":
            allowed = any(var in statement_upper for var in cls.ALLOWED_SET_VARS)
            if not allowed:
                return False, f"SET variable not allowed. Allowed: {', '.join(cls.ALLOWED_SET_VARS)}"

        # Prüfe verbotene Patterns
        for pattern in cls.FORBIDDEN_PATTERNS:
            if re.search(pattern, statement_upper):
                return False, f"Forbidden pattern detected"

        # Prüfe Datenbank
        if database not in Config.ALLOWED_DATABASES:
            return False, f"Database '{database}' not allowed"

        # Prüfe ob Statement gegen System-DB zielt
        for forbidden_db in cls.FORBIDDEN_DATABASES:
            if re.search(rf"\b{forbidden_db}\b", statement_upper, re.IGNORECASE):
                return False, f"Operations on '{forbidden_db}' are forbidden"

        return True, ""

Validierungsschritte (ExecuteValidator)

#PrüfungFehler bei
1Statement nicht leerLeeres Statement
2Statement-Länge> 2000 Zeichen
3Statement-TypNicht ALTER/CREATE/DROP/TRUNCATE/SET
4SET-VariablenNicht in ALLOWED_SET_VARS
5Forbidden PatternsDROP DATABASE, GRANT, etc.
6Database-AllowlistDatenbank nicht erlaubt
7System-DB-Schutzmysql, information_schema, etc.

Erlaubte DDL-Statements

TypBeispielStatus
ALTER TABLEALTER TABLE tasks ADD COLUMN priority INTErlaubt
CREATE TABLECREATE TABLE temp_data (...)Erlaubt
CREATE INDEXCREATE INDEX idx_name ON table(col)Erlaubt
DROP TABLEDROP TABLE temp_dataErlaubt
DROP INDEXDROP INDEX idx_name ON tableErlaubt
TRUNCATETRUNCATE TABLE temp_dataErlaubt
SETSET FOREIGN_KEY_CHECKS = 0Erlaubt

Verbotene Patterns

PatternBeschreibung
DROP DATABASEDatenbank-Löschung verboten
DROP SCHEMASchema-Löschung verboten
CREATE DATABASEDatenbank-Erstellung verboten
CREATE SCHEMASchema-Erstellung verboten
GRANTRechte-Vergabe verboten
REVOKERechte-Entzug verboten
SHUTDOWNServer-Shutdown verboten

Keyword-Erkennung

Keywords werden mit Word Boundaries erkannt:

# Regex-Pattern für Keyword "DROP"
pattern = r'\bDROP\b'

# Blockiert:
"SELECT * FROM t; DROP TABLE t"  # DROP erkannt

# Nicht blockiert (kein vollständiges Wort):
"SELECT dropdown FROM t"         # DROP nicht erkannt

Testfälle

# QueryValidator - Muss DENIED returnen:
"SELECT * FROM mcp_log; DROP TABLE mcp_log"  # DROP
"SELECT LOAD_FILE('/etc/passwd')"            # LOAD_FILE
"SELECT * FROM users"                        # users nicht erlaubt
"SELECT SLEEP(10)"                           # SLEEP

# ExecuteValidator - Muss DENIED returnen:
"DROP DATABASE ki_dev"                       # Forbidden Pattern
"GRANT ALL ON ki_dev.* TO user"              # GRANT verboten
"ALTER TABLE mysql.user ADD COLUMN x INT"   # System-DB

# ExecuteValidator - Muss SUCCESS returnen:
"ALTER TABLE tasks ADD COLUMN priority INT"  # Erlaubt
"CREATE INDEX idx_status ON tasks(status)"   # Erlaubt
"SET FOREIGN_KEY_CHECKS = 0"                 # Erlaubte Variable

Verwandte Kapitel