{
"event": "PreToolUse",
"tool_name": "Edit",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp_db\/tools\/execute_tool.py",
"old_string": "from constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND",
"new_string": "from shared.constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND"
}
}
{
"tool_response": {
"filePath": "\/var\/www\/mcp-servers\/mcp_db\/tools\/execute_tool.py",
"oldString": "from constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND",
"newString": "from shared.constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND",
"originalFile": "\"\"\"Execute Tool - Fuehrt DDL\/DML Statements aus.\"\"\"\n\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom domain.execute_contract import ExecuteRequest, ExecuteResponse, ExecuteStatus\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\nfrom config import Config\nfrom validators.execute_validator import ExecuteValidator\n\n\nclass ExecuteExecutor:\n \"\"\"Fuehrt validierte DDL\/DML Statements aus.\"\"\"\n\n @staticmethod\n def execute(request: ExecuteRequest) -> ExecuteResponse:\n \"\"\"Fuehrt ein validiertes Statement aus.\"\"\"\n start = time.time()\n\n try:\n with DatabaseConnection.get_connection(request.database) as conn:\n cursor = conn.cursor(buffered=True)\n cursor.execute(request.statement, request.params)\n affected_rows = cursor.rowcount\n conn.commit()\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return ExecuteResponse(\n status=ExecuteStatus.SUCCESS,\n affected_rows=affected_rows,\n execution_ms=duration,\n )\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return ExecuteResponse(\n status=ExecuteStatus.ERROR,\n error=str(e)[:LOG_QUERY_MAX_LENGTH],\n execution_ms=duration,\n )\n\n\ndef register_execute_tool(mcp) -> None:\n \"\"\"Registriert db_execute Tool.\"\"\"\n logger = get_logger(\"mcp-db\", Config)\n\n @mcp.tool()\n def db_execute(\n statement: str,\n database: str = \"ki_dev\",\n params: list | None = None,\n ) -> dict:\n \"\"\"\n Fuehrt DDL-Statements aus (ALTER, CREATE, DROP, TRUNCATE).\n\n Args:\n statement: SQL DDL Statement\n database: Zieldatenbank (ki_dev oder ki_content)\n params: Optional - Parameter fuer Prepared Statements\n\n Returns:\n Dict mit status, affected_rows, error, execution_ms\n \"\"\"\n params_tuple = tuple(params) if params else None\n\n # 1. Validierung\n valid, error = ExecuteValidator.validate_statement(statement, database)\n\n if not valid:\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_execute\",\n request=f\"EXECUTE: {statement[:150]}\",\n status=\"denied\",\n duration_ms=0,\n error_message=error,\n )\n )\n except Exception:\n pass\n\n return ExecuteResponse(status=ExecuteStatus.DENIED, error=error).to_dict()\n\n # 2. Request erstellen\n request = ExecuteRequest(\n statement=statement,\n database=database,\n params=params_tuple,\n )\n\n # 3. Ausfuehrung\n response = ExecuteExecutor.execute(request)\n\n # 4. Logging\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_execute\",\n request=f\"EXECUTE: {statement[:150]}\",\n status=response.status.value,\n duration_ms=int(response.execution_ms),\n error_message=response.error,\n )\n )\n except Exception:\n pass\n\n return response.to_dict()\n",
"structuredPatch": [
{
"oldStart": 3,
"oldLines": 7,
"newStart": 3,
"newLines": 7,
"lines": [
" import time",
" from datetime import datetime",
" ",
"-from constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND",
"+from shared.constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND",
" ",
" from domain.execute_contract import ExecuteRequest, ExecuteResponse, ExecuteStatus",
" from infrastructure.db_connection import DatabaseConnection"
]
}
],
"userModified": false,
"replaceAll": false
}
}