{
"event": "PreToolUse",
"tool_name": "Read",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp_tasks\/infrastructure\/task_repository.py",
"limit": 50
}
}
{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/mcp-servers\/mcp_tasks\/infrastructure\/task_repository.py",
"content": "\"\"\"Task Repository - Datenbankzugriff für Tasks\"\"\"\nimport sys\nimport uuid\nfrom datetime import datetime\nfrom typing import Optional, List, Dict, Any\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_tasks\")\nfrom domain.contracts import Task, TaskAssignment, TaskResult, TaskStatus, TaskType, ExecutorType\nfrom infrastructure.db_connection import DatabaseConnection\n\n\nclass TaskRepository:\n \"\"\"Repository für Task-CRUD-Operationen\"\"\"\n\n # ==================== TASK CRUD ====================\n\n def find_by_id(self, task_id: int) -> Optional[Task]:\n \"\"\"Findet Task nach ID\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n cursor.execute(\"SELECT * FROM tasks WHERE id = %s\", (task_id,))\n row = cursor.fetchone()\n return self._row_to_task(row) if row else None\n\n def find_all(\n self,\n status: Optional[str] = None,\n task_type: Optional[str] = None,\n search: Optional[str] = None,\n limit: int = 50,\n offset: int = 0,\n ) -> List[Task]:\n \"\"\"Findet Tasks mit Filtern\"\"\"\n with DatabaseConnection.get_connection() as conn:\n with conn.cursor() as cursor:\n sql = \"SELECT * FROM tasks WHERE 1=1\"\n params = []\n\n if status:\n sql += \" AND status = %s\"\n params.append(status)\n\n if task_type:\n sql += \" AND type = %s\"\n params.append(task_type)\n\n if search:\n sql += \" AND (title LIKE %s OR description LIKE %s)\"\n params.extend([f\"%{search}%\", f\"%{search}%\"])\n",
"numLines": 50,
"startLine": 1,
"totalLines": 332
}
}
}