{
"tool_response": {
"filePath": "\/var\/www\/mcp-servers\/mcp-tasks\/tools\/task_tools\/list_tools.py",
"oldString": "import sys\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp-tasks\")",
"newString": "import sys\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp-tasks\")\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")\n\nfrom constants import MS_PER_SECOND",
"originalFile": "\"\"\"Task listing and statistics tools.\"\"\"\nimport json\nimport time\n\nimport sys\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp-tasks\")\n\nfrom config import Config\nfrom .base import get_repo, get_task_logger, validate_status, validate_type, log_tool_call\n\n\ndef register_list_tools(mcp):\n \"\"\"Register task list and statistics tools.\"\"\"\n\n repo = get_repo()\n logger = get_task_logger()\n\n @mcp.tool()\n def tasks_list(\n status: str | None = None,\n type: str | None = None,\n search: str | None = None,\n limit: int = 10,\n offset: int = 0,\n compact: bool = True,\n ) -> dict:\n \"\"\"\n Listet Tasks mit optionalen Filtern auf.\n\n Args:\n status: Filter nach Status (pending, in_progress, completed, failed, cancelled)\n type: Filter nach Typ (human_task, ai_task, mixed)\n search: Volltextsuche in Titel\/Beschreibung\n limit: Maximale Anzahl Ergebnisse (1-100, default: 10)\n offset: Offset für Pagination\n compact: True = nur id\/title\/status\/type (Token-sparend), False = alle Felder\n\n Returns:\n Dict mit tasks, total, limit, offset\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"status\": status, \"type\": type, \"search\": search, \"limit\": limit})\n\n try:\n limit = max(1, min(limit, Config.MAX_RESULTS))\n\n valid, error = validate_status(status)\n if not valid:\n log_tool_call(logger, \"tasks_list\", request_str, \"denied\", error_message=error)\n return {\"success\": False, \"error\": error}\n\n valid, error = validate_type(type)\n if not valid:\n log_tool_call(logger, \"tasks_list\", request_str, \"denied\", error_message=error)\n return {\"success\": False, \"error\": error}\n\n tasks = repo.find_all(status=status, task_type=type, search=search, limit=limit, offset=offset)\n total = repo.count(status=status, task_type=type, search=search)\n\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_list\", request_str, \"success\", duration)\n\n return {\n \"success\": True,\n \"tasks\": [t.to_dict_compact() if compact else t.to_dict() for t in tasks],\n \"total\": total,\n \"limit\": limit,\n \"offset\": offset,\n \"compact\": compact,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_list\", request_str, \"error\", duration, error_message=str(e))\n return {\"success\": False, \"error\": str(e)}\n\n @mcp.tool()\n def tasks_statistics() -> dict:\n \"\"\"\n Holt Statistiken über alle Tasks.\n\n Returns:\n Statistiken: Tasks nach Status\/Typ, Token-Verbrauch, Modell-Nutzung\n \"\"\"\n start = time.time()\n\n try:\n stats = repo.get_statistics()\n\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_statistics\", \"\", \"success\", duration)\n\n return {\"success\": True, \"statistics\": stats}\n\n except Exception as e:\n duration = int((time.time() - start) * 1000)\n log_tool_call(logger, \"tasks_statistics\", \"\", \"error\", duration, error_message=str(e))\n return {\"success\": False, \"error\": str(e)}\n",
"structuredPatch": [
{
"oldStart": 4,
"oldLines": 7,
"newStart": 4,
"newLines": 10,
"lines": [
" ",
" import sys",
" sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp-tasks\")",
"+sys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")",
" ",
"+from constants import MS_PER_SECOND",
"+",
" from config import Config",
" from .base import get_repo, get_task_logger, validate_status, validate_type, log_tool_call",
" "
]
}
],
"userModified": false,
"replaceAll": false
}
}