storage.py
- Pfad:
/var/www/scripts/pipeline/knowledge/storage.py - Namespace: pipeline
- Zeilen: 58 | Größe: 1,707 Bytes
- Geändert: 2025-12-25 15:50:37 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 100
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 5
- use json
- use sys
- use db.db
- use models.KnowledgeLevel
- use models.KnowledgeType
Klassen 1
-
KnowledgeStorageclass Zeile 48
Funktionen 1
-
store_knowledge()Zeile 13
Code
"""Knowledge Storage für Wissensextraktion."""
import json
import sys
sys.path.insert(0, "/var/www/scripts/pipeline")
from db import db
from .models import KnowledgeLevel, KnowledgeType
def store_knowledge(
level: KnowledgeLevel,
source_id: int,
knowledge_type: KnowledgeType,
data: dict,
model_name: str = "unknown",
):
"""Speichere Wissen in der entsprechenden Tabelle."""
table_map = {
KnowledgeLevel.PAGE: "page_knowledge",
KnowledgeLevel.SECTION: "section_knowledge",
KnowledgeLevel.DOCUMENT: "document_knowledge",
}
id_field_map = {
KnowledgeLevel.PAGE: "page_id",
KnowledgeLevel.SECTION: "section_id",
KnowledgeLevel.DOCUMENT: "document_id",
}
table = table_map[level]
id_field = id_field_map[level]
try:
cursor = db.execute(
f"""INSERT INTO {table} ({id_field}, knowledge_type, data, model_used, created_at)
VALUES (%s, %s, %s, %s, NOW())""", # noqa: S608 - table name is controlled
(source_id, knowledge_type.value, json.dumps(data), model_name),
)
db.commit()
cursor.close()
except Exception as e:
db.log("ERROR", f"Fehler beim Speichern in {table}: {e}")
class KnowledgeStorage:
"""Wrapper für Knowledge Storage mit Modell-Kontext."""
def __init__(self, model_name: str):
"""Initialisiere mit Modellname."""
self.model_name = model_name
def store(self, level: KnowledgeLevel, source_id: int, knowledge_type: KnowledgeType, data: dict):
"""Speichere Wissen mit Modell-Kontext."""
store_knowledge(level, source_id, knowledge_type, data, self.model_name)