delete_tool.py
- Pfad:
/var/www/mcp-servers/mcp-db/tools/delete_tool.py
- Namespace: -
- Zeilen: 169 | Größe: 5,896 Bytes
- Geändert: 2025-12-28 13:23:06 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 87
- Dependencies: 50 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 90 (10%)
Issues 1
| Zeile |
Typ |
Beschreibung |
| 43 |
magic_number |
Magic Number gefunden: 100 |
Dependencies 10
- use re
- use time
- use datetime.datetime
- use shared.constants.LOG_ENTRY_MAX_LENGTH
- use shared.constants.LOG_QUERY_MAX_LENGTH
- use shared.constants.MS_PER_SECOND
- use config.Config
- use infrastructure.db_connection.DatabaseConnection
- use shared.domain.LogEntry
- use shared.infrastructure.get_logger
Funktionen 2
-
_validate_identifier()
Zeile 18
-
register_delete_tool()
Zeile 23
Code
"""Delete Tool - Loescht Datensaetze."""
import re
import time
from datetime import datetime
from shared.constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND
from config import Config
from infrastructure.db_connection import DatabaseConnection
from shared.domain import LogEntry
from shared.infrastructure import get_logger
# Sicherheits-Default fuer LIMIT
DEFAULT_DELETE_LIMIT = 100
def _validate_identifier(name: str) -> bool:
"""Validiert Tabellen-/Spaltennamen gegen SQL-Injection."""
return bool(re.match(r"^[a-zA-Z0-9_]+$", name))
def register_delete_tool(mcp) -> None:
"""Registriert db_delete Tool."""
logger = get_logger("mcp-db", Config)
@mcp.tool()
def db_delete(
table: str,
where: dict,
database: str = "ki_dev",
limit: int | None = None,
confirm_delete_all: bool = False,
) -> dict:
"""
Loescht Datensaetze.
Args:
table: Zieltabelle
where: Dict mit Spalte:Wert Paaren (WHERE-Klausel) - PFLICHT!
Spezial: {"id": ">0"} oder {"1": "1"} fuer alle Zeilen (mit confirm_delete_all=True)
database: Zieldatenbank (ki_dev oder ki_content)
limit: Maximale Anzahl zu loeschender Zeilen (Default: 100)
confirm_delete_all: True = erlaubt Loeschung aller Zeilen
Returns:
Dict mit status, deleted_rows, error
"""
start = time.time()
# Validierung: Tabellenname
if not _validate_identifier(table):
return {
"status": "denied",
"error": "Invalid table name.",
}
# Validierung: Datenbank
if database not in Config.ALLOWED_DATABASES:
return {
"status": "denied",
"error": f"Database '{database}' not allowed.",
}
# Spezialfall: Alle loeschen mit confirm_delete_all
delete_all_mode = False
if where and confirm_delete_all:
# Check for special patterns like {"1": "1"} or {"id": ">0"}
if where == {"1": "1"} or (len(where) == 1 and list(where.values())[0] == ">0"):
delete_all_mode = True
# KRITISCH: WHERE ist Pflicht (ausser delete_all_mode)
if not where and not delete_all_mode:
return {
"status": "denied",
"error": "WHERE clause is required. DELETE without WHERE is forbidden.",
}
# Validierung: Spaltennamen in where (nicht bei delete_all_mode)
if not delete_all_mode:
for col in where:
if not _validate_identifier(col):
return {
"status": "denied",
"error": f"Invalid column name in where: {col}",
}
# Default LIMIT als Sicherheit (kein Limit bei delete_all_mode)
if delete_all_mode:
effective_limit = None
else:
effective_limit = limit if limit is not None else DEFAULT_DELETE_LIMIT
if effective_limit < 1:
effective_limit = DEFAULT_DELETE_LIMIT
try:
with DatabaseConnection.get_connection(database) as conn:
cursor = conn.cursor(buffered=True)
# Build query based on mode
if delete_all_mode:
query = f"DELETE FROM `{table}`"
values = ()
else:
# Build WHERE clause
where_parts = [f"`{col}` = %s" for col in where]
where_clause = " AND ".join(where_parts)
values = tuple(where.values())
query = f"DELETE FROM `{table}` WHERE {where_clause} LIMIT {effective_limit}"
cursor.execute(query, values)
deleted_rows = cursor.rowcount
conn.commit()
cursor.close()
duration = int((time.time() - start) * MS_PER_SECOND)
# Log
log_msg = f"DELETE FROM {table}" + (" (ALL)" if delete_all_mode else f" LIMIT {effective_limit}")
try:
logger.log(
LogEntry(
timestamp=datetime.now(),
client_name="mcp-db",
tool_name="db_delete",
request=log_msg,
status="success",
duration_ms=duration,
)
)
except Exception:
pass
result = {
"status": "success",
"table": table,
"deleted_rows": deleted_rows,
"execution_ms": duration,
}
if effective_limit:
result["limit_applied"] = effective_limit
if delete_all_mode:
result["delete_all"] = True
return result
except Exception as e:
duration = int((time.time() - start) * MS_PER_SECOND)
try:
logger.log(
LogEntry(
timestamp=datetime.now(),
client_name="mcp-db",
tool_name="db_delete",
request=f"DELETE FROM {table}",
status="error",
duration_ms=duration,
error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],
)
)
except Exception:
pass
return {
"status": "error",
"error": str(e)[:LOG_QUERY_MAX_LENGTH],
"execution_ms": duration,
}