Protokoll #28348

ID28348
Zeitstempel2025-12-28 13:14:44.883640
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens2,647 (Input: 737, Output: 1,910)
Dauer107 ms
Request-Zeit2025-12-28 13:14:44.883640
Response-Zeit2025-12-28 13:14:44.991105

Request

{
    "event": "PreToolUse",
    "tool_name": "Write",
    "tool_input": {
        "file_path": "\/var\/www\/mcp-servers\/mcp_db\/validators\/query_validator.py",
        "content": "\"\"\"Query Validator - SRP: Separate Validierungslogik\"\"\"\n\nimport re\nfrom typing import Tuple\n\nfrom config import Config\n\n\nclass QueryValidator:\n    \"\"\"Validiert SQL Queries - SRP: Nur Validierung\"\"\"\n\n    @staticmethod\n    def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:\n        \"\"\"\n        Validiert eine Query gegen alle Sicherheitsregeln.\n\n        Returns:\n            (is_valid, error_message)\n        \"\"\"\n        # Basis-Validierung\n        if not query or len(query) < 1:\n            return False, \"Query must not be empty\"\n\n        if len(query) > Config.MAX_QUERY_LENGTH:\n            return False, f\"Query must be max {Config.MAX_QUERY_LENGTH} chars\"\n\n        # Nur SELECT erlaubt\n        query_upper = query.strip().upper()\n        if not query_upper.startswith(\"SELECT\"):\n            return False, \"Only SELECT queries allowed\"\n\n        # Dangerous Keyword Blocklist\n        for keyword in Config.BLOCKED_KEYWORDS:\n            # Pruefe Keyword mit Word Boundaries\n            pattern = r\"\\b\" + re.escape(keyword) + r\"\\b\"\n            if re.search(pattern, query_upper):\n                return False, f\"Blocked keyword detected: {keyword}\"\n\n        # Database Allowlist\n        if database not in Config.ALLOWED_DATABASES:\n            return (\n                False,\n                f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",\n            )\n\n        # Max Rows pruefen\n        if max_rows < 1 or max_rows > Config.MAX_ROWS:\n            return False, f\"max_rows must be 1-{Config.MAX_ROWS}\"\n\n        # Table Allowlist (information_schema Schutz)\n        from_tables = QueryValidator._extract_table_names(query_upper)\n        for table in from_tables:\n            # Erlaube information_schema.TABLES fuer Schema-Tool\n            if \"INFORMATION_SCHEMA\" in table:\n                continue\n            # Pruefe gegen Allowlist\n            table_clean = table.split(\".\")[-1]\n            if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:\n                return (\n                    False,\n                    f\"Table '{table}' not allowed. Allowed: {', '.join(Config.ALLOWED_TABLES)}\",\n                )\n\n        return True, \"\"\n\n    @staticmethod\n    def _extract_table_names(query_upper: str) -> list:\n        \"\"\"Extrahiert Tabellennamen aus FROM und JOIN Clauses\"\"\"\n        tables = []\n\n        # FROM table, JOIN table\n        from_match = re.findall(r\"\\bFROM\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n        join_match = re.findall(r\"\\bJOIN\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n\n        tables.extend(from_match)\n        tables.extend(join_match)\n\n        return tables\n"
    }
}

Response

