delete_tool.py

Code Hygiene Score: 87

Issues 1

Zeile Typ Beschreibung
43 magic_number Magic Number gefunden: 100

Dependencies 10

Funktionen 2

Code

"""Delete Tool - Loescht Datensaetze."""

import re
import time
from datetime import datetime

from shared.constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND

from config import Config
from infrastructure.db_connection import DatabaseConnection
from shared.domain import LogEntry
from shared.infrastructure import get_logger

# Sicherheits-Default fuer LIMIT
DEFAULT_DELETE_LIMIT = 100


def _validate_identifier(name: str) -> bool:
    """Validiert Tabellen-/Spaltennamen gegen SQL-Injection."""
    return bool(re.match(r"^[a-zA-Z0-9_]+$", name))


def register_delete_tool(mcp) -> None:
    """Registriert db_delete Tool."""
    logger = get_logger("mcp-db", Config)

    @mcp.tool()
    def db_delete(
        table: str,
        where: dict,
        database: str = "ki_dev",
        limit: int | None = None,
        confirm_delete_all: bool = False,
    ) -> dict:
        """
        Loescht Datensaetze.

        Args:
            table: Zieltabelle
            where: Dict mit Spalte:Wert Paaren (WHERE-Klausel) - PFLICHT!
                   Spezial: {"id": ">0"} oder {"1": "1"} fuer alle Zeilen (mit confirm_delete_all=True)
            database: Zieldatenbank (ki_dev oder ki_content)
            limit: Maximale Anzahl zu loeschender Zeilen (Default: 100)
            confirm_delete_all: True = erlaubt Loeschung aller Zeilen

        Returns:
            Dict mit status, deleted_rows, error
        """
        start = time.time()

        # Validierung: Tabellenname
        if not _validate_identifier(table):
            return {
                "status": "denied",
                "error": "Invalid table name.",
            }

        # Validierung: Datenbank
        if database not in Config.ALLOWED_DATABASES:
            return {
                "status": "denied",
                "error": f"Database '{database}' not allowed.",
            }

        # Spezialfall: Alle loeschen mit confirm_delete_all
        delete_all_mode = False
        if where and confirm_delete_all:
            # Check for special patterns like {"1": "1"} or {"id": ">0"}
            if where == {"1": "1"} or (len(where) == 1 and list(where.values())[0] == ">0"):
                delete_all_mode = True

        # KRITISCH: WHERE ist Pflicht (ausser delete_all_mode)
        if not where and not delete_all_mode:
            return {
                "status": "denied",
                "error": "WHERE clause is required. DELETE without WHERE is forbidden.",
            }

        # Validierung: Spaltennamen in where (nicht bei delete_all_mode)
        if not delete_all_mode:
            for col in where:
                if not _validate_identifier(col):
                    return {
                        "status": "denied",
                        "error": f"Invalid column name in where: {col}",
                    }

        # Default LIMIT als Sicherheit (kein Limit bei delete_all_mode)
        if delete_all_mode:
            effective_limit = None
        else:
            effective_limit = limit if limit is not None else DEFAULT_DELETE_LIMIT
            if effective_limit < 1:
                effective_limit = DEFAULT_DELETE_LIMIT

        try:
            with DatabaseConnection.get_connection(database) as conn:
                cursor = conn.cursor(buffered=True)

                # Build query based on mode
                if delete_all_mode:
                    query = f"DELETE FROM `{table}`"
                    values = ()
                else:
                    # Build WHERE clause
                    where_parts = [f"`{col}` = %s" for col in where]
                    where_clause = " AND ".join(where_parts)
                    values = tuple(where.values())
                    query = f"DELETE FROM `{table}` WHERE {where_clause} LIMIT {effective_limit}"

                cursor.execute(query, values)
                deleted_rows = cursor.rowcount
                conn.commit()
                cursor.close()

                duration = int((time.time() - start) * MS_PER_SECOND)

                # Log
                log_msg = f"DELETE FROM {table}" + (" (ALL)" if delete_all_mode else f" LIMIT {effective_limit}")
                try:
                    logger.log(
                        LogEntry(
                            timestamp=datetime.now(),
                            client_name="mcp-db",
                            tool_name="db_delete",
                            request=log_msg,
                            status="success",
                            duration_ms=duration,
                        )
                    )
                except Exception:
                    pass

                result = {
                    "status": "success",
                    "table": table,
                    "deleted_rows": deleted_rows,
                    "execution_ms": duration,
                }
                if effective_limit:
                    result["limit_applied"] = effective_limit
                if delete_all_mode:
                    result["delete_all"] = True
                return result

        except Exception as e:
            duration = int((time.time() - start) * MS_PER_SECOND)

            try:
                logger.log(
                    LogEntry(
                        timestamp=datetime.now(),
                        client_name="mcp-db",
                        tool_name="db_delete",
                        request=f"DELETE FROM {table}",
                        status="error",
                        duration_ms=duration,
                        error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],
                    )
                )
            except Exception:
                pass

            return {
                "status": "error",
                "error": str(e)[:LOG_QUERY_MAX_LENGTH],
                "execution_ms": duration,
            }
← Übersicht