crud_tools.py
- Pfad:
/var/www/mcp-servers/mcp-tasks/tools/task_tools/crud_tools.py - Namespace: -
- Zeilen: 213 | Größe: 7,510 Bytes
- Geändert: 2025-12-28 12:21:49 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 77
- Dependencies: 10 (25%)
- LOC: 95 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 14
- use json
- use time
- use datetime.datetime
- use sys
- use constants.LOG_ENTRY_MAX_LENGTH
- use constants.MS_PER_SECOND
- use config.Config
- use domain.contracts.Task
- use domain.contracts.TaskStatus
- use domain.contracts.TaskType
- use base.get_repo
- use base.get_task_logger
- use base.validate_type
- use base.log_tool_call
Funktionen 1
-
register_crud_tools()Zeile 17
Code
"""Task CRUD tools - Get, Create, Update, Delete."""
import json
import time
from datetime import datetime
import sys
sys.path.insert(0, "/var/www/mcp-servers/mcp_tasks")
sys.path.insert(0, "/var/www/mcp-servers/shared")
from constants import LOG_ENTRY_MAX_LENGTH, MS_PER_SECOND
from config import Config
from domain.contracts import Task, TaskStatus, TaskType
from .base import get_repo, get_task_logger, validate_type, log_tool_call
def register_crud_tools(mcp):
"""Register task CRUD tools."""
repo = get_repo()
logger = get_task_logger()
@mcp.tool()
def tasks_get(id: int) -> dict:
"""
Holt Details eines einzelnen Tasks inkl. Assignments und Results.
Args:
id: Task-ID
Returns:
Task mit allen Details oder Fehlermeldung
"""
start = time.time()
request_str = json.dumps({"id": id})
try:
task = repo.find_by_id(id)
if not task:
log_tool_call(logger, "tasks_get", request_str, "denied", task_id=id, error_message="Task not found")
return {"success": False, "error": f"Task {id} not found"}
assignments = repo.get_assignments(id)
results = repo.get_results(id)
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_get", request_str, "success", duration, task_id=id)
return {
"success": True,
"task": task.to_dict(),
"assignments": [a.to_dict() for a in assignments],
"results": [r.to_dict() for r in results],
}
except Exception as e:
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_get", request_str, "error", duration, task_id=id, error_message=str(e))
return {"success": False, "error": str(e)}
@mcp.tool()
def tasks_create(
title: str,
description: str | None = None,
type: str = "ai_task",
parent_task_id: int | None = None,
due_date: str | None = None,
) -> dict:
"""
Erstellt einen neuen Task.
Args:
title: Aufgabentitel (required)
description: Ausführliche Beschreibung
type: Task-Typ (human_task, ai_task, mixed)
parent_task_id: ID des Parent-Tasks für Subtasks
due_date: Fälligkeitsdatum (ISO 8601)
Returns:
Erstellter Task oder Fehlermeldung
"""
start = time.time()
request_str = json.dumps({"title": title[:LOG_ENTRY_MAX_LENGTH], "type": type})
try:
if not title or not title.strip():
log_tool_call(logger, "tasks_create", request_str, "denied", error_message="Title is required")
return {"success": False, "error": "Title is required"}
valid, error = validate_type(type)
if not valid:
log_tool_call(logger, "tasks_create", request_str, "denied", error_message=error)
return {"success": False, "error": error}
task = Task(
title=title.strip(),
description=description,
type=TaskType(type),
status=TaskStatus.PENDING,
created_by="mcp-tasks",
created_by_type="ai",
parent_task_id=parent_task_id,
due_date=datetime.fromisoformat(due_date) if due_date else None,
)
task_id = repo.create(task)
task.id = task_id
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_create", request_str, "success", duration, task_id=task_id)
return {"success": True, "task": task.to_dict(), "message": f"Task #{task_id} created"}
except Exception as e:
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_create", request_str, "error", duration, error_message=str(e))
return {"success": False, "error": str(e)}
@mcp.tool()
def tasks_update(
id: int,
title: str | None = None,
description: str | None = None,
type: str | None = None,
due_date: str | None = None,
) -> dict:
"""
Aktualisiert Task-Felder.
Args:
id: Task-ID (required)
title: Neuer Titel
description: Neue Beschreibung
type: Neuer Typ
due_date: Neues Datum (ISO 8601)
Returns:
Aktualisierter Task oder Fehlermeldung
"""
start = time.time()
request_str = json.dumps({"id": id})
try:
task = repo.find_by_id(id)
if not task:
log_tool_call(logger, "tasks_update", request_str, "denied", task_id=id, error_message="Task not found")
return {"success": False, "error": f"Task {id} not found"}
updates = {}
if title is not None:
updates["title"] = title.strip()
if description is not None:
updates["description"] = description
if type is not None:
valid, error = validate_type(type)
if not valid:
return {"success": False, "error": error}
updates["type"] = type
if due_date is not None:
updates["due_date"] = datetime.fromisoformat(due_date)
if not updates:
return {"success": False, "error": "No fields to update"}
repo.update(id, updates)
updated_task = repo.find_by_id(id)
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_update", request_str, "success", duration, task_id=id)
return {"success": True, "task": updated_task.to_dict(), "message": f"Task #{id} updated"}
except Exception as e:
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_update", request_str, "error", duration, task_id=id, error_message=str(e))
return {"success": False, "error": str(e)}
@mcp.tool()
def tasks_delete(id: int) -> dict:
"""
Löscht einen Task.
Args:
id: Task-ID (required)
Returns:
Bestätigung oder Fehlermeldung
"""
start = time.time()
request_str = json.dumps({"id": id})
try:
task = repo.find_by_id(id)
if not task:
log_tool_call(logger, "tasks_delete", request_str, "denied", task_id=id, error_message="Task not found")
return {"success": False, "error": f"Task {id} not found"}
deleted = repo.delete(id)
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_delete", request_str, "success" if deleted else "error", duration, task_id=id)
return {
"success": deleted,
"message": f"Task #{id} deleted" if deleted else f"Failed to delete Task #{id}",
}
except Exception as e:
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_delete", request_str, "error", duration, task_id=id, error_message=str(e))
return {"success": False, "error": str(e)}