MCP-DB Validators
SQL-Validierung nach dem Single Responsibility Principle.
QueryValidator
Validiert SELECT-Queries für das db_select Tool.
"""SRP: Separate Validierungslogik"""
from typing import Tuple
import re
from config import Config
class QueryValidator:
"""Validiert SQL Queries - SRP: Nur Validierung"""
@staticmethod
def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:
"""
Validiert eine Query gegen alle Sicherheitsregeln.
Returns:
(is_valid, error_message)
"""
# Basis-Validierung
if not query or len(query) < 1:
return False, "Query must not be empty"
if len(query) > Config.MAX_QUERY_LENGTH:
return False, f"Query must be max {Config.MAX_QUERY_LENGTH} chars"
# Nur SELECT erlaubt
query_upper = query.strip().upper()
if not query_upper.startswith("SELECT"):
return False, "Only SELECT queries allowed"
# Dangerous Keyword Blocklist
for keyword in Config.BLOCKED_KEYWORDS:
pattern = r'\b' + re.escape(keyword) + r'\b'
if re.search(pattern, query_upper):
return False, f"Blocked keyword detected: {keyword}"
# Database Allowlist
if database not in Config.ALLOWED_DATABASES:
return False, f"Database '{database}' not allowed"
# Max Rows prüfen
if max_rows < 1 or max_rows > Config.MAX_ROWS:
return False, f"max_rows must be 1-{Config.MAX_ROWS}"
# Table Allowlist
from_tables = QueryValidator._extract_table_names(query_upper)
for table in from_tables:
if "INFORMATION_SCHEMA" in table:
continue
table_clean = table.split('.')[-1]
if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:
return False, f"Table '{table}' not allowed"
return True, ""
Validierungsschritte (QueryValidator)
| # | Prüfung | Fehler bei |
|---|---|---|
| 1 | Query nicht leer | Leere Query |
| 2 | Query-Länge | > 2000 Zeichen |
| 3 | SELECT-Only | Nicht mit SELECT beginnend |
| 4 | Keyword-Blocklist | Blockiertes Keyword gefunden |
| 5 | Database-Allowlist | Datenbank nicht erlaubt |
| 6 | Max-Rows | < 1 oder > 100 |
| 7 | Table-Allowlist | Tabelle nicht erlaubt |
ExecuteValidator
Validiert DDL-Statements für das db_execute Tool.
"""Execute Validator - Validierung für DDL/DML Statements."""
import re
from config import Config
class ExecuteValidator:
"""Validiert DDL/DML Statements gegen Sicherheitsregeln."""
# Erlaubte Statement-Typen
ALLOWED_STATEMENTS: list[str] = ["ALTER", "CREATE", "DROP", "TRUNCATE", "SET"]
# Erlaubte SET-Variablen (Whitelist)
ALLOWED_SET_VARS: list[str] = ["FOREIGN_KEY_CHECKS"]
# Verbotene Patterns (kritisch!)
FORBIDDEN_PATTERNS: list[str] = [
r"\bDROP\s+DATABASE\b",
r"\bDROP\s+SCHEMA\b",
r"\bCREATE\s+DATABASE\b",
r"\bCREATE\s+SCHEMA\b",
r"\bGRANT\b",
r"\bREVOKE\b",
r"\bSHUTDOWN\b",
]
# Verbotene Datenbanken (System-DBs)
FORBIDDEN_DATABASES: list[str] = [
"information_schema",
"mysql",
"performance_schema",
"sys",
]
@classmethod
def validate_statement(cls, statement: str, database: str) -> tuple[bool, str]:
"""
Validiert ein DDL/DML Statement.
Returns:
Tuple (is_valid, error_message)
"""
if not statement or len(statement.strip()) < 5:
return False, "Statement must not be empty"
if len(statement) > Config.MAX_QUERY_LENGTH:
return False, f"Statement exceeds max length"
statement_upper = statement.strip().upper()
# Prüfe ob Statement mit erlaubtem Keyword beginnt
statement_type = cls._get_statement_type(statement_upper)
if statement_type is None:
return False, f"Statement type not allowed. Allowed: {', '.join(cls.ALLOWED_STATEMENTS)}"
# Bei SET: nur erlaubte Variablen
if statement_type == "SET":
allowed = any(var in statement_upper for var in cls.ALLOWED_SET_VARS)
if not allowed:
return False, f"SET variable not allowed. Allowed: {', '.join(cls.ALLOWED_SET_VARS)}"
# Prüfe verbotene Patterns
for pattern in cls.FORBIDDEN_PATTERNS:
if re.search(pattern, statement_upper):
return False, f"Forbidden pattern detected"
# Prüfe Datenbank
if database not in Config.ALLOWED_DATABASES:
return False, f"Database '{database}' not allowed"
# Prüfe ob Statement gegen System-DB zielt
for forbidden_db in cls.FORBIDDEN_DATABASES:
if re.search(rf"\b{forbidden_db}\b", statement_upper, re.IGNORECASE):
return False, f"Operations on '{forbidden_db}' are forbidden"
return True, ""
Validierungsschritte (ExecuteValidator)
| # | Prüfung | Fehler bei |
|---|---|---|
| 1 | Statement nicht leer | Leeres Statement |
| 2 | Statement-Länge | > 2000 Zeichen |
| 3 | Statement-Typ | Nicht ALTER/CREATE/DROP/TRUNCATE/SET |
| 4 | SET-Variablen | Nicht in ALLOWED_SET_VARS |
| 5 | Forbidden Patterns | DROP DATABASE, GRANT, etc. |
| 6 | Database-Allowlist | Datenbank nicht erlaubt |
| 7 | System-DB-Schutz | mysql, information_schema, etc. |
Erlaubte DDL-Statements
| Typ | Beispiel | Status |
|---|---|---|
| ALTER TABLE | ALTER TABLE tasks ADD COLUMN priority INT | Erlaubt |
| CREATE TABLE | CREATE TABLE temp_data (...) | Erlaubt |
| CREATE INDEX | CREATE INDEX idx_name ON table(col) | Erlaubt |
| DROP TABLE | DROP TABLE temp_data | Erlaubt |
| DROP INDEX | DROP INDEX idx_name ON table | Erlaubt |
| TRUNCATE | TRUNCATE TABLE temp_data | Erlaubt |
| SET | SET FOREIGN_KEY_CHECKS = 0 | Erlaubt |
Verbotene Patterns
| Pattern | Beschreibung |
|---|---|
| DROP DATABASE | Datenbank-Löschung verboten |
| DROP SCHEMA | Schema-Löschung verboten |
| CREATE DATABASE | Datenbank-Erstellung verboten |
| CREATE SCHEMA | Schema-Erstellung verboten |
| GRANT | Rechte-Vergabe verboten |
| REVOKE | Rechte-Entzug verboten |
| SHUTDOWN | Server-Shutdown verboten |
Keyword-Erkennung
Keywords werden mit Word Boundaries erkannt:
# Regex-Pattern für Keyword "DROP"
pattern = r'\bDROP\b'
# Blockiert:
"SELECT * FROM t; DROP TABLE t" # DROP erkannt
# Nicht blockiert (kein vollständiges Wort):
"SELECT dropdown FROM t" # DROP nicht erkannt
Testfälle
# QueryValidator - Muss DENIED returnen:
"SELECT * FROM mcp_log; DROP TABLE mcp_log" # DROP
"SELECT LOAD_FILE('/etc/passwd')" # LOAD_FILE
"SELECT * FROM users" # users nicht erlaubt
"SELECT SLEEP(10)" # SLEEP
# ExecuteValidator - Muss DENIED returnen:
"DROP DATABASE ki_dev" # Forbidden Pattern
"GRANT ALL ON ki_dev.* TO user" # GRANT verboten
"ALTER TABLE mysql.user ADD COLUMN x INT" # System-DB
# ExecuteValidator - Muss SUCCESS returnen:
"ALTER TABLE tasks ADD COLUMN priority INT" # Erlaubt
"CREATE INDEX idx_status ON tasks(status)" # Erlaubt
"SET FOREIGN_KEY_CHECKS = 0" # Erlaubte Variable
Verwandte Kapitel
- Konfiguration - Allowlists und Blocklists
- Sicherheit - Gesamtkonzept
- Tools - db_select und db_execute