{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp_db\/validators\/query_validator.py",
"content": "\"\"\"Query Validator - SRP: Separate Validierungslogik\"\"\"\n\nimport re\nfrom typing import Tuple\n\nfrom config import Config\n\n\nclass QueryValidator:\n \"\"\"Validiert SQL Queries - SRP: Nur Validierung\"\"\"\n\n @staticmethod\n def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:\n \"\"\"\n Validiert eine Query gegen alle Sicherheitsregeln.\n\n Returns:\n (is_valid, error_message)\n \"\"\"\n # Basis-Validierung\n if not query or len(query) < 1:\n return False, \"Query must not be empty\"\n\n if len(query) > Config.MAX_QUERY_LENGTH:\n return False, f\"Query must be max {Config.MAX_QUERY_LENGTH} chars\"\n\n # Nur SELECT erlaubt\n query_upper = query.strip().upper()\n if not query_upper.startswith(\"SELECT\"):\n return False, \"Only SELECT queries allowed\"\n\n # Dangerous Keyword Blocklist\n for keyword in Config.BLOCKED_KEYWORDS:\n # Pruefe Keyword mit Word Boundaries\n pattern = r\"\\b\" + re.escape(keyword) + r\"\\b\"\n if re.search(pattern, query_upper):\n return False, f\"Blocked keyword detected: {keyword}\"\n\n # Database Allowlist\n if database not in Config.ALLOWED_DATABASES:\n return (\n False,\n f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",\n )\n\n # Max Rows pruefen\n if max_rows < 1 or max_rows > Config.MAX_ROWS:\n return False, f\"max_rows must be 1-{Config.MAX_ROWS}\"\n\n # Table Allowlist (information_schema Schutz)\n from_tables = QueryValidator._extract_table_names(query_upper)\n for table in from_tables:\n # Erlaube information_schema.TABLES fuer Schema-Tool\n if \"INFORMATION_SCHEMA\" in table:\n continue\n # Pruefe gegen Allowlist\n table_clean = table.split(\".\")[-1]\n if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:\n return (\n False,\n f\"Table '{table}' not allowed. Allowed: {', '.join(Config.ALLOWED_TABLES)}\",\n )\n\n return True, \"\"\n\n @staticmethod\n def _extract_table_names(query_upper: str) -> list:\n \"\"\"Extrahiert Tabellennamen aus FROM und JOIN Clauses\"\"\"\n tables = []\n\n # FROM table, JOIN table\n from_match = re.findall(r\"\\bFROM\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n join_match = re.findall(r\"\\bJOIN\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n\n tables.extend(from_match)\n tables.extend(join_match)\n\n return tables\n"
}
}
{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/mcp-servers\/mcp_db\/validators\/query_validator.py",
"content": "\"\"\"Query Validator - SRP: Separate Validierungslogik\"\"\"\n\nimport re\nfrom typing import Tuple\n\nfrom config import Config\n\n\nclass QueryValidator:\n \"\"\"Validiert SQL Queries - SRP: Nur Validierung\"\"\"\n\n @staticmethod\n def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:\n \"\"\"\n Validiert eine Query gegen alle Sicherheitsregeln.\n\n Returns:\n (is_valid, error_message)\n \"\"\"\n # Basis-Validierung\n if not query or len(query) < 1:\n return False, \"Query must not be empty\"\n\n if len(query) > Config.MAX_QUERY_LENGTH:\n return False, f\"Query must be max {Config.MAX_QUERY_LENGTH} chars\"\n\n # Nur SELECT erlaubt\n query_upper = query.strip().upper()\n if not query_upper.startswith(\"SELECT\"):\n return False, \"Only SELECT queries allowed\"\n\n # Dangerous Keyword Blocklist\n for keyword in Config.BLOCKED_KEYWORDS:\n # Pruefe Keyword mit Word Boundaries\n pattern = r\"\\b\" + re.escape(keyword) + r\"\\b\"\n if re.search(pattern, query_upper):\n return False, f\"Blocked keyword detected: {keyword}\"\n\n # Database Allowlist\n if database not in Config.ALLOWED_DATABASES:\n return (\n False,\n f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",\n )\n\n # Max Rows pruefen\n if max_rows < 1 or max_rows > Config.MAX_ROWS:\n return False, f\"max_rows must be 1-{Config.MAX_ROWS}\"\n\n # Table Allowlist (information_schema Schutz)\n from_tables = QueryValidator._extract_table_names(query_upper)\n for table in from_tables:\n # Erlaube information_schema.TABLES fuer Schema-Tool\n if \"INFORMATION_SCHEMA\" in table:\n continue\n # Pruefe gegen Allowlist\n table_clean = table.split(\".\")[-1]\n if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:\n return (\n False,\n f\"Table '{table}' not allowed. Allowed: {', '.join(Config.ALLOWED_TABLES)}\",\n )\n\n return True, \"\"\n\n @staticmethod\n def _extract_table_names(query_upper: str) -> list:\n \"\"\"Extrahiert Tabellennamen aus FROM und JOIN Clauses\"\"\"\n tables = []\n\n # FROM table, JOIN table\n from_match = re.findall(r\"\\bFROM\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n join_match = re.findall(r\"\\bJOIN\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n\n tables.extend(from_match)\n tables.extend(join_match)\n\n return tables\n",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 9,
"newStart": 1,
"newLines": 8,
"lines": [
" \"\"\"Query Validator - SRP: Separate Validierungslogik\"\"\"",
"+",
" import re",
"-import sys",
" from typing import Tuple",
" ",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")",
" from config import Config",
" ",
" "
]
},
{
"oldStart": 32,
"oldLines": 7,
"newStart": 31,
"newLines": 7,
"lines": [
" ",
" # Dangerous Keyword Blocklist",
" for keyword in Config.BLOCKED_KEYWORDS:",
"- # Prüfe Keyword mit Word Boundaries",
"+ # Pruefe Keyword mit Word Boundaries",
" pattern = r\"\\b\" + re.escape(keyword) + r\"\\b\"",
" if re.search(pattern, query_upper):",
" return False, f\"Blocked keyword detected: {keyword}\""
]
},
{
"oldStart": 44,
"oldLines": 17,
"newStart": 43,
"newLines": 17,
"lines": [
" f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",",
" )",
" ",
"- # Max Rows prüfen",
"+ # Max Rows pruefen",
" if max_rows < 1 or max_rows > Config.MAX_ROWS:",
" return False, f\"max_rows must be 1-{Config.MAX_ROWS}\"",
" ",
" # Table Allowlist (information_schema Schutz)",
" from_tables = QueryValidator._extract_table_names(query_upper)",
" for table in from_tables:",
"- # Erlaube information_schema.TABLES für Schema-Tool",
"+ # Erlaube information_schema.TABLES fuer Schema-Tool",
" if \"INFORMATION_SCHEMA\" in table:",
" continue",
"- # Prüfe gegen Allowlist",
"+ # Pruefe gegen Allowlist",
" table_clean = table.split(\".\")[-1]",
" if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:",
" return ("
]
}
],
"originalFile": "\"\"\"Query Validator - SRP: Separate Validierungslogik\"\"\"\nimport re\nimport sys\nfrom typing import Tuple\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")\nfrom config import Config\n\n\nclass QueryValidator:\n \"\"\"Validiert SQL Queries - SRP: Nur Validierung\"\"\"\n\n @staticmethod\n def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:\n \"\"\"\n Validiert eine Query gegen alle Sicherheitsregeln.\n\n Returns:\n (is_valid, error_message)\n \"\"\"\n # Basis-Validierung\n if not query or len(query) < 1:\n return False, \"Query must not be empty\"\n\n if len(query) > Config.MAX_QUERY_LENGTH:\n return False, f\"Query must be max {Config.MAX_QUERY_LENGTH} chars\"\n\n # Nur SELECT erlaubt\n query_upper = query.strip().upper()\n if not query_upper.startswith(\"SELECT\"):\n return False, \"Only SELECT queries allowed\"\n\n # Dangerous Keyword Blocklist\n for keyword in Config.BLOCKED_KEYWORDS:\n # Prüfe Keyword mit Word Boundaries\n pattern = r\"\\b\" + re.escape(keyword) + r\"\\b\"\n if re.search(pattern, query_upper):\n return False, f\"Blocked keyword detected: {keyword}\"\n\n # Database Allowlist\n if database not in Config.ALLOWED_DATABASES:\n return (\n False,\n f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",\n )\n\n # Max Rows prüfen\n if max_rows < 1 or max_rows > Config.MAX_ROWS:\n return False, f\"max_rows must be 1-{Config.MAX_ROWS}\"\n\n # Table Allowlist (information_schema Schutz)\n from_tables = QueryValidator._extract_table_names(query_upper)\n for table in from_tables:\n # Erlaube information_schema.TABLES für Schema-Tool\n if \"INFORMATION_SCHEMA\" in table:\n continue\n # Prüfe gegen Allowlist\n table_clean = table.split(\".\")[-1]\n if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:\n return (\n False,\n f\"Table '{table}' not allowed. Allowed: {', '.join(Config.ALLOWED_TABLES)}\",\n )\n\n return True, \"\"\n\n @staticmethod\n def _extract_table_names(query_upper: str) -> list:\n \"\"\"Extrahiert Tabellennamen aus FROM und JOIN Clauses\"\"\"\n tables = []\n\n # FROM table, JOIN table\n from_match = re.findall(r\"\\bFROM\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n join_match = re.findall(r\"\\bJOIN\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n\n tables.extend(from_match)\n tables.extend(join_match)\n\n return tables\n"
}
}