Protokoll #28325

ID28325
Zeitstempel2025-12-28 13:12:32.086823
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens4,568 (Input: 1,349, Output: 3,219)
Dauer97 ms
Request-Zeit2025-12-28 13:12:32.086823
Response-Zeit2025-12-28 13:12:32.183449

Request

{
    "event": "PreToolUse",
    "tool_name": "Write",
    "tool_input": {
        "file_path": "\/var\/www\/mcp-servers\/mcp_db\/tools\/update_tool.py",
        "content": "\"\"\"Update Tool - Aktualisiert Datensaetze.\"\"\"\n\nimport re\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom config import Config\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\n\n\ndef _validate_identifier(name: str) -> bool:\n    \"\"\"Validiert Tabellen-\/Spaltennamen gegen SQL-Injection.\"\"\"\n    return bool(re.match(r\"^[a-zA-Z0-9_]+$\", name))\n\n\ndef register_update_tool(mcp) -> None:\n    \"\"\"Registriert db_update Tool.\"\"\"\n    logger = get_logger(\"mcp-db\", Config)\n\n    @mcp.tool()\n    def db_update(\n        table: str,\n        data: dict,\n        where: dict,\n        database: str = \"ki_dev\",\n    ) -> dict:\n        \"\"\"\n        Aktualisiert Datensaetze.\n\n        Args:\n            table: Zieltabelle\n            data: Dict mit Spalte:Wert Paaren (SET-Klausel)\n            where: Dict mit Spalte:Wert Paaren (WHERE-Klausel) - PFLICHT!\n            database: Zieldatenbank (ki_dev oder ki_content)\n\n        Returns:\n            Dict mit status, affected_rows, error\n        \"\"\"\n        start = time.time()\n\n        # Validierung: Tabellenname\n        if not _validate_identifier(table):\n            return {\n                \"status\": \"denied\",\n                \"error\": \"Invalid table name.\",\n            }\n\n        # Validierung: Datenbank\n        if database not in Config.ALLOWED_DATABASES:\n            return {\n                \"status\": \"denied\",\n                \"error\": f\"Database '{database}' not allowed.\",\n            }\n\n        # Validierung: Data nicht leer\n        if not data:\n            return {\n                \"status\": \"denied\",\n                \"error\": \"Data dict must not be empty.\",\n            }\n\n        # KRITISCH: WHERE ist Pflicht!\n        if not where:\n            return {\n                \"status\": \"denied\",\n                \"error\": \"WHERE clause is required. UPDATE without WHERE is forbidden.\",\n            }\n\n        # Validierung: Spaltennamen in data\n        for col in data:\n            if not _validate_identifier(col):\n                return {\n                    \"status\": \"denied\",\n                    \"error\": f\"Invalid column name in data: {col}\",\n                }\n\n        # Validierung: Spaltennamen in where\n        for col in where:\n            if not _validate_identifier(col):\n                return {\n                    \"status\": \"denied\",\n                    \"error\": f\"Invalid column name in where: {col}\",\n                }\n\n        try:\n            with DatabaseConnection.get_connection(database) as conn:\n                cursor = conn.cursor(buffered=True)\n\n                # Build SET clause\n                set_parts = [f\"`{col}` = %s\" for col in data]\n                set_clause = \", \".join(set_parts)\n\n                # Build WHERE clause\n                where_parts = [f\"`{col}` = %s\" for col in where]\n                where_clause = \" AND \".join(where_parts)\n\n                # Combine values: SET values first, then WHERE values\n                values = tuple(data.values()) + tuple(where.values())\n\n                query = f\"UPDATE `{table}` SET {set_clause} WHERE {where_clause}\"\n                cursor.execute(query, values)\n                affected_rows = cursor.rowcount\n                conn.commit()\n                cursor.close()\n\n                duration = int((time.time() - start) * MS_PER_SECOND)\n\n                # Log\n                try:\n                    logger.log(\n                        LogEntry(\n                            timestamp=datetime.now(),\n                            client_name=\"mcp-db\",\n                            tool_name=\"db_update\",\n                            request=f\"UPDATE {table} SET ... WHERE ...\",\n                            status=\"success\",\n                            duration_ms=duration,\n                        )\n                    )\n                except Exception:\n                    pass\n\n                return {\n                    \"status\": \"success\",\n                    \"table\": table,\n                    \"affected_rows\": affected_rows,\n                    \"execution_ms\": duration,\n                }\n\n        except Exception as e:\n            duration = int((time.time() - start) * MS_PER_SECOND)\n\n            try:\n                logger.log(\n                    LogEntry(\n                        timestamp=datetime.now(),\n                        client_name=\"mcp-db\",\n                        tool_name=\"db_update\",\n                        request=f\"UPDATE {table}\",\n                        status=\"error\",\n                        duration_ms=duration,\n                        error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],\n                    )\n                )\n            except Exception:\n                pass\n\n            return {\n                \"status\": \"error\",\n                \"error\": str(e)[:LOG_QUERY_MAX_LENGTH],\n                \"execution_ms\": duration,\n            }\n"
    }
}

