{
"event": "PostToolUse",
"tool_name": "Read",
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/mcp-servers\/mcp_docs\/tools\/docs_tools\/docs_writer.py",
"content": "\"\"\"Write operations for documentation management.\"\"\"\n\nimport json\nimport sys\nimport time\nfrom typing import Optional\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_docs\")\nfrom domain.dokumentation import Dokumentation, DocStatus, LogEntry\nfrom infrastructure.docs_repository import get_repository\nfrom infrastructure.protokoll_logger import get_logger\n\nfrom .constants import (\n DEFAULT_SORT_ORDER,\n MS_PER_SECOND,\n ROOT_DEPTH,\n ROOT_PATH_PREFIX,\n STATUS_DRAFT,\n VALID_STATUSES,\n)\n\n\ndef _log_operation(\n logger,\n tool_name: str,\n request_data: dict,\n start_time: float,\n status: str = \"success\",\n error: Optional[str] = None\n) -> None:\n \"\"\"Log an operation with timing information.\"\"\"\n logger.log(LogEntry(\n tool_name=tool_name,\n request=json.dumps(request_data),\n status=status,\n error_message=error,\n duration_ms=int((time.time() - start_time) * MS_PER_SECOND)\n ))\n\n\ndef register_writer_tools(mcp) -> None:\n \"\"\"Register write documentation tools with the MCP server.\"\"\"\n\n @mcp.tool()\n def docs_create(\n title: str,\n slug: str,\n content: str = \"\",\n description: Optional[str] = None,\n parent_id: Optional[int] = None,\n status: str = STATUS_DRAFT,\n sort_order: int = DEFAULT_SORT_ORDER\n ) -> dict:\n \"\"\"Create a new document.\"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n # Calculate path\n if parent_id is not None:\n parent = repo.find_by_id(parent_id)\n if not parent:\n return {\n \"success\": False,\n \"error\": f\"Parent with ID {parent_id} not found\"\n }\n path = f\"{parent.path}\/{slug}\"\n depth = parent.depth + 1\n else:\n path = f\"{ROOT_PATH_PREFIX}{slug}\"\n depth = ROOT_DEPTH\n\n # Check if path already exists\n existing = repo.find_by_path(path)\n if existing:\n return {\n \"success\": False,\n \"error\": f\"Path '{path}' already exists\"\n }\n\n doc = Dokumentation(\n parent_id=parent_id,\n slug=slug,\n path=path,\n title=title,\n description=description,\n content=content,\n status=DocStatus(status) if status in VALID_STATUSES else DocStatus.DRAFT,\n sort_order=sort_order,\n depth=depth\n )\n\n new_id = repo.create(doc)\n created_doc = repo.find_by_id(new_id)\n\n _log_operation(\n logger, \"docs_create\",\n {\"title\": title, \"slug\": slug, \"parent_id\": parent_id},\n start_time\n )\n return {\n \"success\": True,\n \"doc\": created_doc.to_dict() if created_doc else None,\n \"message\": f\"Document '{title}' created with ID {new_id}\"\n }\n\n except Exception as e:\n _log_operation(\n logger, \"docs_create\", {\"title\": title, \"slug\": slug},\n start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_update(\n id: int,\n title: Optional[str] = None,\n content: Optional[str] = None,\n description: Optional[str] = None,\n status: Optional[str] = None\n ) -> dict:\n \"\"\"Update a document.\"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n doc = repo.find_by_id(id)\n if not doc:\n return {\n \"success\": False,\n \"error\": f\"Document with ID {id} not found\"\n }\n\n updates = {}\n if title is not None:\n updates[\"title\"] = title\n if content is not None:\n updates[\"content\"] = content\n if description is not None:\n updates[\"description\"] = description\n if status is not None and status in VALID_STATUSES:\n updates[\"status\"] = status\n\n if not updates:\n return {\n \"success\": False,\n \"error\": \"No changes specified\"\n }\n\n success = repo.update(id, updates)\n if success:\n updated_doc = repo.find_by_id(id)\n _log_operation(\n logger, \"docs_update\",\n {\"id\": id, \"updates\": list(updates.keys())},\n start_time\n )\n return {\n \"success\": True,\n \"doc\": updated_doc.to_dict() if updated_doc else None,\n \"message\": f\"Document #{id} updated\"\n }\n else:\n return {\"success\": False, \"error\": \"Update failed\"}\n\n except Exception as e:\n _log_operation(\n logger, \"docs_update\", {\"id\": id}, start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def docs_delete(id: int) -> dict:\n \"\"\"Delete a document.\"\"\"\n start_time = time.time()\n logger = get_logger()\n repo = get_repository()\n\n try:\n doc = repo.find_by_id(id)\n if not doc:\n return {\n \"success\": False,\n \"error\": f\"Document with ID {id} not found\"\n }\n\n success = repo.delete(id)\n\n _log_operation(\n logger, \"docs_delete\", {\"id\": id, \"title\": doc.title},\n start_time, \"success\" if success else \"error\"\n )\n return {\n \"success\": success,\n \"message\": (\n f\"Document '{doc.title}' (ID {id}) deleted\"\n if success else \"Deletion failed\"\n )\n }\n\n except ValueError as e:\n _log_operation(\n logger, \"docs_delete\", {\"id\": id}, start_time, \"denied\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n\n except Exception as e:\n _log_operation(\n logger, \"docs_delete\", {\"id\": id}, start_time, \"error\", str(e)\n )\n return {\"success\": False, \"error\": str(e)}\n",
"numLines": 214,
"startLine": 1,
"totalLines": 214
}
}
}