task_repository.py

Code Hygiene Score: 66

Keine Issues gefunden.

Dependencies 15

Klassen 1

Code

"""Task Repository - Datenbankzugriff für Tasks."""

import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional

from config import Config
from domain.contracts import (
    ExecutorType,
    Task,
    TaskAssignment,
    TaskResult,
    TaskStatus,
    TaskType,
)
from shared.infrastructure import SimpleDbConnection


class TaskRepository:
    """Repository für Task-CRUD-Operationen"""

    # ==================== TASK CRUD ====================

    def find_by_id(self, task_id: int) -> Optional[Task]:
        """Findet Task nach ID"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute("SELECT * FROM tasks WHERE id = %s", (task_id,))
                row = cursor.fetchone()
                return self._row_to_task(row) if row else None

    def find_all(
        self,
        status: Optional[str] = None,
        task_type: Optional[str] = None,
        search: Optional[str] = None,
        limit: int = 50,
        offset: int = 0,
    ) -> List[Task]:
        """Findet Tasks mit Filtern"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                sql = "SELECT * FROM tasks WHERE 1=1"
                params = []

                if status:
                    sql += " AND status = %s"
                    params.append(status)

                if task_type:
                    sql += " AND type = %s"
                    params.append(task_type)

                if search:
                    sql += " AND (title LIKE %s OR description LIKE %s)"
                    params.extend([f"%{search}%", f"%{search}%"])

                sql += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
                params.extend([limit, offset])

                cursor.execute(sql, params)
                rows = cursor.fetchall()
                return [self._row_to_task(row) for row in rows]

    def count(
        self,
        status: Optional[str] = None,
        task_type: Optional[str] = None,
        search: Optional[str] = None,
    ) -> int:
        """Zählt Tasks mit Filtern"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                sql = "SELECT COUNT(*) as cnt FROM tasks WHERE 1=1"
                params = []

                if status:
                    sql += " AND status = %s"
                    params.append(status)

                if task_type:
                    sql += " AND type = %s"
                    params.append(task_type)

                if search:
                    sql += " AND (title LIKE %s OR description LIKE %s)"
                    params.extend([f"%{search}%", f"%{search}%"])

                cursor.execute(sql, params)
                row = cursor.fetchone()
                return row["cnt"] if row else 0

    def create(self, task: Task) -> int:
        """Erstellt neuen Task, gibt ID zurück"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                task.uuid = str(uuid.uuid4())
                task.created_at = datetime.now()
                task.updated_at = datetime.now()

                sql = """
                    INSERT INTO tasks
                    (uuid, title, description, type, status, created_by, created_by_type,
                     parent_task_id, due_date, created_at, updated_at, metadata)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """

                import json
                cursor.execute(sql, (
                    task.uuid,
                    task.title,
                    task.description,
                    task.type.value if isinstance(task.type, TaskType) else task.type,
                    task.status.value if isinstance(task.status, TaskStatus) else task.status,
                    task.created_by,
                    task.created_by_type,
                    task.parent_task_id,
                    task.due_date,
                    task.created_at,
                    task.updated_at,
                    json.dumps(task.metadata) if task.metadata else None,
                ))
                return cursor.lastrowid

    def update(self, task_id: int, updates: Dict[str, Any]) -> bool:
        """Aktualisiert Task-Felder"""
        if not updates:
            return False

        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                # Erlaubte Felder
                allowed = {"title", "description", "type", "status", "due_date", "metadata"}
                filtered = {k: v for k, v in updates.items() if k in allowed}

                if not filtered:
                    return False

                filtered["updated_at"] = datetime.now()

                # Wenn Status auf completed, setze completed_at
                if filtered.get("status") == "completed":
                    filtered["completed_at"] = datetime.now()

                set_clause = ", ".join([f"{k} = %s" for k in filtered.keys()])
                sql = f"UPDATE tasks SET {set_clause} WHERE id = %s"

                values = list(filtered.values()) + [task_id]
                cursor.execute(sql, values)
                return cursor.rowcount > 0

    def delete(self, task_id: int) -> bool:
        """Löscht Task"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute("DELETE FROM tasks WHERE id = %s", (task_id,))
                return cursor.rowcount > 0

    # ==================== ASSIGNMENTS ====================

    def create_assignment(self, assignment: TaskAssignment) -> int:
        """Erstellt Task-Zuweisung"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                sql = """
                    INSERT INTO task_assignments
                    (task_id, assignee, assignee_type, model_name, status, assigned_at, notes)
                    VALUES (%s, %s, %s, %s, %s, %s, %s)
                """
                cursor.execute(sql, (
                    assignment.task_id,
                    assignment.assignee,
                    assignment.assignee_type.value if isinstance(assignment.assignee_type, ExecutorType) else assignment.assignee_type,
                    assignment.model_name,
                    assignment.status,
                    assignment.assigned_at or datetime.now(),
                    assignment.notes,
                ))
                return cursor.lastrowid

    def get_assignments(self, task_id: int) -> List[TaskAssignment]:
        """Holt Zuweisungen für Task"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    "SELECT * FROM task_assignments WHERE task_id = %s ORDER BY assigned_at DESC",
                    (task_id,)
                )
                rows = cursor.fetchall()
                return [self._row_to_assignment(row) for row in rows]

    # ==================== RESULTS ====================

    def create_result(self, result: TaskResult) -> int:
        """Speichert Task-Ergebnis"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                sql = """
                    INSERT INTO task_results
                    (task_id, executor, executor_type, model_name, request, response,
                     status, error_message, tokens_input, tokens_output, cost_usd,
                     duration_ms, created_at)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """
                cursor.execute(sql, (
                    result.task_id,
                    result.executor,
                    result.executor_type.value if isinstance(result.executor_type, ExecutorType) else result.executor_type,
                    result.model_name,
                    result.request,
                    result.response,
                    result.status,
                    result.error_message,
                    result.tokens_input,
                    result.tokens_output,
                    result.cost_usd,
                    result.duration_ms,
                    result.created_at or datetime.now(),
                ))
                return cursor.lastrowid

    def get_results(self, task_id: int) -> List[TaskResult]:
        """Holt Ergebnisse für Task"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    "SELECT * FROM task_results WHERE task_id = %s ORDER BY created_at DESC",
                    (task_id,)
                )
                rows = cursor.fetchall()
                return [self._row_to_result(row) for row in rows]

    # ==================== STATISTICS ====================

    def get_statistics(self) -> Dict[str, Any]:
        """Holt Task-Statistiken"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                stats = {}

                # Task-Zähler nach Status
                cursor.execute("""
                    SELECT status, COUNT(*) as cnt
                    FROM tasks
                    GROUP BY status
                """)
                stats["by_status"] = {row["status"]: row["cnt"] for row in cursor.fetchall()}

                # Task-Zähler nach Typ
                cursor.execute("""
                    SELECT type, COUNT(*) as cnt
                    FROM tasks
                    GROUP BY type
                """)
                stats["by_type"] = {row["type"]: row["cnt"] for row in cursor.fetchall()}

                # Token-Statistiken
                cursor.execute("""
                    SELECT
                        SUM(tokens_input) as total_input,
                        SUM(tokens_output) as total_output,
                        SUM(cost_usd) as total_cost
                    FROM task_results
                """)
                token_row = cursor.fetchone()
                stats["tokens"] = {
                    "total_input": token_row["total_input"] or 0,
                    "total_output": token_row["total_output"] or 0,
                    "total_cost_usd": float(token_row["total_cost"] or 0),
                }

                # Modell-Nutzung
                cursor.execute("""
                    SELECT model_name, COUNT(*) as cnt, SUM(tokens_input + tokens_output) as tokens
                    FROM task_results
                    WHERE model_name IS NOT NULL
                    GROUP BY model_name
                """)
                stats["model_usage"] = [
                    {"model": row["model_name"], "count": row["cnt"], "tokens": row["tokens"] or 0}
                    for row in cursor.fetchall()
                ]

                return stats

    # ==================== HELPER ====================

    def _row_to_task(self, row: Dict) -> Task:
        """Konvertiert DB-Row zu Task"""
        import json
        return Task(
            id=row["id"],
            uuid=row["uuid"],
            title=row["title"],
            description=row.get("description"),
            type=TaskType(row["type"]) if row.get("type") else TaskType.AI_TASK,
            status=TaskStatus(row["status"]) if row.get("status") else TaskStatus.PENDING,
            created_by=row.get("created_by", ""),
            created_by_type=row.get("created_by_type", "ai"),
            parent_task_id=row.get("parent_task_id"),
            due_date=row.get("due_date"),
            created_at=row.get("created_at"),
            updated_at=row.get("updated_at"),
            completed_at=row.get("completed_at"),
            metadata=json.loads(row["metadata"]) if row.get("metadata") else {},
        )

    def _row_to_assignment(self, row: Dict) -> TaskAssignment:
        """Konvertiert DB-Row zu TaskAssignment"""
        return TaskAssignment(
            id=row["id"],
            task_id=row["task_id"],
            assignee=row["assignee"],
            assignee_type=ExecutorType(row["assignee_type"]) if row.get("assignee_type") else ExecutorType.HUMAN,
            model_name=row.get("model_name"),
            status=row.get("status", "pending"),
            assigned_at=row.get("assigned_at"),
            notes=row.get("notes"),
        )

    def _row_to_result(self, row: Dict) -> TaskResult:
        """Konvertiert DB-Row zu TaskResult"""
        return TaskResult(
            id=row["id"],
            task_id=row["task_id"],
            executor=row["executor"],
            executor_type=ExecutorType(row["executor_type"]) if row.get("executor_type") else ExecutorType.OLLAMA,
            model_name=row.get("model_name"),
            request=row.get("request"),
            response=row.get("response"),
            status=row.get("status", "success"),
            error_message=row.get("error_message"),
            tokens_input=row.get("tokens_input", 0),
            tokens_output=row.get("tokens_output", 0),
            cost_usd=float(row.get("cost_usd", 0)),
            duration_ms=row.get("duration_ms", 0),
            created_at=row.get("created_at"),
        )
← Übersicht Graph