docs_writer.py
- Pfad:
/var/www/mcp-servers/mcp-docs/tools/docs_tools/docs_writer.py
- Namespace: -
- Zeilen: 229 | Größe: 6,945 Bytes
- Geändert: 2025-12-28 12:40:22 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 73
- Dependencies: 0 (25%)
- LOC: 90 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Issues 1
| Zeile |
Typ |
Beschreibung |
| - |
coupling |
Klasse hat 16 Dependencies (max: 15) |
Dependencies 16
- use json
- use time
- use datetime.datetime
- use typing.Optional
- use config.Config
- use domain.dokumentation.DocStatus
- use domain.dokumentation.Dokumentation
- use infrastructure.docs_repository.get_repository
- use shared.domain.LogEntry
- use shared.infrastructure.get_logger
- use constants.DEFAULT_SORT_ORDER
- use constants.MS_PER_SECOND
- use constants.ROOT_DEPTH
- use constants.ROOT_PATH_PREFIX
- use constants.STATUS_DRAFT
- use constants.VALID_STATUSES
Funktionen 2
-
_log_operation()
Zeile 24
-
register_writer_tools()
Zeile 46
Code
"""Write operations for documentation management."""
import json
import time
from datetime import datetime
from typing import Optional
from config import Config
from domain.dokumentation import DocStatus, Dokumentation
from infrastructure.docs_repository import get_repository
from shared.domain import LogEntry
from shared.infrastructure import get_logger
from .constants import (
DEFAULT_SORT_ORDER,
MS_PER_SECOND,
ROOT_DEPTH,
ROOT_PATH_PREFIX,
STATUS_DRAFT,
VALID_STATUSES,
)
def _log_operation(
logger,
tool_name: str,
request_data: dict,
start_time: float,
status: str = "success",
error: Optional[str] = None,
) -> None:
"""Log an operation with timing information."""
logger.log(
LogEntry(
timestamp=datetime.now(),
client_name="mcp-docs",
tool_name=tool_name,
request=json.dumps(request_data),
status=status,
error_message=error,
duration_ms=int((time.time() - start_time) * MS_PER_SECOND),
)
)
def register_writer_tools(mcp) -> None:
"""Register write documentation tools with the MCP server."""
@mcp.tool()
def docs_create(
title: str,
slug: str,
content: str = "",
description: Optional[str] = None,
parent_id: Optional[int] = None,
status: str = STATUS_DRAFT,
sort_order: int = DEFAULT_SORT_ORDER,
) -> dict:
"""Create a new document."""
start_time = time.time()
logger = get_logger("mcp-docs", Config)
repo = get_repository()
try:
# Calculate path
if parent_id is not None:
parent = repo.find_by_id(parent_id)
if not parent:
return {
"success": False,
"error": f"Parent with ID {parent_id} not found",
}
path = f"{parent.path}/{slug}"
depth = parent.depth + 1
else:
path = f"{ROOT_PATH_PREFIX}{slug}"
depth = ROOT_DEPTH
# Check if path already exists
existing = repo.find_by_path(path)
if existing:
return {
"success": False,
"error": f"Path '{path}' already exists",
}
doc = Dokumentation(
parent_id=parent_id,
slug=slug,
path=path,
title=title,
description=description,
content=content,
status=DocStatus(status) if status in VALID_STATUSES else DocStatus.DRAFT,
sort_order=sort_order,
depth=depth,
)
new_id = repo.create(doc)
created_doc = repo.find_by_id(new_id)
_log_operation(
logger,
"docs_create",
{"title": title, "slug": slug, "parent_id": parent_id},
start_time,
)
return {
"success": True,
"doc": created_doc.to_dict() if created_doc else None,
"message": f"Document '{title}' created with ID {new_id}",
}
except Exception as e:
_log_operation(
logger,
"docs_create",
{"title": title, "slug": slug},
start_time,
"error",
str(e),
)
return {"success": False, "error": str(e)}
@mcp.tool()
def docs_update(
id: int,
title: Optional[str] = None,
content: Optional[str] = None,
description: Optional[str] = None,
status: Optional[str] = None,
) -> dict:
"""Update a document."""
start_time = time.time()
logger = get_logger("mcp-docs", Config)
repo = get_repository()
try:
doc = repo.find_by_id(id)
if not doc:
return {
"success": False,
"error": f"Document with ID {id} not found",
}
updates = {}
if title is not None:
updates["title"] = title
if content is not None:
updates["content"] = content
if description is not None:
updates["description"] = description
if status is not None and status in VALID_STATUSES:
updates["status"] = status
if not updates:
return {
"success": False,
"error": "No changes specified",
}
success = repo.update(id, updates)
if success:
updated_doc = repo.find_by_id(id)
_log_operation(
logger,
"docs_update",
{"id": id, "updates": list(updates.keys())},
start_time,
)
return {
"success": True,
"doc": updated_doc.to_dict() if updated_doc else None,
"message": f"Document #{id} updated",
}
else:
return {"success": False, "error": "Update failed"}
except Exception as e:
_log_operation(
logger, "docs_update", {"id": id}, start_time, "error", str(e)
)
return {"success": False, "error": str(e)}
@mcp.tool()
def docs_delete(id: int) -> dict:
"""Delete a document."""
start_time = time.time()
logger = get_logger("mcp-docs", Config)
repo = get_repository()
try:
doc = repo.find_by_id(id)
if not doc:
return {
"success": False,
"error": f"Document with ID {id} not found",
}
success = repo.delete(id)
_log_operation(
logger,
"docs_delete",
{"id": id, "title": doc.title},
start_time,
"success" if success else "error",
)
return {
"success": success,
"message": (
f"Document '{doc.title}' (ID {id}) deleted"
if success
else "Deletion failed"
),
}
except ValueError as e:
_log_operation(
logger, "docs_delete", {"id": id}, start_time, "denied", str(e)
)
return {"success": False, "error": str(e)}
except Exception as e:
_log_operation(
logger, "docs_delete", {"id": id}, start_time, "error", str(e)
)
return {"success": False, "error": str(e)}