{
"event": "PostToolUse",
"tool_name": "Read",
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/mcp-servers\/mcp_db\/validators\/execute_validator.py",
"content": "\"\"\"Execute Validator - Validierung fuer DDL\/DML Statements.\"\"\"\n\nimport re\n\nfrom config import Config\n\n\nclass ExecuteValidator:\n \"\"\"Validiert DDL\/DML Statements gegen Sicherheitsregeln.\"\"\"\n\n # Erlaubte Statement-Typen\n ALLOWED_STATEMENTS: list[str] = [\"ALTER\", \"CREATE\", \"DROP\", \"TRUNCATE\", \"SET\"]\n\n # Erlaubte SET-Variablen (Whitelist)\n ALLOWED_SET_VARS: list[str] = [\"FOREIGN_KEY_CHECKS\"]\n\n # Verbotene Patterns (kritisch!)\n FORBIDDEN_PATTERNS: list[str] = [\n r\"\\bDROP\\s+DATABASE\\b\",\n r\"\\bDROP\\s+SCHEMA\\b\",\n r\"\\bCREATE\\s+DATABASE\\b\",\n r\"\\bCREATE\\s+SCHEMA\\b\",\n r\"\\bGRANT\\b\",\n r\"\\bREVOKE\\b\",\n r\"\\bSHUTDOWN\\b\",\n ]\n\n # Verbotene Datenbanken (System-DBs)\n FORBIDDEN_DATABASES: list[str] = [\n \"information_schema\",\n \"mysql\",\n \"performance_schema\",\n \"sys\",\n ]\n\n @classmethod\n def validate_statement(\n cls, statement: str, database: str\n ) -> tuple[bool, str]:\n \"\"\"\n Validiert ein DDL\/DML Statement.\n\n Args:\n statement: SQL Statement\n database: Zieldatenbank\n\n Returns:\n Tuple (is_valid, error_message)\n \"\"\"\n if not statement or len(statement.strip()) < 5:\n return False, \"Statement must not be empty\"\n\n if len(statement) > Config.MAX_QUERY_LENGTH:\n return False, f\"Statement exceeds max length ({Config.MAX_QUERY_LENGTH})\"\n\n statement_upper = statement.strip().upper()\n\n # Pruefe ob Statement mit erlaubtem Keyword beginnt\n statement_type = cls._get_statement_type(statement_upper)\n if statement_type is None:\n return (\n False,\n f\"Statement type not allowed. Allowed: {', '.join(cls.ALLOWED_STATEMENTS)}\",\n )\n\n # Bei SET: nur erlaubte Variablen\n if statement_type == \"SET\":\n allowed = False\n for var in cls.ALLOWED_SET_VARS:\n if var in statement_upper:\n allowed = True\n break\n if not allowed:\n return (\n False,\n f\"SET variable not allowed. Allowed: {', '.join(cls.ALLOWED_SET_VARS)}\",\n )\n\n # Pruefe verbotene Patterns\n for pattern in cls.FORBIDDEN_PATTERNS:\n if re.search(pattern, statement_upper):\n return False, f\"Forbidden pattern detected: {pattern}\"\n\n # Pruefe Datenbank\n if database not in Config.ALLOWED_DATABASES:\n return (\n False,\n f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",\n )\n\n # Pruefe ob Statement gegen System-DB zielt (als Wort, nicht Substring)\n for forbidden_db in cls.FORBIDDEN_DATABASES:\n pattern = rf\"\\b{forbidden_db}\\b\"\n if re.search(pattern, statement_upper, re.IGNORECASE):\n return False, f\"Operations on '{forbidden_db}' are forbidden\"\n\n return True, \"\"\n\n @classmethod\n def _get_statement_type(cls, statement_upper: str) -> str | None:\n \"\"\"Extrahiert den Statement-Typ (ALTER, CREATE, etc.).\"\"\"\n for stmt_type in cls.ALLOWED_STATEMENTS:\n if statement_upper.startswith(stmt_type):\n return stmt_type\n return None\n",
"numLines": 106,
"startLine": 1,
"totalLines": 106
}
}
}