step_entity_enrich.py
- Pfad:
/var/www/scripts/pipeline/step_entity_enrich.py - Namespace: pipeline
- Zeilen: 250 | Größe: 7,959 Bytes
- Geändert: 2025-12-31 03:01:09 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 94
- Dependencies: 90 (25%)
- LOC: 83 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 6
- use time
- use requests
- use constants.DEFAULT_LIMIT
- use constants.OLLAMA_TIMEOUT
- use db.db
- use pipeline_config.get_step_model
Klassen 1
-
EntityEnrichStepclass Zeile 24
Code
"""
Entity Enrichment Step Module
Generates detailed descriptions for entities via Ollama.
This step runs AFTER semantic analysis and enriches entities
with comprehensive descriptions (3-5 sentences).
Part of Progressive Pipeline Architecture.
"""
import time
import requests
from constants import DEFAULT_LIMIT, OLLAMA_TIMEOUT
from db import db
from pipeline_config import get_step_model
# Configuration
OLLAMA_URL = "http://localhost:11434/api/generate"
MIN_DESCRIPTION_LENGTH = 50
class EntityEnrichStep:
"""Step: Enrich entity descriptions via Ollama."""
def __init__(self, progress=None):
"""
Initialize entity enrichment step.
Args:
progress: Optional PipelineProgress instance
"""
self.progress = progress
self.prompt_template = None
def _load_prompt(self):
"""Load prompt template from database."""
if self.prompt_template:
return self.prompt_template
cursor = db.execute("""
SELECT content FROM prompts
WHERE use_case = 'entity_description' AND is_active = 1
ORDER BY id DESC LIMIT 1
""")
row = cursor.fetchone()
cursor.close()
if row:
self.prompt_template = row["content"]
else:
# Fallback prompt
self.prompt_template = """Du bist ein Experte für systemisches Coaching und Organisationsentwicklung.
Aufgabe: Erstelle eine ausführliche Beschreibung für die folgende Entität.
Entität: {entity_name}
Typ: {entity_type}
Aktueller Kontext aus dem Dokument:
{context}
Anforderungen an die Beschreibung:
1. Erster Satz: Grundsätzliche Definition des Begriffs
2. Weitere 2-4 Sätze: Erläuterung der Bedeutung im Kontext von systemischem Coaching, Teamarbeit oder Organisationsentwicklung
3. Falls relevant: Praktische Anwendung oder Beispiele
Schreibe NUR die Beschreibung (3-5 Sätze), keine Überschriften oder Formatierung.
Sprache: Deutsch"""
return self.prompt_template
def _get_entities_to_enrich(self, limit=DEFAULT_LIMIT):
"""Get entities with short or missing descriptions."""
cursor = db.execute(
"""
SELECT id, name, type, description
FROM entities
WHERE description IS NULL
OR CHAR_LENGTH(description) < %s
ORDER BY id
LIMIT %s
""",
(MIN_DESCRIPTION_LENGTH, limit),
)
entities = cursor.fetchall()
cursor.close()
return entities
def _get_entity_context(self, entity_id, max_chunks=3):
"""Get context from chunks where this entity appears."""
cursor = db.execute(
"""
SELECT c.content
FROM chunk_entities ce
JOIN chunks c ON ce.chunk_id = c.id
WHERE ce.entity_id = %s
LIMIT %s
""",
(entity_id, max_chunks),
)
chunks = cursor.fetchall()
cursor.close()
if not chunks:
return "(Kein Kontext verfügbar)"
return "\n\n---\n\n".join(chunk["content"][:500] for chunk in chunks)
def _call_ollama(self, prompt, model):
"""Call Ollama API and return generated text."""
try:
response = requests.post(
OLLAMA_URL,
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.7,
"num_predict": 300,
},
},
timeout=OLLAMA_TIMEOUT,
)
response.raise_for_status()
result = response.json()
return result.get("response", "").strip()
except requests.exceptions.RequestException as e:
db.log("WARNING", f"Ollama error: {e}")
return None
def _update_description(self, entity_id, description):
"""Update entity description in database."""
try:
db.execute(
"""
UPDATE entities SET description = %s WHERE id = %s
""",
(description, entity_id),
)
db.commit()
return True
except Exception as e:
db.log("ERROR", f"Failed to update entity {entity_id}: {e}")
return False
def execute(self, limit=DEFAULT_LIMIT, model=None):
"""
Enrich entity descriptions.
Args:
limit: Maximum entities to process
model: Ollama model (if None, read from DB config)
Returns:
dict: Results with success/error counts
"""
# Model aus DB-Config laden wenn nicht übergeben
if model is None:
model = get_step_model("enrich")
if self.progress:
self.progress.update_step("entity_enrich")
self.progress.add_log("Starte Entity-Beschreibungs-Enrichment...")
db.log("INFO", f"Entity enrichment starting (limit={limit}, model={model})")
# Load prompt
prompt_template = self._load_prompt()
# Get entities
entities = self._get_entities_to_enrich(limit)
total = len(entities)
if total == 0:
db.log("INFO", "No entities need enrichment")
if self.progress:
self.progress.add_log("Keine Entitäten benötigen Enrichment")
return {"processed": 0, "success": 0, "errors": 0}
db.log("INFO", f"Found {total} entities to enrich")
if self.progress:
self.progress.add_log(f"Enriche {total} Entitäten...")
success_count = 0
error_count = 0
for i, entity in enumerate(entities, 1):
# Check for cancellation
if self.progress and self.progress.is_cancelled():
db.log("INFO", "Entity enrichment cancelled")
break
# Get context
context = self._get_entity_context(entity["id"])
# Build prompt
prompt = prompt_template.format(
entity_name=entity["name"], entity_type=entity["type"], context=context[:1500]
)
# Call Ollama
description = self._call_ollama(prompt, model)
if not description:
error_count += 1
continue
# Update database
if self._update_description(entity["id"], description):
success_count += 1
else:
error_count += 1
# Progress update every 10 entities
if i % 10 == 0 and self.progress:
self.progress.add_log(f"Enrichment: {i}/{total} ({success_count} OK)")
# Small delay
time.sleep(0.3)
db.log("INFO", f"Entity enrichment complete: {success_count} success, {error_count} errors")
if self.progress:
self.progress.add_log(f"Enrichment fertig: {success_count}/{total} erfolgreich")
return {
"processed": total,
"success": success_count,
"errors": error_count,
}
def get_stats(self):
"""Get current entity description statistics."""
cursor = db.execute(
"""
SELECT
COUNT(*) as total,
SUM(CASE WHEN description IS NULL OR CHAR_LENGTH(description) < %s THEN 1 ELSE 0 END) as needs_enrichment,
SUM(CASE WHEN CHAR_LENGTH(description) >= %s THEN 1 ELSE 0 END) as enriched,
ROUND(AVG(CASE WHEN CHAR_LENGTH(description) >= %s THEN CHAR_LENGTH(description) END)) as avg_length
FROM entities
""",
(MIN_DESCRIPTION_LENGTH, MIN_DESCRIPTION_LENGTH, MIN_DESCRIPTION_LENGTH),
)
stats = cursor.fetchone()
cursor.close()
return stats