insert_tool.py

Code Hygiene Score: 88

Keine Issues gefunden.

Dependencies 10

Funktionen 2

Code

"""Insert Tool - Fuegt Datensaetze ein."""

import re
import time
from datetime import datetime

from shared.constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND

from config import Config
from infrastructure.db_connection import DatabaseConnection
from shared.domain import LogEntry
from shared.infrastructure import get_logger


def _validate_identifier(name: str) -> bool:
    """Validiert Tabellen-/Spaltennamen gegen SQL-Injection."""
    return bool(re.match(r"^[a-zA-Z0-9_]+$", name))


def register_insert_tool(mcp) -> None:
    """Registriert db_insert Tool."""
    logger = get_logger("mcp-db", Config)

    @mcp.tool()
    def db_insert(
        table: str,
        data: dict,
        database: str = "ki_dev",
    ) -> dict:
        """
        Fuegt einen Datensatz ein.

        Args:
            table: Zieltabelle
            data: Dict mit Spalte:Wert Paaren
            database: Zieldatenbank (ki_dev oder ki_content)

        Returns:
            Dict mit status, inserted_id, error
        """
        start = time.time()

        # Validierung: Tabellenname
        if not _validate_identifier(table):
            return {
                "status": "denied",
                "error": "Invalid table name.",
            }

        # Validierung: Datenbank
        if database not in Config.ALLOWED_DATABASES:
            return {
                "status": "denied",
                "error": f"Database '{database}' not allowed.",
            }

        # Validierung: Data nicht leer
        if not data:
            return {
                "status": "denied",
                "error": "Data dict must not be empty.",
            }

        # Validierung: Spaltennamen
        for col in data:
            if not _validate_identifier(col):
                return {
                    "status": "denied",
                    "error": f"Invalid column name: {col}",
                }

        try:
            with DatabaseConnection.get_connection(database) as conn:
                cursor = conn.cursor(buffered=True)

                # Build parameterized INSERT
                columns = list(data.keys())
                placeholders = ", ".join(["%s"] * len(columns))
                columns_str = ", ".join([f"`{c}`" for c in columns])
                values = tuple(data.values())

                query = f"INSERT INTO `{table}` ({columns_str}) VALUES ({placeholders})"
                cursor.execute(query, values)
                inserted_id = cursor.lastrowid
                conn.commit()
                cursor.close()

                duration = int((time.time() - start) * MS_PER_SECOND)

                # Log
                try:
                    logger.log(
                        LogEntry(
                            timestamp=datetime.now(),
                            client_name="mcp-db",
                            tool_name="db_insert",
                            request=f"INSERT INTO {table} ({columns_str})",
                            status="success",
                            duration_ms=duration,
                        )
                    )
                except Exception:
                    pass

                return {
                    "status": "success",
                    "table": table,
                    "inserted_id": inserted_id,
                    "execution_ms": duration,
                }

        except Exception as e:
            duration = int((time.time() - start) * MS_PER_SECOND)

            try:
                logger.log(
                    LogEntry(
                        timestamp=datetime.now(),
                        client_name="mcp-db",
                        tool_name="db_insert",
                        request=f"INSERT INTO {table}",
                        status="error",
                        duration_ms=duration,
                        error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],
                    )
                )
            except Exception:
                pass

            return {
                "status": "error",
                "error": str(e)[:LOG_QUERY_MAX_LENGTH],
                "execution_ms": duration,
            }
← Übersicht