pipeline.py

Code Hygiene Score: 85

Keine Issues gefunden.

Dependencies 11

Klassen 1

Code

"""
Main pipeline orchestration for semantic chunk analysis.
"""

import sys

sys.path.insert(0, "/var/www/scripts/pipeline")

from db import db

from .analyzers import EntityExtractor, RelationExtractor, SemanticsAnalyzer, TaxonomyClassifier
from .ollama_service import OllamaService
from .repositories import ChunkRepository, EntityRepository, SemanticsRepository, TaxonomyRepository

BATCH_SIZE = 10


class SemanticChunkPipeline:
    """Orchestriert die semantische Chunk-Analyse."""

    def __init__(self):
        self.llm = OllamaService()
        self.semantics_analyzer = SemanticsAnalyzer(self.llm)
        self.entity_extractor = EntityExtractor(self.llm)
        self.relation_extractor = RelationExtractor(self.llm)
        self.taxonomy_classifier = TaxonomyClassifier(self.llm)

        self.chunk_repo = ChunkRepository()
        self.semantics_repo = SemanticsRepository()
        self.entity_repo = EntityRepository()
        self.taxonomy_repo = TaxonomyRepository()

        self.taxonomy_repo.ensure_terms_exist(TaxonomyClassifier.CATEGORIES)

    def analyze_chunk(self, chunk: dict) -> dict:
        """Analysiere einen einzelnen Chunk."""
        chunk_id = chunk["id"]
        text = chunk["content"]

        results = {"chunk_id": chunk_id, "success": False}

        # 1. Semantik-Analyse
        semantics = self.semantics_analyzer.analyze(chunk_id, text)
        if semantics:
            self.semantics_repo.save_semantics(semantics)
            results["semantics"] = True

        # 2. Entity-Extraktion
        entities = self.entity_extractor.extract(text)
        entity_ids = {}
        for entity in entities:
            entity_id = self.entity_repo.find_or_create(entity)
            self.entity_repo.link_to_chunk(chunk_id, entity_id)
            entity_ids[entity.name] = entity_id
        results["entities"] = len(entities)

        # 3. Relation-Extraktion
        if len(entities) >= 2:
            relations = self.relation_extractor.extract(text, entities)
            for rel in relations:
                source_id = entity_ids.get(rel.source)
                target_id = entity_ids.get(rel.target)
                if source_id and target_id:
                    self.entity_repo.save_relation(source_id, target_id, rel, chunk_id)
            results["relations"] = len(relations)

        # 4. Taxonomie-Klassifikation
        categories, confidence = self.taxonomy_classifier.classify(text)
        for cat in categories:
            self.taxonomy_repo.link_to_chunk(chunk_id, cat, confidence)
        results["categories"] = categories

        results["success"] = True
        return results

    def run(self, limit: int = BATCH_SIZE) -> dict:
        """Führe Pipeline aus."""
        db.log("INFO", f"Starting semantic chunk analysis (limit={limit})")

        chunks = self.chunk_repo.get_pending_chunks(limit)
        if not chunks:
            print("Keine Chunks zur Analyse gefunden.")
            return {"processed": 0}

        print(f"Analysiere {len(chunks)} Chunks...")

        success = 0
        failed = 0

        for i, chunk in enumerate(chunks, 1):
            print(f"[{i}/{len(chunks)}] Chunk #{chunk['id']}...", end=" ")

            try:
                result = self.analyze_chunk(chunk)
                if result["success"]:
                    print(f"✓ {result.get('entities', 0)} Entitäten, {result.get('categories', [])}")
                    success += 1
                else:
                    print("✗ Fehler")
                    failed += 1
            except Exception as e:
                print(f"✗ {e}")
                db.log("ERROR", f"Chunk {chunk['id']} failed: {e}")
                failed += 1

        db.log("INFO", f"Analysis complete: {success} success, {failed} failed")
        return {"processed": len(chunks), "success": success, "failed": failed}

    def status(self) -> dict:
        """Zeige Status."""
        return self.chunk_repo.get_stats()
← Übersicht Graph