Protokoll #15183

ID15183
Zeitstempel2025-12-25 15:36:32.149900
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens2,711 (Input: 31, Output: 2,680)
Dauer311 ms
Request-Zeit2025-12-25 15:36:32.149900
Response-Zeit2025-12-25 15:36:32.461040

Request

{
    "event": "PreToolUse",
    "tool_name": "Read",
    "tool_input": {
        "file_path": "\/var\/www\/mcp-servers\/mcp-tasks\/tools\/task_tools.py"
    }
}

Response

{
    "tool_response": {
        "type": "text",
        "file": {
            "filePath": "\/var\/www\/mcp-servers\/mcp-tasks\/tools\/task_tools.py",
            "content": "\"\"\"Task Management Tools für MCP-Tasks Server\"\"\"\nimport sys\nimport time\nimport json\nimport requests\nfrom typing import Optional, List\nfrom datetime import datetime\n\nsys.path.insert(0, \"\/opt\/mcp-servers\/mcp-tasks\")\n\nfrom config import Config\nfrom domain.contracts import (\n    Task, TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType, LogEntry\n)\nfrom infrastructure.task_repository import TaskRepository\nfrom infrastructure.protokoll_logger import get_logger\nfrom validators.workflow_validator import WorkflowValidator\n\n\ndef register_task_tools(mcp):\n    \"\"\"Registriert alle Task-Management-Tools\"\"\"\n\n    repo = TaskRepository()\n    logger = get_logger()\n\n    # ==================== tasks_list ====================\n    @mcp.tool()\n    def tasks_list(\n        status: Optional[str] = None,\n        type: Optional[str] = None,\n        search: Optional[str] = None,\n        limit: int = 10,\n        offset: int = 0,\n        compact: bool = True,\n    ) -> dict:\n        \"\"\"\n        Listet Tasks mit optionalen Filtern auf.\n\n        Args:\n            status: Filter nach Status (pending, in_progress, completed, failed, cancelled)\n            type: Filter nach Typ (human_task, ai_task, mixed)\n            search: Volltextsuche in Titel\/Beschreibung\n            limit: Maximale Anzahl Ergebnisse (1-100, default: 10)\n            offset: Offset für Pagination\n            compact: True = nur id\/title\/status\/type (Token-sparend), False = alle Felder\n\n        Returns:\n            Dict mit tasks, total, limit, offset\n        \"\"\"\n        start = time.time()\n        request_str = json.dumps({\"status\": status, \"type\": type, \"search\": search, \"limit\": limit})\n\n        try:\n            # Validierung\n            limit = max(1, min(limit, Config.MAX_RESULTS))\n\n            if status and status not in Config.VALID_STATUSES:\n                logger.log(LogEntry(\n                    tool_name=\"tasks_list\",\n                    request=request_str,\n                    status=\"denied\",\n                    error_message=f\"Invalid status: {status}\",\n                ))\n                return {\"success\": False, \"error\": f\"Invalid status: {status}\"}\n\n            if type and type not in Config.VALID_TYPES:\n                logger.log(LogEntry(\n                    tool_name=\"tasks_list\",\n                    request=request_str,\n                    status=\"denied\",\n                    error_message=f\"Invalid type: {type}\",\n                ))\n                return {\"success\": False, \"error\": f\"Invalid type: {type}\"}\n\n            # Ausführung\n            tasks = repo.find_all(status=status, task_type=type, search=search, limit=limit, offset=offset)\n            total = repo.count(status=status, task_type=type, search=search)\n\n            duration = int((time.time() - start) * 1000)\n            logger.log(LogEntry(\n                tool_name=\"tasks_list\",\n                request=request_str,\n                status=\"success\",\n                duration_ms=duration,\n            ))\n\n            return {\n                \"success\": True,\n                \"tasks\": [t.to_dict_compact() if compact else t.to_dict() for t in tasks],\n                \"total\": total,\n                \"limit\": limit,\n                \"offset\": offset,\n                \"compact\": compact,\n            }\n\n        except Exception as e:\n            duration = int((time.time() - start) * 1000)\n            logger.log(LogEntry(\n                tool_name=\"tasks_list\",\n                request=request_str,\n                status=\"error\",\n                duration_ms=duration,\n                error_message=str(e)[:200],\n            ))\n            return {\"success\": False, \"error\": str(e)}\n\n\n    # ==================== tasks_get ====================\n    @mcp.tool()\n    def tasks_get(id: int) -> dict:\n        \"\"\"\n        Holt Details eines einzelnen Tasks inkl. Assignments und Results.\n\n        Args:\n            id: Task-ID\n\n        Returns:\n            Task mit allen Details oder Fehlermeldung\n        \"\"\"\n        start = time.time()\n        request_str = json.dumps({\"id\": id})\n\n        try:\n            task = repo.find_by_id(id)\n\n            if not task:\n                logger.log(LogEntry(\n                    tool_name=\"tasks_get\",\n                    request=request_str,\n                    status=\"denied\",\n                    task_id=id,\n                    error_message=\"Task not found\",\n                ))\n                return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n            # Zusätzliche Daten laden\n            assignments = repo.get_assignments(id)\n            results = repo.get_results(id)\n\n            duration = int((time.time() - start) * 1000)\n            logger.log(LogEntry(\n                tool_name=\"tasks_get\",\n                request=request_str,\n                status=\"success\",\n                duration_ms=duration,\n                task_id=id,\n            ))\n\n            return {\n                \"success\": True,\n                \"task\": task.to_dict(),\n                \"assignments\": [a.to_dict() for a in assignments],\n                \"results\": [r.to_dict() for r in results],\n            }\n\n        except Exception as e:\n            duration = int((time.time() - start) * 1000)\n            logger.log(LogEntry(\n                tool_name=\"tasks_get\",\n                request=request_str,\n                status=\"error\",\n                duration_ms=duration,\n                task_id=id,\n                error_message=str(e)[:200],\n            ))\n            return {\"success\": False, \"error\": str(e)}\n\n\n    # ==================== tasks_create ====================\n    @mcp.tool()\n    def tasks_create(\n        title: str,\n        description: Optional[str] = None,\n        type: str = \"ai_task\",\n        parent_task_id: Optional[int] = None,\n        due_date: Optional[str] = None,\n    ) -> dict:\n        \"\"\"\n        Erstellt einen neuen Task.\n\n        Args:\n            title: Aufgabentitel (required)\n            description: Ausführliche Beschreibung\n            type: Task-Typ (human_task, ai_task, mixed)\n            parent_task_id: ID des Parent-Tasks für Subtasks\n            due_date: Fälligkeitsdatum (ISO 8601)\n\n        Returns:\n            Erstellter Task oder Fehlermeldung\n        \"\"\"\n        start = time.time()\n        request_str = json.dumps({\"title\": title[:100], \"type\": type})\n\n        try:\n            # Validierung\n            if not title or not title.strip():\n                logger.log(LogEntry(\n                    tool_name=\"tasks_create\",\n                    request=request_str,\n                    status=\"denied\",\n                    error_message=\"Title is required\",\n                ))\n                return {\"success\": False, \"error\": \"Title is required\"}\n\n            if type not in Config.VALID_TYPES:\n                logger.log(LogEntry(\n                    tool_name=\"tasks_create\",\n                    request=request_str,\n                    status=\"denied\",\n                    error_message=f\"Invalid type: {type}\",\n                ))\n                return {\"success\": False, \"error\": f\"Invalid type: {type}\"}\n\n            # Task erstellen\n            task = Task(\n                title=title.strip(),\n                description=description,\n                type=TaskType(type),\n                status=TaskStatus.PENDING,\n                created_by=\"mcp-tasks\",\n                created_by_type=\"ai\",\n                parent_task_id=parent_task_id,\n                due_date=datetime.fromisoformat(due_date) if due_date else None,\n            )\n\n            task_id = repo.create(task)\n            task.id = task_id\n\n            duration = int((time.time() - start) * 1000)\n            logger.log(LogEntry(\n                tool_name=\"tasks_create\",\n                request=request_str,\n                status=\"success\",\n                duration_ms=duration,\n                task_id=task_id,\n            ))\n\n            return {\n                \"success\": True,\n                \"task\": task.to_dict(),\n                \"message\": f\"Task #{task_id} created\",\n            }\n\n        except Exception as e:\n            duration = int((time.time() - start) * 1000)\n            logger.log(LogEntry(\n                tool_name=\"tasks_create\",\n                request=request_str,\n                status=\"error\",\n                duration_ms=duration,\n                error_message=str(e)[:200],\n            ))\n            return {\"success\": False, \"error\": str(e)}\n\n\n    # ==================== tasks_update ====================\n    @mcp.tool()\n    def tasks_update(\n        id: int,\n        title: Optional[str] = None,\n        description: Optional[str] = None,\n        type: Optional[str] = None,\n        due_date: Optional[str] = None,\n    ) -> dict:\n        \"\"\"\n        Aktualisiert Task-Felder.\n\n        Args:\n            id: Task-ID (required)\n            title: Neuer Titel\n            description: Neue Beschreibung\n            type: Neuer Typ\n            due_date: Neues Datum (ISO 8601)\n\n        Returns:\n            Aktualisierter Task oder Fehlermeldung\n        \"\"\"\n        start = time.time()\n        request_str = json.dumps({\"id\": id})\n\n        try:\n            # Task existiert?\n            task = repo.find_by_id(id)\n            if not task:\n                logger.log(LogEntry(\n                    tool_name=\"tasks_update\",\n                    request=request_str,\n                    status=\"denied\",\n                    task_id=id,\n                    error_message=\"Task not found\",\n                ))\n                return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n            # Updates sammeln\n            updates = {}\n            if title is not None:\n                updates[\"title\"] = title.strip()\n            if description is not None:\n                updates[\"description\"] = description\n            if type is not None:\n                if type not in Config.VALID_TYPES:\n                    return {\"success\": False, \"error\": f\"Invalid type: {type}\"}\n                updates[\"type\"] = type\n       ... [TRUNCATED-e90df983f56f0827]",
            "numLines": 882,
            "startLine": 1,
            "totalLines": 882
        }
    }
}
← Vorheriger Zur Liste Nächster →