Protokoll #15188

ID15188
Zeitstempel2025-12-25 15:38:34.465370
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens4,829 (Input: 2,412, Output: 2,417)
Dauer432 ms
Request-Zeit2025-12-25 15:38:34.465370
Response-Zeit2025-12-25 15:38:34.897518

Request

{
    "event": "PreToolUse",
    "tool_name": "Write",
    "tool_input": {
        "file_path": "\/var\/www\/mcp-servers\/mcp-tasks\/tools\/task_tools\/workflow_tools.py",
        "content": "\"\"\"Task workflow tools - Status, Assign, Result.\"\"\"\nimport json\nimport time\n\nimport sys\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp-tasks\")\n\nfrom config import Config\nfrom domain.contracts import TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType\nfrom validators.workflow_validator import WorkflowValidator\nfrom .base import get_repo, get_task_logger, validate_status, validate_executor_type, log_tool_call\n\n\ndef register_workflow_tools(mcp):\n    \"\"\"Register task workflow tools.\"\"\"\n\n    repo = get_repo()\n    logger = get_task_logger()\n\n    @mcp.tool()\n    def tasks_status(id: int, status: str) -> dict:\n        \"\"\"\n        Ändert den Status eines Tasks.\n\n        Args:\n            id: Task-ID (required)\n            status: Neuer Status (pending, in_progress, completed, failed, cancelled)\n\n        Returns:\n            Aktualisierter Task oder Fehlermeldung\n        \"\"\"\n        start = time.time()\n        request_str = json.dumps({\"id\": id, \"status\": status})\n\n        try:\n            valid, error = validate_status(status)\n            if not valid:\n                log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error)\n                return {\"success\": False, \"error\": error}\n\n            task = repo.find_by_id(id)\n            if not task:\n                log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n                return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n            old_status = task.status.value if isinstance(task.status, TaskStatus) else task.status\n\n            valid, error_msg = WorkflowValidator.validate_status_transition(old_status, status)\n            if not valid:\n                log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error_msg)\n                return {\n                    \"success\": False,\n                    \"error\": error_msg,\n                    \"allowed_transitions\": WorkflowValidator.get_allowed_transitions(old_status),\n                }\n\n            warning_msg = None\n            if status == \"completed\":\n                task_type = task.type.value if isinstance(task.type, TaskType) else task.type\n                results = repo.get_results(id)\n                has_result = len(results) > 0\n\n                valid, error_msg = WorkflowValidator.validate_completion(task_type, has_result)\n                if not valid:\n                    log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error_msg)\n                    return {\"success\": False, \"error\": error_msg}\n\n                is_code = WorkflowValidator.is_code_task(task.title, task.description)\n                _, quality_warning, _ = WorkflowValidator.validate_quality_gate(results, is_code)\n                if quality_warning:\n                    warning_msg = quality_warning\n\n            repo.update(id, {\"status\": status})\n            updated_task = repo.find_by_id(id)\n\n            duration = int((time.time() - start) * 1000)\n            log_tool_call(logger, \"tasks_status\", f\"{old_status} -> {status}\", \"success\", duration, task_id=id)\n\n            response = {\n                \"success\": True,\n                \"task\": updated_task.to_dict(),\n                \"message\": f\"Task #{id} status changed: {old_status} -> {status}\",\n            }\n            if warning_msg:\n                response[\"warning\"] = warning_msg\n            return response\n\n        except Exception as e:\n            duration = int((time.time() - start) * 1000)\n            log_tool_call(logger, \"tasks_status\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n            return {\"success\": False, \"error\": str(e)}\n\n    @mcp.tool()\n    def tasks_assign(\n        id: int,\n        assignee: str,\n        assignee_type: str,\n        model_name: str | None = None,\n        notes: str | None = None,\n    ) -> dict:\n        \"\"\"\n        Weist einen Task einer Person oder KI zu.\n\n        Args:\n            id: Task-ID (required)\n            assignee: Name der Person\/KI (required)\n            assignee_type: Typ (human, ollama, claude, anthropic_api)\n            model_name: Modellname bei KI\n            notes: Anmerkungen\n\n        Returns:\n            Erstellte Zuweisung oder Fehlermeldung\n        \"\"\"\n        start = time.time()\n        request_str = json.dumps({\"id\": id, \"assignee\": assignee, \"assignee_type\": assignee_type})\n\n        try:\n            valid, error = validate_executor_type(assignee_type)\n            if not valid:\n                log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=error)\n                return {\"success\": False, \"error\": error}\n\n            task = repo.find_by_id(id)\n            if not task:\n                log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n                return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n            task_type = task.type.value if isinstance(task.type, TaskType) else task.type\n            valid, warning_msg = WorkflowValidator.validate_assignment(task_type, assignee_type, model_name)\n            if not valid:\n                log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=warning_msg)\n                return {\"success\": False, \"error\": warning_msg}\n\n            assignment = TaskAssignment(\n                task_id=id,\n                assignee=assignee,\n                assignee_type=ExecutorType(assignee_type),\n                model_name=model_name,\n                notes=notes,\n            )\n            assignment_id = repo.create_assignment(assignment)\n            assignment.id = assignment_id\n\n            duration = int((time.time() - start) * 1000)\n            log_tool_call(logger, \"tasks_assign\", request_str, \"success\", duration, task_id=id)\n\n            return {\n                \"success\": True,\n                \"assignment\": assignment.to_dict(),\n                \"message\": f\"Task #{id} assigned to {assignee}\",\n            }\n\n        except Exception as e:\n            duration = int((time.time() - start) * 1000)\n            log_tool_call(logger, \"tasks_assign\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n            return {\"success\": False, \"error\": str(e)}\n\n    @mcp.tool()\n    def tasks_result(\n        id: int,\n        response: str,\n        executor: str,\n        executor_type: str,\n        model_name: str | None = None,\n        status: str = \"success\",\n        tokens_input: int = 0,\n        tokens_output: int = 0,\n        cost_usd: float = 0.0,\n        error_message: str | None = None,\n    ) -> dict:\n        \"\"\"\n        Speichert ein Ergebnis für einen Task.\n\n        Args:\n            id: Task-ID (required)\n            response: Antwort\/Ergebnis (required)\n            executor: Ausführende Instanz (required)\n            executor_type: Typ (human, ollama, claude, anthropic_api)\n            model_name: Verwendetes Modell\n            status: Ergebnis-Status (success, error, partial)\n            tokens_input: Eingabe-Tokens\n            tokens_output: Ausgabe-Tokens\n            cost_usd: Kosten in USD\n            error_message: Bei Fehlern\n\n        Returns:\n            Gespeichertes Ergebnis oder Fehlermeldung\n        \"\"\"\n        start = time.time()\n        request_str = json.dumps({\"id\": id, \"executor\": executor, \"status\": status})\n\n        try:\n            valid, error = validate_executor_type(executor_type)\n            if not valid:\n                log_tool_call(logger, \"tasks_result\", request_str, \"denied\", task_id=id, error_message=error)\n                return {\"success\": False, \"error\": error}\n\n            task = repo.find_by_id(id)\n            if not task:\n                log_tool_call(logger, \"tasks_result\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n                return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n            duration_ms = int((time.time() - start) * 1000)\n\n            result = TaskResult(\n                task_id=id,\n                executor=executor,\n                executor_type=ExecutorType(executor_type),\n                model_name=model_name,\n                response=response,\n                status=status,\n                error_message=error_message,\n                tokens_input=tokens_input,\n                tokens_output=tokens_output,\n                cost_usd=cost_usd,\n                duration_ms=duration_ms,\n            )\n            result_id = repo.create_result(result)\n            result.id = result_id\n\n            log_tool_call(logger, \"tasks_result\", request_str, \"success\", duration_ms, task_id=id)\n\n            return {\"success\": True, \"result\": result.to_dict(), \"message\": f\"Result saved for Task #{id}\"}\n\n        except Exception as e:\n            duration = int((time.time() - start) * 1000)\n            log_tool_call(logger, \"tasks_result\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n            return {\"success\": False, \"error\": str(e)}\n"
    }
}

