storage.py

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 5

Klassen 1

Funktionen 1

Code

"""Knowledge Storage für Wissensextraktion."""

import json
import sys

sys.path.insert(0, "/var/www/scripts/pipeline")

from db import db

from .models import KnowledgeLevel, KnowledgeType


def store_knowledge(
    level: KnowledgeLevel,
    source_id: int,
    knowledge_type: KnowledgeType,
    data: dict,
    model_name: str = "unknown",
):
    """Speichere Wissen in der entsprechenden Tabelle."""
    table_map = {
        KnowledgeLevel.PAGE: "page_knowledge",
        KnowledgeLevel.SECTION: "section_knowledge",
        KnowledgeLevel.DOCUMENT: "document_knowledge",
    }

    id_field_map = {
        KnowledgeLevel.PAGE: "page_id",
        KnowledgeLevel.SECTION: "section_id",
        KnowledgeLevel.DOCUMENT: "document_id",
    }

    table = table_map[level]
    id_field = id_field_map[level]

    try:
        cursor = db.execute(
            f"""INSERT INTO {table} ({id_field}, knowledge_type, data, model_used, created_at)
                VALUES (%s, %s, %s, %s, NOW())""",  # noqa: S608 - table name is controlled
            (source_id, knowledge_type.value, json.dumps(data), model_name),
        )
        db.commit()
        cursor.close()
    except Exception as e:
        db.log("ERROR", f"Fehler beim Speichern in {table}: {e}")


class KnowledgeStorage:
    """Wrapper für Knowledge Storage mit Modell-Kontext."""

    def __init__(self, model_name: str):
        """Initialisiere mit Modellname."""
        self.model_name = model_name

    def store(self, level: KnowledgeLevel, source_id: int, knowledge_type: KnowledgeType, data: dict):
        """Speichere Wissen mit Modell-Kontext."""
        store_knowledge(level, source_id, knowledge_type, data, self.model_name)
← Übersicht Graph