{
"event": "PreToolUse",
"tool_name": "Edit",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp-db\/tools\/insert_tool.py",
"old_string": "error_message=str(e)[:100],",
"new_string": "error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],"
}
}
{
"tool_response": {
"filePath": "\/var\/www\/mcp-servers\/mcp-db\/tools\/insert_tool.py",
"oldString": "error_message=str(e)[:100],",
"newString": "error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],",
"originalFile": "\"\"\"Insert Tool - Fügt Datensätze ein.\"\"\"\nimport re\nimport sys\nimport time\n\nsys.path.insert(0, \"\/opt\/mcp-servers\/mcp-db\")\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom config import Config\nfrom domain.log_contract import LogEntry\nfrom infrastructure.db_connection import DatabaseConnection\nfrom infrastructure.protokoll_logger import ProtokollLogger\n\n\ndef _validate_identifier(name: str) -> bool:\n \"\"\"Validiert Tabellen-\/Spaltennamen gegen SQL-Injection.\"\"\"\n return bool(re.match(r\"^[a-zA-Z0-9_]+$\", name))\n\n\ndef register_insert_tool(mcp) -> None:\n \"\"\"Registriert db_insert Tool.\"\"\"\n logger = ProtokollLogger()\n\n @mcp.tool()\n def db_insert(\n table: str,\n data: dict,\n database: str = \"ki_dev\",\n ) -> dict:\n \"\"\"\n Fügt einen Datensatz ein.\n\n Args:\n table: Zieltabelle\n data: Dict mit Spalte:Wert Paaren\n database: Zieldatenbank (ki_dev oder ki_content)\n\n Returns:\n Dict mit status, inserted_id, error\n \"\"\"\n start = time.time()\n\n # Validierung: Tabellenname\n if not _validate_identifier(table):\n return {\n \"status\": \"denied\",\n \"error\": \"Invalid table name.\",\n }\n\n # Validierung: Datenbank\n if database not in Config.ALLOWED_DATABASES:\n return {\n \"status\": \"denied\",\n \"error\": f\"Database '{database}' not allowed.\",\n }\n\n # Validierung: Data nicht leer\n if not data:\n return {\n \"status\": \"denied\",\n \"error\": \"Data dict must not be empty.\",\n }\n\n # Validierung: Spaltennamen\n for col in data:\n if not _validate_identifier(col):\n return {\n \"status\": \"denied\",\n \"error\": f\"Invalid column name: {col}\",\n }\n\n try:\n with DatabaseConnection.get_connection(database) as conn:\n cursor = conn.cursor(buffered=True)\n\n # Build parameterized INSERT\n columns = list(data.keys())\n placeholders = \", \".join([\"%s\"] * len(columns))\n columns_str = \", \".join([f\"`{c}`\" for c in columns])\n values = tuple(data.values())\n\n query = f\"INSERT INTO `{table}` ({columns_str}) VALUES ({placeholders})\"\n cursor.execute(query, values)\n inserted_id = cursor.lastrowid\n conn.commit()\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n # Log\n try:\n logger.log(\n LogEntry(\n request=f\"INSERT INTO {table} ({columns_str})\",\n status=\"success\",\n duration_ms=duration,\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"success\",\n \"table\": table,\n \"inserted_id\": inserted_id,\n \"execution_ms\": duration,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n try:\n logger.log(\n LogEntry(\n request=f\"INSERT INTO {table}\",\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:100],\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"error\",\n \"error\": str(e)[:200],\n \"execution_ms\": duration,\n }\n",
"structuredPatch": [
{
"oldStart": 117,
"oldLines": 7,
"newStart": 117,
"newLines": 7,
"lines": [
" request=f\"INSERT INTO {table}\",",
" status=\"error\",",
" duration_ms=duration,",
"- error_message=str(e)[:100],",
"+ error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],",
" )",
" )",
" except Exception:"
]
}
],
"userModified": false,
"replaceAll": false
}
}