pipeline.py
- Pfad:
/var/www/scripts/pipeline/semantic_chunk/pipeline.py - Namespace: pipeline
- Zeilen: 112 | Größe: 3,971 Bytes
- Geändert: 2025-12-25 14:05:00 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 85
- Dependencies: 40 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 11
- use sys
- use db.db
- use analyzers.EntityExtractor
- use analyzers.RelationExtractor
- use analyzers.SemanticsAnalyzer
- use analyzers.TaxonomyClassifier
- use ollama_service.OllamaService
- use repositories.ChunkRepository
- use repositories.EntityRepository
- use repositories.SemanticsRepository
- use repositories.TaxonomyRepository
Klassen 1
-
SemanticChunkPipelineclass Zeile 18
Code
"""
Main pipeline orchestration for semantic chunk analysis.
"""
import sys
sys.path.insert(0, "/var/www/scripts/pipeline")
from db import db
from .analyzers import EntityExtractor, RelationExtractor, SemanticsAnalyzer, TaxonomyClassifier
from .ollama_service import OllamaService
from .repositories import ChunkRepository, EntityRepository, SemanticsRepository, TaxonomyRepository
BATCH_SIZE = 10
class SemanticChunkPipeline:
"""Orchestriert die semantische Chunk-Analyse."""
def __init__(self):
self.llm = OllamaService()
self.semantics_analyzer = SemanticsAnalyzer(self.llm)
self.entity_extractor = EntityExtractor(self.llm)
self.relation_extractor = RelationExtractor(self.llm)
self.taxonomy_classifier = TaxonomyClassifier(self.llm)
self.chunk_repo = ChunkRepository()
self.semantics_repo = SemanticsRepository()
self.entity_repo = EntityRepository()
self.taxonomy_repo = TaxonomyRepository()
self.taxonomy_repo.ensure_terms_exist(TaxonomyClassifier.CATEGORIES)
def analyze_chunk(self, chunk: dict) -> dict:
"""Analysiere einen einzelnen Chunk."""
chunk_id = chunk["id"]
text = chunk["content"]
results = {"chunk_id": chunk_id, "success": False}
# 1. Semantik-Analyse
semantics = self.semantics_analyzer.analyze(chunk_id, text)
if semantics:
self.semantics_repo.save_semantics(semantics)
results["semantics"] = True
# 2. Entity-Extraktion
entities = self.entity_extractor.extract(text)
entity_ids = {}
for entity in entities:
entity_id = self.entity_repo.find_or_create(entity)
self.entity_repo.link_to_chunk(chunk_id, entity_id)
entity_ids[entity.name] = entity_id
results["entities"] = len(entities)
# 3. Relation-Extraktion
if len(entities) >= 2:
relations = self.relation_extractor.extract(text, entities)
for rel in relations:
source_id = entity_ids.get(rel.source)
target_id = entity_ids.get(rel.target)
if source_id and target_id:
self.entity_repo.save_relation(source_id, target_id, rel, chunk_id)
results["relations"] = len(relations)
# 4. Taxonomie-Klassifikation
categories, confidence = self.taxonomy_classifier.classify(text)
for cat in categories:
self.taxonomy_repo.link_to_chunk(chunk_id, cat, confidence)
results["categories"] = categories
results["success"] = True
return results
def run(self, limit: int = BATCH_SIZE) -> dict:
"""Führe Pipeline aus."""
db.log("INFO", f"Starting semantic chunk analysis (limit={limit})")
chunks = self.chunk_repo.get_pending_chunks(limit)
if not chunks:
print("Keine Chunks zur Analyse gefunden.")
return {"processed": 0}
print(f"Analysiere {len(chunks)} Chunks...")
success = 0
failed = 0
for i, chunk in enumerate(chunks, 1):
print(f"[{i}/{len(chunks)}] Chunk #{chunk['id']}...", end=" ")
try:
result = self.analyze_chunk(chunk)
if result["success"]:
print(f"✓ {result.get('entities', 0)} Entitäten, {result.get('categories', [])}")
success += 1
else:
print("✗ Fehler")
failed += 1
except Exception as e:
print(f"✗ {e}")
db.log("ERROR", f"Chunk {chunk['id']} failed: {e}")
failed += 1
db.log("INFO", f"Analysis complete: {success} success, {failed} failed")
return {"processed": len(chunks), "success": success, "failed": failed}
def status(self) -> dict:
"""Zeige Status."""
return self.chunk_repo.get_stats()