backfill_text_semantics.py

Code Hygiene Score: 86

Issues 4

Zeile Typ Beschreibung
10 magic_number Magic Number gefunden: 100
10 magic_number Magic Number gefunden: 100
300 magic_number Magic Number gefunden: 60
306 magic_number Magic Number gefunden: 60

Dependencies 6

Funktionen 6

Code

#!/usr/bin/env python3
"""
Backfill Text Semantics for existing chunks.

Analyzes all chunks that don't have text_semantics yet.
Resume-capable: skips already analyzed chunks.

Usage:
    python backfill_text_semantics.py              # Process all pending
    python backfill_text_semantics.py --limit 100  # Process max 100
    python backfill_text_semantics.py --batch 50   # Batch size 50
    python backfill_text_semantics.py --dry-run    # Just count, don't process
"""

import argparse
import json
import time

import ollama

from db import db
from json_utils import extract_json

# Pipeline-ID für Wissenschaftliche Pipeline
DEFAULT_PIPELINE_ID = 5


def get_pipeline_model(step_type: str, pipeline_id: int = DEFAULT_PIPELINE_ID) -> str:
    """Get model from pipeline_steps config - NO HARDCODED DEFAULTS."""
    cursor = db.execute(
        """SELECT config FROM pipeline_steps
           WHERE pipeline_id = %s AND step_type = %s AND enabled = 1
           LIMIT 1""",
        (pipeline_id, step_type),
    )
    row = cursor.fetchone()
    cursor.close()

    if row and row.get("config"):
        try:
            config = json.loads(row["config"])
            model = config.get("model")
            if model:
                return model
        except json.JSONDecodeError:
            pass

    raise ValueError(f"No model configured for step_type={step_type} in pipeline {pipeline_id}")


# Valid ENUM values for validation
VALID_STATEMENT_FORMS = {"assertion", "question", "command", "conditional"}
VALID_INTENTS = {"explain", "argue", "define", "compare", "exemplify", "warn", "instruct"}
VALID_FRAMES = {"theoretical", "practical", "historical", "methodological", "critical"}
VALID_DISCOURSE_ROLES = {"thesis", "evidence", "example", "counter", "summary", "definition"}

PROMPT_TEMPLATE = """Analysiere den folgenden Text semantisch.

Bestimme:
1. statement_form: Ist es eine Aussage (assertion), Frage (question), Aufforderung (command) oder Bedingung (conditional)?
2. intent: Was ist die Absicht? explain, argue, define, compare, exemplify, warn, instruct
3. frame: Welcher Rahmen? theoretical, practical, historical, methodological, critical
4. is_negated: Wird etwas verneint? true/false
5. discourse_role: Welche Rolle im Diskurs? thesis, evidence, example, counter, summary, definition

Antworte NUR mit gültigem JSON:
{{
  "statement_form": "assertion|question|command|conditional",
  "intent": "explain|argue|define|compare|exemplify|warn|instruct",
  "frame": "theoretical|practical|historical|methodological|critical",
  "is_negated": false,
  "discourse_role": "thesis|evidence|example|counter|summary|definition"
}}

Text:
{content}"""


def validate_and_fix(data: dict) -> dict:
    """Validate and fix ENUM values from LLM response."""
    # statement_form
    sf = data.get("statement_form", "").lower().strip()
    if sf not in VALID_STATEMENT_FORMS:
        if "frage" in sf or "question" in sf or sf.endswith("?"):
            sf = "question"
        elif "befehl" in sf or "command" in sf or "aufford" in sf:
            sf = "command"
        elif "bedingun" in sf or "condition" in sf or "wenn" in sf:
            sf = "conditional"
        else:
            sf = "assertion"
    data["statement_form"] = sf

    # intent
    intent = data.get("intent", "").lower().strip()
    if intent not in VALID_INTENTS:
        if "erklär" in intent or "explain" in intent:
            intent = "explain"
        elif "argument" in intent or "argue" in intent:
            intent = "argue"
        elif "defini" in intent or "define" in intent:
            intent = "define"
        elif "vergleich" in intent or "compare" in intent:
            intent = "compare"
        elif "beispiel" in intent or "example" in intent or "exemplify" in intent:
            intent = "exemplify"
        elif "warn" in intent:
            intent = "warn"
        elif "instruc" in intent or "anleit" in intent:
            intent = "instruct"
        else:
            intent = "explain"
    data["intent"] = intent

    # frame
    frame = data.get("frame", "").lower().strip()
    if frame not in VALID_FRAMES:
        if "theor" in frame:
            frame = "theoretical"
        elif "prakt" in frame or "practic" in frame:
            frame = "practical"
        elif "histor" in frame:
            frame = "historical"
        elif "method" in frame:
            frame = "methodological"
        elif "krit" in frame or "critic" in frame:
            frame = "critical"
        else:
            frame = "theoretical"
    data["frame"] = frame

    # discourse_role
    role = data.get("discourse_role", "").lower().strip()
    if role not in VALID_DISCOURSE_ROLES:
        if "these" in role or "thesis" in role:
            role = "thesis"
        elif "evidence" in role or "beleg" in role or "beweis" in role:
            role = "evidence"
        elif "beispiel" in role or "example" in role:
            role = "example"
        elif "gegen" in role or "counter" in role:
            role = "counter"
        elif "zusammen" in role or "summary" in role:
            role = "summary"
        elif "definition" in role:
            role = "definition"
        else:
            role = "evidence"
    data["discourse_role"] = role

    # is_negated
    negated = data.get("is_negated", False)
    if isinstance(negated, str):
        negated = negated.lower() in ("true", "1", "yes", "ja", "wahr")
    data["is_negated"] = bool(negated)

    return data


