task_repository.py
- Pfad:
/var/www/mcp-servers/mcp-tasks/infrastructure/task_repository.py - Namespace: -
- Zeilen: 339 | Größe: 13,447 Bytes
- Geändert: 2025-12-28 12:57:14 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 66
- Dependencies: 0 (25%)
- LOC: 53 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 15
- use uuid
- use datetime.datetime
- use typing.Any
- use typing.Dict
- use typing.List
- use typing.Optional
- use config.Config
- use domain.contracts.ExecutorType
- use domain.contracts.Task
- use domain.contracts.TaskAssignment
- use domain.contracts.TaskResult
- use domain.contracts.TaskStatus
- use domain.contracts.TaskType
- use shared.infrastructure.SimpleDbConnection
- use json
Klassen 1
-
TaskRepositoryclass Zeile 19
Code
"""Task Repository - Datenbankzugriff für Tasks."""
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from config import Config
from domain.contracts import (
ExecutorType,
Task,
TaskAssignment,
TaskResult,
TaskStatus,
TaskType,
)
from shared.infrastructure import SimpleDbConnection
class TaskRepository:
"""Repository für Task-CRUD-Operationen"""
# ==================== TASK CRUD ====================
def find_by_id(self, task_id: int) -> Optional[Task]:
"""Findet Task nach ID"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM tasks WHERE id = %s", (task_id,))
row = cursor.fetchone()
return self._row_to_task(row) if row else None
def find_all(
self,
status: Optional[str] = None,
task_type: Optional[str] = None,
search: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> List[Task]:
"""Findet Tasks mit Filtern"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
sql = "SELECT * FROM tasks WHERE 1=1"
params = []
if status:
sql += " AND status = %s"
params.append(status)
if task_type:
sql += " AND type = %s"
params.append(task_type)
if search:
sql += " AND (title LIKE %s OR description LIKE %s)"
params.extend([f"%{search}%", f"%{search}%"])
sql += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
cursor.execute(sql, params)
rows = cursor.fetchall()
return [self._row_to_task(row) for row in rows]
def count(
self,
status: Optional[str] = None,
task_type: Optional[str] = None,
search: Optional[str] = None,
) -> int:
"""Zählt Tasks mit Filtern"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
sql = "SELECT COUNT(*) as cnt FROM tasks WHERE 1=1"
params = []
if status:
sql += " AND status = %s"
params.append(status)
if task_type:
sql += " AND type = %s"
params.append(task_type)
if search:
sql += " AND (title LIKE %s OR description LIKE %s)"
params.extend([f"%{search}%", f"%{search}%"])
cursor.execute(sql, params)
row = cursor.fetchone()
return row["cnt"] if row else 0
def create(self, task: Task) -> int:
"""Erstellt neuen Task, gibt ID zurück"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
task.uuid = str(uuid.uuid4())
task.created_at = datetime.now()
task.updated_at = datetime.now()
sql = """
INSERT INTO tasks
(uuid, title, description, type, status, created_by, created_by_type,
parent_task_id, due_date, created_at, updated_at, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
import json
cursor.execute(sql, (
task.uuid,
task.title,
task.description,
task.type.value if isinstance(task.type, TaskType) else task.type,
task.status.value if isinstance(task.status, TaskStatus) else task.status,
task.created_by,
task.created_by_type,
task.parent_task_id,
task.due_date,
task.created_at,
task.updated_at,
json.dumps(task.metadata) if task.metadata else None,
))
return cursor.lastrowid
def update(self, task_id: int, updates: Dict[str, Any]) -> bool:
"""Aktualisiert Task-Felder"""
if not updates:
return False
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
# Erlaubte Felder
allowed = {"title", "description", "type", "status", "due_date", "metadata"}
filtered = {k: v for k, v in updates.items() if k in allowed}
if not filtered:
return False
filtered["updated_at"] = datetime.now()
# Wenn Status auf completed, setze completed_at
if filtered.get("status") == "completed":
filtered["completed_at"] = datetime.now()
set_clause = ", ".join([f"{k} = %s" for k in filtered.keys()])
sql = f"UPDATE tasks SET {set_clause} WHERE id = %s"
values = list(filtered.values()) + [task_id]
cursor.execute(sql, values)
return cursor.rowcount > 0
def delete(self, task_id: int) -> bool:
"""Löscht Task"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM tasks WHERE id = %s", (task_id,))
return cursor.rowcount > 0
# ==================== ASSIGNMENTS ====================
def create_assignment(self, assignment: TaskAssignment) -> int:
"""Erstellt Task-Zuweisung"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
sql = """
INSERT INTO task_assignments
(task_id, assignee, assignee_type, model_name, status, assigned_at, notes)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
assignment.task_id,
assignment.assignee,
assignment.assignee_type.value if isinstance(assignment.assignee_type, ExecutorType) else assignment.assignee_type,
assignment.model_name,
assignment.status,
assignment.assigned_at or datetime.now(),
assignment.notes,
))
return cursor.lastrowid
def get_assignments(self, task_id: int) -> List[TaskAssignment]:
"""Holt Zuweisungen für Task"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT * FROM task_assignments WHERE task_id = %s ORDER BY assigned_at DESC",
(task_id,)
)
rows = cursor.fetchall()
return [self._row_to_assignment(row) for row in rows]
# ==================== RESULTS ====================
def create_result(self, result: TaskResult) -> int:
"""Speichert Task-Ergebnis"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
sql = """
INSERT INTO task_results
(task_id, executor, executor_type, model_name, request, response,
status, error_message, tokens_input, tokens_output, cost_usd,
duration_ms, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
result.task_id,
result.executor,
result.executor_type.value if isinstance(result.executor_type, ExecutorType) else result.executor_type,
result.model_name,
result.request,
result.response,
result.status,
result.error_message,
result.tokens_input,
result.tokens_output,
result.cost_usd,
result.duration_ms,
result.created_at or datetime.now(),
))
return cursor.lastrowid
def get_results(self, task_id: int) -> List[TaskResult]:
"""Holt Ergebnisse für Task"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT * FROM task_results WHERE task_id = %s ORDER BY created_at DESC",
(task_id,)
)
rows = cursor.fetchall()
return [self._row_to_result(row) for row in rows]
# ==================== STATISTICS ====================
def get_statistics(self) -> Dict[str, Any]:
"""Holt Task-Statistiken"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
stats = {}
# Task-Zähler nach Status
cursor.execute("""
SELECT status, COUNT(*) as cnt
FROM tasks
GROUP BY status
""")
stats["by_status"] = {row["status"]: row["cnt"] for row in cursor.fetchall()}
# Task-Zähler nach Typ
cursor.execute("""
SELECT type, COUNT(*) as cnt
FROM tasks
GROUP BY type
""")
stats["by_type"] = {row["type"]: row["cnt"] for row in cursor.fetchall()}
# Token-Statistiken
cursor.execute("""
SELECT
SUM(tokens_input) as total_input,
SUM(tokens_output) as total_output,
SUM(cost_usd) as total_cost
FROM task_results
""")
token_row = cursor.fetchone()
stats["tokens"] = {
"total_input": token_row["total_input"] or 0,
"total_output": token_row["total_output"] or 0,
"total_cost_usd": float(token_row["total_cost"] or 0),
}
# Modell-Nutzung
cursor.execute("""
SELECT model_name, COUNT(*) as cnt, SUM(tokens_input + tokens_output) as tokens
FROM task_results
WHERE model_name IS NOT NULL
GROUP BY model_name
""")
stats["model_usage"] = [
{"model": row["model_name"], "count": row["cnt"], "tokens": row["tokens"] or 0}
for row in cursor.fetchall()
]
return stats
# ==================== HELPER ====================
def _row_to_task(self, row: Dict) -> Task:
"""Konvertiert DB-Row zu Task"""
import json
return Task(
id=row["id"],
uuid=row["uuid"],
title=row["title"],
description=row.get("description"),
type=TaskType(row["type"]) if row.get("type") else TaskType.AI_TASK,
status=TaskStatus(row["status"]) if row.get("status") else TaskStatus.PENDING,
created_by=row.get("created_by", ""),
created_by_type=row.get("created_by_type", "ai"),
parent_task_id=row.get("parent_task_id"),
due_date=row.get("due_date"),
created_at=row.get("created_at"),
updated_at=row.get("updated_at"),
completed_at=row.get("completed_at"),
metadata=json.loads(row["metadata"]) if row.get("metadata") else {},
)
def _row_to_assignment(self, row: Dict) -> TaskAssignment:
"""Konvertiert DB-Row zu TaskAssignment"""
return TaskAssignment(
id=row["id"],
task_id=row["task_id"],
assignee=row["assignee"],
assignee_type=ExecutorType(row["assignee_type"]) if row.get("assignee_type") else ExecutorType.HUMAN,
model_name=row.get("model_name"),
status=row.get("status", "pending"),
assigned_at=row.get("assigned_at"),
notes=row.get("notes"),
)
def _row_to_result(self, row: Dict) -> TaskResult:
"""Konvertiert DB-Row zu TaskResult"""
return TaskResult(
id=row["id"],
task_id=row["task_id"],
executor=row["executor"],
executor_type=ExecutorType(row["executor_type"]) if row.get("executor_type") else ExecutorType.OLLAMA,
model_name=row.get("model_name"),
request=row.get("request"),
response=row.get("response"),
status=row.get("status", "success"),
error_message=row.get("error_message"),
tokens_input=row.get("tokens_input", 0),
tokens_output=row.get("tokens_output", 0),
cost_usd=float(row.get("cost_usd", 0)),
duration_ms=row.get("duration_ms", 0),
created_at=row.get("created_at"),
)