{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp_docs\/tools\/docs_tools\/docs_reader.py",
"content": "\"\"\"Read operations for documentation management.\"\"\"\n\nimport json\nimport time\nfrom datetime import datetime\nfrom typing import Optional\n\nfrom config import Config\nfrom infrastructure.docs_repository import get_repository\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\n\nfrom .constants import DEFAULT_LIMIT, MS_PER_SECOND\n\n\ndef _log_operation(\n logger,\n tool_name: str,\n request_data: dict,\n start_time: float,\n status: str = \"success\",\n error: Optional[str] = None,\n) -> None:\n \"\"\"Log an operation with timing information.\"\"\"\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-docs\",\n tool_name=tool_name,\n request=json.dumps(request_data),\n status=status,\n error_message=error,\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND),\n )\n )\n\n\ndef register_reader_tools(mcp) -> None:\n \"\"\"Register read-only documentation tools with the MCP server.\"\"\"\n\n @mcp.tool()\n def docs_list(\n status: Optional[str] = None,\n parent_id: Optional[int] = None,\n search: Optional[str] = None,\n compact: bool = True,\n limit: int = DEFAULT_LIMIT,\n ) -> dict:\n \"\"\"List all documents from the database.\"\"\"\n start_time = time.time()\n logger = get_logger(\"mcp-docs\", Config)\n repo = get_repository()\n\n try:\n docs = repo.find_all(\n status=status,\n parent_id=parent_id,\n search=search,\n limit=limit,\n )\n total = repo.count(status=status, parent_id=parent_id, search=search)\n\n result = {\n \"success\": True,\n \"docs\": [d.to_dict_compact() if compact else d.to_dict() for d in docs],\n \"total\": total,\n \"limit\": limit,\n \"compact\": compact,\n }\n\n _log_operation(\n logger,\n \"docs_list\",\n {\"status\": status, \"parent_id\": parent_id, \"search\": search, \"limit\": limit},\n start_time,\n )\n return result\n\n except Exception as e:\n _log_operation(\n logger,\n \"docs_list\",\n {\"status\": status, \"search\": search},\n start_time,\n \"error\",\n str(e),\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_get(\n id: Optional[int] = None,\n path: Optional[str] = None,\n include_children: bool = False,\n include_breadcrumb: bool = False,\n ) -> dict:\n \"\"\"Retrieve a document by ID or path.\"\"\"\n start_time = time.time()\n logger = get_logger(\"mcp-docs\", Config)\n repo = get_repository()\n\n try:\n if id is None and path is None:\n return {\n \"success\": False,\n \"error\": \"Either id or path must be specified\",\n }\n\n doc = None\n if id is not None:\n doc = repo.find_by_id(id)\n elif path is not None:\n doc = repo.find_by_path(path)\n\n if not doc:\n return {\"success\": False, \"error\": \"Document not found\"}\n\n result = {\"success\": True, \"doc\": doc.to_dict()}\n\n if include_children and doc.id:\n children = repo.find_children(doc.id)\n result[\"children\"] = [c.to_dict_compact() for c in children]\n\n if include_breadcrumb and doc.id:\n result[\"breadcrumb\"] = repo.get_breadcrumb(doc.id)\n\n _log_operation(logger, \"docs_get\", {\"id\": id, \"path\": path}, start_time)\n return result\n\n except Exception as e:\n _log_operation(\n logger,\n \"docs_get\",\n {\"id\": id, \"path\": path},\n start_time,\n \"error\",\n str(e),\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_search(query: str, limit: int = DEFAULT_LIMIT) -> dict:\n \"\"\"Full-text search in all documents.\"\"\"\n start_time = time.time()\n logger = get_logger(\"mcp-docs\", Config)\n repo = get_repository()\n\n try:\n docs = repo.find_all(search=query, limit=limit)\n\n _log_operation(\n logger, \"docs_search\", {\"query\": query, \"limit\": limit}, start_time\n )\n return {\n \"success\": True,\n \"query\": query,\n \"results\": [d.to_dict_compact() for d in docs],\n \"count\": len(docs),\n }\n\n except Exception as e:\n _log_operation(\n logger, \"docs_search\", {\"query\": query}, start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_statistics() -> dict:\n \"\"\"Retrieve statistics about all documents.\"\"\"\n start_time = time.time()\n logger = get_logger(\"mcp-docs\", Config)\n repo = get_repository()\n\n try:\n stats = repo.get_statistics()\n\n _log_operation(logger, \"docs_statistics\", {}, start_time)\n return {\"success\": True, \"statistics\": stats}\n\n except Exception as e:\n _log_operation(\n logger, \"docs_statistics\", {}, start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n"
}
}
{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/mcp-servers\/mcp_docs\/tools\/docs_tools\/docs_reader.py",
"content": "\"\"\"Read operations for documentation management.\"\"\"\n\nimport json\nimport time\nfrom datetime import datetime\nfrom typing import Optional\n\nfrom config import Config\nfrom infrastructure.docs_repository import get_repository\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\n\nfrom .constants import DEFAULT_LIMIT, MS_PER_SECOND\n\n\ndef _log_operation(\n logger,\n tool_name: str,\n request_data: dict,\n start_time: float,\n status: str = \"success\",\n error: Optional[str] = None,\n) -> None:\n \"\"\"Log an operation with timing information.\"\"\"\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-docs\",\n tool_name=tool_name,\n request=json.dumps(request_data),\n status=status,\n error_message=error,\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND),\n )\n )\n\n\ndef register_reader_tools(mcp) -> None:\n \"\"\"Register read-only documentation tools with the MCP server.\"\"\"\n\n @mcp.tool()\n def docs_list(\n status: Optional[str] = None,\n parent_id: Optional[int] = None,\n search: Optional[str] = None,\n compact: bool = True,\n limit: int = DEFAULT_LIMIT,\n ) -> dict:\n \"\"\"List all documents from the database.\"\"\"\n start_time = time.time()\n logger = get_logger(\"mcp-docs\", Config)\n repo = get_repository()\n\n try:\n docs = repo.find_all(\n status=status,\n parent_id=parent_id,\n search=search,\n limit=limit,\n )\n total = repo.count(status=status, parent_id=parent_id, search=search)\n\n result = {\n \"success\": True,\n \"docs\": [d.to_dict_compact() if compact else d.to_dict() for d in docs],\n \"total\": total,\n \"limit\": limit,\n \"compact\": compact,\n }\n\n _log_operation(\n logger,\n \"docs_list\",\n {\"status\": status, \"parent_id\": parent_id, \"search\": search, \"limit\": limit},\n start_time,\n )\n return result\n\n except Exception as e:\n _log_operation(\n logger,\n \"docs_list\",\n {\"status\": status, \"search\": search},\n start_time,\n \"error\",\n str(e),\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_get(\n id: Optional[int] = None,\n path: Optional[str] = None,\n include_children: bool = False,\n include_breadcrumb: bool = False,\n ) -> dict:\n \"\"\"Retrieve a document by ID or path.\"\"\"\n start_time = time.time()\n logger = get_logger(\"mcp-docs\", Config)\n repo = get_repository()\n\n try:\n if id is None and path is None:\n return {\n \"success\": False,\n \"error\": \"Either id or path must be specified\",\n }\n\n doc = None\n if id is not None:\n doc = repo.find_by_id(id)\n elif path is not None:\n doc = repo.find_by_path(path)\n\n if not doc:\n return {\"success\": False, \"error\": \"Document not found\"}\n\n result = {\"success\": True, \"doc\": doc.to_dict()}\n\n if include_children and doc.id:\n children = repo.find_children(doc.id)\n result[\"children\"] = [c.to_dict_compact() for c in children]\n\n if include_breadcrumb and doc.id:\n result[\"breadcrumb\"] = repo.get_breadcrumb(doc.id)\n\n _log_operation(logger, \"docs_get\", {\"id\": id, \"path\": path}, start_time)\n return result\n\n except Exception as e:\n _log_operation(\n logger,\n \"docs_get\",\n {\"id\": id, \"path\": path},\n start_time,\n \"error\",\n str(e),\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_search(query: str, limit: int = DEFAULT_LIMIT) -> dict:\n \"\"\"Full-text search in all documents.\"\"\"\n start_time = time.time()\n logger = get_logger(\"mcp-docs\", Config)\n repo = get_repository()\n\n try:\n docs = repo.find_all(search=query, limit=limit)\n\n _log_operation(\n logger, \"docs_search\", {\"query\": query, \"limit\": limit}, start_time\n )\n return {\n \"success\": True,\n \"query\": query,\n \"results\": [d.to_dict_compact() for d in docs],\n \"count\": len(docs),\n }\n\n except Exception as e:\n _log_operation(\n logger, \"docs_search\", {\"query\": query}, start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_statistics() -> dict:\n \"\"\"Retrieve statistics about all documents.\"\"\"\n start_time = time.time()\n logger = get_logger(\"mcp-docs\", Config)\n repo = get_repository()\n\n try:\n stats = repo.get_statistics()\n\n _log_operation(logger, \"docs_statistics\", {}, start_time)\n return {\"success\": True, \"statistics\": stats}\n\n except Exception as e:\n _log_operation(\n logger, \"docs_statistics\", {}, start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 14,
"newStart": 1,
"newLines": 14,
"lines": [
" \"\"\"Read operations for documentation management.\"\"\"",
" ",
" import json",
"-import sys",
" import time",
"+from datetime import datetime",
" from typing import Optional",
" ",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_docs\")",
"-from domain.dokumentation import LogEntry",
"+from config import Config",
" from infrastructure.docs_repository import get_repository",
"-from infrastructure.protokoll_logger import get_logger",
"+from shared.domain import LogEntry",
"+from shared.infrastructure import get_logger",
" ",
" from .constants import DEFAULT_LIMIT, MS_PER_SECOND",
" "
]
},
{
"oldStart": 19,
"oldLines": 16,
"newStart": 19,
"newLines": 20,
"lines": [
" request_data: dict,",
" start_time: float,",
" status: str = \"success\",",
"- error: Optional[str] = None",
"+ error: Optional[str] = None,",
" ) -> None:",
" \"\"\"Log an operation with timing information.\"\"\"",
"- logger.log(LogEntry(",
"- tool_name=tool_name,",
"- request=json.dumps(request_data),",
"- status=status,",
"- error_message=error,",
"- duration_ms=int((time.time() - start_time) * MS_PER_SECOND)",
"- ))",
"+ logger.log(",
"+ LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-docs\",",
"+ tool_name=tool_name,",
"+ request=json.dumps(request_data),",
"+ status=status,",
"+ error_message=error,",
"+ duration_ms=int((time.time() - start_time) * MS_PER_SECOND),",
"+ )",
"+ )",
" ",
" ",
" def register_reader_tools(mcp) -> None:"
]
},
{
"oldStart": 40,
"oldLines": 11,
"newStart": 44,
"newLines": 11,
"lines": [
" parent_id: Optional[int] = None,",
" search: Optional[str] = None,",
" compact: bool = True,",
"- limit: int = DEFAULT_LIMIT",
"+ limit: int = DEFAULT_LIMIT,",
" ) -> dict:",
" \"\"\"List all documents from the database.\"\"\"",
" start_time = time.time()",
"- logger = get_logger()",
"+ logger = get_logger(\"mcp-docs\", Config)",
" repo = get_repository()",
" ",
" try:"
]
},
{
"oldStart": 52,
"oldLines": 7,
"newStart": 56,
"newLines": 7,
"lines": [
" status=status,",
" parent_id=parent_id,",
" search=search,",
"- limit=limit",
"+ limit=limit,",
" )",
" total = repo.count(status=status, parent_id=parent_id, search=search)",
" "
]
},
{
"oldStart": 61,
"oldLines": 21,
"newStart": 65,
"newLines": 25,
"lines": [
" \"docs\": [d.to_dict_compact() if compact else d.to_dict() for d in docs],",
" \"total\": total,",
" \"limit\": limit,",
"- \"compact\": compact",
"+ \"compact\": compact,",
" }",
" ",
" _log_operation(",
"- logger, \"docs_list\",",
"+ logger,",
"+ \"docs_list\",",
" {\"status\": status, \"parent_id\": parent_id, \"search\": search, \"limit\": limit},",
"- start_time",
"+ start_time,",
" )",
" return result",
" ",
" except Exception as e:",
" _log_operation(",
"- logger, \"docs_list\",",
"+ logger,",
"+ \"docs_list\",",
" {\"status\": status, \"search\": search},",
"- start_time, \"error\", str(e)",
"+ start_time,",
"+ \"error\",",
"+ str(e),",
" )",
" return {\"success\": False, \"error\": str(e)}",
" "
]
},
{
"oldStart": 84,
"oldLines": 18,
"newStart": 92,
"newLines": 18,
"lines": [
" id: Optional[int] = None,",
" path: Optional[str] = None,",
" include_children: bool = False,",
"- include_breadcrumb: bool = False",
"+ include_breadcrumb: bool = False,",
" ) -> dict:",
" \"\"\"Retrieve a document by ID or path.\"\"\"",
" start_time = time.time()",
"- logger = get_logger()",
"+ logger = get_logger(\"mcp-docs\", Config)",
" repo = get_repository()",
" ",
" try:",
" if id is None and path is None:",
" return {",
" \"success\": False,",
"- \"error\": \"Either id or path must be specified\"",
"+ \"error\": \"Either id or path must be specified\",",
" }",
" ",
" doc = None"
]
},
{
"oldStart": 107,
"oldLines": 10,
"newStart": 115,
"newLines": 7,
"lines": [
" if not doc:",
" return {\"success\": False, \"error\": \"Document not found\"}",
" ",
"- result = {",
"- \"success\": True,",
"- \"doc\": doc.to_dict()",
"- }",
"+ result = {\"success\": True, \"doc\": doc.to_dict()}",
" ",
" if include_children and doc.id:",
" children = repo.find_children(doc.id)"
]
},
{
"oldStart": 124,
"oldLines": 8,
"newStart": 129,
"newLines": 12,
"lines": [
" ",
" except Exception as e:",
" _log_operation(",
"- logger, \"docs_get\", {\"id\": id, \"path\": path},",
"- start_time, \"error\", str(e)",
"+ logger,",
"+ \"docs_get\",",
"+ {\"id\": id, \"path\": path},",
"+ start_time,",
"+ \"error\",",
"+ str(e),",
" )",
" return {\"success\": False, \"error\": str(e)}",
" "
]
},
{
"oldStart": 133,
"oldLines": 7,
"newStart": 142,
"newLines": 7,
"lines": [
" def docs_search(query: str, limit: int = DEFAULT_LIMIT) -> dict:",
" \"\"\"Full-text search in all documents.\"\"\"",
" start_time = time.time()",
"- logger = get_logger()",
"+ logger = get_logger(\"mcp-docs\", Config)",
" repo = get_repository()",
" ",
" try:"
]
},
{
"oldStart": 146,
"oldLines": 13,
"newStart": 155,
"newLines": 12,
"lines": [
" \"success\": True,",
" \"query\": query,",
" \"results\": [d.to_dict_compact() for d in docs],",
"- \"count\": len(docs)",
"+ \"count\": len(docs),",
" }",
" ",
" except Exception as e:",
" _log_operation(",
"- logger, \"docs_search\", {\"query\": query},",
"- start_time, \"error\", str(e)",
"+ logger, \"docs_search\", {\"query\": query}, start_time, \"error\", str(e)",
" )",
" return {\"success\": False, \"error\": str(e)}",
" "
]
},
{
"oldStart": 160,
"oldLines": 7,
"newStart": 168,
"newLines": 7,
"lines": [
" def docs_statistics() -> dict:",
" \"\"\"Retrieve statistics about all documents.\"\"\"",
" start_time = time.time()",
"- logger = get_logger()",
"+ logger = get_logger(\"mcp-docs\", Config)",
" repo = get_repository()",
" ",
" try:"
]
}
],
"originalFile": "\"\"\"Read operations for documentation management.\"\"\"\n\nimport json\nimport sys\nimport time\nfrom typing import Optional\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_docs\")\nfrom domain.dokumentation import LogEntry\nfrom infrastructure.docs_repository import get_repository\nfrom infrastructure.protokoll_logger import get_logger\n\nfrom .constants import DEFAULT_LIMIT, MS_PER_SECOND\n\n\ndef _log_operation(\n logger,\n tool_name: str,\n request_data: dict,\n start_time: float,\n status: str = \"success\",\n error: Optional[str] = None\n) -> None:\n \"\"\"Log an operation with timing information.\"\"\"\n logger.log(LogEntry(\n tool_name=tool_name,\n request=json.dumps(request_data),\n status=status,\n error_message=error,\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n\n\ndef register_reader_tools(mcp) -> None:\n \"\"\"Register read-only documentation tools with the MCP server.\"\"\"\n\n @mcp.tool()\n def docs_list(\n status: Optional[str] = None,\n parent_id: Optional[int] = None,\n search: Optional[str] = None,\n compact: bool = True,\n limit: int = DEFAULT_LIMIT\n ) -> dict:\n \"\"\"List all documents from the database.\"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n docs = repo.find_all(\n status=status,\n parent_id=parent_id,\n search=search,\n limit=limit\n )\n total = repo.count(status=status, parent_id=parent_id, search=search)\n\n result = {\n \"success\": True,\n \"docs\": [d.to_dict_compact() if compact else d.to_dict() for d in docs],\n \"total\": total,\n \"limit\": limit,\n \"compact\": compact\n }\n\n _log_operation(\n logger, \"docs_list\",\n {\"status\": status, \"parent_id\": parent_id, \"search\": search, \"limit\": limit},\n start_time\n )\n return result\n\n except Exception as e:\n _log_operation(\n logger, \"docs_list\",\n {\"status\": status, \"search\": search},\n start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_get(\n id: Optional[int] = None,\n path: Optional[str] = None,\n include_children: bool = False,\n include_breadcrumb: bool = False\n ) -> dict:\n \"\"\"Retrieve a document by ID or path.\"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n if id is None and path is None:\n return {\n \"success\": False,\n \"error\": \"Either id or path must be specified\"\n }\n\n doc = None\n if id is not None:\n doc = repo.find_by_id(id)\n elif path is not None:\n doc = repo.find_by_path(path)\n\n if not doc:\n return {\"success\": False, \"error\": \"Document not found\"}\n\n result = {\n \"success\": True,\n \"doc\": doc.to_dict()\n }\n\n if include_children and doc.id:\n children = repo.find_children(doc.id)\n result[\"children\"] = [c.to_dict_compact() for c in children]\n\n if include_breadcrumb and doc.id:\n result[\"breadcrumb\"] = repo.get_breadcrumb(doc.id)\n\n _log_operation(logger, \"docs_get\", {\"id\": id, \"path\": path}, start_time)\n return result\n\n except Exception as e:\n _log_operation(\n logger, \"docs_get\", {\"id\": id, \"path\": path},\n start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_search(query: str, limit: int = DEFAULT_LIMIT) -> dict:\n \"\"\"Full-text search in all documents.\"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n docs = repo.find_all(search=query, limit=limit)\n\n _log_operation(\n logger, \"docs_search\", {\"query\": query, \"limit\": limit}, start_time\n )\n return {\n \"success\": True,\n \"query\": query,\n \"results\": [d.to_dict_compact() for d in docs],\n \"count\": len(docs)\n }\n\n except Exception as e:\n _log_operation(\n logger, \"docs_search\", {\"query\": query},\n start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_statistics() -> dict:\n \"\"\"Retrieve statistics about all documents.\"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n stats = repo.get_statistics()\n\n _log_operation(logger, \"docs_statistics\", {}, start_time)\n return {\"success\": True, \"statistics\": stats}\n\n except Exception as e:\n _log_operation(\n logger, \"docs_statistics\", {}, start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n"
}
}