backfill_knowledge_semantics.py

Code Hygiene Score: 86

Issues 4

Zeile Typ Beschreibung
10 magic_number Magic Number gefunden: 100
10 magic_number Magic Number gefunden: 100
290 magic_number Magic Number gefunden: 60
296 magic_number Magic Number gefunden: 60

Dependencies 6

Funktionen 6

Code

#!/usr/bin/env python3
"""
Backfill Knowledge Semantics for existing entities.

Analyzes all entities that don't have knowledge_semantics yet.
Resume-capable: skips already analyzed entities.

Usage:
    python backfill_knowledge_semantics.py              # Process all pending
    python backfill_knowledge_semantics.py --limit 100  # Process max 100
    python backfill_knowledge_semantics.py --batch 25   # Batch size 25
    python backfill_knowledge_semantics.py --dry-run    # Just count, don't process
"""

import argparse
import json
import time

import ollama

from db import db
from json_utils import extract_json

# Pipeline-ID für Wissenschaftliche Pipeline
DEFAULT_PIPELINE_ID = 5


def get_pipeline_model(step_type: str, pipeline_id: int = DEFAULT_PIPELINE_ID) -> str:
    """Get model from pipeline_steps config - NO HARDCODED DEFAULTS."""
    cursor = db.execute(
        """SELECT config FROM pipeline_steps
           WHERE pipeline_id = %s AND step_type = %s AND enabled = 1
           LIMIT 1""",
        (pipeline_id, step_type),
    )
    row = cursor.fetchone()
    cursor.close()

    if row and row.get("config"):
        try:
            config = json.loads(row["config"])
            model = config.get("model")
            if model:
                return model
        except json.JSONDecodeError:
            pass

    raise ValueError(f"No model configured for step_type={step_type} in pipeline {pipeline_id}")


# Valid ENUM values for validation
VALID_SEMANTIC_ROLES = {"agent", "patient", "instrument", "location", "cause", "effect"}
VALID_FUNCTIONAL_CATEGORIES = {"method", "tool", "concept", "actor", "outcome", "process"}

PROMPT_TEMPLATE = """Analysiere die Bedeutung dieser Entität im Kontext.

Entität: {name}
Typ: {entity_type}
Kontext: {context}

Bestimme:
1. semantic_role: Welche Rolle spielt die Entität? agent, patient, instrument, location, cause, effect
2. properties: Welche Eigenschaften hat sie? (als JSON-Objekt)
3. functional_category: Welche Funktion? method, tool, concept, actor, outcome, process
4. context_meaning: Was bedeutet die Entität in diesem Kontext? (1 Satz)

Antworte NUR mit gültigem JSON:
{{
  "semantic_role": "agent|patient|instrument|location|cause|effect",
  "properties": {{"key": "value"}},
  "functional_category": "method|tool|concept|actor|outcome|process",
  "context_meaning": "Kurze Erklärung"
}}"""


def validate_and_fix(data: dict) -> dict:
    """Validate and fix ENUM values from LLM response."""
    # semantic_role - handle list/string
    role = data.get("semantic_role", "")
    if isinstance(role, list):
        role = role[0] if role else ""
    role = str(role).lower().strip()
    if role not in VALID_SEMANTIC_ROLES:
        if "agent" in role or "akteur" in role or "handelnde" in role:
            role = "agent"
        elif "patient" in role or "betroffene" in role:
            role = "patient"
        elif "instrument" in role or "werkzeug" in role or "mittel" in role:
            role = "instrument"
        elif "ort" in role or "location" in role or "ort" in role:
            role = "location"
        elif "ursache" in role or "cause" in role:
            role = "cause"
        elif "wirkung" in role or "effect" in role or "ergebnis" in role:
            role = "effect"
        else:
            role = "instrument"  # Default
    data["semantic_role"] = role

    # functional_category - handle list/string
    cat = data.get("functional_category", "")
    if isinstance(cat, list):
        cat = cat[0] if cat else ""
    cat = str(cat).lower().strip()
    if cat not in VALID_FUNCTIONAL_CATEGORIES:
        if "method" in cat or "methode" in cat or "verfahren" in cat:
            cat = "method"
        elif "tool" in cat or "werkzeug" in cat:
            cat = "tool"
        elif "concept" in cat or "konzept" in cat or "begriff" in cat:
            cat = "concept"
        elif "actor" in cat or "akteur" in cat:
            cat = "actor"
        elif "outcome" in cat or "ergebnis" in cat or "resultat" in cat:
            cat = "outcome"
        elif "process" in cat or "prozess" in cat or "ablauf" in cat:
            cat = "process"
        else:
            cat = "concept"  # Default
    data["functional_category"] = cat

    # properties - ensure it's a dict
    props = data.get("properties", {})
    if not isinstance(props, dict):
        props = {}
    data["properties"] = props

    # context_meaning - ensure it's a string
    meaning = data.get("context_meaning", "")
    if not isinstance(meaning, str):
        meaning = str(meaning) if meaning else ""
    # Truncate if too long
    if len(meaning) > 500:
        meaning = meaning[:497] + "..."
    data["context_meaning"] = meaning

    return data


