execute_validator.py
- Pfad:
/var/www/mcp-servers/mcp-db/validators/execute_validator.py - Namespace: -
- Zeilen: 106 | Größe: 3,367 Bytes
- Geändert: 2025-12-28 13:14:51 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 100
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 2
- use re
- use config.Config
Klassen 1
-
ExecuteValidatorclass Zeile 8
Code
"""Execute Validator - Validierung fuer DDL/DML Statements."""
import re
from config import Config
class ExecuteValidator:
"""Validiert DDL/DML Statements gegen Sicherheitsregeln."""
# Erlaubte Statement-Typen
ALLOWED_STATEMENTS: list[str] = ["ALTER", "CREATE", "DROP", "TRUNCATE", "SET"]
# Erlaubte SET-Variablen (Whitelist)
ALLOWED_SET_VARS: list[str] = ["FOREIGN_KEY_CHECKS"]
# Verbotene Patterns (kritisch!)
FORBIDDEN_PATTERNS: list[str] = [
r"\bDROP\s+DATABASE\b",
r"\bDROP\s+SCHEMA\b",
r"\bCREATE\s+DATABASE\b",
r"\bCREATE\s+SCHEMA\b",
r"\bGRANT\b",
r"\bREVOKE\b",
r"\bSHUTDOWN\b",
]
# Verbotene Datenbanken (System-DBs)
FORBIDDEN_DATABASES: list[str] = [
"information_schema",
"mysql",
"performance_schema",
"sys",
]
@classmethod
def validate_statement(
cls, statement: str, database: str
) -> tuple[bool, str]:
"""
Validiert ein DDL/DML Statement.
Args:
statement: SQL Statement
database: Zieldatenbank
Returns:
Tuple (is_valid, error_message)
"""
if not statement or len(statement.strip()) < 5:
return False, "Statement must not be empty"
if len(statement) > Config.MAX_QUERY_LENGTH:
return False, f"Statement exceeds max length ({Config.MAX_QUERY_LENGTH})"
statement_upper = statement.strip().upper()
# Pruefe ob Statement mit erlaubtem Keyword beginnt
statement_type = cls._get_statement_type(statement_upper)
if statement_type is None:
return (
False,
f"Statement type not allowed. Allowed: {', '.join(cls.ALLOWED_STATEMENTS)}",
)
# Bei SET: nur erlaubte Variablen
if statement_type == "SET":
allowed = False
for var in cls.ALLOWED_SET_VARS:
if var in statement_upper:
allowed = True
break
if not allowed:
return (
False,
f"SET variable not allowed. Allowed: {', '.join(cls.ALLOWED_SET_VARS)}",
)
# Pruefe verbotene Patterns
for pattern in cls.FORBIDDEN_PATTERNS:
if re.search(pattern, statement_upper):
return False, f"Forbidden pattern detected: {pattern}"
# Pruefe Datenbank
if database not in Config.ALLOWED_DATABASES:
return (
False,
f"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}",
)
# Pruefe ob Statement gegen System-DB zielt (als Wort, nicht Substring)
for forbidden_db in cls.FORBIDDEN_DATABASES:
pattern = rf"\b{forbidden_db}\b"
if re.search(pattern, statement_upper, re.IGNORECASE):
return False, f"Operations on '{forbidden_db}' are forbidden"
return True, ""
@classmethod
def _get_statement_type(cls, statement_upper: str) -> str | None:
"""Extrahiert den Statement-Typ (ALTER, CREATE, etc.)."""
for stmt_type in cls.ALLOWED_STATEMENTS:
if statement_upper.startswith(stmt_type):
return stmt_type
return None