docs_repository.py

Code Hygiene Score: 76

Keine Issues gefunden.

Dependencies 9

Klassen 1

Funktionen 1

Code

"""Repository für Dokumentation CRUD-Operationen."""

from datetime import datetime
from typing import Any, Dict, List, Optional

from config import Config
from domain.dokumentation import DocStatus, Dokumentation
from shared.infrastructure import SimpleDbConnection


class DocsRepository:
    """Repository für ki_system.dokumentation Tabelle."""

    def find_by_id(self, doc_id: int) -> Optional[Dokumentation]:
        """Findet Dokument nach ID."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    "SELECT * FROM dokumentation WHERE id = %s", (doc_id,)
                )
                row = cursor.fetchone()
                return self._row_to_doc(row) if row else None

    def find_by_path(self, path: str) -> Optional[Dokumentation]:
        """Findet Dokument nach Pfad."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    "SELECT * FROM dokumentation WHERE path = %s", (path,)
                )
                row = cursor.fetchone()
                return self._row_to_doc(row) if row else None

    def find_by_slug(
        self, slug: str, parent_id: Optional[int] = None
    ) -> Optional[Dokumentation]:
        """Findet Dokument nach Slug (optional mit Parent)."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                if parent_id is not None:
                    cursor.execute(
                        "SELECT * FROM dokumentation WHERE slug = %s AND parent_id = %s",
                        (slug, parent_id),
                    )
                else:
                    cursor.execute(
                        "SELECT * FROM dokumentation WHERE slug = %s AND parent_id IS NULL",
                        (slug,),
                    )
                row = cursor.fetchone()
                return self._row_to_doc(row) if row else None

    def find_all(
        self,
        status: Optional[str] = None,
        parent_id: Optional[int] = None,
        search: Optional[str] = None,
        limit: int = 20,
        offset: int = 0,
    ) -> List[Dokumentation]:
        """Findet alle Dokumente mit optionalen Filtern."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                conditions = []
                params: List[Any] = []

                if status:
                    conditions.append("status = %s")
                    params.append(status)

                if parent_id is not None:
                    conditions.append("parent_id = %s")
                    params.append(parent_id)

                if search:
                    conditions.append(
                        "(title LIKE %s OR description LIKE %s OR content LIKE %s)"
                    )
                    search_term = f"%{search}%"
                    params.extend([search_term, search_term, search_term])

                where_clause = " AND ".join(conditions) if conditions else "1=1"

                # Limit begrenzen
                limit = min(limit, Config.MAX_RESULTS)

                sql = f"""
                    SELECT * FROM dokumentation
                    WHERE {where_clause}
                    ORDER BY depth, sort_order, title
                    LIMIT %s OFFSET %s
                """
                params.extend([limit, offset])

                cursor.execute(sql, params)
                rows = cursor.fetchall()
                return [self._row_to_doc(row) for row in rows]

    def find_children(self, parent_id: int) -> List[Dokumentation]:
        """Findet alle direkten Kinder eines Dokuments."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    """SELECT * FROM dokumentation
                    WHERE parent_id = %s
                    ORDER BY sort_order, title""",
                    (parent_id,),
                )
                rows = cursor.fetchall()
                return [self._row_to_doc(row) for row in rows]

    def find_root_documents(self) -> List[Dokumentation]:
        """Findet alle Root-Dokumente (ohne Parent)."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    """SELECT * FROM dokumentation
                    WHERE parent_id IS NULL
                    ORDER BY sort_order, title"""
                )
                rows = cursor.fetchall()
                return [self._row_to_doc(row) for row in rows]

    def get_hierarchy(self) -> List[Dict[str, Any]]:
        """Gibt kompletten Dokumentationsbaum zurück."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    """SELECT id, parent_id, slug, path, title, status, sort_order, depth
                    FROM dokumentation
                    ORDER BY depth, sort_order, title"""
                )
                rows = cursor.fetchall()

        # Baum aufbauen
        docs_by_id = {row["id"]: {**row, "children": []} for row in rows}
        roots = []

        for row in rows:
            doc = docs_by_id[row["id"]]
            if row["parent_id"] is None:
                roots.append(doc)
            elif row["parent_id"] in docs_by_id:
                docs_by_id[row["parent_id"]]["children"].append(doc)

        return roots

    def get_breadcrumb(self, doc_id: int) -> List[Dict[str, Any]]:
        """Gibt Breadcrumb-Pfad für ein Dokument zurück."""
        breadcrumb: List[Dict[str, Any]] = []
        current_id: Optional[int] = doc_id

        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                while current_id is not None:
                    cursor.execute(
                        "SELECT id, parent_id, path, title FROM dokumentation WHERE id = %s",
                        (current_id,),
                    )
                    row = cursor.fetchone()
                    if row:
                        breadcrumb.insert(
                            0,
                            {
                                "id": row["id"],
                                "path": row["path"],
                                "title": row["title"],
                            },
                        )
                        current_id = row["parent_id"]
                    else:
                        break

        return breadcrumb

    def get_siblings(self, doc_id: int) -> List[Dokumentation]:
        """Findet Geschwister-Dokumente (gleicher Parent)."""
        doc = self.find_by_id(doc_id)
        if not doc:
            return []

        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                if doc.parent_id is not None:
                    cursor.execute(
                        """SELECT * FROM dokumentation
                        WHERE parent_id = %s AND id != %s
                        ORDER BY sort_order, title""",
                        (doc.parent_id, doc_id),
                    )
                else:
                    cursor.execute(
                        """SELECT * FROM dokumentation
                        WHERE parent_id IS NULL AND id != %s
                        ORDER BY sort_order, title""",
                        (doc_id,),
                    )
                rows = cursor.fetchall()
                return [self._row_to_doc(row) for row in rows]

    def count(
        self,
        status: Optional[str] = None,
        parent_id: Optional[int] = None,
        search: Optional[str] = None,
    ) -> int:
        """Zählt Dokumente mit optionalen Filtern."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                conditions = []
                params: List[Any] = []

                if status:
                    conditions.append("status = %s")
                    params.append(status)

                if parent_id is not None:
                    conditions.append("parent_id = %s")
                    params.append(parent_id)

                if search:
                    conditions.append(
                        "(title LIKE %s OR description LIKE %s OR content LIKE %s)"
                    )
                    search_term = f"%{search}%"
                    params.extend([search_term, search_term, search_term])

                where_clause = " AND ".join(conditions) if conditions else "1=1"

                cursor.execute(
                    f"SELECT COUNT(*) as count FROM dokumentation WHERE {where_clause}",
                    params,
                )
                result = cursor.fetchone()
                return result["count"] if result else 0

    def create(self, doc: Dokumentation) -> int:
        """Erstellt neues Dokument, gibt ID zurück."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                now = datetime.now()
                cursor.execute(
                    """INSERT INTO dokumentation
                    (parent_id, slug, path, title, description, content, status,
                     sort_order, depth, created_at, updated_at)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
                    (
                        doc.parent_id,
                        doc.slug,
                        doc.path,
                        doc.title,
                        doc.description,
                        doc.content,
                        doc.status.value
                        if isinstance(doc.status, DocStatus)
                        else doc.status,
                        doc.sort_order,
                        doc.depth,
                        now,
                        now,
                    ),
                )
                return cursor.lastrowid or 0

    def update(self, doc_id: int, updates: Dict[str, Any]) -> bool:
        """Aktualisiert Dokument-Felder."""
        if not updates:
            return False

        allowed_fields = {
            "title",
            "description",
            "content",
            "status",
            "slug",
            "path",
            "sort_order",
            "parent_id",
            "depth",
        }
        filtered_updates = {k: v for k, v in updates.items() if k in allowed_fields}

        if not filtered_updates:
            return False

        # updated_at automatisch setzen
        filtered_updates["updated_at"] = datetime.now()

        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                set_clause = ", ".join([f"{k} = %s" for k in filtered_updates.keys()])
                values = list(filtered_updates.values())
                values.append(doc_id)

                cursor.execute(
                    f"UPDATE dokumentation SET {set_clause} WHERE id = %s",
                    values,
                )
                return cursor.rowcount > 0

    def delete(self, doc_id: int) -> bool:
        """Löscht ein Dokument (keine Kaskade!)."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                # Prüfen ob Kinder existieren
                cursor.execute(
                    "SELECT COUNT(*) as count FROM dokumentation WHERE parent_id = %s",
                    (doc_id,),
                )
                result = cursor.fetchone()
                if result and result["count"] > 0:
                    raise ValueError(
                        f"Dokument hat {result['count']} Kind-Dokumente. "
                        "Bitte zuerst diese löschen."
                    )

                cursor.execute("DELETE FROM dokumentation WHERE id = %s", (doc_id,))
                return cursor.rowcount > 0

    def move(
        self, doc_id: int, new_parent_id: Optional[int], new_sort_order: Optional[int] = None
    ) -> bool:
        """Verschiebt Dokument zu neuem Parent."""
        doc = self.find_by_id(doc_id)
        if not doc:
            return False

        updates: Dict[str, Any] = {"parent_id": new_parent_id}

        # Neue Tiefe berechnen
        if new_parent_id is not None:
            parent = self.find_by_id(new_parent_id)
            if parent:
                updates["depth"] = parent.depth + 1
                # Pfad aktualisieren
                updates["path"] = f"{parent.path}/{doc.slug}"
        else:
            updates["depth"] = 0
            updates["path"] = f"/{doc.slug}"

        if new_sort_order is not None:
            updates["sort_order"] = new_sort_order

        return self.update(doc_id, updates)

    def get_statistics(self) -> Dict[str, Any]:
        """Gibt Statistiken über alle Dokumente zurück."""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                # Gesamt und nach Status
                cursor.execute(
                    """
                    SELECT
                        COUNT(*) as total,
                        SUM(CASE WHEN status = 'draft' THEN 1 ELSE 0 END) as draft,
                        SUM(CASE WHEN status = 'published' THEN 1 ELSE 0 END) as published,
                        SUM(CASE WHEN status = 'archived' THEN 1 ELSE 0 END) as archived,
                        MAX(depth) as max_depth
                    FROM dokumentation
                """
                )
                stats = cursor.fetchone()

                # Root-Dokumente zählen
                cursor.execute(
                    "SELECT COUNT(*) as count FROM dokumentation WHERE parent_id IS NULL"
                )
                root_count = cursor.fetchone()

                return {
                    "total": stats["total"] or 0,
                    "by_status": {
                        "draft": stats["draft"] or 0,
                        "published": stats["published"] or 0,
                        "archived": stats["archived"] or 0,
                    },
                    "root_documents": root_count["count"] if root_count else 0,
                    "max_depth": stats["max_depth"] or 0,
                }

    def _row_to_doc(self, row: Dict[str, Any]) -> Dokumentation:
        """Konvertiert DB-Row zu Dokumentation-Entity."""
        status = row.get("status", "draft")
        if isinstance(status, str):
            try:
                status = DocStatus(status)
            except ValueError:
                status = DocStatus.DRAFT

        return Dokumentation(
            id=row.get("id"),
            parent_id=row.get("parent_id"),
            slug=row.get("slug", ""),
            path=row.get("path", ""),
            title=row.get("title", ""),
            description=row.get("description"),
            content=row.get("content", ""),
            status=status,
            sort_order=row.get("sort_order", 0),
            depth=row.get("depth", 0),
            created_at=row.get("created_at", datetime.now()),
            updated_at=row.get("updated_at", datetime.now()),
        )


# Singleton-Instanz
_repository_instance: Optional[DocsRepository] = None


def get_repository() -> DocsRepository:
    """Gibt Singleton-Repository zurück."""
    global _repository_instance
    if _repository_instance is None:
        _repository_instance = DocsRepository()
    return _repository_instance
← Übersicht Graph