{
    "tool_response": {
        "type": "update",
        "filePath": "\/var\/www\/mcp-servers\/mcp_db\/validators\/query_validator.py",
        "content": "\"\"\"Query Validator - SRP: Separate Validierungslogik\"\"\"\n\nimport re\nfrom typing import Tuple\n\nfrom config import Config\n\n\nclass QueryValidator:\n    \"\"\"Validiert SQL Queries - SRP: Nur Validierung\"\"\"\n\n    @staticmethod\n    def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:\n        \"\"\"\n        Validiert eine Query gegen alle Sicherheitsregeln.\n\n        Returns:\n            (is_valid, error_message)\n        \"\"\"\n        # Basis-Validierung\n        if not query or len(query) < 1:\n            return False, \"Query must not be empty\"\n\n        if len(query) > Config.MAX_QUERY_LENGTH:\n            return False, f\"Query must be max {Config.MAX_QUERY_LENGTH} chars\"\n\n        # Nur SELECT erlaubt\n        query_upper = query.strip().upper()\n        if not query_upper.startswith(\"SELECT\"):\n            return False, \"Only SELECT queries allowed\"\n\n        # Dangerous Keyword Blocklist\n        for keyword in Config.BLOCKED_KEYWORDS:\n            # Pruefe Keyword mit Word Boundaries\n            pattern = r\"\\b\" + re.escape(keyword) + r\"\\b\"\n            if re.search(pattern, query_upper):\n                return False, f\"Blocked keyword detected: {keyword}\"\n\n        # Database Allowlist\n        if database not in Config.ALLOWED_DATABASES:\n            return (\n                False,\n                f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",\n            )\n\n        # Max Rows pruefen\n        if max_rows < 1 or max_rows > Config.MAX_ROWS:\n            return False, f\"max_rows must be 1-{Config.MAX_ROWS}\"\n\n        # Table Allowlist (information_schema Schutz)\n        from_tables = QueryValidator._extract_table_names(query_upper)\n        for table in from_tables:\n            # Erlaube information_schema.TABLES fuer Schema-Tool\n            if \"INFORMATION_SCHEMA\" in table:\n                continue\n            # Pruefe gegen Allowlist\n            table_clean = table.split(\".\")[-1]\n            if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:\n                return (\n                    False,\n                    f\"Table '{table}' not allowed. Allowed: {', '.join(Config.ALLOWED_TABLES)}\",\n                )\n\n        return True, \"\"\n\n    @staticmethod\n    def _extract_table_names(query_upper: str) -> list:\n        \"\"\"Extrahiert Tabellennamen aus FROM und JOIN Clauses\"\"\"\n        tables = []\n\n        # FROM table, JOIN table\n        from_match = re.findall(r\"\\bFROM\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n        join_match = re.findall(r\"\\bJOIN\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n\n        tables.extend(from_match)\n        tables.extend(join_match)\n\n        return tables\n",
        "structuredPatch": [
            {
                "oldStart": 1,
                "oldLines": 9,
                "newStart": 1,
                "newLines": 8,
                "lines": [
                    " \"\"\"Query Validator - SRP: Separate Validierungslogik\"\"\"",
                    "+",
                    " import re",
                    "-import sys",
                    " from typing import Tuple",
                    " ",
                    "-sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")",
                    " from config import Config",
                    " ",
                    " "
                ]
            },
            {
                "oldStart": 32,
                "oldLines": 7,
                "newStart": 31,
                "newLines": 7,
                "lines": [
                    " ",
                    "         # Dangerous Keyword Blocklist",
                    "         for keyword in Config.BLOCKED_KEYWORDS:",
                    "-            # Prüfe Keyword mit Word Boundaries",
                    "+            # Pruefe Keyword mit Word Boundaries",
                    "             pattern = r\"\\b\" + re.escape(keyword) + r\"\\b\"",
                    "             if re.search(pattern, query_upper):",
                    "                 return False, f\"Blocked keyword detected: {keyword}\""
                ]
            },
            {
                "oldStart": 44,
                "oldLines": 17,
                "newStart": 43,
                "newLines": 17,
                "lines": [
                    "                 f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",",
                    "             )",
                    " ",
                    "-        # Max Rows prüfen",
                    "+        # Max Rows pruefen",
                    "         if max_rows < 1 or max_rows > Config.MAX_ROWS:",
                    "             return False, f\"max_rows must be 1-{Config.MAX_ROWS}\"",
                    " ",
                    "         # Table Allowlist (information_schema Schutz)",
                    "         from_tables = QueryValidator._extract_table_names(query_upper)",
                    "         for table in from_tables:",
                    "-            # Erlaube information_schema.TABLES für Schema-Tool",
                    "+            # Erlaube information_schema.TABLES fuer Schema-Tool",
                    "             if \"INFORMATION_SCHEMA\" in table:",
                    "                 continue",
                    "-            # Prüfe gegen Allowlist",
                    "+            # Pruefe gegen Allowlist",
                    "             table_clean = table.split(\".\")[-1]",
                    "             if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:",
                    "                 return ("
                ]
            }
        ],
        "originalFile": "\"\"\"Query Validator - SRP: Separate Validierungslogik\"\"\"\nimport re\nimport sys\nfrom typing import Tuple\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")\nfrom config import Config\n\n\nclass QueryValidator:\n    \"\"\"Validiert SQL Queries - SRP: Nur Validierung\"\"\"\n\n    @staticmethod\n    def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:\n        \"\"\"\n        Validiert eine Query gegen alle Sicherheitsregeln.\n\n        Returns:\n            (is_valid, error_message)\n        \"\"\"\n        # Basis-Validierung\n        if not query or len(query) < 1:\n            return False, \"Query must not be empty\"\n\n        if len(query) > Config.MAX_QUERY_LENGTH:\n            return False, f\"Query must be max {Config.MAX_QUERY_LENGTH} chars\"\n\n        # Nur SELECT erlaubt\n        query_upper = query.strip().upper()\n        if not query_upper.startswith(\"SELECT\"):\n            return False, \"Only SELECT queries allowed\"\n\n        # Dangerous Keyword Blocklist\n        for keyword in Config.BLOCKED_KEYWORDS:\n            # Prüfe Keyword mit Word Boundaries\n            pattern = r\"\\b\" + re.escape(keyword) + r\"\\b\"\n            if re.search(pattern, query_upper):\n                return False, f\"Blocked keyword detected: {keyword}\"\n\n        # Database Allowlist\n        if database not in Config.ALLOWED_DATABASES:\n            return (\n                False,\n                f\"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}\",\n            )\n\n        # Max Rows prüfen\n        if max_rows < 1 or max_rows > Config.MAX_ROWS:\n            return False, f\"max_rows must be 1-{Config.MAX_ROWS}\"\n\n        # Table Allowlist (information_schema Schutz)\n        from_tables = QueryValidator._extract_table_names(query_upper)\n        for table in from_tables:\n            # Erlaube information_schema.TABLES für Schema-Tool\n            if \"INFORMATION_SCHEMA\" in table:\n                continue\n            # Prüfe gegen Allowlist\n            table_clean = table.split(\".\")[-1]\n            if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:\n                return (\n                    False,\n                    f\"Table '{table}' not allowed. Allowed: {', '.join(Config.ALLOWED_TABLES)}\",\n                )\n\n        return True, \"\"\n\n    @staticmethod\n    def _extract_table_names(query_upper: str) -> list:\n        \"\"\"Extrahiert Tabellennamen aus FROM und JOIN Clauses\"\"\"\n        tables = []\n\n        # FROM table, JOIN table\n        from_match = re.findall(r\"\\bFROM\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n        join_match = re.findall(r\"\\bJOIN\\s+([a-zA-Z0-9_\\.]+)\", query_upper)\n\n        tables.extend(from_match)\n        tables.extend(join_match)\n\n        return tables\n"
    }
}
← Vorheriger Zur Liste Nächster →