{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp_db\/tools\/execute_tool.py",
"content": "\"\"\"Execute Tool - Fuehrt DDL\/DML Statements aus.\"\"\"\n\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom domain.execute_contract import ExecuteRequest, ExecuteResponse, ExecuteStatus\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\nfrom config import Config\nfrom validators.execute_validator import ExecuteValidator\n\n\nclass ExecuteExecutor:\n \"\"\"Fuehrt validierte DDL\/DML Statements aus.\"\"\"\n\n @staticmethod\n def execute(request: ExecuteRequest) -> ExecuteResponse:\n \"\"\"Fuehrt ein validiertes Statement aus.\"\"\"\n start = time.time()\n\n try:\n with DatabaseConnection.get_connection(request.database) as conn:\n cursor = conn.cursor(buffered=True)\n cursor.execute(request.statement, request.params)\n affected_rows = cursor.rowcount\n conn.commit()\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return ExecuteResponse(\n status=ExecuteStatus.SUCCESS,\n affected_rows=affected_rows,\n execution_ms=duration,\n )\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return ExecuteResponse(\n status=ExecuteStatus.ERROR,\n error=str(e)[:LOG_QUERY_MAX_LENGTH],\n execution_ms=duration,\n )\n\n\ndef register_execute_tool(mcp) -> None:\n \"\"\"Registriert db_execute Tool.\"\"\"\n logger = get_logger(\"mcp-db\", Config)\n\n @mcp.tool()\n def db_execute(\n statement: str,\n database: str = \"ki_dev\",\n params: list | None = None,\n ) -> dict:\n \"\"\"\n Fuehrt DDL-Statements aus (ALTER, CREATE, DROP, TRUNCATE).\n\n Args:\n statement: SQL DDL Statement\n database: Zieldatenbank (ki_dev oder ki_content)\n params: Optional - Parameter fuer Prepared Statements\n\n Returns:\n Dict mit status, affected_rows, error, execution_ms\n \"\"\"\n params_tuple = tuple(params) if params else None\n\n # 1. Validierung\n valid, error = ExecuteValidator.validate_statement(statement, database)\n\n if not valid:\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_execute\",\n request=f\"EXECUTE: {statement[:150]}\",\n status=\"denied\",\n duration_ms=0,\n error_message=error,\n )\n )\n except Exception:\n pass\n\n return ExecuteResponse(status=ExecuteStatus.DENIED, error=error).to_dict()\n\n # 2. Request erstellen\n request = ExecuteRequest(\n statement=statement,\n database=database,\n params=params_tuple,\n )\n\n # 3. Ausfuehrung\n response = ExecuteExecutor.execute(request)\n\n # 4. Logging\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_execute\",\n request=f\"EXECUTE: {statement[:150]}\",\n status=response.status.value,\n duration_ms=int(response.execution_ms),\n error_message=response.error,\n )\n )\n except Exception:\n pass\n\n return response.to_dict()\n"
}
}
{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/mcp-servers\/mcp_db\/tools\/execute_tool.py",
"content": "\"\"\"Execute Tool - Fuehrt DDL\/DML Statements aus.\"\"\"\n\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom domain.execute_contract import ExecuteRequest, ExecuteResponse, ExecuteStatus\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\nfrom config import Config\nfrom validators.execute_validator import ExecuteValidator\n\n\nclass ExecuteExecutor:\n \"\"\"Fuehrt validierte DDL\/DML Statements aus.\"\"\"\n\n @staticmethod\n def execute(request: ExecuteRequest) -> ExecuteResponse:\n \"\"\"Fuehrt ein validiertes Statement aus.\"\"\"\n start = time.time()\n\n try:\n with DatabaseConnection.get_connection(request.database) as conn:\n cursor = conn.cursor(buffered=True)\n cursor.execute(request.statement, request.params)\n affected_rows = cursor.rowcount\n conn.commit()\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return ExecuteResponse(\n status=ExecuteStatus.SUCCESS,\n affected_rows=affected_rows,\n execution_ms=duration,\n )\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return ExecuteResponse(\n status=ExecuteStatus.ERROR,\n error=str(e)[:LOG_QUERY_MAX_LENGTH],\n execution_ms=duration,\n )\n\n\ndef register_execute_tool(mcp) -> None:\n \"\"\"Registriert db_execute Tool.\"\"\"\n logger = get_logger(\"mcp-db\", Config)\n\n @mcp.tool()\n def db_execute(\n statement: str,\n database: str = \"ki_dev\",\n params: list | None = None,\n ) -> dict:\n \"\"\"\n Fuehrt DDL-Statements aus (ALTER, CREATE, DROP, TRUNCATE).\n\n Args:\n statement: SQL DDL Statement\n database: Zieldatenbank (ki_dev oder ki_content)\n params: Optional - Parameter fuer Prepared Statements\n\n Returns:\n Dict mit status, affected_rows, error, execution_ms\n \"\"\"\n params_tuple = tuple(params) if params else None\n\n # 1. Validierung\n valid, error = ExecuteValidator.validate_statement(statement, database)\n\n if not valid:\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_execute\",\n request=f\"EXECUTE: {statement[:150]}\",\n status=\"denied\",\n duration_ms=0,\n error_message=error,\n )\n )\n except Exception:\n pass\n\n return ExecuteResponse(status=ExecuteStatus.DENIED, error=error).to_dict()\n\n # 2. Request erstellen\n request = ExecuteRequest(\n statement=statement,\n database=database,\n params=params_tuple,\n )\n\n # 3. Ausfuehrung\n response = ExecuteExecutor.execute(request)\n\n # 4. Logging\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_execute\",\n request=f\"EXECUTE: {statement[:150]}\",\n status=response.status.value,\n duration_ms=int(response.execution_ms),\n error_message=response.error,\n )\n )\n except Exception:\n pass\n\n return response.to_dict()\n",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 25,
"newStart": 1,
"newLines": 24,
"lines": [
"-\"\"\"Execute Tool - Führt DDL\/DML Statements aus.\"\"\"",
"-import sys",
"+\"\"\"Execute Tool - Fuehrt DDL\/DML Statements aus.\"\"\"",
"+",
" import time",
"+from datetime import datetime",
" ",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")",
"-",
" from constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND",
" ",
" from domain.execute_contract import ExecuteRequest, ExecuteResponse, ExecuteStatus",
"-from domain.log_contract import LogEntry",
" from infrastructure.db_connection import DatabaseConnection",
"-from infrastructure.protokoll_logger import ProtokollLogger",
"+from shared.domain import LogEntry",
"+from shared.infrastructure import get_logger",
"+from config import Config",
" from validators.execute_validator import ExecuteValidator",
" ",
" ",
" class ExecuteExecutor:",
"- \"\"\"Führt validierte DDL\/DML Statements aus.\"\"\"",
"+ \"\"\"Fuehrt validierte DDL\/DML Statements aus.\"\"\"",
" ",
" @staticmethod",
" def execute(request: ExecuteRequest) -> ExecuteResponse:",
"- \"\"\"Führt ein validiertes Statement aus.\"\"\"",
"+ \"\"\"Fuehrt ein validiertes Statement aus.\"\"\"",
" start = time.time()",
" ",
" try:"
]
},
{
"oldStart": 50,
"oldLines": 7,
"newStart": 49,
"newLines": 7,
"lines": [
" ",
" def register_execute_tool(mcp) -> None:",
" \"\"\"Registriert db_execute Tool.\"\"\"",
"- logger = ProtokollLogger()",
"+ logger = get_logger(\"mcp-db\", Config)",
" ",
" @mcp.tool()",
" def db_execute("
]
},
{
"oldStart": 59,
"oldLines": 12,
"newStart": 58,
"newLines": 12,
"lines": [
" params: list | None = None,",
" ) -> dict:",
" \"\"\"",
"- Führt DDL-Statements aus (ALTER, CREATE, DROP, TRUNCATE).",
"+ Fuehrt DDL-Statements aus (ALTER, CREATE, DROP, TRUNCATE).",
" ",
" Args:",
" statement: SQL DDL Statement",
" database: Zieldatenbank (ki_dev oder ki_content)",
"- params: Optional - Parameter für Prepared Statements",
"+ params: Optional - Parameter fuer Prepared Statements",
" ",
" Returns:",
" Dict mit status, affected_rows, error, execution_ms"
]
},
{
"oldStart": 78,
"oldLines": 6,
"newStart": 77,
"newLines": 9,
"lines": [
" try:",
" logger.log(",
" LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-db\",",
"+ tool_name=\"db_execute\",",
" request=f\"EXECUTE: {statement[:150]}\",",
" status=\"denied\",",
" duration_ms=0,"
]
},
{
"oldStart": 96,
"oldLines": 13,
"newStart": 98,
"newLines": 16,
"lines": [
" params=params_tuple,",
" )",
" ",
"- # 3. Ausführung",
"+ # 3. Ausfuehrung",
" response = ExecuteExecutor.execute(request)",
" ",
" # 4. Logging",
" try:",
" logger.log(",
" LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-db\",",
"+ tool_name=\"db_execute\",",
" request=f\"EXECUTE: {statement[:150]}\",",
" status=response.status.value,",
" duration_ms=int(response.execution_ms),"
]
}
],
"originalFile": "\"\"\"Execute Tool - Führt DDL\/DML Statements aus.\"\"\"\nimport sys\nimport time\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")\n\nfrom constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom domain.execute_contract import ExecuteRequest, ExecuteResponse, ExecuteStatus\nfrom domain.log_contract import LogEntry\nfrom infrastructure.db_connection import DatabaseConnection\nfrom infrastructure.protokoll_logger import ProtokollLogger\nfrom validators.execute_validator import ExecuteValidator\n\n\nclass ExecuteExecutor:\n \"\"\"Führt validierte DDL\/DML Statements aus.\"\"\"\n\n @staticmethod\n def execute(request: ExecuteRequest) -> ExecuteResponse:\n \"\"\"Führt ein validiertes Statement aus.\"\"\"\n start = time.time()\n\n try:\n with DatabaseConnection.get_connection(request.database) as conn:\n cursor = conn.cursor(buffered=True)\n cursor.execute(request.statement, request.params)\n affected_rows = cursor.rowcount\n conn.commit()\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return ExecuteResponse(\n status=ExecuteStatus.SUCCESS,\n affected_rows=affected_rows,\n execution_ms=duration,\n )\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return ExecuteResponse(\n status=ExecuteStatus.ERROR,\n error=str(e)[:LOG_QUERY_MAX_LENGTH],\n execution_ms=duration,\n )\n\n\ndef register_execute_tool(mcp) -> None:\n \"\"\"Registriert db_execute Tool.\"\"\"\n logger = ProtokollLogger()\n\n @mcp.tool()\n def db_execute(\n statement: str,\n database: str = \"ki_dev\",\n params: list | None = None,\n ) -> dict:\n \"\"\"\n Führt DDL-Statements aus (ALTER, CREATE, DROP, TRUNCATE).\n\n Args:\n statement: SQL DDL Statement\n database: Zieldatenbank (ki_dev oder ki_content)\n params: Optional - Parameter für Prepared Statements\n\n Returns:\n Dict mit status, affected_rows, error, execution_ms\n \"\"\"\n params_tuple = tuple(params) if params else None\n\n # 1. Validierung\n valid, error = ExecuteValidator.validate_statement(statement, database)\n\n if not valid:\n try:\n logger.log(\n LogEntry(\n request=f\"EXECUTE: {statement[:150]}\",\n status=\"denied\",\n duration_ms=0,\n error_message=error,\n )\n )\n except Exception:\n pass\n\n return ExecuteResponse(status=ExecuteStatus.DENIED, error=error).to_dict()\n\n # 2. Request erstellen\n request = ExecuteRequest(\n statement=statement,\n database=database,\n params=params_tuple,\n )\n\n # 3. Ausführung\n response = ExecuteExecutor.execute(request)\n\n # 4. Logging\n try:\n logger.log(\n LogEntry(\n request=f\"EXECUTE: {statement[:150]}\",\n status=response.status.value,\n duration_ms=int(response.execution_ms),\n error_message=response.error,\n )\n )\n except Exception:\n pass\n\n return response.to_dict()\n"
}
}