{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp_db\/tools\/update_tool.py",
"content": "\"\"\"Update Tool - Aktualisiert Datensaetze.\"\"\"\n\nimport re\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom config import Config\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\n\n\ndef _validate_identifier(name: str) -> bool:\n \"\"\"Validiert Tabellen-\/Spaltennamen gegen SQL-Injection.\"\"\"\n return bool(re.match(r\"^[a-zA-Z0-9_]+$\", name))\n\n\ndef register_update_tool(mcp) -> None:\n \"\"\"Registriert db_update Tool.\"\"\"\n logger = get_logger(\"mcp-db\", Config)\n\n @mcp.tool()\n def db_update(\n table: str,\n data: dict,\n where: dict,\n database: str = \"ki_dev\",\n ) -> dict:\n \"\"\"\n Aktualisiert Datensaetze.\n\n Args:\n table: Zieltabelle\n data: Dict mit Spalte:Wert Paaren (SET-Klausel)\n where: Dict mit Spalte:Wert Paaren (WHERE-Klausel) - PFLICHT!\n database: Zieldatenbank (ki_dev oder ki_content)\n\n Returns:\n Dict mit status, affected_rows, error\n \"\"\"\n start = time.time()\n\n # Validierung: Tabellenname\n if not _validate_identifier(table):\n return {\n \"status\": \"denied\",\n \"error\": \"Invalid table name.\",\n }\n\n # Validierung: Datenbank\n if database not in Config.ALLOWED_DATABASES:\n return {\n \"status\": \"denied\",\n \"error\": f\"Database '{database}' not allowed.\",\n }\n\n # Validierung: Data nicht leer\n if not data:\n return {\n \"status\": \"denied\",\n \"error\": \"Data dict must not be empty.\",\n }\n\n # KRITISCH: WHERE ist Pflicht!\n if not where:\n return {\n \"status\": \"denied\",\n \"error\": \"WHERE clause is required. UPDATE without WHERE is forbidden.\",\n }\n\n # Validierung: Spaltennamen in data\n for col in data:\n if not _validate_identifier(col):\n return {\n \"status\": \"denied\",\n \"error\": f\"Invalid column name in data: {col}\",\n }\n\n # Validierung: Spaltennamen in where\n for col in where:\n if not _validate_identifier(col):\n return {\n \"status\": \"denied\",\n \"error\": f\"Invalid column name in where: {col}\",\n }\n\n try:\n with DatabaseConnection.get_connection(database) as conn:\n cursor = conn.cursor(buffered=True)\n\n # Build SET clause\n set_parts = [f\"`{col}` = %s\" for col in data]\n set_clause = \", \".join(set_parts)\n\n # Build WHERE clause\n where_parts = [f\"`{col}` = %s\" for col in where]\n where_clause = \" AND \".join(where_parts)\n\n # Combine values: SET values first, then WHERE values\n values = tuple(data.values()) + tuple(where.values())\n\n query = f\"UPDATE `{table}` SET {set_clause} WHERE {where_clause}\"\n cursor.execute(query, values)\n affected_rows = cursor.rowcount\n conn.commit()\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n # Log\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_update\",\n request=f\"UPDATE {table} SET ... WHERE ...\",\n status=\"success\",\n duration_ms=duration,\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"success\",\n \"table\": table,\n \"affected_rows\": affected_rows,\n \"execution_ms\": duration,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_update\",\n request=f\"UPDATE {table}\",\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"error\",\n \"error\": str(e)[:LOG_QUERY_MAX_LENGTH],\n \"execution_ms\": duration,\n }\n"
}
}
{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/mcp-servers\/mcp_db\/tools\/update_tool.py",
"content": "\"\"\"Update Tool - Aktualisiert Datensaetze.\"\"\"\n\nimport re\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom config import Config\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\n\n\ndef _validate_identifier(name: str) -> bool:\n \"\"\"Validiert Tabellen-\/Spaltennamen gegen SQL-Injection.\"\"\"\n return bool(re.match(r\"^[a-zA-Z0-9_]+$\", name))\n\n\ndef register_update_tool(mcp) -> None:\n \"\"\"Registriert db_update Tool.\"\"\"\n logger = get_logger(\"mcp-db\", Config)\n\n @mcp.tool()\n def db_update(\n table: str,\n data: dict,\n where: dict,\n database: str = \"ki_dev\",\n ) -> dict:\n \"\"\"\n Aktualisiert Datensaetze.\n\n Args:\n table: Zieltabelle\n data: Dict mit Spalte:Wert Paaren (SET-Klausel)\n where: Dict mit Spalte:Wert Paaren (WHERE-Klausel) - PFLICHT!\n database: Zieldatenbank (ki_dev oder ki_content)\n\n Returns:\n Dict mit status, affected_rows, error\n \"\"\"\n start = time.time()\n\n # Validierung: Tabellenname\n if not _validate_identifier(table):\n return {\n \"status\": \"denied\",\n \"error\": \"Invalid table name.\",\n }\n\n # Validierung: Datenbank\n if database not in Config.ALLOWED_DATABASES:\n return {\n \"status\": \"denied\",\n \"error\": f\"Database '{database}' not allowed.\",\n }\n\n # Validierung: Data nicht leer\n if not data:\n return {\n \"status\": \"denied\",\n \"error\": \"Data dict must not be empty.\",\n }\n\n # KRITISCH: WHERE ist Pflicht!\n if not where:\n return {\n \"status\": \"denied\",\n \"error\": \"WHERE clause is required. UPDATE without WHERE is forbidden.\",\n }\n\n # Validierung: Spaltennamen in data\n for col in data:\n if not _validate_identifier(col):\n return {\n \"status\": \"denied\",\n \"error\": f\"Invalid column name in data: {col}\",\n }\n\n # Validierung: Spaltennamen in where\n for col in where:\n if not _validate_identifier(col):\n return {\n \"status\": \"denied\",\n \"error\": f\"Invalid column name in where: {col}\",\n }\n\n try:\n with DatabaseConnection.get_connection(database) as conn:\n cursor = conn.cursor(buffered=True)\n\n # Build SET clause\n set_parts = [f\"`{col}` = %s\" for col in data]\n set_clause = \", \".join(set_parts)\n\n # Build WHERE clause\n where_parts = [f\"`{col}` = %s\" for col in where]\n where_clause = \" AND \".join(where_parts)\n\n # Combine values: SET values first, then WHERE values\n values = tuple(data.values()) + tuple(where.values())\n\n query = f\"UPDATE `{table}` SET {set_clause} WHERE {where_clause}\"\n cursor.execute(query, values)\n affected_rows = cursor.rowcount\n conn.commit()\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n # Log\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_update\",\n request=f\"UPDATE {table} SET ... WHERE ...\",\n status=\"success\",\n duration_ms=duration,\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"success\",\n \"table\": table,\n \"affected_rows\": affected_rows,\n \"execution_ms\": duration,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_update\",\n request=f\"UPDATE {table}\",\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"error\",\n \"error\": str(e)[:LOG_QUERY_MAX_LENGTH],\n \"execution_ms\": duration,\n }\n",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 17,
"newStart": 1,
"newLines": 15,
"lines": [
"-\"\"\"Update Tool - Aktualisiert Datensätze.\"\"\"",
"+\"\"\"Update Tool - Aktualisiert Datensaetze.\"\"\"",
"+",
" import re",
"-import sys",
" import time",
"+from datetime import datetime",
" ",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")",
"-",
" from constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND",
" ",
" from config import Config",
"-from domain.log_contract import LogEntry",
" from infrastructure.db_connection import DatabaseConnection",
"-from infrastructure.protokoll_logger import ProtokollLogger",
"+from shared.domain import LogEntry",
"+from shared.infrastructure import get_logger",
" ",
" ",
" def _validate_identifier(name: str) -> bool:"
]
},
{
"oldStart": 21,
"oldLines": 7,
"newStart": 19,
"newLines": 7,
"lines": [
" ",
" def register_update_tool(mcp) -> None:",
" \"\"\"Registriert db_update Tool.\"\"\"",
"- logger = ProtokollLogger()",
"+ logger = get_logger(\"mcp-db\", Config)",
" ",
" @mcp.tool()",
" def db_update("
]
},
{
"oldStart": 31,
"oldLines": 7,
"newStart": 29,
"newLines": 7,
"lines": [
" database: str = \"ki_dev\",",
" ) -> dict:",
" \"\"\"",
"- Aktualisiert Datensätze.",
"+ Aktualisiert Datensaetze.",
" ",
" Args:",
" table: Zieltabelle"
]
},
{
"oldStart": 115,
"oldLines": 6,
"newStart": 113,
"newLines": 9,
"lines": [
" try:",
" logger.log(",
" LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-db\",",
"+ tool_name=\"db_update\",",
" request=f\"UPDATE {table} SET ... WHERE ...\",",
" status=\"success\",",
" duration_ms=duration,"
]
},
{
"oldStart": 136,
"oldLines": 6,
"newStart": 137,
"newLines": 9,
"lines": [
" try:",
" logger.log(",
" LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-db\",",
"+ tool_name=\"db_update\",",
" request=f\"UPDATE {table}\",",
" status=\"error\",",
" duration_ms=duration,"
]
}
],
"originalFile": "\"\"\"Update Tool - Aktualisiert Datensätze.\"\"\"\nimport re\nimport sys\nimport time\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom config import Config\nfrom domain.log_contract import LogEntry\nfrom infrastructure.db_connection import DatabaseConnection\nfrom infrastructure.protokoll_logger import ProtokollLogger\n\n\ndef _validate_identifier(name: str) -> bool:\n \"\"\"Validiert Tabellen-\/Spaltennamen gegen SQL-Injection.\"\"\"\n return bool(re.match(r\"^[a-zA-Z0-9_]+$\", name))\n\n\ndef register_update_tool(mcp) -> None:\n \"\"\"Registriert db_update Tool.\"\"\"\n logger = ProtokollLogger()\n\n @mcp.tool()\n def db_update(\n table: str,\n data: dict,\n where: dict,\n database: str = \"ki_dev\",\n ) -> dict:\n \"\"\"\n Aktualisiert Datensätze.\n\n Args:\n table: Zieltabelle\n data: Dict mit Spalte:Wert Paaren (SET-Klausel)\n where: Dict mit Spalte:Wert Paaren (WHERE-Klausel) - PFLICHT!\n database: Zieldatenbank (ki_dev oder ki_content)\n\n Returns:\n Dict mit status, affected_rows, error\n \"\"\"\n start = time.time()\n\n # Validierung: Tabellenname\n if not _validate_identifier(table):\n return {\n \"status\": \"denied\",\n \"error\": \"Invalid table name.\",\n }\n\n # Validierung: Datenbank\n if database not in Config.ALLOWED_DATABASES:\n return {\n \"status\": \"denied\",\n \"error\": f\"Database '{database}' not allowed.\",\n }\n\n # Validierung: Data nicht leer\n if not data:\n return {\n \"status\": \"denied\",\n \"error\": \"Data dict must not be empty.\",\n }\n\n # KRITISCH: WHERE ist Pflicht!\n if not where:\n return {\n \"status\": \"denied\",\n \"error\": \"WHERE clause is required. UPDATE without WHERE is forbidden.\",\n }\n\n # Validierung: Spaltennamen in data\n for col in data:\n if not _validate_identifier(col):\n return {\n \"status\": \"denied\",\n \"error\": f\"Invalid column name in data: {col}\",\n }\n\n # Validierung: Spaltennamen in where\n for col in where:\n if not _validate_identifier(col):\n return {\n \"status\": \"denied\",\n \"error\": f\"Invalid column name in where: {col}\",\n }\n\n try:\n with DatabaseConnection.get_connection(database) as conn:\n cursor = conn.cursor(buffered=True)\n\n # Build SET clause\n set_parts = [f\"`{col}` = %s\" for col in data]\n set_clause = \", \".join(set_parts)\n\n # Build WHERE clause\n where_parts = [f\"`{col}` = %s\" for col in where]\n where_clause = \" AND \".join(where_parts)\n\n # Combine values: SET values first, then WHERE values\n values = tuple(data.values()) + tuple(where.values())\n\n query = f\"UPDATE `{table}` SET {set_clause} WHERE {where_clause}\"\n cursor.execute(query, values)\n affected_rows = cursor.rowcount\n conn.commit()\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n # Log\n try:\n logger.log(\n LogEntry(\n request=f\"UPDATE {table} SET ... WHERE ...\",\n status=\"success\",\n duration_ms=duration,\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"success\",\n \"table\": table,\n \"affected_rows\": affected_rows,\n \"execution_ms\": duration,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n try:\n logger.log(\n LogEntry(\n request=f\"UPDATE {table}\",\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"error\",\n \"error\": str(e)[:LOG_QUERY_MAX_LENGTH],\n \"execution_ms\": duration,\n }\n"
}
}