def get_pending_entities(limit: int = 0) -> list:
    """Get entities without knowledge semantics, with context from chunks."""
    sql = """
        SELECT e.id, e.name, e.type,
               GROUP_CONCAT(SUBSTRING(c.content, 1, 500) SEPARATOR ' ... ') as context
        FROM entities e
        LEFT JOIN chunk_entities ce ON e.id = ce.entity_id
        LEFT JOIN chunks c ON ce.chunk_id = c.id
        LEFT JOIN entity_semantics es ON e.id = es.entity_id
        WHERE es.id IS NULL
        GROUP BY e.id, e.name, e.type
        ORDER BY e.id
    """
    if limit > 0:
        sql = sql.replace("ORDER BY e.id", f"ORDER BY e.id LIMIT {limit}")

    cursor = db.execute(sql)
    entities = cursor.fetchall()
    cursor.close()
    return list(entities)


def analyze_entity(entity: dict, model: str) -> dict | None:
    """Analyze a single entity with Ollama."""
    try:
        context = entity.get("context") or ""
        if len(context) > 1500:
            context = context[:1500]

        prompt = PROMPT_TEMPLATE.format(
            name=entity.get("name", ""),
            entity_type=entity.get("type", "unknown"),
            context=context,
        )

        response = ollama.generate(
            model=model,
            prompt=prompt,
            options={"num_predict": 300},
        )

        response_text = response["response"].strip()

        # Robuste JSON-Extraktion
        data = extract_json(response_text)
        if data:
            data = validate_and_fix(data)
            data["model_used"] = model
            return data

    except Exception as e:
        db.log("WARNING", f"Backfill: Knowledge semantic analysis failed for entity {entity['id']}: {e}")

    return None


def store_semantics(entity_id: int, semantics: dict) -> bool:
    """Store knowledge semantics to database."""
    try:
        cursor = db.execute(
            """INSERT INTO entity_semantics
               (entity_id, semantic_role, properties, functional_category,
                definition, model_used)
               VALUES (%s, %s, %s, %s, %s, %s)
               ON DUPLICATE KEY UPDATE
               semantic_role = VALUES(semantic_role),
               properties = VALUES(properties),
               functional_category = VALUES(functional_category),
               definition = VALUES(definition),
               model_used = VALUES(model_used),
               updated_at = NOW()""",
            (
                entity_id,
                semantics.get("semantic_role"),
                json.dumps(semantics.get("properties", {})),
                semantics.get("functional_category"),
                semantics.get("context_meaning"),
                semantics.get("model_used"),
            ),
        )
        db.commit()
        cursor.close()
        return True
    except Exception as e:
        db.log("ERROR", f"Backfill: Failed to store knowledge semantics for entity {entity_id}: {e}")
        return False


def main():
    """Main entry point."""
    parser = argparse.ArgumentParser(description="Backfill Knowledge Semantics")
    parser.add_argument("--limit", type=int, default=0, help="Max entities to process (0=all)")
    parser.add_argument("--batch", type=int, default=25, help="Batch size for progress output")
    parser.add_argument("--model", default=None, help="Override pipeline model (reads from pipeline_steps if not set)")
    parser.add_argument("--pipeline-id", type=int, default=DEFAULT_PIPELINE_ID, help="Pipeline ID to read config from")
    parser.add_argument("--dry-run", action="store_true", help="Just count, don't process")

    args = parser.parse_args()

    db.connect()

    try:
        # Get model from pipeline config if not overridden
        if args.model is None:
            args.model = get_pipeline_model("knowledge_semantic_analyze", args.pipeline_id)
            print(f"[Config] Model from pipeline {args.pipeline_id}: {args.model}")

        # Get pending entities
        entities = get_pending_entities(args.limit)
        total = len(entities)

        print("Knowledge Semantics Backfill")
        print("=" * 50)
        print(f"Pending entities: {total}")
        print(f"Model: {args.model} (from pipeline_steps)")
        print(f"Batch size: {args.batch}")

        if args.dry_run:
            print("\nDry run - no processing")
            return

        if total == 0:
            print("\nNo pending entities - all done!")
            return

        print("\nStarting analysis...")
        print("-" * 50)

        success = 0
        errors = 0
        start_time = time.time()

        for i, entity in enumerate(entities, 1):
            # Analyze
            semantics = analyze_entity(entity, args.model)

            if semantics:
                if store_semantics(entity["id"], semantics):
                    success += 1
                else:
                    errors += 1
            else:
                errors += 1

            # Progress output
            if i % args.batch == 0 or i == total:
                elapsed = time.time() - start_time
                rate = i / elapsed if elapsed > 0 else 0
                eta = (total - i) / rate if rate > 0 else 0
                print(
                    f"  [{i}/{total}] Success: {success}, Errors: {errors}, Rate: {rate:.1f}/s, ETA: {eta / 60:.1f}min"
                )

        # Final summary
        elapsed = time.time() - start_time
        print("-" * 50)
        print(f"Completed in {elapsed / 60:.1f} minutes")
        print(f"Success: {success}")
        print(f"Errors: {errors}")

        db.log("INFO", f"Backfill knowledge semantics: {success} success, {errors} errors")

    finally:
        db.disconnect()


if __name__ == "__main__":
    main()
← Übersicht