#!/usr/bin/env python3
"""
Backfill Text Semantics for existing chunks.
Analyzes all chunks that don't have text_semantics yet.
Resume-capable: skips already analyzed chunks.
Usage:
python backfill_text_semantics.py # Process all pending
python backfill_text_semantics.py --limit 100 # Process max 100
python backfill_text_semantics.py --batch 50 # Batch size 50
python backfill_text_semantics.py --dry-run # Just count, don't process
"""
import argparse
import json
import time
import ollama
from db import db
from json_utils import extract_json
# Pipeline-ID für Wissenschaftliche Pipeline
DEFAULT_PIPELINE_ID = 5
def get_pipeline_model(step_type: str, pipeline_id: int = DEFAULT_PIPELINE_ID) -> str:
"""Get model from pipeline_steps config - NO HARDCODED DEFAULTS."""
cursor = db.execute(
"""SELECT config FROM pipeline_steps
WHERE pipeline_id = %s AND step_type = %s AND enabled = 1
LIMIT 1""",
(pipeline_id, step_type),
)
row = cursor.fetchone()
cursor.close()
if row and row.get("config"):
try:
config = json.loads(row["config"])
model = config.get("model")
if model:
return model
except json.JSONDecodeError:
pass
raise ValueError(f"No model configured for step_type={step_type} in pipeline {pipeline_id}")
# Valid ENUM values for validation
VALID_STATEMENT_FORMS = {"assertion", "question", "command", "conditional"}
VALID_INTENTS = {"explain", "argue", "define", "compare", "exemplify", "warn", "instruct"}
VALID_FRAMES = {"theoretical", "practical", "historical", "methodological", "critical"}
VALID_DISCOURSE_ROLES = {"thesis", "evidence", "example", "counter", "summary", "definition"}
PROMPT_TEMPLATE = """Analysiere den folgenden Text semantisch.
Bestimme:
1. statement_form: Ist es eine Aussage (assertion), Frage (question), Aufforderung (command) oder Bedingung (conditional)?
2. intent: Was ist die Absicht? explain, argue, define, compare, exemplify, warn, instruct
3. frame: Welcher Rahmen? theoretical, practical, historical, methodological, critical
4. is_negated: Wird etwas verneint? true/false
5. discourse_role: Welche Rolle im Diskurs? thesis, evidence, example, counter, summary, definition
Antworte NUR mit gültigem JSON:
{{
"statement_form": "assertion|question|command|conditional",
"intent": "explain|argue|define|compare|exemplify|warn|instruct",
"frame": "theoretical|practical|historical|methodological|critical",
"is_negated": false,
"discourse_role": "thesis|evidence|example|counter|summary|definition"
}}
Text:
{content}"""
def validate_and_fix(data: dict) -> dict:
"""Validate and fix ENUM values from LLM response."""
# statement_form
sf = data.get("statement_form", "").lower().strip()
if sf not in VALID_STATEMENT_FORMS:
if "frage" in sf or "question" in sf or sf.endswith("?"):
sf = "question"
elif "befehl" in sf or "command" in sf or "aufford" in sf:
sf = "command"
elif "bedingun" in sf or "condition" in sf or "wenn" in sf:
sf = "conditional"
else:
sf = "assertion"
data["statement_form"] = sf
# intent
intent = data.get("intent", "").lower().strip()
if intent not in VALID_INTENTS:
if "erklär" in intent or "explain" in intent:
intent = "explain"
elif "argument" in intent or "argue" in intent:
intent = "argue"
elif "defini" in intent or "define" in intent:
intent = "define"
elif "vergleich" in intent or "compare" in intent:
intent = "compare"
elif "beispiel" in intent or "example" in intent or "exemplify" in intent:
intent = "exemplify"
elif "warn" in intent:
intent = "warn"
elif "instruc" in intent or "anleit" in intent:
intent = "instruct"
else:
intent = "explain"
data["intent"] = intent
# frame
frame = data.get("frame", "").lower().strip()
if frame not in VALID_FRAMES:
if "theor" in frame:
frame = "theoretical"
elif "prakt" in frame or "practic" in frame:
frame = "practical"
elif "histor" in frame:
frame = "historical"
elif "method" in frame:
frame = "methodological"
elif "krit" in frame or "critic" in frame:
frame = "critical"
else:
frame = "theoretical"
data["frame"] = frame
# discourse_role
role = data.get("discourse_role", "").lower().strip()
if role not in VALID_DISCOURSE_ROLES:
if "these" in role or "thesis" in role:
role = "thesis"
elif "evidence" in role or "beleg" in role or "beweis" in role:
role = "evidence"
elif "beispiel" in role or "example" in role:
role = "example"
elif "gegen" in role or "counter" in role:
role = "counter"
elif "zusammen" in role or "summary" in role:
role = "summary"
elif "definition" in role:
role = "definition"
else:
role = "evidence"
data["discourse_role"] = role
# is_negated
negated = data.get("is_negated", False)
if isinstance(negated, str):
negated = negated.lower() in ("true", "1", "yes", "ja", "wahr")
data["is_negated"] = bool(negated)
return data
def get_pending_chunks(limit: int = 0) -> list:
"""Get chunks without text semantics."""
sql = """
SELECT c.id, c.content, c.document_id
FROM chunks c
LEFT JOIN chunk_text_semantics cts ON c.id = cts.chunk_id
WHERE cts.id IS NULL
ORDER BY c.id
"""
if limit > 0:
sql += f" LIMIT {limit}"
cursor = db.execute(sql)
chunks = cursor.fetchall()
cursor.close()
return list(chunks)
def analyze_chunk(chunk: dict, model: str) -> dict | None:
"""Analyze a single chunk with Ollama."""
try:
prompt = PROMPT_TEMPLATE.format(content=chunk["content"][:2000])
response = ollama.generate(
model=model,
prompt=prompt,
options={"num_predict": 200},
)
response_text = response["response"].strip()
# Robuste JSON-Extraktion
data = extract_json(response_text)
if data:
data = validate_and_fix(data)
data["model_used"] = model
return data
except Exception as e:
db.log("WARNING", f"Backfill: Text semantic analysis failed for chunk {chunk['id']}: {e}")
return None
def store_semantics(chunk_id: int, semantics: dict) -> bool:
"""Store text semantics to database."""
try:
cursor = db.execute(
"""INSERT INTO chunk_text_semantics
(chunk_id, statement_form, intent, frame, is_negated,
discourse_role, model_used)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
statement_form = VALUES(statement_form),
intent = VALUES(intent),
frame = VALUES(frame),
is_negated = VALUES(is_negated),
discourse_role = VALUES(discourse_role),
model_used = VALUES(model_used),
updated_at = NOW()""",
(
chunk_id,
semantics.get("statement_form"),
semantics.get("intent"),
semantics.get("frame"),
semantics.get("is_negated", False),
semantics.get("discourse_role"),
semantics.get("model_used"),
),
)
db.commit()
cursor.close()
return True
except Exception as e:
db.log("ERROR", f"Backfill: Failed to store text semantics for chunk {chunk_id}: {e}")
return False
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Backfill Text Semantics")
parser.add_argument("--limit", type=int, default=0, help="Max chunks to process (0=all)")
parser.add_argument("--batch", type=int, default=50, help="Batch size for progress output")
parser.add_argument("--model", default=None, help="Override pipeline model (reads from pipeline_steps if not set)")
parser.add_argument("--pipeline-id", type=int, default=DEFAULT_PIPELINE_ID, help="Pipeline ID to read config from")
parser.add_argument("--dry-run", action="store_true", help="Just count, don't process")
args = parser.parse_args()
db.connect()
try:
# Get model from pipeline config if not overridden
if args.model is None:
args.model = get_pipeline_model("text_semantic_analyze", args.pipeline_id)
print(f"[Config] Model from pipeline {args.pipeline_id}: {args.model}")
# Get pending chunks
chunks = get_pending_chunks(args.limit)
total = len(chunks)
print("Text Semantics Backfill")
print("=" * 50)
print(f"Pending chunks: {total}")
print(f"Model: {args.model} (from pipeline_steps)")
print(f"Batch size: {args.batch}")
if args.dry_run:
print("\nDry run - no processing")
return
if total == 0:
print("\nNo pending chunks - all done!")
return
print("\nStarting analysis...")
print("-" * 50)
success = 0
errors = 0
start_time = time.time()
for i, chunk in enumerate(chunks, 1):
# Analyze
semantics = analyze_chunk(chunk, args.model)
if semantics:
if store_semantics(chunk["id"], semantics):
success += 1
else:
errors += 1
else:
errors += 1
# Progress output
if i % args.batch == 0 or i == total:
elapsed = time.time() - start_time
rate = i / elapsed if elapsed > 0 else 0
eta = (total - i) / rate if rate > 0 else 0
print(
f" [{i}/{total}] Success: {success}, Errors: {errors}, Rate: {rate:.1f}/s, ETA: {eta / 60:.1f}min"
)
# Final summary
elapsed = time.time() - start_time
print("-" * 50)
print(f"Completed in {elapsed / 60:.1f} minutes")
print(f"Success: {success}")
print(f"Errors: {errors}")
db.log("INFO", f"Backfill text semantics: {success} success, {errors} errors")
finally:
db.disconnect()
if __name__ == "__main__":
main()