def get_pending_chunks(limit: int = 0) -> list:
    """Get chunks without text semantics."""
    sql = """
        SELECT c.id, c.content, c.document_id
        FROM chunks c
        LEFT JOIN chunk_text_semantics cts ON c.id = cts.chunk_id
        WHERE cts.id IS NULL
        ORDER BY c.id
    """
    if limit > 0:
        sql += f" LIMIT {limit}"

    cursor = db.execute(sql)
    chunks = cursor.fetchall()
    cursor.close()
    return list(chunks)


def analyze_chunk(chunk: dict, model: str) -> dict | None:
    """Analyze a single chunk with Ollama."""
    try:
        prompt = PROMPT_TEMPLATE.format(content=chunk["content"][:2000])

        response = ollama.generate(
            model=model,
            prompt=prompt,
            options={"num_predict": 200},
        )

        response_text = response["response"].strip()

        # Robuste JSON-Extraktion
        data = extract_json(response_text)
        if data:
            data = validate_and_fix(data)
            data["model_used"] = model
            return data

    except Exception as e:
        db.log("WARNING", f"Backfill: Text semantic analysis failed for chunk {chunk['id']}: {e}")

    return None


def store_semantics(chunk_id: int, semantics: dict) -> bool:
    """Store text semantics to database."""
    try:
        cursor = db.execute(
            """INSERT INTO chunk_text_semantics
               (chunk_id, statement_form, intent, frame, is_negated,
                discourse_role, model_used)
               VALUES (%s, %s, %s, %s, %s, %s, %s)
               ON DUPLICATE KEY UPDATE
               statement_form = VALUES(statement_form),
               intent = VALUES(intent),
               frame = VALUES(frame),
               is_negated = VALUES(is_negated),
               discourse_role = VALUES(discourse_role),
               model_used = VALUES(model_used),
               updated_at = NOW()""",
            (
                chunk_id,
                semantics.get("statement_form"),
                semantics.get("intent"),
                semantics.get("frame"),
                semantics.get("is_negated", False),
                semantics.get("discourse_role"),
                semantics.get("model_used"),
            ),
        )
        db.commit()
        cursor.close()
        return True
    except Exception as e:
        db.log("ERROR", f"Backfill: Failed to store text semantics for chunk {chunk_id}: {e}")
        return False


def main():
    """Main entry point."""
    parser = argparse.ArgumentParser(description="Backfill Text Semantics")
    parser.add_argument("--limit", type=int, default=0, help="Max chunks to process (0=all)")
    parser.add_argument("--batch", type=int, default=50, help="Batch size for progress output")
    parser.add_argument("--model", default=None, help="Override pipeline model (reads from pipeline_steps if not set)")
    parser.add_argument("--pipeline-id", type=int, default=DEFAULT_PIPELINE_ID, help="Pipeline ID to read config from")
    parser.add_argument("--dry-run", action="store_true", help="Just count, don't process")

    args = parser.parse_args()

    db.connect()

    try:
        # Get model from pipeline config if not overridden
        if args.model is None:
            args.model = get_pipeline_model("text_semantic_analyze", args.pipeline_id)
            print(f"[Config] Model from pipeline {args.pipeline_id}: {args.model}")

        # Get pending chunks
        chunks = get_pending_chunks(args.limit)
        total = len(chunks)

        print("Text Semantics Backfill")
        print("=" * 50)
        print(f"Pending chunks: {total}")
        print(f"Model: {args.model} (from pipeline_steps)")
        print(f"Batch size: {args.batch}")

        if args.dry_run:
            print("\nDry run - no processing")
            return

        if total == 0:
            print("\nNo pending chunks - all done!")
            return

        print("\nStarting analysis...")
        print("-" * 50)

        success = 0
        errors = 0
        start_time = time.time()

        for i, chunk in enumerate(chunks, 1):
            # Analyze
            semantics = analyze_chunk(chunk, args.model)

            if semantics:
                if store_semantics(chunk["id"], semantics):
                    success += 1
                else:
                    errors += 1
            else:
                errors += 1

            # Progress output
            if i % args.batch == 0 or i == total:
                elapsed = time.time() - start_time
                rate = i / elapsed if elapsed > 0 else 0
                eta = (total - i) / rate if rate > 0 else 0
                print(
                    f"  [{i}/{total}] Success: {success}, Errors: {errors}, Rate: {rate:.1f}/s, ETA: {eta / 60:.1f}min"
                )

        # Final summary
        elapsed = time.time() - start_time
        print("-" * 50)
        print(f"Completed in {elapsed / 60:.1f} minutes")
        print(f"Success: {success}")
        print(f"Errors: {errors}")

        db.log("INFO", f"Backfill text semantics: {success} success, {errors} errors")

    finally:
        db.disconnect()


if __name__ == "__main__":
    main()
← Übersicht