{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp-tasks\/tools\/task_tools\/workflow_tools.py",
"content": "\"\"\"Task workflow tools - Status, Assign, Result.\"\"\"\nimport json\nimport time\n\nimport sys\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp-tasks\")\n\nfrom config import Config\nfrom domain.contracts import TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType\nfrom validators.workflow_validator import WorkflowValidator\nfrom .base import get_repo, get_task_logger, validate_status, validate_executor_type, log_tool_call\n\n\ndef register_workflow_tools(mcp):\n \"\"\"Register task workflow tools.\"\"\"\n\n repo = get_repo()\n logger = get_task_logger()\n\n @mcp.tool()\n def tasks_status(id: int, status: str) -> dict:\n \"\"\"\n Ändert den Status eines Tasks.\n\n Args:\n id: Task-ID (required)\n status: Neuer Status (pending, in_progress, completed, failed, cancelled)\n\n Returns:\n Aktualisierter Task oder Fehlermeldung\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"id\": id, \"status\": status})\n\n try:\n valid, error = validate_status(status)\n if not valid:\n log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error)\n return {\"success\": False, \"error\": error}\n\n task = repo.find_by_id(id)\n if not task:\n log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n old_status = task.status.value if isinstance(task.status, TaskStatus) else task.status\n\n valid, error_msg = WorkflowValidator.validate_status_transition(old_status, status)\n if not valid:\n log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error_msg)\n return {\n \"success\": False,\n \"error\": error_msg,\n \"allowed_transitions\": WorkflowValidator.get_allowed_transitions(old_status),\n }\n\n warning_msg = None\n if status == \"completed\":\n task_type = task.type.value if isinstance(task.type, TaskType) else task.type\n results = repo.get_results(id)\n has_result = len(results) > 0\n\n valid, error_msg = WorkflowValidator.validate_completion(task_type, has_result)\n if not valid:\n log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error_msg)\n return {\"success\": False, \"error\": error_msg}\n\n is_code = WorkflowValidator.is_code_task(task.title, task.description)\n _, quality_warning, _ = WorkflowValidator.validate_quality_gate(results, is_code)\n if quality_warning:\n warning_msg = quality_warning\n\n repo.update(id, {\"status\": status})\n updated_task = repo.find_by_id(id)\n\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_status\", f\"{old_status} -> {status}\", \"success\", duration, task_id=id)\n\n response = {\n \"success\": True,\n \"task\": updated_task.to_dict(),\n \"message\": f\"Task #{id} status changed: {old_status} -> {status}\",\n }\n if warning_msg:\n response[\"warning\"] = warning_msg\n return response\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_status\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def tasks_assign(\n id: int,\n assignee: str,\n assignee_type: str,\n model_name: str | None = None,\n notes: str | None = None,\n ) -> dict:\n \"\"\"\n Weist einen Task einer Person oder KI zu.\n\n Args:\n id: Task-ID (required)\n assignee: Name der Person\/KI (required)\n assignee_type: Typ (human, ollama, claude, anthropic_api)\n model_name: Modellname bei KI\n notes: Anmerkungen\n\n Returns:\n Erstellte Zuweisung oder Fehlermeldung\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"id\": id, \"assignee\": assignee, \"assignee_type\": assignee_type})\n\n try:\n valid, error = validate_executor_type(assignee_type)\n if not valid:\n log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=error)\n return {\"success\": False, \"error\": error}\n\n task = repo.find_by_id(id)\n if not task:\n log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n task_type = task.type.value if isinstance(task.type, TaskType) else task.type\n valid, warning_msg = WorkflowValidator.validate_assignment(task_type, assignee_type, model_name)\n if not valid:\n log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=warning_msg)\n return {\"success\": False, \"error\": warning_msg}\n\n assignment = TaskAssignment(\n task_id=id,\n assignee=assignee,\n assignee_type=ExecutorType(assignee_type),\n model_name=model_name,\n notes=notes,\n )\n assignment_id = repo.create_assignment(assignment)\n assignment.id = assignment_id\n\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_assign\", request_str, \"success\", duration, task_id=id)\n\n return {\n \"success\": True,\n \"assignment\": assignment.to_dict(),\n \"message\": f\"Task #{id} assigned to {assignee}\",\n }\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_assign\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def tasks_result(\n id: int,\n response: str,\n executor: str,\n executor_type: str,\n model_name: str | None = None,\n status: str = \"success\",\n tokens_input: int = 0,\n tokens_output: int = 0,\n cost_usd: float = 0.0,\n error_message: str | None = None,\n ) -> dict:\n \"\"\"\n Speichert ein Ergebnis für einen Task.\n\n Args:\n id: Task-ID (required)\n response: Antwort\/Ergebnis (required)\n executor: Ausführende Instanz (required)\n executor_type: Typ (human, ollama, claude, anthropic_api)\n model_name: Verwendetes Modell\n status: Ergebnis-Status (success, error, partial)\n tokens_input: Eingabe-Tokens\n tokens_output: Ausgabe-Tokens\n cost_usd: Kosten in USD\n error_message: Bei Fehlern\n\n Returns:\n Gespeichertes Ergebnis oder Fehlermeldung\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"id\": id, \"executor\": executor, \"status\": status})\n\n try:\n valid, error = validate_executor_type(executor_type)\n if not valid:\n log_tool_call(logger, \"tasks_result\", request_str, \"denied\", task_id=id, error_message=error)\n return {\"success\": False, \"error\": error}\n\n task = repo.find_by_id(id)\n if not task:\n log_tool_call(logger, \"tasks_result\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n duration_ms = int((time.time() - start) * 1000)\n\n result = TaskResult(\n task_id=id,\n executor=executor,\n executor_type=ExecutorType(executor_type),\n model_name=model_name,\n response=response,\n status=status,\n error_message=error_message,\n tokens_input=tokens_input,\n tokens_output=tokens_output,\n cost_usd=cost_usd,\n duration_ms=duration_ms,\n )\n result_id = repo.create_result(result)\n result.id = result_id\n\n log_tool_call(logger, \"tasks_result\", request_str, \"success\", duration_ms, task_id=id)\n\n return {\"success\": True, \"result\": result.to_dict(), \"message\": f\"Result saved for Task #{id}\"}\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_result\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n return {\"success\": False, \"error\": str(e)}\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/mcp-servers\/mcp-tasks\/tools\/task_tools\/workflow_tools.py",
"content": "\"\"\"Task workflow tools - Status, Assign, Result.\"\"\"\nimport json\nimport time\n\nimport sys\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp-tasks\")\n\nfrom config import Config\nfrom domain.contracts import TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType\nfrom validators.workflow_validator import WorkflowValidator\nfrom .base import get_repo, get_task_logger, validate_status, validate_executor_type, log_tool_call\n\n\ndef register_workflow_tools(mcp):\n \"\"\"Register task workflow tools.\"\"\"\n\n repo = get_repo()\n logger = get_task_logger()\n\n @mcp.tool()\n def tasks_status(id: int, status: str) -> dict:\n \"\"\"\n Ändert den Status eines Tasks.\n\n Args:\n id: Task-ID (required)\n status: Neuer Status (pending, in_progress, completed, failed, cancelled)\n\n Returns:\n Aktualisierter Task oder Fehlermeldung\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"id\": id, \"status\": status})\n\n try:\n valid, error = validate_status(status)\n if not valid:\n log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error)\n return {\"success\": False, \"error\": error}\n\n task = repo.find_by_id(id)\n if not task:\n log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n old_status = task.status.value if isinstance(task.status, TaskStatus) else task.status\n\n valid, error_msg = WorkflowValidator.validate_status_transition(old_status, status)\n if not valid:\n log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error_msg)\n return {\n \"success\": False,\n \"error\": error_msg,\n \"allowed_transitions\": WorkflowValidator.get_allowed_transitions(old_status),\n }\n\n warning_msg = None\n if status == \"completed\":\n task_type = task.type.value if isinstance(task.type, TaskType) else task.type\n results = repo.get_results(id)\n has_result = len(results) > 0\n\n valid, error_msg = WorkflowValidator.validate_completion(task_type, has_result)\n if not valid:\n log_tool_call(logger, \"tasks_status\", request_str, \"denied\", task_id=id, error_message=error_msg)\n return {\"success\": False, \"error\": error_msg}\n\n is_code = WorkflowValidator.is_code_task(task.title, task.description)\n _, quality_warning, _ = WorkflowValidator.validate_quality_gate(results, is_code)\n if quality_warning:\n warning_msg = quality_warning\n\n repo.update(id, {\"status\": status})\n updated_task = repo.find_by_id(id)\n\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_status\", f\"{old_status} -> {status}\", \"success\", duration, task_id=id)\n\n response = {\n \"success\": True,\n \"task\": updated_task.to_dict(),\n \"message\": f\"Task #{id} status changed: {old_status} -> {status}\",\n }\n if warning_msg:\n response[\"warning\"] = warning_msg\n return response\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_status\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def tasks_assign(\n id: int,\n assignee: str,\n assignee_type: str,\n model_name: str | None = None,\n notes: str | None = None,\n ) -> dict:\n \"\"\"\n Weist einen Task einer Person oder KI zu.\n\n Args:\n id: Task-ID (required)\n assignee: Name der Person\/KI (required)\n assignee_type: Typ (human, ollama, claude, anthropic_api)\n model_name: Modellname bei KI\n notes: Anmerkungen\n\n Returns:\n Erstellte Zuweisung oder Fehlermeldung\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"id\": id, \"assignee\": assignee, \"assignee_type\": assignee_type})\n\n try:\n valid, error = validate_executor_type(assignee_type)\n if not valid:\n log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=error)\n return {\"success\": False, \"error\": error}\n\n task = repo.find_by_id(id)\n if not task:\n log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n task_type = task.type.value if isinstance(task.type, TaskType) else task.type\n valid, warning_msg = WorkflowValidator.validate_assignment(task_type, assignee_type, model_name)\n if not valid:\n log_tool_call(logger, \"tasks_assign\", request_str, \"denied\", task_id=id, error_message=warning_msg)\n return {\"success\": False, \"error\": warning_msg}\n\n assignment = TaskAssignment(\n task_id=id,\n assignee=assignee,\n assignee_type=ExecutorType(assignee_type),\n model_name=model_name,\n notes=notes,\n )\n assignment_id = repo.create_assignment(assignment)\n assignment.id = assignment_id\n\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_assign\", request_str, \"success\", duration, task_id=id)\n\n return {\n \"success\": True,\n \"assignment\": assignment.to_dict(),\n \"message\": f\"Task #{id} assigned to {assignee}\",\n }\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_assign\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def tasks_result(\n id: int,\n response: str,\n executor: str,\n executor_type: str,\n model_name: str | None = None,\n status: str = \"success\",\n tokens_input: int = 0,\n tokens_output: int = 0,\n cost_usd: float = 0.0,\n error_message: str | None = None,\n ) -> dict:\n \"\"\"\n Speichert ein Ergebnis für einen Task.\n\n Args:\n id: Task-ID (required)\n response: Antwort\/Ergebnis (required)\n executor: Ausführende Instanz (required)\n executor_type: Typ (human, ollama, claude, anthropic_api)\n model_name: Verwendetes Modell\n status: Ergebnis-Status (success, error, partial)\n tokens_input: Eingabe-Tokens\n tokens_output: Ausgabe-Tokens\n cost_usd: Kosten in USD\n error_message: Bei Fehlern\n\n Returns:\n Gespeichertes Ergebnis oder Fehlermeldung\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"id\": id, \"executor\": executor, \"status\": status})\n\n try:\n valid, error = validate_executor_type(executor_type)\n if not valid:\n log_tool_call(logger, \"tasks_result\", request_str, \"denied\", task_id=id, error_message=error)\n return {\"success\": False, \"error\": error}\n\n task = repo.find_by_id(id)\n if not task:\n log_tool_call(logger, \"tasks_result\", request_str, \"denied\", task_id=id, error_message=\"Task not found\")\n return {\"success\": False, \"error\": f\"Task {id} not found\"}\n\n duration_ms = int((time.time() - start) * 1000)\n\n result = TaskResult(\n task_id=id,\n executor=executor,\n executor_type=ExecutorType(executor_type),\n model_name=model_name,\n response=response,\n status=status,\n error_message=error_message,\n tokens_input=tokens_input,\n tokens_output=tokens_output,\n cost_usd=cost_usd,\n duration_ms=duration_ms,\n )\n result_id = repo.create_result(result)\n result.id = result_id\n\n log_tool_call(logger, \"tasks_result\", request_str, \"success\", duration_ms, task_id=id)\n\n return {\"success\": True, \"result\": result.to_dict(), \"message\": f\"Result saved for Task #{id}\"}\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_result\", request_str, \"error\", duration, task_id=id, error_message=str(e))\n return {\"success\": False, \"error\": str(e)}\n",
"structuredPatch": [],
"originalFile": null
}
}