{
"tool_response": {
"filePath": "\/var\/www\/mcp-servers\/mcp_tasks\/infrastructure\/task_repository.py",
"oldString": "DatabaseConnection.get_connection()",
"newString": "SimpleDbConnection.get_connection(Config)",
"originalFile": "\"\"\"Task Repository - Datenbankzugriff für Tasks.\"\"\"\n\nimport uuid\nfrom datetime import datetime\nfrom typing import Any, Dict, List, Optional\n\nfrom config import Config\nfrom domain.contracts import (\n ExecutorType,\n Task,\n TaskAssignment,\n TaskResult,\n TaskStatus,\n TaskType,\n)\nfrom shared.infrastructure import SimpleDbConnection\n\n\nclass TaskRepository:\n \"\"\"Repository für Task-CRUD-Operationen\"\"\"\n\n # ==================== TASK CRUD ====================\n\n def find_by_id(self, task_id: int) -> Optional[Task]:\n \"\"\"Findet Task nach ID\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n cursor.execute(\"SELECT * FROM tasks WHERE id = %s\", (task_id,))\n row = cursor.fetchone()\n return self._row_to_task(row) if row else None\n\n def find_all(\n self,\n status: Optional[str] = None,\n task_type: Optional[str] = None,\n search: Optional[str] = None,\n limit: int = 50,\n offset: int = 0,\n ) -> List[Task]:\n \"\"\"Findet Tasks mit Filtern\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n sql = \"SELECT * FROM tasks WHERE 1=1\"\n params = []\n\n if status:\n sql += \" AND status = %s\"\n params.append(status)\n\n if task_type:\n sql += \" AND type = %s\"\n params.append(task_type)\n\n if search:\n sql += \" AND (title LIKE %s OR description LIKE %s)\"\n params.extend([f\"%{search}%\", f\"%{search}%\"])\n\n sql += \" ORDER BY created_at DESC LIMIT %s OFFSET %s\"\n params.extend([limit, offset])\n\n cursor.execute(sql, params)\n rows = cursor.fetchall()\n return [self._row_to_task(row) for row in rows]\n\n def count(\n self,\n status: Optional[str] = None,\n task_type: Optional[str] = None,\n search: Optional[str] = None,\n ) -> int:\n \"\"\"Zählt Tasks mit Filtern\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n sql = \"SELECT COUNT(*) as cnt FROM tasks WHERE 1=1\"\n params = []\n\n if status:\n sql += \" AND status = %s\"\n params.append(status)\n\n if task_type:\n sql += \" AND type = %s\"\n params.append(task_type)\n\n if search:\n sql += \" AND (title LIKE %s OR description LIKE %s)\"\n params.extend([f\"%{search}%\", f\"%{search}%\"])\n\n cursor.execute(sql, params)\n row = cursor.fetchone()\n return row[\"cnt\"] if row else 0\n\n def create(self, task: Task) -> int:\n \"\"\"Erstellt neuen Task, gibt ID zurück\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n task.uuid = str(uuid.uuid4())\n task.created_at = datetime.now()\n task.updated_at = datetime.now()\n\n sql = \"\"\"\n INSERT INTO tasks\n (uuid, title, description, type, status, created_by, created_by_type,\n parent_task_id, due_date, created_at, updated_at, metadata)\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\n \"\"\"\n\n import json\n cursor.execute(sql, (\n task.uuid,\n task.title,\n task.description,\n task.type.value if isinstance(task.type, TaskType) else task.type,\n task.status.value if isinstance(task.status, TaskStatus) else task.status,\n task.created_by,\n task.created_by_type,\n task.parent_task_id,\n task.due_date,\n task.created_at,\n task.updated_at,\n json.dumps(task.metadata) if task.metadata else None,\n ))\n return cursor.lastrowid\n\n def update(self, task_id: int, updates: Dict[str, Any]) -> bool:\n \"\"\"Aktualisiert Task-Felder\"\"\"\n if not updates:\n return False\n\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n # Erlaubte Felder\n allowed = {\"title\", \"description\", \"type\", \"status\", \"due_date\", \"metadata\"}\n filtered = {k: v for k, v in updates.items() if k in allowed}\n\n if not filtered:\n return False\n\n filtered[\"updated_at\"] = datetime.now()\n\n # Wenn Status auf completed, setze completed_at\n if filtered.get(\"status\") == \"completed\":\n filtered[\"completed_at\"] = datetime.now()\n\n set_clause = \", \".join([f\"{k} = %s\" for k in filtered.keys()])\n sql = f\"UPDATE tasks SET {set_clause} WHERE id = %s\"\n\n values = list(filtered.values()) + [task_id]\n cursor.execute(sql, values)\n return cursor.rowcount > 0\n\n def delete(self, task_id: int) -> bool:\n \"\"\"Löscht Task\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n cursor.execute(\"DELETE FROM tasks WHERE id = %s\", (task_id,))\n return cursor.rowcount > 0\n\n # ==================== ASSIGNMENTS ====================\n\n def create_assignment(self, assignment: TaskAssignment) -> int:\n \"\"\"Erstellt Task-Zuweisung\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n sql = \"\"\"\n INSERT INTO task_assignments\n (task_id, assignee, assignee_type, model_name, status, assigned_at, notes)\n VALUES (%s, %s, %s, %s, %s, %s, %s)\n \"\"\"\n cursor.execute(sql, (\n assignment.task_id,\n assignment.assignee,\n assignment.assignee_type.value if isinstance(assignment.assignee_type, ExecutorType) else assignment.assignee_type,\n assignment.model_name,\n assignment.status,\n assignment.assigned_at or datetime.now(),\n assignment.notes,\n ))\n return cursor.lastrowid\n\n def get_assignments(self, task_id: int) -> List[TaskAssignment]:\n \"\"\"Holt Zuweisungen für Task\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n cursor.execute(\n \"SELECT * FROM task_assignments WHERE task_id = %s ORDER BY assigned_at DESC\",\n (task_id,)\n )\n rows = cursor.fetchall()\n return [self._row_to_assignment(row) for row in rows]\n\n # ==================== RESULTS ====================\n\n def create_result(self, result: TaskResult) -> int:\n \"\"\"Speichert Task-Ergebnis\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n sql = \"\"\"\n INSERT INTO task_results\n (task_id, executor, executor_type, model_name, request, response,\n status, error_message, tokens_input, tokens_output, cost_usd,\n duration_ms, created_at)\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\n \"\"\"\n cursor.execute(sql, (\n result.task_id,\n result.executor,\n result.executor_type.value if isinstance(result.executor_type, ExecutorType) else result.executor_type,\n result.model_name,\n result.request,\n result.response,\n result.status,\n result.error_message,\n result.tokens_input,\n result.tokens_output,\n result.cost_usd,\n result.duration_ms,\n result.created_at or datetime.now(),\n ))\n return cursor.lastrowid\n\n def get_results(self, task_id: int) -> List[TaskResult]:\n \"\"\"Holt Ergebnisse für Task\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n cursor.execute(\n \"SELECT * FROM task_results WHERE task_id = %s ORDER BY created_at DESC\",\n (task_id,)\n )\n rows = cursor.fetchall()\n return [self._row_to_result(row) for row in rows]\n\n # ==================== STATISTICS ====================\n\n def get_statistics(self) -> Dict[str, Any]:\n \"\"\"Holt Task-Statistiken\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n stats = {}\n\n # Task-Zähler nach Status\n cursor.execute(\"\"\"\n SELECT status, COUNT(*) as cnt\n FROM tasks\n GROUP BY status\n \"\"\")\n stats[\"by_status\"] = {row[\"status\"]: row[\"cnt\"] for row in cursor.fetchall()}\n\n # Task-Zähler nach Typ\n cursor.execute(\"\"\"\n SELECT type, COUNT(*) as cnt\n FROM tasks\n GROUP BY type\n \"\"\")\n stats[\"by_type\"] = {row[\"type\"]: row[\"cnt\"] for row in cursor.fetchall()}\n\n # Token-Statistiken\n cursor.execute(\"\"\"\n SELECT\n SUM(tokens_input) as t... [TRUNCATED-1cc185a2d658281e]",
"structuredPatch": [
{
"oldStart": 23,
"oldLines": 7,
"newStart": 23,
"newLines": 7,
"lines": [
" ",
" def find_by_id(self, task_id: int) -> Optional[Task]:",
" \"\"\"Findet Task nach ID\"\"\"",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" cursor.execute(\"SELECT * FROM tasks WHERE id = %s\", (task_id,))",
" row = cursor.fetchone()"
]
},
{
"oldStart": 38,
"oldLines": 7,
"newStart": 38,
"newLines": 7,
"lines": [
" offset: int = 0,",
" ) -> List[Task]:",
" \"\"\"Findet Tasks mit Filtern\"\"\"",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" sql = \"SELECT * FROM tasks WHERE 1=1\"",
" params = []"
]
},
{
"oldStart": 69,
"oldLines": 7,
"newStart": 69,
"newLines": 7,
"lines": [
" search: Optional[str] = None,",
" ) -> int:",
" \"\"\"Zählt Tasks mit Filtern\"\"\"",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" sql = \"SELECT COUNT(*) as cnt FROM tasks WHERE 1=1\"",
" params = []"
]
},
{
"oldStart": 92,
"oldLines": 7,
"newStart": 92,
"newLines": 7,
"lines": [
" ",
" def create(self, task: Task) -> int:",
" \"\"\"Erstellt neuen Task, gibt ID zurück\"\"\"",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" task.uuid = str(uuid.uuid4())",
" task.created_at = datetime.now()"
]
},
{
"oldStart": 127,
"oldLines": 7,
"newStart": 127,
"newLines": 7,
"lines": [
" if not updates:",
" return False",
" ",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" # Erlaubte Felder",
" allowed = {\"title\", \"description\", \"type\", \"status\", \"due_date\", \"metadata\"}"
]
},
{
"oldStart": 151,
"oldLines": 7,
"newStart": 151,
"newLines": 7,
"lines": [
" ",
" def delete(self, task_id: int) -> bool:",
" \"\"\"Löscht Task\"\"\"",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" cursor.execute(\"DELETE FROM tasks WHERE id = %s\", (task_id,))",
" return cursor.rowcount > 0"
]
},
{
"oldStart": 160,
"oldLines": 7,
"newStart": 160,
"newLines": 7,
"lines": [
" ",
" def create_assignment(self, assignment: TaskAssignment) -> int:",
" \"\"\"Erstellt Task-Zuweisung\"\"\"",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" sql = \"\"\"",
" INSERT INTO task_assignments"
]
},
{
"oldStart": 180,
"oldLines": 7,
"newStart": 180,
"newLines": 7,
"lines": [
" ",
" def get_assignments(self, task_id: int) -> List[TaskAssignment]:",
" \"\"\"Holt Zuweisungen für Task\"\"\"",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" cursor.execute(",
" \"SELECT * FROM task_assignments WHERE task_id = %s ORDER BY assigned_at DESC\","
]
},
{
"oldStart": 193,
"oldLines": 7,
"newStart": 193,
"newLines": 7,
"lines": [
" ",
" def create_result(self, result: TaskResult) -> int:",
" \"\"\"Speichert Task-Ergebnis\"\"\"",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" sql = \"\"\"",
" INSERT INTO task_results"
]
},
{
"oldStart": 221,
"oldLines": 7,
"newStart": 221,
"newLines": 7,
"lines": [
" ",
" def get_results(self, task_id: int) -> List[TaskResult]:",
" \"\"\"Holt Ergebnisse für Task\"\"\"",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" cursor.execute(",
" \"SELECT * FROM task_results WHERE task_id = %s ORDER BY created_at DESC\","
]
},
{
"oldStart": 234,
"oldLines": 7,
"newStart": 234,
"newLines": 7,
"lines": [
" ",
" def get_statistics(self) -> Dict[str, Any]:",
" \"\"\"Holt Task-Statistiken\"\"\"",
"- with DatabaseConnection.get_connection() as conn:",
"+ with SimpleDbConnection.get_connection(Config) as conn:",
" with conn.cursor() as cursor:",
" stats = {}",
" "
]
}
],
"userModified": false,
"replaceAll": true
}
}