repositories.py

Code Hygiene Score: 98

Keine Issues gefunden.

Dependencies 6

Klassen 4

Code

"""
Repository classes for semantic chunk data persistence.
"""

import json
import sys

sys.path.insert(0, "/var/www/scripts/pipeline")

from db import db

from .models import ChunkSemantics, Relation
from .ollama_service import ANALYSIS_MODEL

BATCH_SIZE = 10


class ChunkRepository:
    """Datenbankzugriff für Chunks."""

    def get_pending_chunks(self, limit: int = BATCH_SIZE) -> list[dict]:
        """Hole Chunks ohne Semantik-Analyse."""
        cursor = db.execute(
            """
            SELECT c.id, c.content, c.document_id
            FROM chunks c
            LEFT JOIN chunk_semantics cs ON c.id = cs.chunk_id
            WHERE cs.id IS NULL
            ORDER BY c.id
            LIMIT %s
        """,
            (limit,),
        )
        results = cursor.fetchall()
        cursor.close()
        return results

    def get_stats(self) -> dict:
        """Hole Statistiken."""
        cursor = db.execute("SELECT COUNT(*) as total FROM chunks")
        total = cursor.fetchone()["total"]
        cursor.close()

        cursor = db.execute("SELECT COUNT(*) as analyzed FROM chunk_semantics")
        analyzed = cursor.fetchone()["analyzed"]
        cursor.close()

        cursor = db.execute("SELECT COUNT(*) as entities FROM entities")
        entities = cursor.fetchone()["entities"]
        cursor.close()

        return {"total": total, "analyzed": analyzed, "entities": entities}


class SemanticsRepository:
    """Datenbankzugriff für Semantik-Daten."""

    def save_semantics(self, sem: ChunkSemantics) -> int:
        """Speichere Chunk-Semantik."""
        cursor = db.execute(
            """
            INSERT INTO chunk_semantics
                (chunk_id, summary, keywords, sentiment, topics, language, analyzed_at, analysis_model)
            VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s)
            ON DUPLICATE KEY UPDATE
                summary = VALUES(summary),
                keywords = VALUES(keywords),
                sentiment = VALUES(sentiment),
                topics = VALUES(topics),
                language = VALUES(language),
                analyzed_at = NOW()
        """,
            (
                sem.chunk_id,
                sem.summary,
                json.dumps(sem.keywords, ensure_ascii=False),
                sem.sentiment,
                json.dumps(sem.topics, ensure_ascii=False),
                sem.language,
                ANALYSIS_MODEL,
            ),
        )
        db.commit()
        sem_id = cursor.lastrowid
        cursor.close()
        return sem_id


class EntityRepository:
    """Datenbankzugriff für Entitäten."""

    def find_or_create(self, entity) -> int:
        """Finde oder erstelle Entität."""
        cursor = db.execute("SELECT id FROM entities WHERE name = %s AND type = %s", (entity.name, entity.entity_type))
        existing = cursor.fetchone()
        cursor.close()

        if existing:
            return existing["id"]

        cursor = db.execute(
            """
            INSERT INTO entities (name, type, description, created_at)
            VALUES (%s, %s, %s, NOW())
        """,
            (entity.name, entity.entity_type, entity.description),
        )
        db.commit()
        entity_id = cursor.lastrowid
        cursor.close()
        return entity_id

    def link_to_chunk(self, chunk_id: int, entity_id: int, relevance: float = 1.0):
        """Verknüpfe Entity mit Chunk."""
        cursor = db.execute(
            """
            INSERT INTO chunk_entities (chunk_id, entity_id, relevance_score, mention_count)
            VALUES (%s, %s, %s, 1)
            ON DUPLICATE KEY UPDATE
                mention_count = mention_count + 1
        """,
            (chunk_id, entity_id, relevance),
        )
        db.commit()
        cursor.close()

    def save_relation(self, source_id: int, target_id: int, rel: Relation, chunk_id: int):
        """Speichere Relation."""
        cursor = db.execute(
            """
            INSERT INTO entity_relations
                (source_entity_id, target_entity_id, relation_type, strength, chunk_id)
            VALUES (%s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                strength = GREATEST(strength, VALUES(strength))
        """,
            (source_id, target_id, rel.relation_type, rel.strength, chunk_id),
        )
        db.commit()
        cursor.close()


class TaxonomyRepository:
    """Datenbankzugriff für Taxonomie."""

    def ensure_terms_exist(self, terms: list[str]):
        """Stelle sicher, dass Taxonomie-Terms existieren."""
        for term in terms:
            cursor = db.execute("SELECT id FROM taxonomy_terms WHERE name = %s", (term,))
            if not cursor.fetchone():
                cursor.close()
                slug = term.lower().replace(" ", "-")
                cursor = db.execute(
                    """
                    INSERT INTO taxonomy_terms (name, slug, depth)
                    VALUES (%s, %s, 0)
                """,
                    (term, slug),
                )
                db.commit()
            cursor.close()

    def link_to_chunk(self, chunk_id: int, category: str, confidence: float):
        """Verknüpfe Chunk mit Taxonomie-Term."""
        cursor = db.execute("SELECT id FROM taxonomy_terms WHERE name = %s", (category,))
        term = cursor.fetchone()
        cursor.close()

        if term:
            cursor = db.execute(
                """
                INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence)
                VALUES (%s, %s, %s)
                ON DUPLICATE KEY UPDATE confidence = VALUES(confidence)
            """,
                (chunk_id, term["id"], confidence),
            )
            db.commit()
            cursor.close()
← Übersicht Graph