{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/mcp-servers\/mcp-tasks\/tools\/task_tools.py",
"content": "\"\"\"\nTask Management Tools für MCP-Tasks Server.\n\nBACKWARD COMPATIBILITY WRAPPER - Logic moved to task_tools\/ package.\n\"\"\"\n\n# Re-export main registration function from package\nfrom task_tools import register_task_tools\n\n# Re-export individual registration functions for flexibility\nfrom task_tools import (\n register_list_tools,\n register_crud_tools,\n register_workflow_tools,\n register_execute_tools,\n)\n\n# Re-export utilities\nfrom task_tools import (\n get_repo,\n get_task_logger,\n validate_status,\n validate_type,\n validate_executor_type,\n log_tool_call,\n)\n\n__all__ = [\n \"register_task_tools\",\n \"register_list_tools\",\n \"register_crud_tools\",\n \"register_workflow_tools\",\n \"register_execute_tools\",\n \"get_repo\",\n \"get_task_logger\",\n \"validate_status\",\n \"validate_type\",\n \"validate_executor_type\",\n \"log_tool_call\",\n]\n",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 881,
"newStart": 1,
"newLines": 40,
"lines": [
"-\"\"\"Task Management Tools für MCP-Tasks Server\"\"\"",
"-import sys",
"-import time",
"-import json",
"-import requests",
"-from typing import Optional, List",
"-from datetime import datetime",
"+\"\"\"",
"+Task Management Tools für MCP-Tasks Server.",
" ",
"-sys.path.insert(0, \"\/opt\/mcp-servers\/mcp-tasks\")",
"+BACKWARD COMPATIBILITY WRAPPER - Logic moved to task_tools\/ package.",
"+\"\"\"",
" ",
"-from config import Config",
"-from domain.contracts import (",
"- Task, TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType, LogEntry",
"+# Re-export main registration function from package",
"+from task_tools import register_task_tools",
"+",
"+# Re-export individual registration functions for flexibility",
"+from task_tools import (",
"+ register_list_tools,",
"+ register_crud_tools,",
"+ register_workflow_tools,",
"+ register_execute_tools,",
" )",
"-from infrastructure.task_repository import TaskRepository",
"-from infrastructure.protokoll_logger import get_logger",
"-from validators.workflow_validator import WorkflowValidator",
" ",
"+# Re-export utilities",
"+from task_tools import (",
"+ get_repo,",
"+ get_task_logger,",
"+ validate_status,",
"+ validate_type,",
"+ validate_executor_type,",
"+ log_tool_call,",
"+)",
" ",
"-def register_task_tools(mcp):",
"- \"\"\"Registriert alle Task-Management-Tools\"\"\"",
"-",
"- repo = TaskRepository()",
"- logger = get_logger()",
"-",
"- # ==================== tasks_list ====================",
"- @mcp.tool()",
"- def tasks_list(",
"- status: Optional[str] = None,",
"- type: Optional[str] = None,",
"- search: Optional[str] = None,",
"- limit: int = 10,",
"- offset: int = 0,",
"- compact: bool = True,",
"- ) -> dict:",
"- \"\"\"",
"- Listet Tasks mit optionalen Filtern auf.",
"-",
"- Args:",
"- status: Filter nach Status (pending, in_progress, completed, failed, cancelled)",
"- type: Filter nach Typ (human_task, ai_task, mixed)",
"- search: Volltextsuche in Titel\/Beschreibung",
"- limit: Maximale Anzahl Ergebnisse (1-100, default: 10)",
"- offset: Offset für Pagination",
"- compact: True = nur id\/title\/status\/type (Token-sparend), False = alle Felder",
"-",
"- Returns:",
"- Dict mit tasks, total, limit, offset",
"- \"\"\"",
"- start = time.time()",
"- request_str = json.dumps({\"status\": status, \"type\": type, \"search\": search, \"limit\": limit})",
"-",
"- try:",
"- # Validierung",
"- limit = max(1, min(limit, Config.MAX_RESULTS))",
"-",
"- if status and status not in Config.VALID_STATUSES:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_list\",",
"- request=request_str,",
"- status=\"denied\",",
"- error_message=f\"Invalid status: {status}\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Invalid status: {status}\"}",
"-",
"- if type and type not in Config.VALID_TYPES:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_list\",",
"- request=request_str,",
"- status=\"denied\",",
"- error_message=f\"Invalid type: {type}\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Invalid type: {type}\"}",
"-",
"- # Ausführung",
"- tasks = repo.find_all(status=status, task_type=type, search=search, limit=limit, offset=offset)",
"- total = repo.count(status=status, task_type=type, search=search)",
"-",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_list\",",
"- request=request_str,",
"- status=\"success\",",
"- duration_ms=duration,",
"- ))",
"-",
"- return {",
"- \"success\": True,",
"- \"tasks\": [t.to_dict_compact() if compact else t.to_dict() for t in tasks],",
"- \"total\": total,",
"- \"limit\": limit,",
"- \"offset\": offset,",
"- \"compact\": compact,",
"- }",
"-",
"- except Exception as e:",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_list\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- error_message=str(e)[:200],",
"- ))",
"- return {\"success\": False, \"error\": str(e)}",
"-",
"-",
"- # ==================== tasks_get ====================",
"- @mcp.tool()",
"- def tasks_get(id: int) -> dict:",
"- \"\"\"",
"- Holt Details eines einzelnen Tasks inkl. Assignments und Results.",
"-",
"- Args:",
"- id: Task-ID",
"-",
"- Returns:",
"- Task mit allen Details oder Fehlermeldung",
"- \"\"\"",
"- start = time.time()",
"- request_str = json.dumps({\"id\": id})",
"-",
"- try:",
"- task = repo.find_by_id(id)",
"-",
"- if not task:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_get\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=\"Task not found\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Task {id} not found\"}",
"-",
"- # Zusätzliche Daten laden",
"- assignments = repo.get_assignments(id)",
"- results = repo.get_results(id)",
"-",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_get\",",
"- request=request_str,",
"- status=\"success\",",
"- duration_ms=duration,",
"- task_id=id,",
"- ))",
"-",
"- return {",
"- \"success\": True,",
"- \"task\": task.to_dict(),",
"- \"assignments\": [a.to_dict() for a in assignments],",
"- \"results\": [r.to_dict() for r in results],",
"- }",
"-",
"- except Exception as e:",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_get\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- task_id=id,",
"- error_message=str(e)[:200],",
"- ))",
"- return {\"success\": False, \"error\": str(e)}",
"-",
"-",
"- # ==================== tasks_create ====================",
"- @mcp.tool()",
"- def tasks_create(",
"- title: str,",
"- description: Optional[str] = None,",
"- type: str = \"ai_task\",",
"- parent_task_id: Optional[int] = None,",
"- due_date: Optional[str] = None,",
"- ) -> dict:",
"- \"\"\"",
"- Erstellt einen neuen Task.",
"-",
"- Args:",
"- title: Aufgabentitel (required)",
"- description: Ausführliche Beschreibung",
"- type: Task-Typ (human_task, ai_task, mixed)",
"- parent_task_id: ID des Parent-Tasks für Subtasks",
"- due_date: Fälligkeitsdatum (ISO 8601)",
"-",
"- Returns:",
"- Erstellter Task oder Fehlermeldung",
"- \"\"\"",
"- start = time.time()",
"- request_str = json.dumps({\"title\": title[:100], \"type\": type})",
"-",
"- try:",
"- # Validierung",
"- if not title or not title.strip():",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_create\",",
"- request=request_str,",
"- status=\"denied\",",
"- error_message=\"Title is required\",",
"- ))",
"- return {\"success\": False, \"error\": \"Title is required\"}",
"-",
"- if type not in Config.VALID_TYPES:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_create\",",
"- request=request_str,",
"- status=\"denied\",",
"- error_message=f\"Invalid type: {type}\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Invalid type: {type}\"}",
"-",
"- # Task erstellen",
"- task = Task(",
"- title=title.strip(),",
"- description=description,",
"- type=TaskType(type),",
"- status=TaskStatus.PENDING,",
"- created_by=\"mcp-tasks\",",
"- created_by_type=\"ai\",",
"- parent_task_id=parent_task_id,",
"- due_date=datetime.fromisoformat(due_date) if due_date else None,",
"- )",
"-",
"- task_id = repo.create(task)",
"- task.id = task_id",
"-",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_create\",",
"- request=request_str,",
"- status=\"success\",",
"- duration_ms=duration,",
"- task_id=task_id,",
"- ))",
"-",
"- return {",
"- \"success\": True,",
"- \"task\": task.to_dict(),",
"- \"message\": f\"Task #{task_id} created\",",
"- }",
"-",
"- except Exception as e:",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_create\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- error_message=str(e)[:200],",
"- ))",
"- return {\"success\": False, \"error\": str(e)}",
"-",
"-",
"- # ==================== tasks_update ====================",
"- @mcp.tool()",
"- def tasks_update(",
"- id: int,",
"- title: Optional[str] = None,",
"- description: Optional[str] = None,",
"- type: Optional[str] = None,",
"- due_date: Optional[str] = None,",
"- ) -> dict:",
"- \"\"\"",
"- Aktualisiert Task-Felder.",
"-",
"- Args:",
"- id: Task-ID (required)",
"- title: Neuer Titel",
"- description: Neue Beschreibung",
"- type: Neuer Typ",
"- due_date: Neues Datum (ISO 8601)",
"-",
"- Returns:",
"- Aktualisierter Task oder Fehlermeldung",
"- \"\"\"",
"- start = time.time()",
"- request_str = json.dumps({\"id\": id})",
"-",
"- try:",
"- # Task existiert?",
"- task = repo.find_by_id(id)",
"- if not task:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_update\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=\"Task not found\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Task {id} not found\"}",
"-",
"- # Updates sammeln",
"- updates = {}",
"- if title is not None:",
"- updates[\"title\"] = title.strip()",
"- if description is not None:",
"- updates[\"description\"] = description",
"- if type is not None:",
"- if type not in Config.VALID_TYPES:",
"- return {\"success\": False, \"error\": f\"Invalid type: {type}\"}",
"- updates[\"type\"] = type",
"- if due_date is not None:",
"- updates[\"due_date\"] = datetime.fromisoformat(due_date)",
"-",
"- if not updates:",
"- return {\"success\": False, \"error\": \"No fields to update\"}",
"-",
"- repo.update(id, updates)",
"-",
"- # Aktualisierten Task laden",
"- updated_task = repo.find_by_id(id)",
"-",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_update\",",
"- request=request_str,",
"- status=\"success\",",
"- duration_ms=duration,",
"- task_id=id,",
"- ))",
"-",
"- return {",
"- \"success\": True,",
"- \"task\": updated_task.to_dict(),",
"- \"message\": f\"Task #{id} updated\",",
"- }",
"-",
"- except Exception as e:",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_update\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- task_id=id,",
"- error_message=str(e)[:200],",
"- ))",
"- return {\"success\": False, \"error\": str(e)}",
"-",
"-",
"- # ==================== tasks_status ====================",
"- @mcp.tool()",
"- def tasks_status(",
"- id: int,",
"- status: str,",
"- ) -> dict:",
"- \"\"\"",
"- Ändert den Status eines Tasks.",
"-",
"- Args:",
"- id: Task-ID (required)",
"- status: Neuer Status (pending, in_progress, completed, failed, cancelled)",
"-",
"- Returns:",
"- Aktualisierter Task oder Fehlermeldung",
"- \"\"\"",
"- start = time.time()",
"- request_str = json.dumps({\"id\": id, \"status\": status})",
"-",
"- try:",
"- # Validierung",
"- if status not in Config.VALID_STATUSES:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_status\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=f\"Invalid status: {status}\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Invalid status: {status}\"}",
"-",
"- # Task existiert?",
"- task = repo.find_by_id(id)",
"- if not task:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_status\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=\"Task not found\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Task {id} not found\"}",
"-",
"- old_status = task.status.value if isinstance(task.status, TaskStatus) else task.status",
"-",
"- # Workflow-Validierung: Status-Übergang prüfen",
"- valid, error_msg = WorkflowValidator.validate_status_transition(old_status, status)",
"- if not valid:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_status\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=error_msg,",
"- ))",
"- return {",
"- \"success\": False,",
"- \"error\": error_msg,",
"- \"allowed_transitions\": WorkflowValidator.get_allowed_transitions(old_status),",
"- }",
"-",
"- # Bei Completion: Prüfen ob Ergebnis vorhanden",
"- warning_msg = None",
"- if status == \"completed\":",
"- task_type = task.type.value if isinstance(task.type, TaskType) else task.type",
"- results = repo.get_results(id)",
"- has_result = len(results) > 0",
"-",
"- valid, error_msg = WorkflowValidator.validate_completion(task_type, has_result)",
"- if not valid:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_status\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=error_msg,",
"- ))",
"- return {\"success\": False, \"error\": error_msg}",
"-",
"- # Quality Gate prüfen (nur Warnung, kein Blocker)",
"- is_code = WorkflowValidator.is_code_task(task.title, task.description)",
"- _, quality_warning, _ = WorkflowValidator.validate_quality_gate(results, is_code)",
"- if quality_warning:",
"- warning_msg = quality_warning",
"-",
"- repo.update(id, {\"status\": status})",
"-",
"- updated_task = repo.find_by_id(id)",
"-",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_status\",",
"- request=f\"{old_status} -> {status}\",",
"- status=\"success\",",
"- duration_ms=duration,",
"- task_id=id,",
"- ))",
"-",
"- response = {",
"- \"success\": True,",
"- \"task\": updated_task.to_dict(),",
"- \"message\": f\"Task #{id} status changed: {old_status} -> {status}\",",
"- }",
"- if warning_msg:",
"- response[\"warning\"] = warning_msg",
"- return response",
"-",
"- except Exception as e:",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_status\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- task_id=id,",
"- error_message=str(e)[:200],",
"- ))",
"- return {\"success\": False, \"error\": str(e)}",
"-",
"-",
"- # ==================== tasks_assign ====================",
"- @mcp.tool()",
"- def tasks_assign(",
"- id: int,",
"- assignee: str,",
"- assignee_type: str,",
"- model_name: Optional[str] = None,",
"- notes: Optional[str] = None,",
"- ) -> dict:",
"- \"\"\"",
"- Weist einen Task einer Person oder KI zu.",
"-",
"- Args:",
"- id: Task-ID (required)",
"- assignee: Name der Person\/KI (required)",
"- assignee_type: Typ (human, ollama, claude, anthropic_api)",
"- model_name: Modellname bei KI",
"- notes: Anmerkungen",
"-",
"- Returns:",
"- Erstellte Zuweisung oder Fehlermeldung",
"- \"\"\"",
"- start = time.time()",
"- request_str = json.dumps({\"id\": id, \"assignee\": assignee, \"assignee_type\": assignee_type})",
"-",
"- try:",
"- # Validierung",
"- if assignee_type not in Config.VALID_EXECUTOR_TYPES:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_assign\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=f\"Invalid assignee_type: {assignee_type}\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Invalid assignee_type: {assignee_type}\"}",
"-",
"- # Task existiert?",
"- task = repo.find_by_id(id)",
"- if not task:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_assign\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=\"Task not found\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Task {id} not found\"}",
"-",
"- # Workflow-Validierung: Zuweisung prüfen",
"- task_type = task.type.value if isinstance(task.type, TaskType) else task.type",
"- valid, warning_msg = WorkflowValidator.validate_assignment(",
"- task_type, assignee_type, model_name",
"- )",
"- if not valid:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_assign\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=warning_msg,",
"- ))",
"- return {\"success\": False, \"error\": warning_msg}",
"-",
"- # Assignment erstellen",
"- assignment = TaskAssignment(",
"- task_id=id,",
"- assignee=assignee,",
"- assignee_type=ExecutorType(assignee_type),",
"- model_name=model_name,",
"- notes=notes,",
"- )",
"- assignment_id = repo.create_assignment(assignment)",
"- assignment.id = assignment_id",
"-",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_assign\",",
"- request=request_str,",
"- status=\"success\",",
"- duration_ms=duration,",
"- task_id=id,",
"- ))",
"-",
"- return {",
"- \"success\": True,",
"- \"assignment\": assignment.to_dict(),",
"- \"message\": f\"Task #{id} assigned to {assignee}\",",
"- }",
"-",
"- except Exception as e:",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_assign\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- task_id=id,",
"- error_message=str(e)[:200],",
"- ))",
"- return {\"success\": False, \"error\": str(e)}",
"-",
"-",
"- # ==================== tasks_result ====================",
"- @mcp.tool()",
"- def tasks_result(",
"- id: int,",
"- response: str,",
"- executor: str,",
"- executor_type: str,",
"- model_name: Optional[str] = None,",
"- status: str = \"success\",",
"- tokens_input: int = 0,",
"- tokens_output: int = 0,",
"- cost_usd: float = 0.0,",
"- error_message: Optional[str] = None,",
"- ) -> dict:",
"- \"\"\"",
"- Speichert ein Ergebnis für einen Task.",
"-",
"- Args:",
"- id: Task-ID (required)",
"- response: Antwort\/Ergebnis (required)",
"- executor: Ausführende Instanz (required)",
"- executor_type: Typ (human, ollama, claude, anthropic_api)",
"- model_name: Verwendetes Modell",
"- status: Ergebnis-Status (success, error, partial)",
"- tokens_input: Eingabe-Tokens",
"- tokens_output: Ausgabe-Tokens",
"- cost_usd: Kosten in USD",
"- error_message: Bei Fehlern",
"-",
"- Returns:",
"- Gespeichertes Ergebnis oder Fehlermeldung",
"- \"\"\"",
"- start = time.time()",
"- request_str = json.dumps({\"id\": id, \"executor\": executor, \"status\": status})",
"-",
"- try:",
"- # Validierung",
"- if executor_type not in Config.VALID_EXECUTOR_TYPES:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_result\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=f\"Invalid executor_type: {executor_type}\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Invalid executor_type: {executor_type}\"}",
"-",
"- # Task existiert?",
"- task = repo.find_by_id(id)",
"- if not task:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_result\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=\"Task not found\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Task {id} not found\"}",
"-",
"- # Result erstellen",
"- duration_ms = int((time.time() - start) * 1000)",
"-",
"- result = TaskResult(",
"- task_id=id,",
"- executor=executor,",
"- executor_type=ExecutorType(executor_type),",
"- model_name=model_name,",
"- response=response,",
"- status=status,",
"- error_message=error_message,",
"- tokens_input=tokens_input,",
"- tokens_output=tokens_output,",
"- cost_usd=cost_usd,",
"- duration_ms=duration_ms,",
"- )",
"- result_id = repo.create_result(result)",
"- result.id = result_id",
"-",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_result\",",
"- request=request_str,",
"- status=\"success\",",
"- duration_ms=duration_ms,",
"- task_id=id,",
"- ))",
"-",
"- return {",
"- \"success\": True,",
"- \"result\": result.to_dict(),",
"- \"message\": f\"Result saved for Task #{id}\",",
"- }",
"-",
"- except Exception as e:",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_result\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- task_id=id,",
"- error_message=str(e)[:200],",
"- ))",
"- return {\"success\": False, \"error\": str(e)}",
"-",
"-",
"- # ==================== tasks_execute ====================",
"- @mcp.tool()",
"- def tasks_execute(",
"- id: int,",
"- model: str = \"mistral\",",
"- auto_complete: bool = False,",
"- ) -> dict:",
"- \"\"\"",
"- Führt einen Task direkt mit lokalem Ollama aus.",
"-",
"- Args:",
"- id: Task-ID (required)",
"- model: Ollama-Modell (default: mistral)",
"- auto_complete: Task nach Erfolg abschließen",
"-",
"- Returns:",
"- Ausführungsergebnis mit Response, Tokens, Dauer",
"- \"\"\"",
"- start = time.time()",
"- request_str = json.dumps({\"id\": id, \"model\": model})",
"-",
"- try:",
"- # Task laden",
"- task = repo.find_by_id(id)",
"- if not task:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_execute\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=\"Task not found\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Task {id} not found\"}",
"-",
"- # Prompt bauen",
"- prompt = f\"Task: {task.title}\\n\\n\"",
"- if task.description:",
"- prompt += f\"Description:\\n{task.description}\\n\\n\"",
"- prompt += \"Please complete this task and provide a detailed response.\"",
"-",
"- # Ollama aufrufen",
"- try:",
"- resp = requests.post(",
"- f\"{Config.OLLAMA_HOST}\/api\/generate\",",
"- json={",
"- \"model\": model,",
"- \"prompt\": prompt,",
"- \"stream\": False,",
"- },",
"- timeout=Config.OLLAMA_TIMEOUT,",
"- )",
"- resp.raise_for_status()",
"- ollama_result = resp.json()",
"- except Exception as e:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_execute\",",
"- request=request_str,",
"- status=\"error\",",
"- task_id=id,",
"- error_message=f\"Ollama error: {e}\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Ollama error: {e}\"}",
"-",
"- duration_ms = int((time.time() - start) * 1000)",
"-",
"- # Tokens schätzen",
"- tokens_input = ollama_result.get(\"prompt_eval_count\", len(prompt) \/\/ 4)",
"- tokens_output = ollama_result.get(\"eval_count\", len(ollama_result.get(\"response\", \"\")) \/\/ 4)",
"-",
"- # Result speichern",
"- result = TaskResult(",
"- task_id=id,",
"- executor=\"mcp-tasks\",",
"- executor_type=ExecutorType.OLLAMA,",
"- model_name=model,",
"- request=prompt[:2000],",
"- response=ollama_result.get(\"response\", \"\"),",
"- status=\"success\",",
"- tokens_input=tokens_input,",
"- tokens_output=tokens_output,",
"- duration_ms=duration_ms,",
"- )",
"- result_id = repo.create_result(result)",
"- result.id = result_id",
"-",
"- # Optional: Task abschließen",
"- if auto_complete:",
"- repo.update(id, {\"status\": \"completed\"})",
"-",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_execute\",",
"- request=f\"model={model}, tokens={tokens_input + tokens_output}\",",
"- status=\"success\",",
"- duration_ms=duration_ms,",
"- task_id=id,",
"- ))",
"-",
"- return {",
"- \"success\": True,",
"- \"result\": result.to_dict(),",
"- \"task_completed\": auto_complete,",
"- \"message\": f\"Task #{id} executed with {model}\",",
"- }",
"-",
"- except Exception as e:",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_execute\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- task_id=id,",
"- error_message=str(e)[:200],",
"- ))",
"- return {\"success\": False, \"error\": str(e)}",
"-",
"-",
"- # ==================== tasks_delete ====================",
"- @mcp.tool()",
"- def tasks_delete(id: int) -> dict:",
"- \"\"\"",
"- Löscht einen Task.",
"-",
"- Args:",
"- id: Task-ID (required)",
"-",
"- Returns:",
"- Bestätigung oder Fehlermeldung",
"- \"\"\"",
"- start = time.time()",
"- request_str = json.dumps({\"id\": id})",
"-",
"- try:",
"- # Task existiert?",
"- task = repo.find_by_id(id)",
"- if not task:",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_delete\",",
"- request=request_str,",
"- status=\"denied\",",
"- task_id=id,",
"- error_message=\"Task not found\",",
"- ))",
"- return {\"success\": False, \"error\": f\"Task {id} not found\"}",
"-",
"- deleted = repo.delete(id)",
"-",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_delete\",",
"- request=request_str,",
"- status=\"success\" if deleted else \"error\",",
"- duration_ms=duration,",
"- task_id=id,",
"- ))",
"-",
"- return {",
"- \"success\": deleted,",
"- \"message\": f\"Task #{id} deleted\" if deleted else f\"Failed to delete Task #{id}\",",
"- }",
"-",
"- except Exception as e:",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_delete\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- task_id=id,",
"- error_message=str(e)[:200],",
"- ))",
"- return {\"success\": False, \"error\": str(e)}",
"-",
"-",
"- # ==================== tasks_statistics ====================",
"- @mcp.tool()",
"- def tasks_statistics() -> dict:",
"- \"\"\"",
"- Holt Statistiken über alle Tasks.",
"-",
"- Returns:",
"- Statistiken: Tasks nach Status\/Typ, Token-Verbrauch, Modell-Nutzung",
"- \"\"\"",
"- start = time.time()",
"-",
"- try:",
"- stats = repo.get_statistics()",
"-",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_statistics\",",
"- request=\"\",",
"- status=\"success\",",
"- duration_ms=duration,",
"- ))",
"-",
"- return {",
"- \"success\": True,",
"- \"statistics\": stats,",
"- }",
"-",
"- except Exception as e:",
"- duration = int((time.time() - start) * 1000)",
"- logger.log(LogEntry(",
"- tool_name=\"tasks_statistics\",",
"- request=\"\",",
"- status=\"error\",",
"- duration_ms=duration,",
"- error_message=str(e)[:200],",
"- ))",
"- return {\"success\": False, \"error\": str(e)}",
"+__all__ = [",
"+ \"register_task_tools\",",
"+ \"register_list_tools\",",
"+ \"register_crud_tools\",",
"+ \"register_workflow_tools\",",
"+ \"register_execute_tools\",",
"+ \"get_repo\",",
"+ \"get_task_logger\",",
"+ \"validate_status\",",
"+ \"validate_type\",",
"+ \"validate_executor_type\",",
"+ \"log_tool_call\",",
"+]"
]
}
],
"originalFile": "\"\"\"Task Management Tools für MCP-Tasks Server\"\"\"\nimport sys\nimport time\nimport json\nimport requests\nfrom typing import Optional, List\nfrom datetime import datetime\n\nsys.path.insert(0, \"\/opt\/mcp-servers\/mcp-tasks\")\n\nfrom config import Config\nfrom domain.contracts import (\n Task, TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType, LogEntry\n)\nfrom infrastructure.task_repository import TaskRepository\nfrom infrastructure.protokoll_logger import get_logger\nfrom validators.workflow_validator import WorkflowValidator\n\n\ndef register_task_tools(mcp):\n \"\"\"Registriert alle Task-Management-Tools\"\"\"\n\n repo = TaskRepository()\n logger = get_logger()\n\n # ==================== tasks_list ====================\n @mcp.tool()\n def tasks_list(\n status: Optional[str] = None,\n type: Optional[str] = None,\n search: Optional[str] = None,\n limit: int = 10,\n offset: int = 0,\n compact: bool = True,\n ) -> dict:\n \"\"\"\n Listet Tasks mit optionalen Filtern auf.\n\n Args:\n status: Filter nach Status (pending, in_progress, completed, failed, cancelled)\n type: Filter nach Typ (human_task, ai_task, mixed)\n search: Volltextsuche in Titel\/Beschreibung\n limit: Maximale Anzahl Ergebnisse (1-100, default: 10)\n offset: Offset für Pagination\n compact: True = nur id\/title\/status\/type (Token-sparend), False = alle Felder\n\n Returns:\n Dict mit tasks, total, limit, offset\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"status\": status, \"type\": type, \"search\": search, \"limit\": limit})\n\n try:\n # Validierung\n limit = max(1, min(limit, Config.MAX_RESULTS))\n\n if status and status not in Config.VALID_STATUSES:\n logger.log(LogEntry(\n tool_name=\"tasks_list\",\n request=request_str,\n status=\"denied\",\n error_message=f\"Invalid status: {status}\",\n ))\n return {\"success\": False, \"error\": f\"Invalid status: {status}\"}\n\n if type and type not in Config.VALID_TYPES:\n logger.log(LogEntry(\n tool_name=\"tasks_list\",\n request=request_str,\n status=\"denied\",\n error_message=f\"Invalid type: {type}\",\n ))\n return {\"success\": False, \"error\": f\"Invalid type: {type}\"}\n\n # Ausführung\n tasks = repo.find_all(status=status, task_type=type, search=search, limit=limit, offset=offset)\n total = repo.count(status=status, task_type=type, search=search)\n\n duration = int((time.time() - start) * 1000)\n logger.log(LogEntry(\n tool_name=\"tasks_list\",\n request=request_str,\n status=\"success\",\n duration_ms=duration,\n ))\n\n return {\n \"success\": True,\n \"tasks\": [t.to_dict_compact() if compact else t.to_dict() for t in tasks],\n \"total\": total,\n \"limit\": limit,\n \"offset\": offset,\n \"compact\": compact,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n logger.log(LogEntry(\n tool_name=\"tasks_list\",\n request=request_str,\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:200],\n ))\n return {\"success\": False, \"error\": str(e)}\n\n\n # ==================== tasks_get ====================\n @mcp.tool()\n def tasks_get(id: int) -> dict:\n \"\"\"\n Holt Details eines einzelnen Tasks inkl. Assignments und Results.\n\n Args:\n id: Task-ID\n\n Returns:\n Task mit allen Details oder Fehlermeldung\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"id\": id})\n\n try:\n task = repo.find_by_id(id)\n\n if not task:\n logger.log(LogEntry(\n tool_name=\"tasks_get\",\n request=request_str,\n status=\"denied\",\n task_id=id,\n error_message=\"Task not found\",\n ))\n return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n # Zusätzliche Daten laden\n assignments = repo.get_assignments(id)\n results = repo.get_results(id)\n\n duration = int((time.time() - start) * 1000)\n logger.log(LogEntry(\n tool_name=\"tasks_get\",\n request=request_str,\n status=\"success\",\n duration_ms=duration,\n task_id=id,\n ))\n\n return {\n \"success\": True,\n \"task\": task.to_dict(),\n \"assignments\": [a.to_dict() for a in assignments],\n \"results\": [r.to_dict() for r in results],\n }\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n logger.log(LogEntry(\n tool_name=\"tasks_get\",\n request=request_str,\n status=\"error\",\n duration_ms=duration,\n task_id=id,\n error_message=str(e)[:200],\n ))\n return {\"success\": False, \"error\": str(e)}\n\n\n # ==================== tasks_create ====================\n @mcp.tool()\n def tasks_create(\n title: str,\n description: Optional[str] = None,\n type: str = \"ai_task\",\n parent_task_id: Optional[int] = None,\n due_date: Optional[str] = None,\n ) -> dict:\n \"\"\"\n Erstellt einen neuen Task.\n\n Args:\n title: Aufgabentitel (required)\n description: Ausführliche Beschreibung\n type: Task-Typ (human_task, ai_task, mixed)\n parent_task_id: ID des Parent-Tasks für Subtasks\n due_date: Fälligkeitsdatum (ISO 8601)\n\n Returns:\n Erstellter Task oder Fehlermeldung\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"title\": title[:100], \"type\": type})\n\n try:\n # Validierung\n if not title or not title.strip():\n logger.log(LogEntry(\n tool_name=\"tasks_create\",\n request=request_str,\n status=\"denied\",\n error_message=\"Title is required\",\n ))\n return {\"success\": False, \"error\": \"Title is required\"}\n\n if type not in Config.VALID_TYPES:\n logger.log(LogEntry(\n tool_name=\"tasks_create\",\n request=request_str,\n status=\"denied\",\n error_message=f\"Invalid type: {type}\",\n ))\n return {\"success\": False, \"error\": f\"Invalid type: {type}\"}\n\n # Task erstellen\n task = Task(\n title=title.strip(),\n description=description,\n type=TaskType(type),\n status=TaskStatus.PENDING,\n created_by=\"mcp-tasks\",\n created_by_type=\"ai\",\n parent_task_id=parent_task_id,\n due_date=datetime.fromisoformat(due_date) if due_date else None,\n )\n\n task_id = repo.create(task)\n task.id = task_id\n\n duration = int((time.time() - start) * 1000)\n logger.log(LogEntry(\n tool_name=\"tasks_create\",\n request=request_str,\n status=\"success\",\n duration_ms=duration,\n task_id=task_id,\n ))\n\n return {\n \"success\": True,\n \"task\": task.to_dict(),\n \"message\": f\"Task #{task_id} created\",\n }\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n logger.log(LogEntry(\n tool_name=\"tasks_create\",\n request=request_str,\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:200],\n ))\n return {\"success\": False, \"error\": str(e)}\n\n\n # ==================== tasks_update ====================\n @mcp.tool()\n def tasks_update(\n id: int,\n title: Optional[str] = None,\n description: Optional[str] = None,\n type: Optional[str] = None,\n due_date: Optional[str] = None,\n ) -> dict:\n \"\"\"\n Aktualisiert Task-Felder.\n\n Args:\n id: Task-ID (required)\n title: Neuer Titel\n description: Neue Beschreibung\n type: Neuer Typ\n due_date: Neues Datum (ISO 8601)\n\n Returns:\n Aktualisierter Task oder Fehlermeldung\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"id\": id})\n\n try:\n # Task existiert?\n task = repo.find_by_id(id)\n if not task:\n logger.log(LogEntry(\n tool_name=\"tasks_update\",\n request=request_str,\n status=\"denied\",\n task_id=id,\n error_message=\"Task not found\",\n ))\n return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n # Updates sammeln\n updates = {}\n if title is not None:\n updates[\"title\"] = title.strip()\n if description is not None:\n updates[\"description\"] = description\n if type is not None:\n if type not in Config.VALID_TYPES:\n return {\"success\": False, \"error\": f\"Invalid type: {type}\"}\n updates[\"type\"] = type\n ... [TRUNCATED-e90df983f56f0827]"
}
}