docs_repository.py
- Pfad:
/var/www/mcp-servers/mcp-docs/infrastructure/docs_repository.py - Namespace: -
- Zeilen: 416 | Größe: 15,769 Bytes
- Geändert: 2025-12-28 12:38:48 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 76
- Dependencies: 60 (25%)
- LOC: 28 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 9
- use datetime.datetime
- use typing.Any
- use typing.Dict
- use typing.List
- use typing.Optional
- use config.Config
- use domain.dokumentation.DocStatus
- use domain.dokumentation.Dokumentation
- use shared.infrastructure.SimpleDbConnection
Klassen 1
-
DocsRepositoryclass Zeile 11
Funktionen 1
-
get_repository()Zeile 410
Code
"""Repository für Dokumentation CRUD-Operationen."""
from datetime import datetime
from typing import Any, Dict, List, Optional
from config import Config
from domain.dokumentation import DocStatus, Dokumentation
from shared.infrastructure import SimpleDbConnection
class DocsRepository:
"""Repository für ki_system.dokumentation Tabelle."""
def find_by_id(self, doc_id: int) -> Optional[Dokumentation]:
"""Findet Dokument nach ID."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT * FROM dokumentation WHERE id = %s", (doc_id,)
)
row = cursor.fetchone()
return self._row_to_doc(row) if row else None
def find_by_path(self, path: str) -> Optional[Dokumentation]:
"""Findet Dokument nach Pfad."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT * FROM dokumentation WHERE path = %s", (path,)
)
row = cursor.fetchone()
return self._row_to_doc(row) if row else None
def find_by_slug(
self, slug: str, parent_id: Optional[int] = None
) -> Optional[Dokumentation]:
"""Findet Dokument nach Slug (optional mit Parent)."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
if parent_id is not None:
cursor.execute(
"SELECT * FROM dokumentation WHERE slug = %s AND parent_id = %s",
(slug, parent_id),
)
else:
cursor.execute(
"SELECT * FROM dokumentation WHERE slug = %s AND parent_id IS NULL",
(slug,),
)
row = cursor.fetchone()
return self._row_to_doc(row) if row else None
def find_all(
self,
status: Optional[str] = None,
parent_id: Optional[int] = None,
search: Optional[str] = None,
limit: int = 20,
offset: int = 0,
) -> List[Dokumentation]:
"""Findet alle Dokumente mit optionalen Filtern."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
conditions = []
params: List[Any] = []
if status:
conditions.append("status = %s")
params.append(status)
if parent_id is not None:
conditions.append("parent_id = %s")
params.append(parent_id)
if search:
conditions.append(
"(title LIKE %s OR description LIKE %s OR content LIKE %s)"
)
search_term = f"%{search}%"
params.extend([search_term, search_term, search_term])
where_clause = " AND ".join(conditions) if conditions else "1=1"
# Limit begrenzen
limit = min(limit, Config.MAX_RESULTS)
sql = f"""
SELECT * FROM dokumentation
WHERE {where_clause}
ORDER BY depth, sort_order, title
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
cursor.execute(sql, params)
rows = cursor.fetchall()
return [self._row_to_doc(row) for row in rows]
def find_children(self, parent_id: int) -> List[Dokumentation]:
"""Findet alle direkten Kinder eines Dokuments."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute(
"""SELECT * FROM dokumentation
WHERE parent_id = %s
ORDER BY sort_order, title""",
(parent_id,),
)
rows = cursor.fetchall()
return [self._row_to_doc(row) for row in rows]
def find_root_documents(self) -> List[Dokumentation]:
"""Findet alle Root-Dokumente (ohne Parent)."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute(
"""SELECT * FROM dokumentation
WHERE parent_id IS NULL
ORDER BY sort_order, title"""
)
rows = cursor.fetchall()
return [self._row_to_doc(row) for row in rows]
def get_hierarchy(self) -> List[Dict[str, Any]]:
"""Gibt kompletten Dokumentationsbaum zurück."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute(
"""SELECT id, parent_id, slug, path, title, status, sort_order, depth
FROM dokumentation
ORDER BY depth, sort_order, title"""
)
rows = cursor.fetchall()
# Baum aufbauen
docs_by_id = {row["id"]: {**row, "children": []} for row in rows}
roots = []
for row in rows:
doc = docs_by_id[row["id"]]
if row["parent_id"] is None:
roots.append(doc)
elif row["parent_id"] in docs_by_id:
docs_by_id[row["parent_id"]]["children"].append(doc)
return roots
def get_breadcrumb(self, doc_id: int) -> List[Dict[str, Any]]:
"""Gibt Breadcrumb-Pfad für ein Dokument zurück."""
breadcrumb: List[Dict[str, Any]] = []
current_id: Optional[int] = doc_id
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
while current_id is not None:
cursor.execute(
"SELECT id, parent_id, path, title FROM dokumentation WHERE id = %s",
(current_id,),
)
row = cursor.fetchone()
if row:
breadcrumb.insert(
0,
{
"id": row["id"],
"path": row["path"],
"title": row["title"],
},
)
current_id = row["parent_id"]
else:
break
return breadcrumb
def get_siblings(self, doc_id: int) -> List[Dokumentation]:
"""Findet Geschwister-Dokumente (gleicher Parent)."""
doc = self.find_by_id(doc_id)
if not doc:
return []
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
if doc.parent_id is not None:
cursor.execute(
"""SELECT * FROM dokumentation
WHERE parent_id = %s AND id != %s
ORDER BY sort_order, title""",
(doc.parent_id, doc_id),
)
else:
cursor.execute(
"""SELECT * FROM dokumentation
WHERE parent_id IS NULL AND id != %s
ORDER BY sort_order, title""",
(doc_id,),
)
rows = cursor.fetchall()
return [self._row_to_doc(row) for row in rows]
def count(
self,
status: Optional[str] = None,
parent_id: Optional[int] = None,
search: Optional[str] = None,
) -> int:
"""Zählt Dokumente mit optionalen Filtern."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
conditions = []
params: List[Any] = []
if status:
conditions.append("status = %s")
params.append(status)
if parent_id is not None:
conditions.append("parent_id = %s")
params.append(parent_id)
if search:
conditions.append(
"(title LIKE %s OR description LIKE %s OR content LIKE %s)"
)
search_term = f"%{search}%"
params.extend([search_term, search_term, search_term])
where_clause = " AND ".join(conditions) if conditions else "1=1"
cursor.execute(
f"SELECT COUNT(*) as count FROM dokumentation WHERE {where_clause}",
params,
)
result = cursor.fetchone()
return result["count"] if result else 0
def create(self, doc: Dokumentation) -> int:
"""Erstellt neues Dokument, gibt ID zurück."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
now = datetime.now()
cursor.execute(
"""INSERT INTO dokumentation
(parent_id, slug, path, title, description, content, status,
sort_order, depth, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
(
doc.parent_id,
doc.slug,
doc.path,
doc.title,
doc.description,
doc.content,
doc.status.value
if isinstance(doc.status, DocStatus)
else doc.status,
doc.sort_order,
doc.depth,
now,
now,
),
)
return cursor.lastrowid or 0
def update(self, doc_id: int, updates: Dict[str, Any]) -> bool:
"""Aktualisiert Dokument-Felder."""
if not updates:
return False
allowed_fields = {
"title",
"description",
"content",
"status",
"slug",
"path",
"sort_order",
"parent_id",
"depth",
}
filtered_updates = {k: v for k, v in updates.items() if k in allowed_fields}
if not filtered_updates:
return False
# updated_at automatisch setzen
filtered_updates["updated_at"] = datetime.now()
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
set_clause = ", ".join([f"{k} = %s" for k in filtered_updates.keys()])
values = list(filtered_updates.values())
values.append(doc_id)
cursor.execute(
f"UPDATE dokumentation SET {set_clause} WHERE id = %s",
values,
)
return cursor.rowcount > 0
def delete(self, doc_id: int) -> bool:
"""Löscht ein Dokument (keine Kaskade!)."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
# Prüfen ob Kinder existieren
cursor.execute(
"SELECT COUNT(*) as count FROM dokumentation WHERE parent_id = %s",
(doc_id,),
)
result = cursor.fetchone()
if result and result["count"] > 0:
raise ValueError(
f"Dokument hat {result['count']} Kind-Dokumente. "
"Bitte zuerst diese löschen."
)
cursor.execute("DELETE FROM dokumentation WHERE id = %s", (doc_id,))
return cursor.rowcount > 0
def move(
self, doc_id: int, new_parent_id: Optional[int], new_sort_order: Optional[int] = None
) -> bool:
"""Verschiebt Dokument zu neuem Parent."""
doc = self.find_by_id(doc_id)
if not doc:
return False
updates: Dict[str, Any] = {"parent_id": new_parent_id}
# Neue Tiefe berechnen
if new_parent_id is not None:
parent = self.find_by_id(new_parent_id)
if parent:
updates["depth"] = parent.depth + 1
# Pfad aktualisieren
updates["path"] = f"{parent.path}/{doc.slug}"
else:
updates["depth"] = 0
updates["path"] = f"/{doc.slug}"
if new_sort_order is not None:
updates["sort_order"] = new_sort_order
return self.update(doc_id, updates)
def get_statistics(self) -> Dict[str, Any]:
"""Gibt Statistiken über alle Dokumente zurück."""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
# Gesamt und nach Status
cursor.execute(
"""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'draft' THEN 1 ELSE 0 END) as draft,
SUM(CASE WHEN status = 'published' THEN 1 ELSE 0 END) as published,
SUM(CASE WHEN status = 'archived' THEN 1 ELSE 0 END) as archived,
MAX(depth) as max_depth
FROM dokumentation
"""
)
stats = cursor.fetchone()
# Root-Dokumente zählen
cursor.execute(
"SELECT COUNT(*) as count FROM dokumentation WHERE parent_id IS NULL"
)
root_count = cursor.fetchone()
return {
"total": stats["total"] or 0,
"by_status": {
"draft": stats["draft"] or 0,
"published": stats["published"] or 0,
"archived": stats["archived"] or 0,
},
"root_documents": root_count["count"] if root_count else 0,
"max_depth": stats["max_depth"] or 0,
}
def _row_to_doc(self, row: Dict[str, Any]) -> Dokumentation:
"""Konvertiert DB-Row zu Dokumentation-Entity."""
status = row.get("status", "draft")
if isinstance(status, str):
try:
status = DocStatus(status)
except ValueError:
status = DocStatus.DRAFT
return Dokumentation(
id=row.get("id"),
parent_id=row.get("parent_id"),
slug=row.get("slug", ""),
path=row.get("path", ""),
title=row.get("title", ""),
description=row.get("description"),
content=row.get("content", ""),
status=status,
sort_order=row.get("sort_order", 0),
depth=row.get("depth", 0),
created_at=row.get("created_at", datetime.now()),
updated_at=row.get("updated_at", datetime.now()),
)
# Singleton-Instanz
_repository_instance: Optional[DocsRepository] = None
def get_repository() -> DocsRepository:
"""Gibt Singleton-Repository zurück."""
global _repository_instance
if _repository_instance is None:
_repository_instance = DocsRepository()
return _repository_instance