db.py

Code Hygiene Score: 90

Keine Issues gefunden.

Dependencies 15

Klassen 2

Code

"""
Database operations for KI-System Pipeline

Composition layer that combines all database mixins into a single Database class.
Maintains full backward compatibility with existing imports.

Architecture:
    db_core.py      - Connection management (DatabaseCore)
    db_documents.py - Document/Page/Chunk CRUD (DocumentsMixin)
    db_queue.py     - Pipeline queue operations (QueueMixin)
    db_logging.py   - Logging operations (LoggingMixin)
    db_semantic.py  - Semantic operations (SemanticMixin)
    db_prompts.py   - Prompt operations (PromptsMixin)
"""

from datetime import datetime

import mysql.connector

from config import DB_CONFIG
from db_core import DatabaseCore
from db_documents import DocumentsMixin
from db_logging import LoggingMixin
from db_prompts import PromptsMixin
from db_queue import QueueMixin
from db_semantic import SemanticMixin


class Database(
    DatabaseCore,
    DocumentsMixin,
    QueueMixin,
    LoggingMixin,
    SemanticMixin,
    PromptsMixin,
):
    """MariaDB connection wrapper with all database operations.

    Combines all mixins via multiple inheritance (MRO):
    - DatabaseCore: connect, disconnect, execute, commit
    - DocumentsMixin: document/page/chunk CRUD
    - QueueMixin: pipeline queue operations
    - LoggingMixin: log, log_to_protokoll, log_provenance
    - SemanticMixin: entity types, stopwords, taxonomy
    - PromptsMixin: prompt retrieval
    """

    pass


class PipelineProgress:
    """Tracks and updates pipeline run progress in ki_content.pipeline_runs."""

    def __init__(self, run_id: int):
        self.run_id = run_id
        self.log_lines = []
        self.max_log_lines = 200  # Keep full log history

    def _update(self, **kwargs):
        """Internal: Update database with progress."""
        if not self.run_id:
            return

        kwargs["last_update_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        sets = ", ".join(f"{k} = %s" for k in kwargs)
        values = list(kwargs.values())
        values.append(self.run_id)

        try:
            conn = mysql.connector.connect(**DB_CONFIG)
            cursor = conn.cursor()
            cursor.execute(f"UPDATE pipeline_runs SET {sets} WHERE id = %s", values)  # noqa: S608
            conn.commit()
            cursor.close()
            conn.close()
        except Exception as e:
            print(f"Progress update error: {e}")

    def update_step(self, step_name: str):
        """Update current pipeline step."""
        self._update(current_step=step_name)

    def update_document(self, filename: str):
        """Update current document being processed."""
        self._update(current_document=filename)

    def update_progress(
        self,
        processed: int = None,
        total: int = None,
        failed: int = None,
        chunks: int = None,
        embeddings: int = None,
    ):
        """Update numeric progress fields."""
        data = {}
        if processed is not None:
            data["documents_processed"] = processed
        if total is not None:
            data["documents_total"] = total
        if failed is not None:
            data["documents_failed"] = failed
        if chunks is not None:
            data["chunks_created"] = chunks
        if embeddings is not None:
            data["embeddings_created"] = embeddings
        if data:
            self._update(**data)

    def add_log(self, message: str):
        """Add log line and update log_tail."""
        timestamp = datetime.now().strftime("%H:%M:%S")
        self.log_lines.append(f"[{timestamp}] {message}")
        if len(self.log_lines) > self.max_log_lines:
            self.log_lines = self.log_lines[-self.max_log_lines :]
        self._update(log_tail="\n".join(self.log_lines))

    def complete(self, status: str = "completed", error: str = None):
        """Mark run as complete or failed."""
        data = {
            "status": status,
            "completed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "current_step": None,
            "current_document": None,
        }
        if error:
            data["error_log"] = error
        self._update(**data)

    def is_cancelled(self) -> bool:
        """Check if run has been cancelled by user."""
        if not self.run_id:
            return False

        try:
            conn = mysql.connector.connect(**DB_CONFIG)
            cursor = conn.cursor(dictionary=True)
            cursor.execute(
                "SELECT status FROM pipeline_runs WHERE id = %s",
                (self.run_id,),
            )
            result = cursor.fetchone()
            cursor.close()
            conn.close()
            return result and result["status"] == "cancelled"
        except Exception:
            return False


# Global database instance for backward compatibility
db = Database()
← Übersicht Graph