{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp-docs\/tools\/docs_tools\/docs_reader.py",
"content": "\"\"\"Read operations for documentation management.\"\"\"\n\nimport json\nimport sys\nimport time\nfrom typing import Optional\n\nsys.path.insert(0, \"\/opt\/mcp-servers\/mcp-docs\")\nfrom domain.dokumentation import LogEntry\nfrom infrastructure.docs_repository import get_repository\nfrom infrastructure.protokoll_logger import get_logger\n\nfrom .constants import DEFAULT_LIMIT, MS_PER_SECOND\n\n\ndef register_reader_tools(mcp) -> None:\n \"\"\"\n Register read-only documentation tools with the MCP server.\n\n Args:\n mcp: The MCP server instance to register tools with\n \"\"\"\n\n @mcp.tool()\n def docs_list(\n status: Optional[str] = None,\n parent_id: Optional[int] = None,\n search: Optional[str] = None,\n compact: bool = True,\n limit: int = DEFAULT_LIMIT\n ) -> dict:\n \"\"\"\n List all documents from the database.\n\n Args:\n status: Filter by status (draft, published, archived)\n parent_id: Filter by parent ID\n search: Full-text search in title\/description\n compact: True = only id\/path\/title\/status (token-efficient)\n limit: Maximum number of results\n\n Returns:\n Dict containing list of documents with metadata\n \"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n docs = repo.find_all(\n status=status,\n parent_id=parent_id,\n search=search,\n limit=limit\n )\n total = repo.count(status=status, parent_id=parent_id, search=search)\n\n result = {\n \"success\": True,\n \"docs\": [d.to_dict_compact() if compact else d.to_dict() for d in docs],\n \"total\": total,\n \"limit\": limit,\n \"compact\": compact\n }\n\n logger.log(LogEntry(\n tool_name=\"docs_list\",\n request=json.dumps({\n \"status\": status,\n \"parent_id\": parent_id,\n \"search\": search,\n \"limit\": limit\n }),\n status=\"success\",\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n\n return result\n\n except Exception as e:\n logger.log(LogEntry(\n tool_name=\"docs_list\",\n request=json.dumps({\"status\": status, \"search\": search}),\n status=\"error\",\n error_message=str(e),\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_get(\n id: Optional[int] = None,\n path: Optional[str] = None,\n include_children: bool = False,\n include_breadcrumb: bool = False\n ) -> dict:\n \"\"\"\n Retrieve a document by ID or path.\n\n Args:\n id: Document ID\n path: Document path (alternative to ID)\n include_children: Include child documents\n include_breadcrumb: Include breadcrumb path\n\n Returns:\n Dict containing document details\n \"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n if id is None and path is None:\n return {\n \"success\": False,\n \"error\": \"Either id or path must be specified\"\n }\n\n doc = None\n if id is not None:\n doc = repo.find_by_id(id)\n elif path is not None:\n doc = repo.find_by_path(path)\n\n if not doc:\n return {\"success\": False, \"error\": \"Document not found\"}\n\n result = {\n \"success\": True,\n \"doc\": doc.to_dict()\n }\n\n if include_children and doc.id:\n children = repo.find_children(doc.id)\n result[\"children\"] = [c.to_dict_compact() for c in children]\n\n if include_breadcrumb and doc.id:\n result[\"breadcrumb\"] = repo.get_breadcrumb(doc.id)\n\n logger.log(LogEntry(\n tool_name=\"docs_get\",\n request=json.dumps({\"id\": id, \"path\": path}),\n status=\"success\",\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n\n return result\n\n except Exception as e:\n logger.log(LogEntry(\n tool_name=\"docs_get\",\n request=json.dumps({\"id\": id, \"path\": path}),\n status=\"error\",\n error_message=str(e),\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_search(query: str, limit: int = DEFAULT_LIMIT) -> dict:\n \"\"\"\n Full-text search in all documents.\n\n Args:\n query: Search term (required)\n limit: Maximum number of results\n\n Returns:\n Dict containing list of matching documents\n \"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n docs = repo.find_all(search=query, limit=limit)\n\n logger.log(LogEntry(\n tool_name=\"docs_search\",\n request=json.dumps({\"query\": query, \"limit\": limit}),\n status=\"success\",\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n\n return {\n \"success\": True,\n \"query\": query,\n \"results\": [d.to_dict_compact() for d in docs],\n \"count\": len(docs)\n }\n\n except Exception as e:\n logger.log(LogEntry(\n tool_name=\"docs_search\",\n request=json.dumps({\"query\": query}),\n status=\"error\",\n error_message=str(e),\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_statistics() -> dict:\n \"\"\"\n Retrieve statistics about all documents.\n\n Returns:\n Dict containing statistics by status, depth, etc.\n \"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n stats = repo.get_statistics()\n\n logger.log(LogEntry(\n tool_name=\"docs_statistics\",\n request=\"{}\",\n status=\"success\",\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n\n return {\n \"success\": True,\n \"statistics\": stats\n }\n\n except Exception as e:\n logger.log(LogEntry(\n tool_name=\"docs_statistics\",\n request=\"{}\",\n status=\"error\",\n error_message=str(e),\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n return {\"success\": False, \"error\": str(e)}\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/mcp-servers\/mcp-docs\/tools\/docs_tools\/docs_reader.py",
"content": "\"\"\"Read operations for documentation management.\"\"\"\n\nimport json\nimport sys\nimport time\nfrom typing import Optional\n\nsys.path.insert(0, \"\/opt\/mcp-servers\/mcp-docs\")\nfrom domain.dokumentation import LogEntry\nfrom infrastructure.docs_repository import get_repository\nfrom infrastructure.protokoll_logger import get_logger\n\nfrom .constants import DEFAULT_LIMIT, MS_PER_SECOND\n\n\ndef register_reader_tools(mcp) -> None:\n \"\"\"\n Register read-only documentation tools with the MCP server.\n\n Args:\n mcp: The MCP server instance to register tools with\n \"\"\"\n\n @mcp.tool()\n def docs_list(\n status: Optional[str] = None,\n parent_id: Optional[int] = None,\n search: Optional[str] = None,\n compact: bool = True,\n limit: int = DEFAULT_LIMIT\n ) -> dict:\n \"\"\"\n List all documents from the database.\n\n Args:\n status: Filter by status (draft, published, archived)\n parent_id: Filter by parent ID\n search: Full-text search in title\/description\n compact: True = only id\/path\/title\/status (token-efficient)\n limit: Maximum number of results\n\n Returns:\n Dict containing list of documents with metadata\n \"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n docs = repo.find_all(\n status=status,\n parent_id=parent_id,\n search=search,\n limit=limit\n )\n total = repo.count(status=status, parent_id=parent_id, search=search)\n\n result = {\n \"success\": True,\n \"docs\": [d.to_dict_compact() if compact else d.to_dict() for d in docs],\n \"total\": total,\n \"limit\": limit,\n \"compact\": compact\n }\n\n logger.log(LogEntry(\n tool_name=\"docs_list\",\n request=json.dumps({\n \"status\": status,\n \"parent_id\": parent_id,\n \"search\": search,\n \"limit\": limit\n }),\n status=\"success\",\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n\n return result\n\n except Exception as e:\n logger.log(LogEntry(\n tool_name=\"docs_list\",\n request=json.dumps({\"status\": status, \"search\": search}),\n status=\"error\",\n error_message=str(e),\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_get(\n id: Optional[int] = None,\n path: Optional[str] = None,\n include_children: bool = False,\n include_breadcrumb: bool = False\n ) -> dict:\n \"\"\"\n Retrieve a document by ID or path.\n\n Args:\n id: Document ID\n path: Document path (alternative to ID)\n include_children: Include child documents\n include_breadcrumb: Include breadcrumb path\n\n Returns:\n Dict containing document details\n \"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n if id is None and path is None:\n return {\n \"success\": False,\n \"error\": \"Either id or path must be specified\"\n }\n\n doc = None\n if id is not None:\n doc = repo.find_by_id(id)\n elif path is not None:\n doc = repo.find_by_path(path)\n\n if not doc:\n return {\"success\": False, \"error\": \"Document not found\"}\n\n result = {\n \"success\": True,\n \"doc\": doc.to_dict()\n }\n\n if include_children and doc.id:\n children = repo.find_children(doc.id)\n result[\"children\"] = [c.to_dict_compact() for c in children]\n\n if include_breadcrumb and doc.id:\n result[\"breadcrumb\"] = repo.get_breadcrumb(doc.id)\n\n logger.log(LogEntry(\n tool_name=\"docs_get\",\n request=json.dumps({\"id\": id, \"path\": path}),\n status=\"success\",\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n\n return result\n\n except Exception as e:\n logger.log(LogEntry(\n tool_name=\"docs_get\",\n request=json.dumps({\"id\": id, \"path\": path}),\n status=\"error\",\n error_message=str(e),\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_search(query: str, limit: int = DEFAULT_LIMIT) -> dict:\n \"\"\"\n Full-text search in all documents.\n\n Args:\n query: Search term (required)\n limit: Maximum number of results\n\n Returns:\n Dict containing list of matching documents\n \"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n docs = repo.find_all(search=query, limit=limit)\n\n logger.log(LogEntry(\n tool_name=\"docs_search\",\n request=json.dumps({\"query\": query, \"limit\": limit}),\n status=\"success\",\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n\n return {\n \"success\": True,\n \"query\": query,\n \"results\": [d.to_dict_compact() for d in docs],\n \"count\": len(docs)\n }\n\n except Exception as e:\n logger.log(LogEntry(\n tool_name=\"docs_search\",\n request=json.dumps({\"query\": query}),\n status=\"error\",\n error_message=str(e),\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_statistics() -> dict:\n \"\"\"\n Retrieve statistics about all documents.\n\n Returns:\n Dict containing statistics by status, depth, etc.\n \"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n stats = repo.get_statistics()\n\n logger.log(LogEntry(\n tool_name=\"docs_statistics\",\n request=\"{}\",\n status=\"success\",\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n\n return {\n \"success\": True,\n \"statistics\": stats\n }\n\n except Exception as e:\n logger.log(LogEntry(\n tool_name=\"docs_statistics\",\n request=\"{}\",\n status=\"error\",\n error_message=str(e),\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n return {\"success\": False, \"error\": str(e)}\n",
"structuredPatch": [],
"originalFile": null
}
}