Protokoll #27634

ID27634
Zeitstempel2025-12-28 10:02:49.801532
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuspending
Tokens2,677 (Input: 2,677, Output: 0)
Dauer-
Request-Zeit2025-12-28 10:02:49.801532
Response-Zeit-

Request

{
    "event": "PostToolUse",
    "tool_name": "Read",
    "tool_response": {
        "type": "text",
        "file": {
            "filePath": "\/var\/www\/mcp-servers\/mcp-tasks\/infrastructure\/task_repository.py",
            "content": "\"\"\"Task Repository - Datenbankzugriff für Tasks\"\"\"\nimport sys\nimport uuid\nfrom datetime import datetime\nfrom typing import Optional, List, Dict, Any\n\nsys.path.insert(0, \"\/opt\/mcp-servers\/mcp-tasks\")\nfrom domain.contracts import Task, TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType\nfrom infrastructure.db_connection import DatabaseConnection\n\n\nclass TaskRepository:\n    \"\"\"Repository für Task-CRUD-Operationen\"\"\"\n\n    # ==================== TASK CRUD ====================\n\n    def find_by_id(self, task_id: int) -> Optional[Task]:\n        \"\"\"Findet Task nach ID\"\"\"\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                cursor.execute(\"SELECT * FROM tasks WHERE id = %s\", (task_id,))\n                row = cursor.fetchone()\n                return self._row_to_task(row) if row else None\n\n    def find_all(\n        self,\n        status: Optional[str] = None,\n        task_type: Optional[str] = None,\n        search: Optional[str] = None,\n        limit: int = 50,\n        offset: int = 0,\n    ) -> List[Task]:\n        \"\"\"Findet Tasks mit Filtern\"\"\"\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                sql = \"SELECT * FROM tasks WHERE 1=1\"\n                params = []\n\n                if status:\n                    sql += \" AND status = %s\"\n                    params.append(status)\n\n                if task_type:\n                    sql += \" AND type = %s\"\n                    params.append(task_type)\n\n                if search:\n                    sql += \" AND (title LIKE %s OR description LIKE %s)\"\n                    params.extend([f\"%{search}%\", f\"%{search}%\"])\n\n                sql += \" ORDER BY created_at DESC LIMIT %s OFFSET %s\"\n                params.extend([limit, offset])\n\n                cursor.execute(sql, params)\n                rows = cursor.fetchall()\n                return [self._row_to_task(row) for row in rows]\n\n    def count(\n        self,\n        status: Optional[str] = None,\n        task_type: Optional[str] = None,\n        search: Optional[str] = None,\n    ) -> int:\n        \"\"\"Zählt Tasks mit Filtern\"\"\"\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                sql = \"SELECT COUNT(*) as cnt FROM tasks WHERE 1=1\"\n                params = []\n\n                if status:\n                    sql += \" AND status = %s\"\n                    params.append(status)\n\n                if task_type:\n                    sql += \" AND type = %s\"\n                    params.append(task_type)\n\n                if search:\n                    sql += \" AND (title LIKE %s OR description LIKE %s)\"\n                    params.extend([f\"%{search}%\", f\"%{search}%\"])\n\n                cursor.execute(sql, params)\n                row = cursor.fetchone()\n                return row[\"cnt\"] if row else 0\n\n    def create(self, task: Task) -> int:\n        \"\"\"Erstellt neuen Task, gibt ID zurück\"\"\"\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                task.uuid = str(uuid.uuid4())\n                task.created_at = datetime.now()\n                task.updated_at = datetime.now()\n\n                sql = \"\"\"\n                    INSERT INTO tasks\n                    (uuid, title, description, type, status, created_by, created_by_type,\n                     parent_task_id, due_date, created_at, updated_at, metadata)\n                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\n                \"\"\"\n\n                import json\n                cursor.execute(sql, (\n                    task.uuid,\n                    task.title,\n                    task.description,\n                    task.type.value if isinstance(task.type, TaskType) else task.type,\n                    task.status.value if isinstance(task.status, TaskStatus) else task.status,\n                    task.created_by,\n                    task.created_by_type,\n                    task.parent_task_id,\n                    task.due_date,\n                    task.created_at,\n                    task.updated_at,\n                    json.dumps(task.metadata) if task.metadata else None,\n                ))\n                return cursor.lastrowid\n\n    def update(self, task_id: int, updates: Dict[str, Any]) -> bool:\n        \"\"\"Aktualisiert Task-Felder\"\"\"\n        if not updates:\n            return False\n\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                # Erlaubte Felder\n                allowed = {\"title\", \"description\", \"type\", \"status\", \"due_date\", \"metadata\"}\n                filtered = {k: v for k, v in updates.items() if k in allowed}\n\n                if not filtered:\n                    return False\n\n                filtered[\"updated_at\"] = datetime.now()\n\n                # Wenn Status auf completed, setze completed_at\n                if filtered.get(\"status\") == \"completed\":\n                    filtered[\"completed_at\"] = datetime.now()\n\n                set_clause = \", \".join([f\"{k} = %s\" for k in filtered.keys()])\n                sql = f\"UPDATE tasks SET {set_clause} WHERE id = %s\"\n\n                values = list(filtered.values()) + [task_id]\n                cursor.execute(sql, values)\n                return cursor.rowcount > 0\n\n    def delete(self, task_id: int) -> bool:\n        \"\"\"Löscht Task\"\"\"\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                cursor.execute(\"DELETE FROM tasks WHERE id = %s\", (task_id,))\n                return cursor.rowcount > 0\n\n    # ==================== ASSIGNMENTS ====================\n\n    def create_assignment(self, assignment: TaskAssignment) -> int:\n        \"\"\"Erstellt Task-Zuweisung\"\"\"\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                sql = \"\"\"\n                    INSERT INTO task_assignments\n                    (task_id, assignee, assignee_type, model_name, status, assigned_at, notes)\n                    VALUES (%s, %s, %s, %s, %s, %s, %s)\n                \"\"\"\n                cursor.execute(sql, (\n                    assignment.task_id,\n                    assignment.assignee,\n                    assignment.assignee_type.value if isinstance(assignment.assignee_type, ExecutorType) else assignment.assignee_type,\n                    assignment.model_name,\n                    assignment.status,\n                    assignment.assigned_at or datetime.now(),\n                    assignment.notes,\n                ))\n                return cursor.lastrowid\n\n    def get_assignments(self, task_id: int) -> List[TaskAssignment]:\n        \"\"\"Holt Zuweisungen für Task\"\"\"\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                cursor.execute(\n                    \"SELECT * FROM task_assignments WHERE task_id = %s ORDER BY assigned_at DESC\",\n                    (task_id,)\n                )\n                rows = cursor.fetchall()\n                return [self._row_to_assignment(row) for row in rows]\n\n    # ==================== RESULTS ====================\n\n    def create_result(self, result: TaskResult) -> int:\n        \"\"\"Speichert Task-Ergebnis\"\"\"\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                sql = \"\"\"\n                    INSERT INTO task_results\n                    (task_id, executor, executor_type, model_name, request, response,\n                     status, error_message, tokens_input, tokens_output, cost_usd,\n                     duration_ms, created_at)\n                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\n                \"\"\"\n                cursor.execute(sql, (\n                    result.task_id,\n                    result.executor,\n                    result.executor_type.value if isinstance(result.executor_type, ExecutorType) else result.executor_type,\n                    result.model_name,\n                    result.request,\n                    result.response,\n                    result.status,\n                    result.error_message,\n                    result.tokens_input,\n                    result.tokens_output,\n                    result.cost_usd,\n                    result.duration_ms,\n                    result.created_at or datetime.now(),\n                ))\n                return cursor.lastrowid\n\n    def get_results(self, task_id: int) -> List[TaskResult]:\n        \"\"\"Holt Ergebnisse für Task\"\"\"\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                cursor.execute(\n                    \"SELECT * FROM task_results WHERE task_id = %s ORDER BY created_at DESC\",\n                    (task_id,)\n                )\n                rows = cursor.fetchall()\n                return [self._row_to_result(row) for row in rows]\n\n    # ==================== STATISTICS ====================\n\n    def get_statistics(self) -> Dict[str, Any]:\n        \"\"\"Holt Task-Statistiken\"\"\"\n        with DatabaseConnection.get_connection() as conn:\n            with conn.cursor() as cursor:\n                stats = {}\n\n                # Task-Zähler nach Status\n                cursor.execute(\"\"\"\n                    SELECT status, COUNT(*) as cnt\n                    FROM tasks\n                    GROUP BY status\n                \"\"\")\n                stats[\"by_status\"] = {row[\"status\"]: row[\"cnt\"] for row in cursor.fetchall()}\n\n                # Task-Zähler nach Typ\n                cursor.execute(\"\"\"\n                    SELECT type, COUNT(*) as cnt\n                    FROM tasks\n                    GROUP BY type\n                \"\"\")\n                stats[\"by_type\"] = {row[\"type\"]: row[\"cnt\"] for row in cursor.fetchall()}\n\n                # Token-Statistiken\n                cursor.execute(\"\"\"\n                    SELECT\n                        SUM(tokens_i... [TRUNCATED-14c2fbf127be5c85]",
            "numLines": 332,
            "startLine": 1,
            "totalLines": 332
        }
    }
}

Response

-
← Vorheriger Zur Liste Nächster →