MCP-DB Validators
SQL-Validierung nach dem Single Responsibility Principle.
QueryValidator
"""SRP: Separate Validierungslogik"""
from typing import Tuple
import re
from config import Config
class QueryValidator:
"""Validiert SQL Queries - SRP: Nur Validierung"""
@staticmethod
def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:
"""
Validiert eine Query gegen alle Sicherheitsregeln.
Returns:
(is_valid, error_message)
"""
# Basis-Validierung
if not query or len(query) < 1:
return False, "Query must not be empty"
if len(query) > Config.MAX_QUERY_LENGTH:
return False, f"Query must be max {Config.MAX_QUERY_LENGTH} chars"
# Nur SELECT erlaubt
query_upper = query.strip().upper()
if not query_upper.startswith("SELECT"):
return False, "Only SELECT queries allowed"
# Dangerous Keyword Blocklist
for keyword in Config.BLOCKED_KEYWORDS:
pattern = r'\b' + re.escape(keyword) + r'\b'
if re.search(pattern, query_upper):
return False, f"Blocked keyword detected: {keyword}"
# Database Allowlist
if database not in Config.ALLOWED_DATABASES:
return False, f"Database '{database}' not allowed"
# Max Rows prüfen
if max_rows < 1 or max_rows > Config.MAX_ROWS:
return False, f"max_rows must be 1-{Config.MAX_ROWS}"
# Table Allowlist (mit information_schema Ausnahme)
from_tables = QueryValidator._extract_table_names(query_upper)
for table in from_tables:
# Erlaube information_schema.TABLES für Schema-Tool
if "INFORMATION_SCHEMA" in table:
continue
# Prüfe gegen Allowlist
table_clean = table.split('.')[-1]
if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:
return False, f"Table '{table}' not allowed"
return True, ""
@staticmethod
def _extract_table_names(query_upper: str) -> list:
"""Extrahiert Tabellennamen aus FROM und JOIN Clauses"""
tables = []
from_match = re.findall(r'\bFROM\s+([a-zA-Z0-9_\.]+)', query_upper)
join_match = re.findall(r'\bJOIN\s+([a-zA-Z0-9_\.]+)', query_upper)
tables.extend(from_match)
tables.extend(join_match)
return tables
Validierungsschritte
| # | Prüfung | Fehler bei |
|---|---|---|
| 1 | Query nicht leer | Leere Query |
| 2 | Query-Länge | > 2000 Zeichen |
| 3 | SELECT-Only | Nicht mit SELECT beginnend |
| 4 | Keyword-Blocklist | Blockiertes Keyword gefunden |
| 5 | Database-Allowlist | Datenbank nicht erlaubt |
| 6 | Max-Rows | < 1 oder > 100 |
| 7 | Table-Allowlist | Tabelle nicht erlaubt |
Keyword-Erkennung
Keywords werden mit Word Boundaries erkannt:
# Regex-Pattern für Keyword "DROP"
pattern = r'\bDROP\b'
# Blockiert:
"SELECT * FROM t; DROP TABLE t" # DROP erkannt
# Nicht blockiert (kein vollständiges Wort):
"SELECT dropdown FROM t" # DROP nicht erkannt
Tabellen-Extraktion
Tabellennamen werden aus FROM und JOIN Clauses extrahiert:
# Query
"SELECT * FROM mcp_log JOIN ki_tags ON ..."
# Extrahierte Tabellen
["MCP_LOG", "KI_TAGS"]
# Prüfung gegen Allowlist
mcp_log ✓ (in ALLOWED_TABLES)
ki_tags ✓ (in ALLOWED_TABLES)
Bekannte Limits
| Limit | Beschreibung | Mitigation |
|---|---|---|
| Regex-basiert | Kein vollständiger SQL-Parser | Konservative Extraktion, nur FROM/JOIN |
| Subqueries | Tabellen in Subqueries werden erkannt | Regex erfasst alle FROM/JOIN |
| Aliase | Aliase werden nicht aufgelöst | Tabellenname vor Alias wird erkannt |
Testfälle
# Muss DENIED returnen:
"SELECT * FROM mcp_log; DROP TABLE mcp_log" # DROP
"SELECT LOAD_FILE('/etc/passwd')" # LOAD_FILE
"SELECT * FROM users" # users nicht erlaubt
"SELECT SLEEP(10)" # SLEEP
"INSERT INTO mcp_log VALUES (...)" # Kein SELECT
# Muss SUCCESS returnen:
"SELECT * FROM mcp_log" # Erlaubt
"SELECT * FROM mcp_log WHERE status = %s" # Mit Params
"SELECT dropdown FROM ki_tags" # dropdown != DROP
Verwandte Kapitel
- Konfiguration - Allowlists und Blocklists
- Sicherheit - Gesamtkonzept