execute_tool.py
- Pfad:
/var/www/mcp-servers/mcp-db/tools/execute_tool.py - Namespace: -
- Zeilen: 121 | Größe: 3,776 Bytes
- Geändert: 2025-12-28 13:24:18 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 83
- Dependencies: 30 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 12
- use time
- use datetime.datetime
- use shared.constants.LOG_QUERY_MAX_LENGTH
- use shared.constants.MS_PER_SECOND
- use domain.execute_contract.ExecuteRequest
- use domain.execute_contract.ExecuteResponse
- use domain.execute_contract.ExecuteStatus
- use infrastructure.db_connection.DatabaseConnection
- use shared.domain.LogEntry
- use shared.infrastructure.get_logger
- use config.Config
- use validators.execute_validator.ExecuteValidator
Klassen 1
-
ExecuteExecutorclass Zeile 16
Funktionen 1
-
register_execute_tool()Zeile 50
Code
"""Execute Tool - Fuehrt DDL/DML Statements aus."""
import time
from datetime import datetime
from shared.constants import LOG_QUERY_MAX_LENGTH, MS_PER_SECOND
from domain.execute_contract import ExecuteRequest, ExecuteResponse, ExecuteStatus
from infrastructure.db_connection import DatabaseConnection
from shared.domain import LogEntry
from shared.infrastructure import get_logger
from config import Config
from validators.execute_validator import ExecuteValidator
class ExecuteExecutor:
"""Fuehrt validierte DDL/DML Statements aus."""
@staticmethod
def execute(request: ExecuteRequest) -> ExecuteResponse:
"""Fuehrt ein validiertes Statement aus."""
start = time.time()
try:
with DatabaseConnection.get_connection(request.database) as conn:
cursor = conn.cursor(buffered=True)
cursor.execute(request.statement, request.params)
affected_rows = cursor.rowcount
conn.commit()
cursor.close()
duration = int((time.time() - start) * MS_PER_SECOND)
return ExecuteResponse(
status=ExecuteStatus.SUCCESS,
affected_rows=affected_rows,
execution_ms=duration,
)
except Exception as e:
duration = int((time.time() - start) * MS_PER_SECOND)
return ExecuteResponse(
status=ExecuteStatus.ERROR,
error=str(e)[:LOG_QUERY_MAX_LENGTH],
execution_ms=duration,
)
def register_execute_tool(mcp) -> None:
"""Registriert db_execute Tool."""
logger = get_logger("mcp-db", Config)
@mcp.tool()
def db_execute(
statement: str,
database: str = "ki_dev",
params: list | None = None,
) -> dict:
"""
Fuehrt DDL-Statements aus (ALTER, CREATE, DROP, TRUNCATE).
Args:
statement: SQL DDL Statement
database: Zieldatenbank (ki_dev oder ki_content)
params: Optional - Parameter fuer Prepared Statements
Returns:
Dict mit status, affected_rows, error, execution_ms
"""
params_tuple = tuple(params) if params else None
# 1. Validierung
valid, error = ExecuteValidator.validate_statement(statement, database)
if not valid:
try:
logger.log(
LogEntry(
timestamp=datetime.now(),
client_name="mcp-db",
tool_name="db_execute",
request=f"EXECUTE: {statement[:150]}",
status="denied",
duration_ms=0,
error_message=error,
)
)
except Exception:
pass
return ExecuteResponse(status=ExecuteStatus.DENIED, error=error).to_dict()
# 2. Request erstellen
request = ExecuteRequest(
statement=statement,
database=database,
params=params_tuple,
)
# 3. Ausfuehrung
response = ExecuteExecutor.execute(request)
# 4. Logging
try:
logger.log(
LogEntry(
timestamp=datetime.now(),
client_name="mcp-db",
tool_name="db_execute",
request=f"EXECUTE: {statement[:150]}",
status=response.status.value,
duration_ms=int(response.execution_ms),
error_message=response.error,
)
)
except Exception:
pass
return response.to_dict()