tables_tool.py

Code Hygiene Score: 90

Keine Issues gefunden.

Dependencies 9

Funktionen 1

Code

"""Tables Tool - Listet Tabellen einer Datenbank auf."""

import time
from datetime import datetime

from shared.constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND

from config import Config
from infrastructure.db_connection import DatabaseConnection
from shared.domain import LogEntry
from shared.infrastructure import get_logger


def register_tables_tool(mcp) -> None:
    """Registriert db_tables Tool."""
    logger = get_logger("mcp-db", Config)

    @mcp.tool()
    def db_tables(
        database: str = "ki_dev",
        include_row_count: bool = False,
    ) -> dict:
        """
        Listet alle Tabellen einer Datenbank auf.

        Args:
            database: Zieldatenbank (ki_dev oder ki_content)
            include_row_count: Row-Count pro Tabelle inkludieren

        Returns:
            Dict mit tables (Liste der Tabellen)
        """
        start = time.time()

        # Validierung: Datenbank
        if database not in Config.ALLOWED_DATABASES:
            return {
                "status": "denied",
                "error": f"Database '{database}' not allowed.",
            }

        try:
            with DatabaseConnection.get_connection(database) as conn:
                cursor = conn.cursor(dictionary=True, buffered=True)

                if include_row_count:
                    cursor.execute(
                        """SELECT table_name, table_rows
                           FROM information_schema.tables
                           WHERE table_schema = %s
                           ORDER BY table_name""",
                        (database,),
                    )
                    rows = cursor.fetchall()
                    tables = [
                        {"name": r["table_name"], "row_count": r["table_rows"] or 0}
                        for r in rows
                    ]
                else:
                    cursor.execute(f"SHOW TABLES FROM `{database}`")
                    rows = cursor.fetchall()
                    # SHOW TABLES returns dict with key like 'Tables_in_ki_dev'
                    tables = [
                        {"name": list(r.values())[0]}
                        for r in rows
                    ]

                cursor.close()

                duration = int((time.time() - start) * MS_PER_SECOND)

                # Log
                try:
                    logger.log(
                        LogEntry(
                            timestamp=datetime.now(),
                            client_name="mcp-db",
                            tool_name="db_tables",
                            request=f"SHOW TABLES FROM {database}",
                            status="success",
                            duration_ms=duration,
                        )
                    )
                except Exception:
                    pass

                return {
                    "status": "success",
                    "database": database,
                    "tables": tables,
                    "table_count": len(tables),
                    "execution_ms": duration,
                }

        except Exception as e:
            duration = int((time.time() - start) * MS_PER_SECOND)

            try:
                logger.log(
                    LogEntry(
                        timestamp=datetime.now(),
                        client_name="mcp-db",
                        tool_name="db_tables",
                        request=f"SHOW TABLES FROM {database}",
                        status="error",
                        duration_ms=duration,
                        error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],
                    )
                )
            except Exception:
                pass

            return {
                "status": "error",
                "error": str(e)[:LOG_QUERY_MAX_LENGTH],
                "execution_ms": duration,
            }
← Übersicht