{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp_db\/validators\/execute_validator.py",
"content": "\"\"\"Execute Validator - Validierung fuer DDL\/DML Statements.\"\"\"\n\nimport re\n\nfrom config import Config\n\n\nclass ExecuteValidator:\n \"\"\"Validiert DDL\/DML Statements gegen Sicherheitsregeln.\"\"\"\n\n # Erlaubte Statement-Typen\n ALLOWED_STATEMENTS: list[str] = [\"ALTER\", \"CREATE\", \"DROP\", \"TRUNCATE\", \"SET\"]\n\n # Erlaubte SET-Variablen (Whitelist)\n ALLOWED_SET_VARS: list[str] = [\"FOREIGN_KEY_CHECKS\"]\n\n # Verbotene Patterns (kritisch!)\n FORBIDDEN_PATTERNS: list[str] = [\n r\"\\bDROP\\s+DATABASE\\b\",\n r\"\\bDROP\\s+SCHEMA\\b\",\n r\"\\bCREATE\\s+DATABASE\\b\",\n r\"\\bCREATE\\s+SCHEMA\\b\",\n r\"\\bGRANT\\b\",\n r\"\\bREVOKE\\b\",\n r\"\\bSHUTDOWN\\b\",\n ]\n\n # Verbotene Datenbanken (System-DBs)\n FORBIDDEN_DATABASES: list[str] = [\n \"information_schema\",\n \"mysql\",\n \"performance_schema\",\n \"sys\",\n ]\n\n @classmethod\n def validate_statement(\n cls, statement: str, database: str\n ) -> tuple[bool, str]:\n \"\"\"\n Validiert ein DDL\/DML Statement.\n\n Args:\n statement: SQL Statement\n database: Zieldatenbank\n\n Returns:\n Tuple (is_valid, error_message)\n \"\"\"\n if not statement or len(statement.strip()) < 5:\n return False, \"Statement must not be empty\"\n\n if len(statement) > Config.MAX_QUERY_LENGTH:\n return False, f\"Statement exceeds max length ({Config.MAX_QUERY_LENGTH})\"\n\n statement_upper = statement.strip().upper()\n\n # Pruefe ob Statement mit erlaubtem Keyword beginnt\n statement_type = cls._get_statement_type(statement_upper)\n if statement_type is None:\n return (\n False,\n f\"Statement type not allowed. Allowed: {', '.join(cls.ALLOWED_STATEMENTS)}\",\n )\n\n # Bei SET: nur erlaubte Variablen\n if statement_type == \"SET\":\n allowed = False\n for var in cls.ALLOWED_SET_VARS:\n if var in statement_upper:\n allowed = True\n break\n if not allowed:\n return (\n False,\n f\"SET variable not allowed. Allowed: {', '.join(cls.ALLOWED_SET_VARS)}\",\n )\n\n # Pruefe verbotene Patterns\n for pattern in cls.FORBIDDEN_PATTERNS:\n if re.search(pattern, statement_upper):\n return False, f\"Forbidden pattern detected: {pattern}\"\n\n # Pruefe Datenbank\n if database not in Config.ALLOWED_DATABASES:\n return (\n False,\n f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",\n )\n\n # Pruefe ob Statement gegen System-DB zielt (als Wort, nicht Substring)\n for forbidden_db in cls.FORBIDDEN_DATABASES:\n pattern = rf\"\\b{forbidden_db}\\b\"\n if re.search(pattern, statement_upper, re.IGNORECASE):\n return False, f\"Operations on '{forbidden_db}' are forbidden\"\n\n return True, \"\"\n\n @classmethod\n def _get_statement_type(cls, statement_upper: str) -> str | None:\n \"\"\"Extrahiert den Statement-Typ (ALTER, CREATE, etc.).\"\"\"\n for stmt_type in cls.ALLOWED_STATEMENTS:\n if statement_upper.startswith(stmt_type):\n return stmt_type\n return None\n"
}
}
{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/mcp-servers\/mcp_db\/validators\/execute_validator.py",
"content": "\"\"\"Execute Validator - Validierung fuer DDL\/DML Statements.\"\"\"\n\nimport re\n\nfrom config import Config\n\n\nclass ExecuteValidator:\n \"\"\"Validiert DDL\/DML Statements gegen Sicherheitsregeln.\"\"\"\n\n # Erlaubte Statement-Typen\n ALLOWED_STATEMENTS: list[str] = [\"ALTER\", \"CREATE\", \"DROP\", \"TRUNCATE\", \"SET\"]\n\n # Erlaubte SET-Variablen (Whitelist)\n ALLOWED_SET_VARS: list[str] = [\"FOREIGN_KEY_CHECKS\"]\n\n # Verbotene Patterns (kritisch!)\n FORBIDDEN_PATTERNS: list[str] = [\n r\"\\bDROP\\s+DATABASE\\b\",\n r\"\\bDROP\\s+SCHEMA\\b\",\n r\"\\bCREATE\\s+DATABASE\\b\",\n r\"\\bCREATE\\s+SCHEMA\\b\",\n r\"\\bGRANT\\b\",\n r\"\\bREVOKE\\b\",\n r\"\\bSHUTDOWN\\b\",\n ]\n\n # Verbotene Datenbanken (System-DBs)\n FORBIDDEN_DATABASES: list[str] = [\n \"information_schema\",\n \"mysql\",\n \"performance_schema\",\n \"sys\",\n ]\n\n @classmethod\n def validate_statement(\n cls, statement: str, database: str\n ) -> tuple[bool, str]:\n \"\"\"\n Validiert ein DDL\/DML Statement.\n\n Args:\n statement: SQL Statement\n database: Zieldatenbank\n\n Returns:\n Tuple (is_valid, error_message)\n \"\"\"\n if not statement or len(statement.strip()) < 5:\n return False, \"Statement must not be empty\"\n\n if len(statement) > Config.MAX_QUERY_LENGTH:\n return False, f\"Statement exceeds max length ({Config.MAX_QUERY_LENGTH})\"\n\n statement_upper = statement.strip().upper()\n\n # Pruefe ob Statement mit erlaubtem Keyword beginnt\n statement_type = cls._get_statement_type(statement_upper)\n if statement_type is None:\n return (\n False,\n f\"Statement type not allowed. Allowed: {', '.join(cls.ALLOWED_STATEMENTS)}\",\n )\n\n # Bei SET: nur erlaubte Variablen\n if statement_type == \"SET\":\n allowed = False\n for var in cls.ALLOWED_SET_VARS:\n if var in statement_upper:\n allowed = True\n break\n if not allowed:\n return (\n False,\n f\"SET variable not allowed. Allowed: {', '.join(cls.ALLOWED_SET_VARS)}\",\n )\n\n # Pruefe verbotene Patterns\n for pattern in cls.FORBIDDEN_PATTERNS:\n if re.search(pattern, statement_upper):\n return False, f\"Forbidden pattern detected: {pattern}\"\n\n # Pruefe Datenbank\n if database not in Config.ALLOWED_DATABASES:\n return (\n False,\n f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",\n )\n\n # Pruefe ob Statement gegen System-DB zielt (als Wort, nicht Substring)\n for forbidden_db in cls.FORBIDDEN_DATABASES:\n pattern = rf\"\\b{forbidden_db}\\b\"\n if re.search(pattern, statement_upper, re.IGNORECASE):\n return False, f\"Operations on '{forbidden_db}' are forbidden\"\n\n return True, \"\"\n\n @classmethod\n def _get_statement_type(cls, statement_upper: str) -> str | None:\n \"\"\"Extrahiert den Statement-Typ (ALTER, CREATE, etc.).\"\"\"\n for stmt_type in cls.ALLOWED_STATEMENTS:\n if statement_upper.startswith(stmt_type):\n return stmt_type\n return None\n",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 8,
"newStart": 1,
"newLines": 7,
"lines": [
"-\"\"\"Execute Validator - Validierung für DDL\/DML Statements.\"\"\"",
"+\"\"\"Execute Validator - Validierung fuer DDL\/DML Statements.\"\"\"",
"+",
" import re",
"-import sys",
" ",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")",
" from config import Config",
" ",
" "
]
},
{
"oldStart": 56,
"oldLines": 7,
"newStart": 55,
"newLines": 7,
"lines": [
" ",
" statement_upper = statement.strip().upper()",
" ",
"- # Prüfe ob Statement mit erlaubtem Keyword beginnt",
"+ # Pruefe ob Statement mit erlaubtem Keyword beginnt",
" statement_type = cls._get_statement_type(statement_upper)",
" if statement_type is None:",
" return ("
]
},
{
"oldStart": 77,
"oldLines": 19,
"newStart": 76,
"newLines": 19,
"lines": [
" f\"SET variable not allowed. Allowed: {', '.join(cls.ALLOWED_SET_VARS)}\",",
" )",
" ",
"- # Prüfe verbotene Patterns",
"+ # Pruefe verbotene Patterns",
" for pattern in cls.FORBIDDEN_PATTERNS:",
" if re.search(pattern, statement_upper):",
" return False, f\"Forbidden pattern detected: {pattern}\"",
" ",
"- # Prüfe Datenbank",
"+ # Pruefe Datenbank",
" if database not in Config.ALLOWED_DATABASES:",
" return (",
" False,",
" f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",",
" )",
" ",
"- # Prüfe ob Statement gegen System-DB zielt (als Wort, nicht Substring)",
"+ # Pruefe ob Statement gegen System-DB zielt (als Wort, nicht Substring)",
" for forbidden_db in cls.FORBIDDEN_DATABASES:",
" pattern = rf\"\\b{forbidden_db}\\b\"",
" if re.search(pattern, statement_upper, re.IGNORECASE):"
]
}
],
"originalFile": "\"\"\"Execute Validator - Validierung für DDL\/DML Statements.\"\"\"\nimport re\nimport sys\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")\nfrom config import Config\n\n\nclass ExecuteValidator:\n \"\"\"Validiert DDL\/DML Statements gegen Sicherheitsregeln.\"\"\"\n\n # Erlaubte Statement-Typen\n ALLOWED_STATEMENTS: list[str] = [\"ALTER\", \"CREATE\", \"DROP\", \"TRUNCATE\", \"SET\"]\n\n # Erlaubte SET-Variablen (Whitelist)\n ALLOWED_SET_VARS: list[str] = [\"FOREIGN_KEY_CHECKS\"]\n\n # Verbotene Patterns (kritisch!)\n FORBIDDEN_PATTERNS: list[str] = [\n r\"\\bDROP\\s+DATABASE\\b\",\n r\"\\bDROP\\s+SCHEMA\\b\",\n r\"\\bCREATE\\s+DATABASE\\b\",\n r\"\\bCREATE\\s+SCHEMA\\b\",\n r\"\\bGRANT\\b\",\n r\"\\bREVOKE\\b\",\n r\"\\bSHUTDOWN\\b\",\n ]\n\n # Verbotene Datenbanken (System-DBs)\n FORBIDDEN_DATABASES: list[str] = [\n \"information_schema\",\n \"mysql\",\n \"performance_schema\",\n \"sys\",\n ]\n\n @classmethod\n def validate_statement(\n cls, statement: str, database: str\n ) -> tuple[bool, str]:\n \"\"\"\n Validiert ein DDL\/DML Statement.\n\n Args:\n statement: SQL Statement\n database: Zieldatenbank\n\n Returns:\n Tuple (is_valid, error_message)\n \"\"\"\n if not statement or len(statement.strip()) < 5:\n return False, \"Statement must not be empty\"\n\n if len(statement) > Config.MAX_QUERY_LENGTH:\n return False, f\"Statement exceeds max length ({Config.MAX_QUERY_LENGTH})\"\n\n statement_upper = statement.strip().upper()\n\n # Prüfe ob Statement mit erlaubtem Keyword beginnt\n statement_type = cls._get_statement_type(statement_upper)\n if statement_type is None:\n return (\n False,\n f\"Statement type not allowed. Allowed: {', '.join(cls.ALLOWED_STATEMENTS)}\",\n )\n\n # Bei SET: nur erlaubte Variablen\n if statement_type == \"SET\":\n allowed = False\n for var in cls.ALLOWED_SET_VARS:\n if var in statement_upper:\n allowed = True\n break\n if not allowed:\n return (\n False,\n f\"SET variable not allowed. Allowed: {', '.join(cls.ALLOWED_SET_VARS)}\",\n )\n\n # Prüfe verbotene Patterns\n for pattern in cls.FORBIDDEN_PATTERNS:\n if re.search(pattern, statement_upper):\n return False, f\"Forbidden pattern detected: {pattern}\"\n\n # Prüfe Datenbank\n if database not in Config.ALLOWED_DATABASES:\n return (\n False,\n f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",\n )\n\n # Prüfe ob Statement gegen System-DB zielt (als Wort, nicht Substring)\n for forbidden_db in cls.FORBIDDEN_DATABASES:\n pattern = rf\"\\b{forbidden_db}\\b\"\n if re.search(pattern, statement_upper, re.IGNORECASE):\n return False, f\"Operations on '{forbidden_db}' are forbidden\"\n\n return True, \"\"\n\n @classmethod\n def _get_statement_type(cls, statement_upper: str) -> str | None:\n \"\"\"Extrahiert den Statement-Typ (ALTER, CREATE, etc.).\"\"\"\n for stmt_type in cls.ALLOWED_STATEMENTS:\n if statement_upper.startswith(stmt_type):\n return stmt_type\n return None\n"
}
}