workflow_tools.py

Code Hygiene Score: 73

Issues 1

Zeile Typ Beschreibung
- coupling Klasse hat 16 Dependencies (max: 15)

Dependencies 16

Funktionen 1

Code

"""Task workflow tools - Status, Assign, Result."""
import json
import time

import sys
sys.path.insert(0, "/var/www/mcp-servers/mcp_tasks")
sys.path.insert(0, "/var/www/mcp-servers/shared")

from constants import MS_PER_SECOND

from config import Config
from domain.contracts import TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType
from validators.workflow_validator import WorkflowValidator
from .base import get_repo, get_task_logger, validate_status, validate_executor_type, log_tool_call


def register_workflow_tools(mcp):
    """Register task workflow tools."""

    repo = get_repo()
    logger = get_task_logger()

    @mcp.tool()
    def tasks_status(id: int, status: str) -> dict:
        """
        Ändert den Status eines Tasks.

        Args:
            id: Task-ID (required)
            status: Neuer Status (pending, in_progress, completed, failed, cancelled)

        Returns:
            Aktualisierter Task oder Fehlermeldung
        """
        start = time.time()
        request_str = json.dumps({"id": id, "status": status})

        try:
            valid, error = validate_status(status)
            if not valid:
                log_tool_call(logger, "tasks_status", request_str, "denied", task_id=id, error_message=error)
                return {"success": False, "error": error}

            task = repo.find_by_id(id)
            if not task:
                log_tool_call(logger, "tasks_status", request_str, "denied", task_id=id, error_message="Task not found")
                return {"success": False, "error": f"Task {id} not found"}

            old_status = task.status.value if isinstance(task.status, TaskStatus) else task.status

            valid, error_msg = WorkflowValidator.validate_status_transition(old_status, status)
            if not valid:
                log_tool_call(logger, "tasks_status", request_str, "denied", task_id=id, error_message=error_msg)
                return {
                    "success": False,
                    "error": error_msg,
                    "allowed_transitions": WorkflowValidator.get_allowed_transitions(old_status),
                }

            warning_msg = None
            if status == "completed":
                task_type = task.type.value if isinstance(task.type, TaskType) else task.type
                results = repo.get_results(id)
                has_result = len(results) > 0

                valid, error_msg = WorkflowValidator.validate_completion(task_type, has_result)
                if not valid:
                    log_tool_call(logger, "tasks_status", request_str, "denied", task_id=id, error_message=error_msg)
                    return {"success": False, "error": error_msg}

                is_code = WorkflowValidator.is_code_task(task.title, task.description)
                _, quality_warning, _ = WorkflowValidator.validate_quality_gate(results, is_code)
                if quality_warning:
                    warning_msg = quality_warning

            repo.update(id, {"status": status})
            updated_task = repo.find_by_id(id)

            duration = int((time.time() - start) * MS_PER_SECOND)
            log_tool_call(logger, "tasks_status", f"{old_status} -> {status}", "success", duration, task_id=id)

            response = {
                "success": True,
                "task": updated_task.to_dict(),
                "message": f"Task #{id} status changed: {old_status} -> {status}",
            }
            if warning_msg:
                response["warning"] = warning_msg
            return response

        except Exception as e:
            duration = int((time.time() - start) * MS_PER_SECOND)
            log_tool_call(logger, "tasks_status", request_str, "error", duration, task_id=id, error_message=str(e))
            return {"success": False, "error": str(e)}

    @mcp.tool()
    def tasks_assign(
        id: int,
        assignee: str,
        assignee_type: str,
        model_name: str | None = None,
        notes: str | None = None,
    ) -> dict:
        """
        Weist einen Task einer Person oder KI zu.

        Args:
            id: Task-ID (required)
            assignee: Name der Person/KI (required)
            assignee_type: Typ (human, ollama, claude, anthropic_api)
            model_name: Modellname bei KI
            notes: Anmerkungen

        Returns:
            Erstellte Zuweisung oder Fehlermeldung
        """
        start = time.time()
        request_str = json.dumps({"id": id, "assignee": assignee, "assignee_type": assignee_type})

        try:
            valid, error = validate_executor_type(assignee_type)
            if not valid:
                log_tool_call(logger, "tasks_assign", request_str, "denied", task_id=id, error_message=error)
                return {"success": False, "error": error}

            task = repo.find_by_id(id)
            if not task:
                log_tool_call(logger, "tasks_assign", request_str, "denied", task_id=id, error_message="Task not found")
                return {"success": False, "error": f"Task {id} not found"}

            task_type = task.type.value if isinstance(task.type, TaskType) else task.type
            valid, warning_msg = WorkflowValidator.validate_assignment(task_type, assignee_type, model_name)
            if not valid:
                log_tool_call(logger, "tasks_assign", request_str, "denied", task_id=id, error_message=warning_msg)
                return {"success": False, "error": warning_msg}

            assignment = TaskAssignment(
                task_id=id,
                assignee=assignee,
                assignee_type=ExecutorType(assignee_type),
                model_name=model_name,
                notes=notes,
            )
            assignment_id = repo.create_assignment(assignment)
            assignment.id = assignment_id

            duration = int((time.time() - start) * MS_PER_SECOND)
            log_tool_call(logger, "tasks_assign", request_str, "success", duration, task_id=id)

            return {
                "success": True,
                "assignment": assignment.to_dict(),
                "message": f"Task #{id} assigned to {assignee}",
            }

        except Exception as e:
            duration = int((time.time() - start) * MS_PER_SECOND)
            log_tool_call(logger, "tasks_assign", request_str, "error", duration, task_id=id, error_message=str(e))
            return {"success": False, "error": str(e)}

    @mcp.tool()
    def tasks_result(
        id: int,
        response: str,
        executor: str,
        executor_type: str,
        model_name: str | None = None,
        status: str = "success",
        tokens_input: int = 0,
        tokens_output: int = 0,
        cost_usd: float = 0.0,
        error_message: str | None = None,
    ) -> dict:
        """
        Speichert ein Ergebnis für einen Task.

        Args:
            id: Task-ID (required)
            response: Antwort/Ergebnis (required)
            executor: Ausführende Instanz (required)
            executor_type: Typ (human, ollama, claude, anthropic_api)
            model_name: Verwendetes Modell
            status: Ergebnis-Status (success, error, partial)
            tokens_input: Eingabe-Tokens
            tokens_output: Ausgabe-Tokens
            cost_usd: Kosten in USD
            error_message: Bei Fehlern

        Returns:
            Gespeichertes Ergebnis oder Fehlermeldung
        """
        start = time.time()
        request_str = json.dumps({"id": id, "executor": executor, "status": status})

        try:
            valid, error = validate_executor_type(executor_type)
            if not valid:
                log_tool_call(logger, "tasks_result", request_str, "denied", task_id=id, error_message=error)
                return {"success": False, "error": error}

            task = repo.find_by_id(id)
            if not task:
                log_tool_call(logger, "tasks_result", request_str, "denied", task_id=id, error_message="Task not found")
                return {"success": False, "error": f"Task {id} not found"}

            duration_ms = int((time.time() - start) * MS_PER_SECOND)

            result = TaskResult(
                task_id=id,
                executor=executor,
                executor_type=ExecutorType(executor_type),
                model_name=model_name,
                response=response,
                status=status,
                error_message=error_message,
                tokens_input=tokens_input,
                tokens_output=tokens_output,
                cost_usd=cost_usd,
                duration_ms=duration_ms,
            )
            result_id = repo.create_result(result)
            result.id = result_id

            log_tool_call(logger, "tasks_result", request_str, "success", duration_ms, task_id=id)

            return {"success": True, "result": result.to_dict(), "message": f"Result saved for Task #{id}"}

        except Exception as e:
            duration = int((time.time() - start) * MS_PER_SECOND)
            log_tool_call(logger, "tasks_result", request_str, "error", duration, task_id=id, error_message=str(e))
            return {"success": False, "error": str(e)}
← Übersicht