{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/opt\/mcp-servers\/mcp-db\/tools\/select_tool.py",
"content": "\"\"\"SELECT Tool - Führt SELECT-Abfragen aus\"\"\"\nimport sys\nimport time\nfrom typing import List, Optional\n\nsys.path.insert(0, \"\/opt\/mcp-servers\/mcp-db\")\n\nfrom config import Config\nfrom domain.log_contract import LogEntry\nfrom domain.query_contract import QueryRequest, QueryResponse, QueryStatus\nfrom infrastructure.db_connection import DatabaseConnection\nfrom infrastructure.protokoll_logger import ProtokollLogger\nfrom validators.query_validator import QueryValidator\n\n\nclass SelectExecutor:\n \"\"\"SRP: Nur Query-Ausführung\"\"\"\n\n @staticmethod\n def execute(request: QueryRequest) -> QueryResponse:\n \"\"\"Führt validierte Query aus\"\"\"\n start = time.time()\n\n try:\n with DatabaseConnection.get_connection(request.database) as conn:\n # buffered=True verhindert \"Unread result found\" Fehler\n cursor = conn.cursor(dictionary=True, buffered=True)\n\n # Prepared Statement mit Parametern\n cursor.execute(request.query, request.params)\n\n rows = cursor.fetchmany(request.max_rows)\n cursor.close()\n\n duration = int((time.time() - start) * 1000)\n\n return QueryResponse(\n status=QueryStatus.SUCCESS,\n data=rows,\n row_count=len(rows),\n execution_ms=duration,\n )\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n\n return QueryResponse(\n status=QueryStatus.ERROR,\n error=str(e)[:100],\n execution_ms=duration,\n )\n\n\ndef register_select_tool(mcp):\n \"\"\"Registriert db_select Tool\"\"\"\n\n logger = ProtokollLogger()\n\n @mcp.tool()\n def db_select(\n query: str,\n database: str = \"ki_protokoll\",\n max_rows: int = 100,\n params: Optional[List] = None,\n ) -> dict:\n \"\"\"\n Führt eine SELECT-Abfrage aus.\n\n Args:\n query: SQL SELECT Statement\n database: Zieldatenbank (ki_protokoll oder ki_system)\n max_rows: Maximale Anzahl Ergebniszeilen (1-100)\n params: Optional - Parameter für Prepared Statements\n\n Returns:\n Dict mit status, data, row_count, error, execution_ms\n \"\"\"\n # Convert list to tuple for params\n params_tuple = tuple(params) if params else None\n\n # 1. Validierung (SRP)\n valid, error = QueryValidator.validate_query(query, database, max_rows)\n\n if not valid:\n # Log denied query\n try:\n logger.log(\n LogEntry(\n request=query[:200],\n status=\"denied\",\n duration_ms=0,\n error_message=error,\n )\n )\n except Exception:\n pass # Logging-Fehler nicht an Client weitergeben\n\n return QueryResponse(status=QueryStatus.DENIED, error=error).to_dict()\n\n # 2. Request erstellen\n request = QueryRequest(\n query=query,\n params=params_tuple,\n database=database,\n max_rows=min(max_rows, Config.MAX_ROWS),\n )\n\n # 3. Ausführung (SRP)\n response = SelectExecutor.execute(request)\n\n # 4. Logging (SRP)\n try:\n logger.log(\n LogEntry(\n request=query[:200],\n status=response.status.value,\n duration_ms=int(response.execution_ms),\n error_message=response.error,\n )\n )\n except Exception:\n pass # Logging-Fehler nicht an Client weitergeben\n\n return response.to_dict()\n",
"numLines": 125,
"startLine": 1,
"totalLines": 125
}
}
}