db.py
- Pfad:
/var/www/scripts/pipeline/db.py - Namespace: pipeline
- Zeilen: 153 | Größe: 4,849 Bytes
- Geändert: 2025-12-31 03:01:09 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 90
- Dependencies: 60 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 15
- extends DatabaseCore
- trait DocumentsMixin
- trait QueueMixin
- trait LoggingMixin
- trait SemanticMixin
- trait PromptsMixin
- use datetime.datetime
- use mysql.connector
- use config.DB_CONFIG
- use db_core.DatabaseCore
- use db_documents.DocumentsMixin
- use db_logging.LoggingMixin
- use db_prompts.PromptsMixin
- use db_queue.QueueMixin
- use db_semantic.SemanticMixin
Klassen 2
-
Databaseclass Zeile 29 -
PipelineProgressclass Zeile 51
Code
"""
Database operations for KI-System Pipeline
Composition layer that combines all database mixins into a single Database class.
Maintains full backward compatibility with existing imports.
Architecture:
db_core.py - Connection management (DatabaseCore)
db_documents.py - Document/Page/Chunk CRUD (DocumentsMixin)
db_queue.py - Pipeline queue operations (QueueMixin)
db_logging.py - Logging operations (LoggingMixin)
db_semantic.py - Semantic operations (SemanticMixin)
db_prompts.py - Prompt operations (PromptsMixin)
"""
from datetime import datetime
import mysql.connector
from config import DB_CONFIG
from db_core import DatabaseCore
from db_documents import DocumentsMixin
from db_logging import LoggingMixin
from db_prompts import PromptsMixin
from db_queue import QueueMixin
from db_semantic import SemanticMixin
class Database(
DatabaseCore,
DocumentsMixin,
QueueMixin,
LoggingMixin,
SemanticMixin,
PromptsMixin,
):
"""MariaDB connection wrapper with all database operations.
Combines all mixins via multiple inheritance (MRO):
- DatabaseCore: connect, disconnect, execute, commit
- DocumentsMixin: document/page/chunk CRUD
- QueueMixin: pipeline queue operations
- LoggingMixin: log, log_to_protokoll, log_provenance
- SemanticMixin: entity types, stopwords, taxonomy
- PromptsMixin: prompt retrieval
"""
pass
class PipelineProgress:
"""Tracks and updates pipeline run progress in ki_content.pipeline_runs."""
def __init__(self, run_id: int):
self.run_id = run_id
self.log_lines = []
self.max_log_lines = 200 # Keep full log history
def _update(self, **kwargs):
"""Internal: Update database with progress."""
if not self.run_id:
return
kwargs["last_update_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sets = ", ".join(f"{k} = %s" for k in kwargs)
values = list(kwargs.values())
values.append(self.run_id)
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute(f"UPDATE pipeline_runs SET {sets} WHERE id = %s", values) # noqa: S608
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Progress update error: {e}")
def update_step(self, step_name: str):
"""Update current pipeline step."""
self._update(current_step=step_name)
def update_document(self, filename: str):
"""Update current document being processed."""
self._update(current_document=filename)
def update_progress(
self,
processed: int = None,
total: int = None,
failed: int = None,
chunks: int = None,
embeddings: int = None,
):
"""Update numeric progress fields."""
data = {}
if processed is not None:
data["documents_processed"] = processed
if total is not None:
data["documents_total"] = total
if failed is not None:
data["documents_failed"] = failed
if chunks is not None:
data["chunks_created"] = chunks
if embeddings is not None:
data["embeddings_created"] = embeddings
if data:
self._update(**data)
def add_log(self, message: str):
"""Add log line and update log_tail."""
timestamp = datetime.now().strftime("%H:%M:%S")
self.log_lines.append(f"[{timestamp}] {message}")
if len(self.log_lines) > self.max_log_lines:
self.log_lines = self.log_lines[-self.max_log_lines :]
self._update(log_tail="\n".join(self.log_lines))
def complete(self, status: str = "completed", error: str = None):
"""Mark run as complete or failed."""
data = {
"status": status,
"completed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"current_step": None,
"current_document": None,
}
if error:
data["error_log"] = error
self._update(**data)
def is_cancelled(self) -> bool:
"""Check if run has been cancelled by user."""
if not self.run_id:
return False
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT status FROM pipeline_runs WHERE id = %s",
(self.run_id,),
)
result = cursor.fetchone()
cursor.close()
conn.close()
return result and result["status"] == "cancelled"
except Exception:
return False
# Global database instance for backward compatibility
db = Database()