Response

{
    "tool_response": {
        "type": "create",
        "filePath": "\/var\/www\/mcp-servers\/mcp-tasks\/tools\/task_tools\/workflow_tools.py",
        "content": "\"\"\"Task workflow tools - Status, Assign, Result.\"\"\"\nimport json\nimport time\n\nimport sys\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp-tasks\")\n\nfrom config import Config\nfrom domain.contracts import TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType\nfrom validators.workflow_validator import WorkflowValidator\nfrom .base import get_repo, get_task_logger, validate_status, validate_executor_type, log_tool_call\n\n\ndef register_workflow_tools(mcp):\n    \"\"\"Register task workflow tools.\"\"\"\n\n    repo = get_repo()\n    logger = get_task_logger()\n\n    @mcp.tool()\n    def tasks_status(id: int, status: str) -> dict:\n        \"\"\"\n        Ändert den Status eines Tasks.\n\n        Args:\n            id: Task-ID (required)\n            status: Neuer Status (pending, in_progress, completed, failed, cancelled)\n\n        Returns:\n            Aktualisierter Task oder Fehlermeldung\n        \"\"\"\n        start = time.time()\n        request_str = json.dumps({\"id\": id, \"status\": status})\n\n        try:\n            valid, error = validate_status(status)\n            if not valid:\n                log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error)\n                return {\"success\": False, \"error\": error}\n\n            task = repo.find_by_id(id)\n            if not task:\n                log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n                return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n            old_status = task.status.value if isinstance(task.status, TaskStatus) else task.status\n\n            valid, error_msg = WorkflowValidator.validate_status_transition(old_status, status)\n            if not valid:\n                log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error_msg)\n                return {\n                    \"success\": False,\n                    \"error\": error_msg,\n                    \"allowed_transitions\": WorkflowValidator.get_allowed_transitions(old_status),\n                }\n\n            warning_msg = None\n            if status == \"completed\":\n                task_type = task.type.value if isinstance(task.type, TaskType) else task.type\n                results = repo.get_results(id)\n                has_result = len(results) > 0\n\n                valid, error_msg = WorkflowValidator.validate_completion(task_type, has_result)\n                if not valid:\n                    log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error_msg)\n                    return {\"success\": False, \"error\": error_msg}\n\n                is_code = WorkflowValidator.is_code_task(task.title, task.description)\n                _, quality_warning, _ = WorkflowValidator.validate_quality_gate(results, is_code)\n                if quality_warning:\n                    warning_msg = quality_warning\n\n            repo.update(id, {\"status\": status})\n            updated_task = repo.find_by_id(id)\n\n            duration = int((time.time() - start) * 1000)\n            log_tool_call(logger, \"tasks_status\", f\"{old_status} -> {status}\", \"success\", duration, task_id=id)\n\n            response = {\n                \"success\": True,\n                \"task\": updated_task.to_dict(),\n                \"message\": f\"Task #{id} status changed: {old_status} -> {status}\",\n            }\n            if warning_msg:\n                response[\"warning\"] = warning_msg\n            return response\n\n        except Exception as e:\n            duration = int((time.time() - start) * 1000)\n            log_tool_call(logger, \"tasks_status\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n            return {\"success\": False, \"error\": str(e)}\n\n    @mcp.tool()\n    def tasks_assign(\n        id: int,\n        assignee: str,\n        assignee_type: str,\n        model_name: str | None = None,\n        notes: str | None = None,\n    ) -> dict:\n        \"\"\"\n        Weist einen Task einer Person oder KI zu.\n\n        Args:\n            id: Task-ID (required)\n            assignee: Name der Person\/KI (required)\n            assignee_type: Typ (human, ollama, claude, anthropic_api)\n            model_name: Modellname bei KI\n            notes: Anmerkungen\n\n        Returns:\n            Erstellte Zuweisung oder Fehlermeldung\n        \"\"\"\n        start = time.time()\n        request_str = json.dumps({\"id\": id, \"assignee\": assignee, \"assignee_type\": assignee_type})\n\n        try:\n            valid, error = validate_executor_type(assignee_type)\n            if not valid:\n                log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=error)\n                return {\"success\": False, \"error\": error}\n\n            task = repo.find_by_id(id)\n            if not task:\n                log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n                return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n            task_type = task.type.value if isinstance(task.type, TaskType) else task.type\n            valid, warning_msg = WorkflowValidator.validate_assignment(task_type, assignee_type, model_name)\n            if not valid:\n                log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=warning_msg)\n                return {\"success\": False, \"error\": warning_msg}\n\n            assignment = TaskAssignment(\n                task_id=id,\n                assignee=assignee,\n                assignee_type=ExecutorType(assignee_type),\n                model_name=model_name,\n                notes=notes,\n            )\n            assignment_id = repo.create_assignment(assignment)\n            assignment.id = assignment_id\n\n            duration = int((time.time() - start) * 1000)\n            log_tool_call(logger, \"tasks_assign\", request_str, \"success\", duration, task_id=id)\n\n            return {\n                \"success\": True,\n                \"assignment\": assignment.to_dict(),\n                \"message\": f\"Task #{id} assigned to {assignee}\",\n            }\n\n        except Exception as e:\n            duration = int((time.time() - start) * 1000)\n            log_tool_call(logger, \"tasks_assign\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n            return {\"success\": False, \"error\": str(e)}\n\n    @mcp.tool()\n    def tasks_result(\n        id: int,\n        response: str,\n        executor: str,\n        executor_type: str,\n        model_name: str | None = None,\n        status: str = \"success\",\n        tokens_input: int = 0,\n        tokens_output: int = 0,\n        cost_usd: float = 0.0,\n        error_message: str | None = None,\n    ) -> dict:\n        \"\"\"\n        Speichert ein Ergebnis für einen Task.\n\n        Args:\n            id: Task-ID (required)\n            response: Antwort\/Ergebnis (required)\n            executor: Ausführende Instanz (required)\n            executor_type: Typ (human, ollama, claude, anthropic_api)\n            model_name: Verwendetes Modell\n            status: Ergebnis-Status (success, error, partial)\n            tokens_input: Eingabe-Tokens\n            tokens_output: Ausgabe-Tokens\n            cost_usd: Kosten in USD\n            error_message: Bei Fehlern\n\n        Returns:\n            Gespeichertes Ergebnis oder Fehlermeldung\n        \"\"\"\n        start = time.time()\n        request_str = json.dumps({\"id\": id, \"executor\": executor, \"status\": status})\n\n        try:\n            valid, error = validate_executor_type(executor_type)\n            if not valid:\n                log_tool_call(logger, \"tasks_result\", request_str, \"denied\", task_id=id, error_message=error)\n                return {\"success\": False, \"error\": error}\n\n            task = repo.find_by_id(id)\n            if not task:\n                log_tool_call(logger, \"tasks_result\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n                return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n            duration_ms = int((time.time() - start) * 1000)\n\n            result = TaskResult(\n                task_id=id,\n                executor=executor,\n                executor_type=ExecutorType(executor_type),\n                model_name=model_name,\n                response=response,\n                status=status,\n                error_message=error_message,\n                tokens_input=tokens_input,\n                tokens_output=tokens_output,\n                cost_usd=cost_usd,\n                duration_ms=duration_ms,\n            )\n            result_id = repo.create_result(result)\n            result.id = result_id\n\n            log_tool_call(logger, \"tasks_result\", request_str, \"success\", duration_ms, task_id=id)\n\n            return {\"success\": True, \"result\": result.to_dict(), \"message\": f\"Result saved for Task #{id}\"}\n\n        except Exception as e:\n            duration = int((time.time() - start) * 1000)\n            log_tool_call(logger, \"tasks_result\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n            return {\"success\": False, \"error\": str(e)}\n",
        "structuredPatch": [],
        "originalFile": null
    }
}
← Vorheriger Zur Liste Nächster →