insert_tool.py
- Pfad:
/var/www/mcp-servers/mcp-db/tools/insert_tool.py - Namespace: -
- Zeilen: 135 | Größe: 4,195 Bytes
- Geändert: 2025-12-28 13:24:43 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 88
- Dependencies: 50 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 10
- use re
- use time
- use datetime.datetime
- use shared.constants.LOG_ENTRY_MAX_LENGTH
- use shared.constants.LOG_QUERY_MAX_LENGTH
- use shared.constants.MS_PER_SECOND
- use config.Config
- use infrastructure.db_connection.DatabaseConnection
- use shared.domain.LogEntry
- use shared.infrastructure.get_logger
Funktionen 2
-
_validate_identifier()Zeile 15 -
register_insert_tool()Zeile 20
Code
"""Insert Tool - Fuegt Datensaetze ein."""
import re
import time
from datetime import datetime
from shared.constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND
from config import Config
from infrastructure.db_connection import DatabaseConnection
from shared.domain import LogEntry
from shared.infrastructure import get_logger
def _validate_identifier(name: str) -> bool:
"""Validiert Tabellen-/Spaltennamen gegen SQL-Injection."""
return bool(re.match(r"^[a-zA-Z0-9_]+$", name))
def register_insert_tool(mcp) -> None:
"""Registriert db_insert Tool."""
logger = get_logger("mcp-db", Config)
@mcp.tool()
def db_insert(
table: str,
data: dict,
database: str = "ki_dev",
) -> dict:
"""
Fuegt einen Datensatz ein.
Args:
table: Zieltabelle
data: Dict mit Spalte:Wert Paaren
database: Zieldatenbank (ki_dev oder ki_content)
Returns:
Dict mit status, inserted_id, error
"""
start = time.time()
# Validierung: Tabellenname
if not _validate_identifier(table):
return {
"status": "denied",
"error": "Invalid table name.",
}
# Validierung: Datenbank
if database not in Config.ALLOWED_DATABASES:
return {
"status": "denied",
"error": f"Database '{database}' not allowed.",
}
# Validierung: Data nicht leer
if not data:
return {
"status": "denied",
"error": "Data dict must not be empty.",
}
# Validierung: Spaltennamen
for col in data:
if not _validate_identifier(col):
return {
"status": "denied",
"error": f"Invalid column name: {col}",
}
try:
with DatabaseConnection.get_connection(database) as conn:
cursor = conn.cursor(buffered=True)
# Build parameterized INSERT
columns = list(data.keys())
placeholders = ", ".join(["%s"] * len(columns))
columns_str = ", ".join([f"`{c}`" for c in columns])
values = tuple(data.values())
query = f"INSERT INTO `{table}` ({columns_str}) VALUES ({placeholders})"
cursor.execute(query, values)
inserted_id = cursor.lastrowid
conn.commit()
cursor.close()
duration = int((time.time() - start) * MS_PER_SECOND)
# Log
try:
logger.log(
LogEntry(
timestamp=datetime.now(),
client_name="mcp-db",
tool_name="db_insert",
request=f"INSERT INTO {table} ({columns_str})",
status="success",
duration_ms=duration,
)
)
except Exception:
pass
return {
"status": "success",
"table": table,
"inserted_id": inserted_id,
"execution_ms": duration,
}
except Exception as e:
duration = int((time.time() - start) * MS_PER_SECOND)
try:
logger.log(
LogEntry(
timestamp=datetime.now(),
client_name="mcp-db",
tool_name="db_insert",
request=f"INSERT INTO {table}",
status="error",
duration_ms=duration,
error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],
)
)
except Exception:
pass
return {
"status": "error",
"error": str(e)[:LOG_QUERY_MAX_LENGTH],
"execution_ms": duration,
}