stats_tool.py
- Pfad:
/var/www/mcp-servers/mcp-db/tools/stats_tool.py
- Namespace: -
- Zeilen: 49 | Größe: 1,554 Bytes
- Geändert: 2025-12-28 13:24:09 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 99
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 90 (10%)
Issues 1
| Zeile |
Typ |
Beschreibung |
| 18 |
magic_number |
Magic Number gefunden: 100 |
Dependencies 4
- use shared.constants.MAX_ROWS
- use shared.constants.PERCENT_HALF
- use config.Config
- use infrastructure.db_connection.DatabaseConnection
Funktionen 1
-
register_stats_tool()
Zeile 9
Code
"""Stats Tool - Logging-Statistiken"""
from shared.constants import MAX_ROWS, PERCENT_HALF
from config import Config
from infrastructure.db_connection import DatabaseConnection
def register_stats_tool(mcp):
"""Registriert db_stats Tool"""
@mcp.tool()
def db_stats(limit: int = PERCENT_HALF) -> dict:
"""
Zeigt letzte Queries aus mcp_log.
Args:
limit: Anzahl der Eintraege (1-100)
Returns:
Dict mit logs (List[dict])
"""
limit = min(max(limit, 1), MAX_ROWS)
try:
with DatabaseConnection.get_connection(Config.LOG_DB_NAME) as conn:
# buffered=True verhindert "Unread result found" Fehler
cursor = conn.cursor(dictionary=True, buffered=True)
cursor.execute(
"""SELECT id, timestamp, client_name, request, status,
duration_ms, error_message
FROM mcp_log
ORDER BY timestamp DESC
LIMIT %s""",
(limit,),
)
logs = cursor.fetchall()
cursor.close()
# Convert datetime objects to strings for JSON serialization
for log in logs:
if log.get("timestamp"):
log["timestamp"] = log["timestamp"].isoformat()
return {"logs": logs, "count": len(logs)}
except Exception as e:
return {"error": str(e), "logs": []}