Response

{
    "tool_response": {
        "type": "update",
        "filePath": "\/var\/www\/mcp-servers\/mcp_db\/tools\/update_tool.py",
        "content": "\"\"\"Update Tool - Aktualisiert Datensaetze.\"\"\"\n\nimport re\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom config import Config\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\n\n\ndef _validate_identifier(name: str) -> bool:\n    \"\"\"Validiert Tabellen-\/Spaltennamen gegen SQL-Injection.\"\"\"\n    return bool(re.match(r\"^[a-zA-Z0-9_]+$\", name))\n\n\ndef register_update_tool(mcp) -> None:\n    \"\"\"Registriert db_update Tool.\"\"\"\n    logger = get_logger(\"mcp-db\", Config)\n\n    @mcp.tool()\n    def db_update(\n        table: str,\n        data: dict,\n        where: dict,\n        database: str = \"ki_dev\",\n    ) -> dict:\n        \"\"\"\n        Aktualisiert Datensaetze.\n\n        Args:\n            table: Zieltabelle\n            data: Dict mit Spalte:Wert Paaren (SET-Klausel)\n            where: Dict mit Spalte:Wert Paaren (WHERE-Klausel) - PFLICHT!\n            database: Zieldatenbank (ki_dev oder ki_content)\n\n        Returns:\n            Dict mit status, affected_rows, error\n        \"\"\"\n        start = time.time()\n\n        # Validierung: Tabellenname\n        if not _validate_identifier(table):\n            return {\n                \"status\": \"denied\",\n                \"error\": \"Invalid table name.\",\n            }\n\n        # Validierung: Datenbank\n        if database not in Config.ALLOWED_DATABASES:\n            return {\n                \"status\": \"denied\",\n                \"error\": f\"Database '{database}' not allowed.\",\n            }\n\n        # Validierung: Data nicht leer\n        if not data:\n            return {\n                \"status\": \"denied\",\n                \"error\": \"Data dict must not be empty.\",\n            }\n\n        # KRITISCH: WHERE ist Pflicht!\n        if not where:\n            return {\n                \"status\": \"denied\",\n                \"error\": \"WHERE clause is required. UPDATE without WHERE is forbidden.\",\n            }\n\n        # Validierung: Spaltennamen in data\n        for col in data:\n            if not _validate_identifier(col):\n                return {\n                    \"status\": \"denied\",\n                    \"error\": f\"Invalid column name in data: {col}\",\n                }\n\n        # Validierung: Spaltennamen in where\n        for col in where:\n            if not _validate_identifier(col):\n                return {\n                    \"status\": \"denied\",\n                    \"error\": f\"Invalid column name in where: {col}\",\n                }\n\n        try:\n            with DatabaseConnection.get_connection(database) as conn:\n                cursor = conn.cursor(buffered=True)\n\n                # Build SET clause\n                set_parts = [f\"`{col}` = %s\" for col in data]\n                set_clause = \", \".join(set_parts)\n\n                # Build WHERE clause\n                where_parts = [f\"`{col}` = %s\" for col in where]\n                where_clause = \" AND \".join(where_parts)\n\n                # Combine values: SET values first, then WHERE values\n                values = tuple(data.values()) + tuple(where.values())\n\n                query = f\"UPDATE `{table}` SET {set_clause} WHERE {where_clause}\"\n                cursor.execute(query, values)\n                affected_rows = cursor.rowcount\n                conn.commit()\n                cursor.close()\n\n                duration = int((time.time() - start) * MS_PER_SECOND)\n\n                # Log\n                try:\n                    logger.log(\n                        LogEntry(\n                            timestamp=datetime.now(),\n                            client_name=\"mcp-db\",\n                            tool_name=\"db_update\",\n                            request=f\"UPDATE {table} SET ... WHERE ...\",\n                            status=\"success\",\n                            duration_ms=duration,\n                        )\n                    )\n                except Exception:\n                    pass\n\n                return {\n                    \"status\": \"success\",\n                    \"table\": table,\n                    \"affected_rows\": affected_rows,\n                    \"execution_ms\": duration,\n                }\n\n        except Exception as e:\n            duration = int((time.time() - start) * MS_PER_SECOND)\n\n            try:\n                logger.log(\n                    LogEntry(\n                        timestamp=datetime.now(),\n                        client_name=\"mcp-db\",\n                        tool_name=\"db_update\",\n                        request=f\"UPDATE {table}\",\n                        status=\"error\",\n                        duration_ms=duration,\n                        error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],\n                    )\n                )\n            except Exception:\n                pass\n\n            return {\n                \"status\": \"error\",\n                \"error\": str(e)[:LOG_QUERY_MAX_LENGTH],\n                \"execution_ms\": duration,\n            }\n",
        "structuredPatch": [
            {
                "oldStart": 1,
                "oldLines": 17,
                "newStart": 1,
                "newLines": 15,
                "lines": [
                    "-\"\"\"Update Tool - Aktualisiert Datensätze.\"\"\"",
                    "+\"\"\"Update Tool - Aktualisiert Datensaetze.\"\"\"",
                    "+",
                    " import re",
                    "-import sys",
                    " import time",
                    "+from datetime import datetime",
                    " ",
                    "-sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")",
                    "-sys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")",
                    "-",
                    " from constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND",
                    " ",
                    " from config import Config",
                    "-from domain.log_contract import LogEntry",
                    " from infrastructure.db_connection import DatabaseConnection",
                    "-from infrastructure.protokoll_logger import ProtokollLogger",
                    "+from shared.domain import LogEntry",
                    "+from shared.infrastructure import get_logger",
                    " ",
                    " ",
                    " def _validate_identifier(name: str) -> bool:"
                ]
            },
            {
                "oldStart": 21,
                "oldLines": 7,
                "newStart": 19,
                "newLines": 7,
                "lines": [
                    " ",
                    " def register_update_tool(mcp) -> None:",
                    "     \"\"\"Registriert db_update Tool.\"\"\"",
                    "-    logger = ProtokollLogger()",
                    "+    logger = get_logger(\"mcp-db\", Config)",
                    " ",
                    "     @mcp.tool()",
                    "     def db_update("
                ]
            },
            {
                "oldStart": 31,
                "oldLines": 7,
                "newStart": 29,
                "newLines": 7,
                "lines": [
                    "         database: str = \"ki_dev\",",
                    "     ) -> dict:",
                    "         \"\"\"",
                    "-        Aktualisiert Datensätze.",
                    "+        Aktualisiert Datensaetze.",
                    " ",
                    "         Args:",
                    "             table: Zieltabelle"
                ]
            },
            {
                "oldStart": 115,
                "oldLines": 6,
                "newStart": 113,
                "newLines": 9,
                "lines": [
                    "                 try:",
                    "                     logger.log(",
                    "                         LogEntry(",
                    "+                            timestamp=datetime.now(),",
                    "+                            client_name=\"mcp-db\",",
                    "+                            tool_name=\"db_update\",",
                    "                             request=f\"UPDATE {table} SET ... WHERE ...\",",
                    "                             status=\"success\",",
                    "                             duration_ms=duration,"
                ]
            },
            {
                "oldStart": 136,
                "oldLines": 6,
                "newStart": 137,
                "newLines": 9,
                "lines": [
                    "             try:",
                    "                 logger.log(",
                    "                     LogEntry(",
                    "+                        timestamp=datetime.now(),",
                    "+                        client_name=\"mcp-db\",",
                    "+                        tool_name=\"db_update\",",
                    "                         request=f\"UPDATE {table}\",",
                    "                         status=\"error\",",
                    "                         duration_ms=duration,"
                ]
            }
        ],
        "originalFile": "\"\"\"Update Tool - Aktualisiert Datensätze.\"\"\"\nimport re\nimport sys\nimport time\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom config import Config\nfrom domain.log_contract import LogEntry\nfrom infrastructure.db_connection import DatabaseConnection\nfrom infrastructure.protokoll_logger import ProtokollLogger\n\n\ndef _validate_identifier(name: str) -> bool:\n    \"\"\"Validiert Tabellen-\/Spaltennamen gegen SQL-Injection.\"\"\"\n    return bool(re.match(r\"^[a-zA-Z0-9_]+$\", name))\n\n\ndef register_update_tool(mcp) -> None:\n    \"\"\"Registriert db_update Tool.\"\"\"\n    logger = ProtokollLogger()\n\n    @mcp.tool()\n    def db_update(\n        table: str,\n        data: dict,\n        where: dict,\n        database: str = \"ki_dev\",\n    ) -> dict:\n        \"\"\"\n        Aktualisiert Datensätze.\n\n        Args:\n            table: Zieltabelle\n            data: Dict mit Spalte:Wert Paaren (SET-Klausel)\n            where: Dict mit Spalte:Wert Paaren (WHERE-Klausel) - PFLICHT!\n            database: Zieldatenbank (ki_dev oder ki_content)\n\n        Returns:\n            Dict mit status, affected_rows, error\n        \"\"\"\n        start = time.time()\n\n        # Validierung: Tabellenname\n        if not _validate_identifier(table):\n            return {\n                \"status\": \"denied\",\n                \"error\": \"Invalid table name.\",\n            }\n\n        # Validierung: Datenbank\n        if database not in Config.ALLOWED_DATABASES:\n            return {\n                \"status\": \"denied\",\n                \"error\": f\"Database '{database}' not allowed.\",\n            }\n\n        # Validierung: Data nicht leer\n        if not data:\n            return {\n                \"status\": \"denied\",\n                \"error\": \"Data dict must not be empty.\",\n            }\n\n        # KRITISCH: WHERE ist Pflicht!\n        if not where:\n            return {\n                \"status\": \"denied\",\n                \"error\": \"WHERE clause is required. UPDATE without WHERE is forbidden.\",\n            }\n\n        # Validierung: Spaltennamen in data\n        for col in data:\n            if not _validate_identifier(col):\n                return {\n                    \"status\": \"denied\",\n                    \"error\": f\"Invalid column name in data: {col}\",\n                }\n\n        # Validierung: Spaltennamen in where\n        for col in where:\n            if not _validate_identifier(col):\n                return {\n                    \"status\": \"denied\",\n                    \"error\": f\"Invalid column name in where: {col}\",\n                }\n\n        try:\n            with DatabaseConnection.get_connection(database) as conn:\n                cursor = conn.cursor(buffered=True)\n\n                # Build SET clause\n                set_parts = [f\"`{col}` = %s\" for col in data]\n                set_clause = \", \".join(set_parts)\n\n                # Build WHERE clause\n                where_parts = [f\"`{col}` = %s\" for col in where]\n                where_clause = \" AND \".join(where_parts)\n\n                # Combine values: SET values first, then WHERE values\n                values = tuple(data.values()) + tuple(where.values())\n\n                query = f\"UPDATE `{table}` SET {set_clause} WHERE {where_clause}\"\n                cursor.execute(query, values)\n                affected_rows = cursor.rowcount\n                conn.commit()\n                cursor.close()\n\n                duration = int((time.time() - start) * MS_PER_SECOND)\n\n                # Log\n                try:\n                    logger.log(\n                        LogEntry(\n                            request=f\"UPDATE {table} SET ... WHERE ...\",\n                            status=\"success\",\n                            duration_ms=duration,\n                        )\n                    )\n                except Exception:\n                    pass\n\n                return {\n                    \"status\": \"success\",\n                    \"table\": table,\n                    \"affected_rows\": affected_rows,\n                    \"execution_ms\": duration,\n                }\n\n        except Exception as e:\n            duration = int((time.time() - start) * MS_PER_SECOND)\n\n            try:\n                logger.log(\n                    LogEntry(\n                        request=f\"UPDATE {table}\",\n                        status=\"error\",\n                        duration_ms=duration,\n                        error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],\n                    )\n                )\n            except Exception:\n                pass\n\n            return {\n                \"status\": \"error\",\n                \"error\": str(e)[:LOG_QUERY_MAX_LENGTH],\n                \"execution_ms\": duration,\n            }\n"
    }
}
← Vorheriger Zur Liste Nächster →