{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp_db\/tools\/databases_tool.py",
"content": "\"\"\"Databases Tool - Listet verfuegbare Datenbanken auf.\"\"\"\n\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom config import Config\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\n\n\ndef register_databases_tool(mcp) -> None:\n \"\"\"Registriert db_databases Tool.\"\"\"\n logger = get_logger(\"mcp-db\", Config)\n\n @mcp.tool()\n def db_databases() -> dict:\n \"\"\"\n Listet alle verfuegbaren Datenbanken auf.\n\n Returns:\n Dict mit databases (Liste der erlaubten DBs mit Tabellen-Count)\n \"\"\"\n start = time.time()\n\n try:\n # Verbinde zu einer erlaubten DB um Queries auszufuehren\n with DatabaseConnection.get_connection(\"ki_dev\") as conn:\n cursor = conn.cursor(dictionary=True, buffered=True)\n\n databases = []\n for db_name in Config.ALLOWED_DATABASES:\n cursor.execute(\n \"\"\"SELECT COUNT(*) as table_count\n FROM information_schema.tables\n WHERE table_schema = %s\"\"\",\n (db_name,),\n )\n row = cursor.fetchone()\n table_count = row[\"table_count\"] if row else 0\n\n databases.append({\n \"name\": db_name,\n \"tables_count\": table_count,\n })\n\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n # Log\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_databases\",\n request=\"SHOW DATABASES (filtered)\",\n status=\"success\",\n duration_ms=duration,\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"success\",\n \"databases\": databases,\n \"execution_ms\": duration,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_databases\",\n request=\"SHOW DATABASES\",\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"error\",\n \"error\": str(e)[:LOG_QUERY_MAX_LENGTH],\n \"execution_ms\": duration,\n }\n"
}
}
{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/mcp-servers\/mcp_db\/tools\/databases_tool.py",
"content": "\"\"\"Databases Tool - Listet verfuegbare Datenbanken auf.\"\"\"\n\nimport time\nfrom datetime import datetime\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom config import Config\nfrom infrastructure.db_connection import DatabaseConnection\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\n\n\ndef register_databases_tool(mcp) -> None:\n \"\"\"Registriert db_databases Tool.\"\"\"\n logger = get_logger(\"mcp-db\", Config)\n\n @mcp.tool()\n def db_databases() -> dict:\n \"\"\"\n Listet alle verfuegbaren Datenbanken auf.\n\n Returns:\n Dict mit databases (Liste der erlaubten DBs mit Tabellen-Count)\n \"\"\"\n start = time.time()\n\n try:\n # Verbinde zu einer erlaubten DB um Queries auszufuehren\n with DatabaseConnection.get_connection(\"ki_dev\") as conn:\n cursor = conn.cursor(dictionary=True, buffered=True)\n\n databases = []\n for db_name in Config.ALLOWED_DATABASES:\n cursor.execute(\n \"\"\"SELECT COUNT(*) as table_count\n FROM information_schema.tables\n WHERE table_schema = %s\"\"\",\n (db_name,),\n )\n row = cursor.fetchone()\n table_count = row[\"table_count\"] if row else 0\n\n databases.append({\n \"name\": db_name,\n \"tables_count\": table_count,\n })\n\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n # Log\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_databases\",\n request=\"SHOW DATABASES (filtered)\",\n status=\"success\",\n duration_ms=duration,\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"success\",\n \"databases\": databases,\n \"execution_ms\": duration,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n try:\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-db\",\n tool_name=\"db_databases\",\n request=\"SHOW DATABASES\",\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"error\",\n \"error\": str(e)[:LOG_QUERY_MAX_LENGTH],\n \"execution_ms\": duration,\n }\n",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 26,
"newStart": 1,
"newLines": 24,
"lines": [
"-\"\"\"Databases Tool - Listet verfügbare Datenbanken auf.\"\"\"",
"-import sys",
"+\"\"\"Databases Tool - Listet verfuegbare Datenbanken auf.\"\"\"",
"+",
" import time",
"+from datetime import datetime",
" ",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")",
"-",
" from constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND",
" ",
" from config import Config",
"-from domain.log_contract import LogEntry",
" from infrastructure.db_connection import DatabaseConnection",
"-from infrastructure.protokoll_logger import ProtokollLogger",
"+from shared.domain import LogEntry",
"+from shared.infrastructure import get_logger",
" ",
" ",
" def register_databases_tool(mcp) -> None:",
" \"\"\"Registriert db_databases Tool.\"\"\"",
"- logger = ProtokollLogger()",
"+ logger = get_logger(\"mcp-db\", Config)",
" ",
" @mcp.tool()",
" def db_databases() -> dict:",
" \"\"\"",
"- Listet alle verfügbaren Datenbanken auf.",
"+ Listet alle verfuegbaren Datenbanken auf.",
" ",
" Returns:",
" Dict mit databases (Liste der erlaubten DBs mit Tabellen-Count)"
]
},
{
"oldStart": 28,
"oldLines": 7,
"newStart": 26,
"newLines": 7,
"lines": [
" start = time.time()",
" ",
" try:",
"- # Verbinde zu einer erlaubten DB um Queries auszuführen",
"+ # Verbinde zu einer erlaubten DB um Queries auszufuehren",
" with DatabaseConnection.get_connection(\"ki_dev\") as conn:",
" cursor = conn.cursor(dictionary=True, buffered=True)",
" "
]
},
{
"oldStart": 56,
"oldLines": 6,
"newStart": 54,
"newLines": 9,
"lines": [
" try:",
" logger.log(",
" LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-db\",",
"+ tool_name=\"db_databases\",",
" request=\"SHOW DATABASES (filtered)\",",
" status=\"success\",",
" duration_ms=duration,"
]
},
{
"oldStart": 76,
"oldLines": 6,
"newStart": 77,
"newLines": 9,
"lines": [
" try:",
" logger.log(",
" LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-db\",",
"+ tool_name=\"db_databases\",",
" request=\"SHOW DATABASES\",",
" status=\"error\",",
" duration_ms=duration,"
]
}
],
"originalFile": "\"\"\"Databases Tool - Listet verfügbare Datenbanken auf.\"\"\"\nimport sys\nimport time\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_db\")\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, MS_PER_SECOND\n\nfrom config import Config\nfrom domain.log_contract import LogEntry\nfrom infrastructure.db_connection import DatabaseConnection\nfrom infrastructure.protokoll_logger import ProtokollLogger\n\n\ndef register_databases_tool(mcp) -> None:\n \"\"\"Registriert db_databases Tool.\"\"\"\n logger = ProtokollLogger()\n\n @mcp.tool()\n def db_databases() -> dict:\n \"\"\"\n Listet alle verfügbaren Datenbanken auf.\n\n Returns:\n Dict mit databases (Liste der erlaubten DBs mit Tabellen-Count)\n \"\"\"\n start = time.time()\n\n try:\n # Verbinde zu einer erlaubten DB um Queries auszuführen\n with DatabaseConnection.get_connection(\"ki_dev\") as conn:\n cursor = conn.cursor(dictionary=True, buffered=True)\n\n databases = []\n for db_name in Config.ALLOWED_DATABASES:\n cursor.execute(\n \"\"\"SELECT COUNT(*) as table_count\n FROM information_schema.tables\n WHERE table_schema = %s\"\"\",\n (db_name,),\n )\n row = cursor.fetchone()\n table_count = row[\"table_count\"] if row else 0\n\n databases.append({\n \"name\": db_name,\n \"tables_count\": table_count,\n })\n\n cursor.close()\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n # Log\n try:\n logger.log(\n LogEntry(\n request=\"SHOW DATABASES (filtered)\",\n status=\"success\",\n duration_ms=duration,\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"success\",\n \"databases\": databases,\n \"execution_ms\": duration,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n\n try:\n logger.log(\n LogEntry(\n request=\"SHOW DATABASES\",\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_ENTRY_MAX_LENGTH],\n )\n )\n except Exception:\n pass\n\n return {\n \"status\": \"error\",\n \"error\": str(e)[:LOG_QUERY_MAX_LENGTH],\n \"execution_ms\": duration,\n }\n"
}
}