select_tool.py

Code Hygiene Score: 77

Issues 1

Zeile Typ Beschreibung
72 magic_number Magic Number gefunden: 100

Dependencies 14

Klassen 1

Funktionen 1

Code

"""SELECT Tool - Fuehrt SELECT-Abfragen aus"""

import time
from datetime import datetime

from shared.constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MAX_ROWS, MS_PER_SECOND

from config import Config
from domain.query_contract import QueryRequest, QueryResponse, QueryStatus
from infrastructure.db_connection import DatabaseConnection
from shared.domain import LogEntry
from shared.infrastructure import get_logger
from validators.query_validator import QueryValidator


class SelectExecutor:
    """SRP: Nur Query-Ausfuehrung"""

    @staticmethod
    def execute(request: QueryRequest) -> QueryResponse:
        """Fuehrt validierte Query aus"""
        start = time.time()

        try:
            with DatabaseConnection.get_connection(request.database) as conn:
                # buffered=True verhindert "Unread result found" Fehler
                cursor = conn.cursor(dictionary=True, buffered=True)

                # Prepared Statement mit Parametern
                cursor.execute(request.query, request.params)

                rows = cursor.fetchmany(request.max_rows)
                cursor.close()

                duration = int((time.time() - start) * MS_PER_SECOND)

                return QueryResponse(
                    status=QueryStatus.SUCCESS,
                    data=rows,
                    row_count=len(rows),
                    execution_ms=duration,
                )

        except Exception as e:
            duration = int((time.time() - start) * MS_PER_SECOND)

            return QueryResponse(
                status=QueryStatus.ERROR,
                error=str(e)[:LOG_ENTRY_MAX_LENGTH],
                execution_ms=duration,
            )


def register_select_tool(mcp):
    """Registriert db_select Tool"""

    logger = get_logger("mcp-db", Config)

    @mcp.tool()
    def db_select(
        query: str,
        database: str = "ki_protokoll",
        max_rows: int = MAX_ROWS,
        params: list | None = None,
    ) -> dict:
        """
        Fuehrt eine SELECT-Abfrage aus.

        Args:
            query: SQL SELECT Statement
            database: Zieldatenbank (ki_protokoll oder ki_system)
            max_rows: Maximale Anzahl Ergebniszeilen (1-100)
            params: Optional - Parameter fuer Prepared Statements

        Returns:
            Dict mit status, data, row_count, error, execution_ms
        """
        # Convert list to tuple for params
        params_tuple = tuple(params) if params else None

        # 1. Validierung (SRP)
        valid, error = QueryValidator.validate_query(query, database, max_rows)

        if not valid:
            # Log denied query
            try:
                logger.log(
                    LogEntry(
                        timestamp=datetime.now(),
                        client_name="mcp-db",
                        tool_name="db_select",
                        request=query[:LOG_QUERY_MAX_LENGTH],
                        status="denied",
                        duration_ms=0,
                        error_message=error,
                    )
                )
            except Exception:
                pass  # Logging-Fehler nicht an Client weitergeben

            return QueryResponse(status=QueryStatus.DENIED, error=error).to_dict()

        # 2. Request erstellen
        request = QueryRequest(
            query=query,
            params=params_tuple,
            database=database,
            max_rows=min(max_rows, Config.MAX_ROWS),
        )

        # 3. Ausfuehrung (SRP)
        response = SelectExecutor.execute(request)

        # 4. Logging (SRP)
        try:
            logger.log(
                LogEntry(
                    timestamp=datetime.now(),
                    client_name="mcp-db",
                    tool_name="db_select",
                    request=query[:LOG_QUERY_MAX_LENGTH],
                    status=response.status.value,
                    duration_ms=int(response.execution_ms),
                    error_message=response.error,
                )
            )
        except Exception:
            pass  # Logging-Fehler nicht an Client weitergeben

        return response.to_dict()
← Übersicht Graph