#!/usr/bin/env python3
"""
Backfill Knowledge Semantics for existing entities.
Analyzes all entities that don't have knowledge_semantics yet.
Resume-capable: skips already analyzed entities.
Usage:
python backfill_knowledge_semantics.py # Process all pending
python backfill_knowledge_semantics.py --limit 100 # Process max 100
python backfill_knowledge_semantics.py --batch 25 # Batch size 25
python backfill_knowledge_semantics.py --dry-run # Just count, don't process
"""
import argparse
import json
import time
import ollama
from db import db
from json_utils import extract_json
# Pipeline-ID für Wissenschaftliche Pipeline
DEFAULT_PIPELINE_ID = 5
def get_pipeline_model(step_type: str, pipeline_id: int = DEFAULT_PIPELINE_ID) -> str:
"""Get model from pipeline_steps config - NO HARDCODED DEFAULTS."""
cursor = db.execute(
"""SELECT config FROM pipeline_steps
WHERE pipeline_id = %s AND step_type = %s AND enabled = 1
LIMIT 1""",
(pipeline_id, step_type),
)
row = cursor.fetchone()
cursor.close()
if row and row.get("config"):
try:
config = json.loads(row["config"])
model = config.get("model")
if model:
return model
except json.JSONDecodeError:
pass
raise ValueError(f"No model configured for step_type={step_type} in pipeline {pipeline_id}")
# Valid ENUM values for validation
VALID_SEMANTIC_ROLES = {"agent", "patient", "instrument", "location", "cause", "effect"}
VALID_FUNCTIONAL_CATEGORIES = {"method", "tool", "concept", "actor", "outcome", "process"}
PROMPT_TEMPLATE = """Analysiere die Bedeutung dieser Entität im Kontext.
Entität: {name}
Typ: {entity_type}
Kontext: {context}
Bestimme:
1. semantic_role: Welche Rolle spielt die Entität? agent, patient, instrument, location, cause, effect
2. properties: Welche Eigenschaften hat sie? (als JSON-Objekt)
3. functional_category: Welche Funktion? method, tool, concept, actor, outcome, process
4. context_meaning: Was bedeutet die Entität in diesem Kontext? (1 Satz)
Antworte NUR mit gültigem JSON:
{{
"semantic_role": "agent|patient|instrument|location|cause|effect",
"properties": {{"key": "value"}},
"functional_category": "method|tool|concept|actor|outcome|process",
"context_meaning": "Kurze Erklärung"
}}"""
def validate_and_fix(data: dict) -> dict:
"""Validate and fix ENUM values from LLM response."""
# semantic_role - handle list/string
role = data.get("semantic_role", "")
if isinstance(role, list):
role = role[0] if role else ""
role = str(role).lower().strip()
if role not in VALID_SEMANTIC_ROLES:
if "agent" in role or "akteur" in role or "handelnde" in role:
role = "agent"
elif "patient" in role or "betroffene" in role:
role = "patient"
elif "instrument" in role or "werkzeug" in role or "mittel" in role:
role = "instrument"
elif "ort" in role or "location" in role or "ort" in role:
role = "location"
elif "ursache" in role or "cause" in role:
role = "cause"
elif "wirkung" in role or "effect" in role or "ergebnis" in role:
role = "effect"
else:
role = "instrument" # Default
data["semantic_role"] = role
# functional_category - handle list/string
cat = data.get("functional_category", "")
if isinstance(cat, list):
cat = cat[0] if cat else ""
cat = str(cat).lower().strip()
if cat not in VALID_FUNCTIONAL_CATEGORIES:
if "method" in cat or "methode" in cat or "verfahren" in cat:
cat = "method"
elif "tool" in cat or "werkzeug" in cat:
cat = "tool"
elif "concept" in cat or "konzept" in cat or "begriff" in cat:
cat = "concept"
elif "actor" in cat or "akteur" in cat:
cat = "actor"
elif "outcome" in cat or "ergebnis" in cat or "resultat" in cat:
cat = "outcome"
elif "process" in cat or "prozess" in cat or "ablauf" in cat:
cat = "process"
else:
cat = "concept" # Default
data["functional_category"] = cat
# properties - ensure it's a dict
props = data.get("properties", {})
if not isinstance(props, dict):
props = {}
data["properties"] = props
# context_meaning - ensure it's a string
meaning = data.get("context_meaning", "")
if not isinstance(meaning, str):
meaning = str(meaning) if meaning else ""
# Truncate if too long
if len(meaning) > 500:
meaning = meaning[:497] + "..."
data["context_meaning"] = meaning
return data
def get_pending_entities(limit: int = 0) -> list:
"""Get entities without knowledge semantics, with context from chunks."""
sql = """
SELECT e.id, e.name, e.type,
GROUP_CONCAT(SUBSTRING(c.content, 1, 500) SEPARATOR ' ... ') as context
FROM entities e
LEFT JOIN chunk_entities ce ON e.id = ce.entity_id
LEFT JOIN chunks c ON ce.chunk_id = c.id
LEFT JOIN entity_semantics es ON e.id = es.entity_id
WHERE es.id IS NULL
GROUP BY e.id, e.name, e.type
ORDER BY e.id
"""
if limit > 0:
sql = sql.replace("ORDER BY e.id", f"ORDER BY e.id LIMIT {limit}")
cursor = db.execute(sql)
entities = cursor.fetchall()
cursor.close()
return list(entities)
def analyze_entity(entity: dict, model: str) -> dict | None:
"""Analyze a single entity with Ollama."""
try:
context = entity.get("context") or ""
if len(context) > 1500:
context = context[:1500]
prompt = PROMPT_TEMPLATE.format(
name=entity.get("name", ""),
entity_type=entity.get("type", "unknown"),
context=context,
)
response = ollama.generate(
model=model,
prompt=prompt,
options={"num_predict": 300},
)
response_text = response["response"].strip()
# Robuste JSON-Extraktion
data = extract_json(response_text)
if data:
data = validate_and_fix(data)
data["model_used"] = model
return data
except Exception as e:
db.log("WARNING", f"Backfill: Knowledge semantic analysis failed for entity {entity['id']}: {e}")
return None
def store_semantics(entity_id: int, semantics: dict) -> bool:
"""Store knowledge semantics to database."""
try:
cursor = db.execute(
"""INSERT INTO entity_semantics
(entity_id, semantic_role, properties, functional_category,
definition, model_used)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
semantic_role = VALUES(semantic_role),
properties = VALUES(properties),
functional_category = VALUES(functional_category),
definition = VALUES(definition),
model_used = VALUES(model_used),
updated_at = NOW()""",
(
entity_id,
semantics.get("semantic_role"),
json.dumps(semantics.get("properties", {})),
semantics.get("functional_category"),
semantics.get("context_meaning"),
semantics.get("model_used"),
),
)
db.commit()
cursor.close()
return True
except Exception as e:
db.log("ERROR", f"Backfill: Failed to store knowledge semantics for entity {entity_id}: {e}")
return False
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Backfill Knowledge Semantics")
parser.add_argument("--limit", type=int, default=0, help="Max entities to process (0=all)")
parser.add_argument("--batch", type=int, default=25, help="Batch size for progress output")
parser.add_argument("--model", default=None, help="Override pipeline model (reads from pipeline_steps if not set)")
parser.add_argument("--pipeline-id", type=int, default=DEFAULT_PIPELINE_ID, help="Pipeline ID to read config from")
parser.add_argument("--dry-run", action="store_true", help="Just count, don't process")
args = parser.parse_args()
db.connect()
try:
# Get model from pipeline config if not overridden
if args.model is None:
args.model = get_pipeline_model("knowledge_semantic_analyze", args.pipeline_id)
print(f"[Config] Model from pipeline {args.pipeline_id}: {args.model}")
# Get pending entities
entities = get_pending_entities(args.limit)
total = len(entities)
print("Knowledge Semantics Backfill")
print("=" * 50)
print(f"Pending entities: {total}")
print(f"Model: {args.model} (from pipeline_steps)")
print(f"Batch size: {args.batch}")
if args.dry_run:
print("\nDry run - no processing")
return
if total == 0:
print("\nNo pending entities - all done!")
return
print("\nStarting analysis...")
print("-" * 50)
success = 0
errors = 0
start_time = time.time()
for i, entity in enumerate(entities, 1):
# Analyze
semantics = analyze_entity(entity, args.model)
if semantics:
if store_semantics(entity["id"], semantics):
success += 1
else:
errors += 1
else:
errors += 1
# Progress output
if i % args.batch == 0 or i == total:
elapsed = time.time() - start_time
rate = i / elapsed if elapsed > 0 else 0
eta = (total - i) / rate if rate > 0 else 0
print(
f" [{i}/{total}] Success: {success}, Errors: {errors}, Rate: {rate:.1f}/s, ETA: {eta / 60:.1f}min"
)
# Final summary
elapsed = time.time() - start_time
print("-" * 50)
print(f"Completed in {elapsed / 60:.1f} minutes")
print(f"Success: {success}")
print(f"Errors: {errors}")
db.log("INFO", f"Backfill knowledge semantics: {success} success, {errors} errors")
finally:
db.disconnect()
if __name__ == "__main__":
main()