workflow_tools.py
- Pfad:
/var/www/mcp-servers/mcp-tasks/tools/task_tools/workflow_tools.py
- Namespace: -
- Zeilen: 232 | Größe: 9,203 Bytes
- Geändert: 2025-12-28 12:21:49 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 73
- Dependencies: 0 (25%)
- LOC: 89 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Issues 1
| Zeile |
Typ |
Beschreibung |
| - |
coupling |
Klasse hat 16 Dependencies (max: 15) |
Dependencies 16
- use json
- use time
- use sys
- use constants.MS_PER_SECOND
- use config.Config
- use domain.contracts.TaskAssignment
- use domain.contracts.TaskResult
- use domain.contracts.TaskStatus
- use domain.contracts.TaskType
- use domain.contracts.ExecutorType
- use validators.workflow_validator.WorkflowValidator
- use base.get_repo
- use base.get_task_logger
- use base.validate_status
- use base.validate_executor_type
- use base.log_tool_call
Funktionen 1
-
register_workflow_tools()
Zeile 17
Code
"""Task workflow tools - Status, Assign, Result."""
import json
import time
import sys
sys.path.insert(0, "/var/www/mcp-servers/mcp_tasks")
sys.path.insert(0, "/var/www/mcp-servers/shared")
from constants import MS_PER_SECOND
from config import Config
from domain.contracts import TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType
from validators.workflow_validator import WorkflowValidator
from .base import get_repo, get_task_logger, validate_status, validate_executor_type, log_tool_call
def register_workflow_tools(mcp):
"""Register task workflow tools."""
repo = get_repo()
logger = get_task_logger()
@mcp.tool()
def tasks_status(id: int, status: str) -> dict:
"""
Ändert den Status eines Tasks.
Args:
id: Task-ID (required)
status: Neuer Status (pending, in_progress, completed, failed, cancelled)
Returns:
Aktualisierter Task oder Fehlermeldung
"""
start = time.time()
request_str = json.dumps({"id": id, "status": status})
try:
valid, error = validate_status(status)
if not valid:
log_tool_call(logger, "tasks_status", request_str, "denied", task_id=id, error_message=error)
return {"success": False, "error": error}
task = repo.find_by_id(id)
if not task:
log_tool_call(logger, "tasks_status", request_str, "denied", task_id=id, error_message="Task not found")
return {"success": False, "error": f"Task {id} not found"}
old_status = task.status.value if isinstance(task.status, TaskStatus) else task.status
valid, error_msg = WorkflowValidator.validate_status_transition(old_status, status)
if not valid:
log_tool_call(logger, "tasks_status", request_str, "denied", task_id=id, error_message=error_msg)
return {
"success": False,
"error": error_msg,
"allowed_transitions": WorkflowValidator.get_allowed_transitions(old_status),
}
warning_msg = None
if status == "completed":
task_type = task.type.value if isinstance(task.type, TaskType) else task.type
results = repo.get_results(id)
has_result = len(results) > 0
valid, error_msg = WorkflowValidator.validate_completion(task_type, has_result)
if not valid:
log_tool_call(logger, "tasks_status", request_str, "denied", task_id=id, error_message=error_msg)
return {"success": False, "error": error_msg}
is_code = WorkflowValidator.is_code_task(task.title, task.description)
_, quality_warning, _ = WorkflowValidator.validate_quality_gate(results, is_code)
if quality_warning:
warning_msg = quality_warning
repo.update(id, {"status": status})
updated_task = repo.find_by_id(id)
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_status", f"{old_status} -> {status}", "success", duration, task_id=id)
response = {
"success": True,
"task": updated_task.to_dict(),
"message": f"Task #{id} status changed: {old_status} -> {status}",
}
if warning_msg:
response["warning"] = warning_msg
return response
except Exception as e:
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_status", request_str, "error", duration, task_id=id, error_message=str(e))
return {"success": False, "error": str(e)}
@mcp.tool()
def tasks_assign(
id: int,
assignee: str,
assignee_type: str,
model_name: str | None = None,
notes: str | None = None,
) -> dict:
"""
Weist einen Task einer Person oder KI zu.
Args:
id: Task-ID (required)
assignee: Name der Person/KI (required)
assignee_type: Typ (human, ollama, claude, anthropic_api)
model_name: Modellname bei KI
notes: Anmerkungen
Returns:
Erstellte Zuweisung oder Fehlermeldung
"""
start = time.time()
request_str = json.dumps({"id": id, "assignee": assignee, "assignee_type": assignee_type})
try:
valid, error = validate_executor_type(assignee_type)
if not valid:
log_tool_call(logger, "tasks_assign", request_str, "denied", task_id=id, error_message=error)
return {"success": False, "error": error}
task = repo.find_by_id(id)
if not task:
log_tool_call(logger, "tasks_assign", request_str, "denied", task_id=id, error_message="Task not found")
return {"success": False, "error": f"Task {id} not found"}
task_type = task.type.value if isinstance(task.type, TaskType) else task.type
valid, warning_msg = WorkflowValidator.validate_assignment(task_type, assignee_type, model_name)
if not valid:
log_tool_call(logger, "tasks_assign", request_str, "denied", task_id=id, error_message=warning_msg)
return {"success": False, "error": warning_msg}
assignment = TaskAssignment(
task_id=id,
assignee=assignee,
assignee_type=ExecutorType(assignee_type),
model_name=model_name,
notes=notes,
)
assignment_id = repo.create_assignment(assignment)
assignment.id = assignment_id
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_assign", request_str, "success", duration, task_id=id)
return {
"success": True,
"assignment": assignment.to_dict(),
"message": f"Task #{id} assigned to {assignee}",
}
except Exception as e:
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_assign", request_str, "error", duration, task_id=id, error_message=str(e))
return {"success": False, "error": str(e)}
@mcp.tool()
def tasks_result(
id: int,
response: str,
executor: str,
executor_type: str,
model_name: str | None = None,
status: str = "success",
tokens_input: int = 0,
tokens_output: int = 0,
cost_usd: float = 0.0,
error_message: str | None = None,
) -> dict:
"""
Speichert ein Ergebnis für einen Task.
Args:
id: Task-ID (required)
response: Antwort/Ergebnis (required)
executor: Ausführende Instanz (required)
executor_type: Typ (human, ollama, claude, anthropic_api)
model_name: Verwendetes Modell
status: Ergebnis-Status (success, error, partial)
tokens_input: Eingabe-Tokens
tokens_output: Ausgabe-Tokens
cost_usd: Kosten in USD
error_message: Bei Fehlern
Returns:
Gespeichertes Ergebnis oder Fehlermeldung
"""
start = time.time()
request_str = json.dumps({"id": id, "executor": executor, "status": status})
try:
valid, error = validate_executor_type(executor_type)
if not valid:
log_tool_call(logger, "tasks_result", request_str, "denied", task_id=id, error_message=error)
return {"success": False, "error": error}
task = repo.find_by_id(id)
if not task:
log_tool_call(logger, "tasks_result", request_str, "denied", task_id=id, error_message="Task not found")
return {"success": False, "error": f"Task {id} not found"}
duration_ms = int((time.time() - start) * MS_PER_SECOND)
result = TaskResult(
task_id=id,
executor=executor,
executor_type=ExecutorType(executor_type),
model_name=model_name,
response=response,
status=status,
error_message=error_message,
tokens_input=tokens_input,
tokens_output=tokens_output,
cost_usd=cost_usd,
duration_ms=duration_ms,
)
result_id = repo.create_result(result)
result.id = result_id
log_tool_call(logger, "tasks_result", request_str, "success", duration_ms, task_id=id)
return {"success": True, "result": result.to_dict(), "message": f"Result saved for Task #{id}"}
except Exception as e:
duration = int((time.time() - start) * MS_PER_SECOND)
log_tool_call(logger, "tasks_result", request_str, "error", duration, task_id=id, error_message=str(e))
return {"success": False, "error": str(e)}