{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp_db\/tools\/select_tool.py",
"content": "\"\"\"SELECT Tool - Fuehrt SELECT-Abfragen aus\"\"\"\n\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MAX_ROWS, MS_PER_SECOND\n\nfrom config import Config\nfrom domain.query_contract import QueryRequest, QueryResponse, QueryStatus\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\nfrom validators.query_validator import QueryValidator\n\n\nclass SelectExecutor:\n \"\"\"SRP: Nur Query-Ausfuehrung\"\"\"\n\n @staticmethod\n def execute(request: QueryRequest) -> QueryResponse:\n \"\"\"Fuehrt validierte Query aus\"\"\"\n start = time.time()\n\n try:\n with DatabaseConnection.get_connection(request.database) as conn:\n # buffered=True verhindert \"Unread result found\" Fehler\n cursor = conn.cursor(dictionary=True, buffered=True)\n\n # Prepared Statement mit Parametern\n cursor.execute(request.query, request.params)\n\n rows = cursor.fetchmany(request.max_rows)\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return QueryResponse(\n status=QueryStatus.SUCCESS,\n data=rows,\n row_count=len(rows),\n execution_ms=duration,\n )\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return QueryResponse(\n status=QueryStatus.ERROR,\n error=str(e)[:LOG_ENTRY_MAX_LENGTH],\n execution_ms=duration,\n )\n\n\ndef register_select_tool(mcp):\n \"\"\"Registriert db_select Tool\"\"\"\n\n logger = get_logger(\"mcp-db\", Config)\n\n @mcp.tool()\n def db_select(\n query: str,\n database: str = \"ki_protokoll\",\n max_rows: int = MAX_ROWS,\n params: list | None = None,\n ) -> dict:\n \"\"\"\n Fuehrt eine SELECT-Abfrage aus.\n\n Args:\n query: SQL SELECT Statement\n database: Zieldatenbank (ki_protokoll oder ki_system)\n max_rows: Maximale Anzahl Ergebniszeilen (1-100)\n params: Optional - Parameter fuer Prepared Statements\n\n Returns:\n Dict mit status, data, row_count, error, execution_ms\n \"\"\"\n # Convert list to tuple for params\n params_tuple = tuple(params) if params else None\n\n # 1. Validierung (SRP)\n valid, error = QueryValidator.validate_query(query, database, max_rows)\n\n if not valid:\n # Log denied query\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_select\",\n request=query[:LOG_QUERY_MAX_LENGTH],\n status=\"denied\",\n duration_ms=0,\n error_message=error,\n )\n )\n except Exception:\n pass # Logging-Fehler nicht an Client weitergeben\n\n return QueryResponse(status=QueryStatus.DENIED, error=error).to_dict()\n\n # 2. Request erstellen\n request = QueryRequest(\n query=query,\n params=params_tuple,\n database=database,\n max_rows=min(max_rows, Config.MAX_ROWS),\n )\n\n # 3. Ausfuehrung (SRP)\n response = SelectExecutor.execute(request)\n\n # 4. Logging (SRP)\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_select\",\n request=query[:LOG_QUERY_MAX_LENGTH],\n status=response.status.value,\n duration_ms=int(response.execution_ms),\n error_message=response.error,\n )\n )\n except Exception:\n pass # Logging-Fehler nicht an Client weitergeben\n\n return response.to_dict()\n"
}
}
{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/mcp-servers\/mcp_db\/tools\/select_tool.py",
"content": "\"\"\"SELECT Tool - Fuehrt SELECT-Abfragen aus\"\"\"\n\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MAX_ROWS, MS_PER_SECOND\n\nfrom config import Config\nfrom domain.query_contract import QueryRequest, QueryResponse, QueryStatus\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\nfrom validators.query_validator import QueryValidator\n\n\nclass SelectExecutor:\n \"\"\"SRP: Nur Query-Ausfuehrung\"\"\"\n\n @staticmethod\n def execute(request: QueryRequest) -> QueryResponse:\n \"\"\"Fuehrt validierte Query aus\"\"\"\n start = time.time()\n\n try:\n with DatabaseConnection.get_connection(request.database) as conn:\n # buffered=True verhindert \"Unread result found\" Fehler\n cursor = conn.cursor(dictionary=True, buffered=True)\n\n # Prepared Statement mit Parametern\n cursor.execute(request.query, request.params)\n\n rows = cursor.fetchmany(request.max_rows)\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return QueryResponse(\n status=QueryStatus.SUCCESS,\n data=rows,\n row_count=len(rows),\n execution_ms=duration,\n )\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return QueryResponse(\n status=QueryStatus.ERROR,\n error=str(e)[:LOG_ENTRY_MAX_LENGTH],\n execution_ms=duration,\n )\n\n\ndef register_select_tool(mcp):\n \"\"\"Registriert db_select Tool\"\"\"\n\n logger = get_logger(\"mcp-db\", Config)\n\n @mcp.tool()\n def db_select(\n query: str,\n database: str = \"ki_protokoll\",\n max_rows: int = MAX_ROWS,\n params: list | None = None,\n ) -> dict:\n \"\"\"\n Fuehrt eine SELECT-Abfrage aus.\n\n Args:\n query: SQL SELECT Statement\n database: Zieldatenbank (ki_protokoll oder ki_system)\n max_rows: Maximale Anzahl Ergebniszeilen (1-100)\n params: Optional - Parameter fuer Prepared Statements\n\n Returns:\n Dict mit status, data, row_count, error, execution_ms\n \"\"\"\n # Convert list to tuple for params\n params_tuple = tuple(params) if params else None\n\n # 1. Validierung (SRP)\n valid, error = QueryValidator.validate_query(query, database, max_rows)\n\n if not valid:\n # Log denied query\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_select\",\n request=query[:LOG_QUERY_MAX_LENGTH],\n status=\"denied\",\n duration_ms=0,\n error_message=error,\n )\n )\n except Exception:\n pass # Logging-Fehler nicht an Client weitergeben\n\n return QueryResponse(status=QueryStatus.DENIED, error=error).to_dict()\n\n # 2. Request erstellen\n request = QueryRequest(\n query=query,\n params=params_tuple,\n database=database,\n max_rows=min(max_rows, Config.MAX_ROWS),\n )\n\n # 3. Ausfuehrung (SRP)\n response = SelectExecutor.execute(request)\n\n # 4. Logging (SRP)\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_select\",\n request=query[:LOG_QUERY_MAX_LENGTH],\n status=response.status.value,\n duration_ms=int(response.execution_ms),\n error_message=response.error,\n )\n )\n except Exception:\n pass # Logging-Fehler nicht an Client weitergeben\n\n return response.to_dict()\n",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 26,
"newStart": 1,
"newLines": 24,
"lines": [
"-\"\"\"SELECT Tool - Führt SELECT-Abfragen aus\"\"\"",
"-import sys",
"+\"\"\"SELECT Tool - Fuehrt SELECT-Abfragen aus\"\"\"",
"+",
" import time",
"+from datetime import datetime",
" ",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")",
"-",
" from constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MAX_ROWS, MS_PER_SECOND",
" ",
" from config import Config",
"-from domain.log_contract import LogEntry",
" from domain.query_contract import QueryRequest, QueryResponse, QueryStatus",
" from infrastructure.db_connection import DatabaseConnection",
"-from infrastructure.protokoll_logger import ProtokollLogger",
"+from shared.domain import LogEntry",
"+from shared.infrastructure import get_logger",
" from validators.query_validator import QueryValidator",
" ",
" ",
" class SelectExecutor:",
"- \"\"\"SRP: Nur Query-Ausführung\"\"\"",
"+ \"\"\"SRP: Nur Query-Ausfuehrung\"\"\"",
" ",
" @staticmethod",
" def execute(request: QueryRequest) -> QueryResponse:",
"- \"\"\"Führt validierte Query aus\"\"\"",
"+ \"\"\"Fuehrt validierte Query aus\"\"\"",
" start = time.time()",
" ",
" try:"
]
},
{
"oldStart": 56,
"oldLines": 7,
"newStart": 54,
"newLines": 7,
"lines": [
" def register_select_tool(mcp):",
" \"\"\"Registriert db_select Tool\"\"\"",
" ",
"- logger = ProtokollLogger()",
"+ logger = get_logger(\"mcp-db\", Config)",
" ",
" @mcp.tool()",
" def db_select("
]
},
{
"oldStart": 66,
"oldLines": 13,
"newStart": 64,
"newLines": 13,
"lines": [
" params: list | None = None,",
" ) -> dict:",
" \"\"\"",
"- Führt eine SELECT-Abfrage aus.",
"+ Fuehrt eine SELECT-Abfrage aus.",
" ",
" Args:",
" query: SQL SELECT Statement",
" database: Zieldatenbank (ki_protokoll oder ki_system)",
" max_rows: Maximale Anzahl Ergebniszeilen (1-100)",
"- params: Optional - Parameter für Prepared Statements",
"+ params: Optional - Parameter fuer Prepared Statements",
" ",
" Returns:",
" Dict mit status, data, row_count, error, execution_ms"
]
},
{
"oldStart": 88,
"oldLines": 6,
"newStart": 86,
"newLines": 9,
"lines": [
" try:",
" logger.log(",
" LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-db\",",
"+ tool_name=\"db_select\",",
" request=query[:LOG_QUERY_MAX_LENGTH],",
" status=\"denied\",",
" duration_ms=0,"
]
},
{
"oldStart": 107,
"oldLines": 13,
"newStart": 108,
"newLines": 16,
"lines": [
" max_rows=min(max_rows, Config.MAX_ROWS),",
" )",
" ",
"- # 3. Ausführung (SRP)",
"+ # 3. Ausfuehrung (SRP)",
" response = SelectExecutor.execute(request)",
" ",
" # 4. Logging (SRP)",
" try:",
" logger.log(",
" LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-db\",",
"+ tool_name=\"db_select\",",
" request=query[:LOG_QUERY_MAX_LENGTH],",
" status=response.status.value,",
" duration_ms=int(response.execution_ms),"
]
}
],
"originalFile": "\"\"\"SELECT Tool - Führt SELECT-Abfragen aus\"\"\"\nimport sys\nimport time\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MAX_ROWS, MS_PER_SECOND\n\nfrom config import Config\nfrom domain.log_contract import LogEntry\nfrom domain.query_contract import QueryRequest, QueryResponse, QueryStatus\nfrom infrastructure.db_connection import DatabaseConnection\nfrom infrastructure.protokoll_logger import ProtokollLogger\nfrom validators.query_validator import QueryValidator\n\n\nclass SelectExecutor:\n \"\"\"SRP: Nur Query-Ausführung\"\"\"\n\n @staticmethod\n def execute(request: QueryRequest) -> QueryResponse:\n \"\"\"Führt validierte Query aus\"\"\"\n start = time.time()\n\n try:\n with DatabaseConnection.get_connection(request.database) as conn:\n # buffered=True verhindert \"Unread result found\" Fehler\n cursor = conn.cursor(dictionary=True, buffered=True)\n\n # Prepared Statement mit Parametern\n cursor.execute(request.query, request.params)\n\n rows = cursor.fetchmany(request.max_rows)\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return QueryResponse(\n status=QueryStatus.SUCCESS,\n data=rows,\n row_count=len(rows),\n execution_ms=duration,\n )\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n return QueryResponse(\n status=QueryStatus.ERROR,\n error=str(e)[:LOG_ENTRY_MAX_LENGTH],\n execution_ms=duration,\n )\n\n\ndef register_select_tool(mcp):\n \"\"\"Registriert db_select Tool\"\"\"\n\n logger = ProtokollLogger()\n\n @mcp.tool()\n def db_select(\n query: str,\n database: str = \"ki_protokoll\",\n max_rows: int = MAX_ROWS,\n params: list | None = None,\n ) -> dict:\n \"\"\"\n Führt eine SELECT-Abfrage aus.\n\n Args:\n query: SQL SELECT Statement\n database: Zieldatenbank (ki_protokoll oder ki_system)\n max_rows: Maximale Anzahl Ergebniszeilen (1-100)\n params: Optional - Parameter für Prepared Statements\n\n Returns:\n Dict mit status, data, row_count, error, execution_ms\n \"\"\"\n # Convert list to tuple for params\n params_tuple = tuple(params) if params else None\n\n # 1. Validierung (SRP)\n valid, error = QueryValidator.validate_query(query, database, max_rows)\n\n if not valid:\n # Log denied query\n try:\n logger.log(\n LogEntry(\n request=query[:LOG_QUERY_MAX_LENGTH],\n status=\"denied\",\n duration_ms=0,\n error_message=error,\n )\n )\n except Exception:\n pass # Logging-Fehler nicht an Client weitergeben\n\n return QueryResponse(status=QueryStatus.DENIED, error=error).to_dict()\n\n # 2. Request erstellen\n request = QueryRequest(\n query=query,\n params=params_tuple,\n database=database,\n max_rows=min(max_rows, Config.MAX_ROWS),\n )\n\n # 3. Ausführung (SRP)\n response = SelectExecutor.execute(request)\n\n # 4. Logging (SRP)\n try:\n logger.log(\n LogEntry(\n request=query[:LOG_QUERY_MAX_LENGTH],\n status=response.status.value,\n duration_ms=int(response.execution_ms),\n error_message=response.error,\n )\n )\n except Exception:\n pass # Logging-Fehler nicht an Client weitergeben\n\n return response.to_dict()\n"
}
}