{
"tool_response": {
"filePath": "\/var\/www\/scripts\/pipeline\/backfill_text_semantics.py",
"oldString": "import argparse\nimport json\nimport time\n\nimport ollama\n\nfrom db import db",
"newString": "import argparse\nimport json\nimport time\n\nimport ollama\n\nfrom db import db\nfrom json_utils import extract_json",
"originalFile": "#!\/usr\/bin\/env python3\n\"\"\"\nBackfill Text Semantics for existing chunks.\n\nAnalyzes all chunks that don't have text_semantics yet.\nResume-capable: skips already analyzed chunks.\n\nUsage:\n python backfill_text_semantics.py # Process all pending\n python backfill_text_semantics.py --limit 100 # Process max 100\n python backfill_text_semantics.py --batch 50 # Batch size 50\n python backfill_text_semantics.py --dry-run # Just count, don't process\n\"\"\"\n\nimport argparse\nimport json\nimport time\n\nimport ollama\n\nfrom db import db\n\n# Pipeline-ID für Wissenschaftliche Pipeline\nDEFAULT_PIPELINE_ID = 5\n\n\ndef get_pipeline_model(step_type: str, pipeline_id: int = DEFAULT_PIPELINE_ID) -> str:\n \"\"\"Get model from pipeline_steps config - NO HARDCODED DEFAULTS.\"\"\"\n cursor = db.execute(\n \"\"\"SELECT config FROM pipeline_steps\n WHERE pipeline_id = %s AND step_type = %s AND enabled = 1\n LIMIT 1\"\"\",\n (pipeline_id, step_type),\n )\n row = cursor.fetchone()\n cursor.close()\n\n if row and row.get(\"config\"):\n try:\n config = json.loads(row[\"config\"])\n model = config.get(\"model\")\n if model:\n return model\n except json.JSONDecodeError:\n pass\n\n raise ValueError(f\"No model configured for step_type={step_type} in pipeline {pipeline_id}\")\n\n\n# Valid ENUM values for validation\nVALID_STATEMENT_FORMS = {\"assertion\", \"question\", \"command\", \"conditional\"}\nVALID_INTENTS = {\"explain\", \"argue\", \"define\", \"compare\", \"exemplify\", \"warn\", \"instruct\"}\nVALID_FRAMES = {\"theoretical\", \"practical\", \"historical\", \"methodological\", \"critical\"}\nVALID_DISCOURSE_ROLES = {\"thesis\", \"evidence\", \"example\", \"counter\", \"summary\", \"definition\"}\n\nPROMPT_TEMPLATE = \"\"\"Analysiere den folgenden Text semantisch.\n\nBestimme:\n1. statement_form: Ist es eine Aussage (assertion), Frage (question), Aufforderung (command) oder Bedingung (conditional)?\n2. intent: Was ist die Absicht? explain, argue, define, compare, exemplify, warn, instruct\n3. frame: Welcher Rahmen? theoretical, practical, historical, methodological, critical\n4. is_negated: Wird etwas verneint? true\/false\n5. discourse_role: Welche Rolle im Diskurs? thesis, evidence, example, counter, summary, definition\n\nAntworte NUR mit gültigem JSON:\n{{\n \"statement_form\": \"assertion|question|command|conditional\",\n \"intent\": \"explain|argue|define|compare|exemplify|warn|instruct\",\n \"frame\": \"theoretical|practical|historical|methodological|critical\",\n \"is_negated\": false,\n \"discourse_role\": \"thesis|evidence|example|counter|summary|definition\"\n}}\n\nText:\n{content}\"\"\"\n\n\ndef validate_and_fix(data: dict) -> dict:\n \"\"\"Validate and fix ENUM values from LLM response.\"\"\"\n # statement_form\n sf = data.get(\"statement_form\", \"\").lower().strip()\n if sf not in VALID_STATEMENT_FORMS:\n if \"frage\" in sf or \"question\" in sf or sf.endswith(\"?\"):\n sf = \"question\"\n elif \"befehl\" in sf or \"command\" in sf or \"aufford\" in sf:\n sf = \"command\"\n elif \"bedingun\" in sf or \"condition\" in sf or \"wenn\" in sf:\n sf = \"conditional\"\n else:\n sf = \"assertion\"\n data[\"statement_form\"] = sf\n\n # intent\n intent = data.get(\"intent\", \"\").lower().strip()\n if intent not in VALID_INTENTS:\n if \"erklär\" in intent or \"explain\" in intent:\n intent = \"explain\"\n elif \"argument\" in intent or \"argue\" in intent:\n intent = \"argue\"\n elif \"defini\" in intent or \"define\" in intent:\n intent = \"define\"\n elif \"vergleich\" in intent or \"compare\" in intent:\n intent = \"compare\"\n elif \"beispiel\" in intent or \"example\" in intent or \"exemplify\" in intent:\n intent = \"exemplify\"\n elif \"warn\" in intent:\n intent = \"warn\"\n elif \"instruc\" in intent or \"anleit\" in intent:\n intent = \"instruct\"\n else:\n intent = \"explain\"\n data[\"intent\"] = intent\n\n # frame\n frame = data.get(\"frame\", \"\").lower().strip()\n if frame not in VALID_FRAMES:\n if \"theor\" in frame:\n frame = \"theoretical\"\n elif \"prakt\" in frame or \"practic\" in frame:\n frame = \"practical\"\n elif \"histor\" in frame:\n frame = \"historical\"\n elif \"method\" in frame:\n frame = \"methodological\"\n elif \"krit\" in frame or \"critic\" in frame:\n frame = \"critical\"\n else:\n frame = \"theoretical\"\n data[\"frame\"] = frame\n\n # discourse_role\n role = data.get(\"discourse_role\", \"\").lower().strip()\n if role not in VALID_DISCOURSE_ROLES:\n if \"these\" in role or \"thesis\" in role:\n role = \"thesis\"\n elif \"evidence\" in role or \"beleg\" in role or \"beweis\" in role:\n role = \"evidence\"\n elif \"beispiel\" in role or \"example\" in role:\n role = \"example\"\n elif \"gegen\" in role or \"counter\" in role:\n role = \"counter\"\n elif \"zusammen\" in role or \"summary\" in role:\n role = \"summary\"\n elif \"definition\" in role:\n role = \"definition\"\n else:\n role = \"evidence\"\n data[\"discourse_role\"] = role\n\n # is_negated\n negated = data.get(\"is_negated\", False)\n if isinstance(negated, str):\n negated = negated.lower() in (\"true\", \"1\", \"yes\", \"ja\", \"wahr\")\n data[\"is_negated\"] = bool(negated)\n\n return data\n\n\ndef get_pending_chunks(limit: int = 0) -> list:\n \"\"\"Get chunks without text semantics.\"\"\"\n sql = \"\"\"\n SELECT c.id, c.content, c.document_id\n FROM chunks c\n LEFT JOIN chunk_text_semantics cts ON c.id = cts.chunk_id\n WHERE cts.id IS NULL\n ORDER BY c.id\n \"\"\"\n if limit > 0:\n sql += f\" LIMIT {limit}\"\n\n cursor = db.execute(sql)\n chunks = cursor.fetchall()\n cursor.close()\n return list(chunks)\n\n\ndef analyze_chunk(chunk: dict, model: str = \"mistral\") -> dict | None:\n \"\"\"Analyze a single chunk with Ollama.\"\"\"\n try:\n prompt = PROMPT_TEMPLATE.format(content=chunk[\"content\"][:2000])\n\n response = ollama.generate(\n model=model,\n prompt=prompt,\n options={\"num_predict\": 200},\n )\n\n response_text = response[\"response\"].strip()\n\n # Extract JSON\n start = response_text.find(\"{\")\n end = response_text.rfind(\"}\") + 1\n\n if start >= 0 and end > start:\n json_str = response_text[start:end]\n data = json.loads(json_str)\n data = validate_and_fix(data)\n data[\"model_used\"] = model\n return data\n\n except Exception as e:\n db.log(\"WARNING\", f\"Backfill: Text semantic analysis failed for chunk {chunk['id']}: {e}\")\n\n return None\n\n\ndef store_semantics(chunk_id: int, semantics: dict) -> bool:\n \"\"\"Store text semantics to database.\"\"\"\n try:\n cursor = db.execute(\n \"\"\"INSERT INTO chunk_text_semantics\n (chunk_id, statement_form, intent, frame, is_negated,\n discourse_role, model_used)\n VALUES (%s, %s, %s, %s, %s, %s, %s)\n ON DUPLICATE KEY UPDATE\n statement_form = VALUES(statement_form),\n intent = VALUES(intent),\n frame = VALUES(frame),\n is_negated = VALUES(is_negated),\n discourse_role = VALUES(discourse_role),\n model_used = VALUES(model_used),\n updated_at = NOW()\"\"\",\n (\n chunk_id,\n semantics.get(\"statement_form\"),\n semantics.get(\"intent\"),\n semantics.get(\"frame\"),\n semantics.get(\"is_negated\", False),\n semantics.get(\"discourse_role\"),\n semantics.get(\"model_used\"),\n ),\n )\n db.commit()\n cursor.close()\n return True\n except Exception as e:\n db.log(\"ERROR\", f\"Backfill: Failed to store text semantics for chunk {chunk_id}: {e}\")\n return False\n\n\ndef main():\n \"\"\"Main entry point.\"\"\"\n parser = argparse.ArgumentParser(description=\"Backfill Text Semantics\")\n parser.add_argument(\"--limit\", type=int, default=0, help=\"Max chunks to process (0=all)\")\n parser.add_argument(\"--batch\", type=int, default=50, help=\"Batch size for progress output\")\n parser.add_argument(\"--model\", default=None, help=\"Override pipeline model (reads from pipeline_steps if not set)\")\n parser.add_argument(\"--pipeline-id\", type=int, default=DEFAULT_PIPELINE_ID, help=\"Pipeline ID to read config from\")\n parser.add_argument(\"--dry-run\", action=\"store_true\", help=\"Just count, don't process\")\n\n args = parser.parse_args()\n\n db.connect()\n\n try:\n # Get model from pipeline config if not overridden\n if args.model is None:\n args.model = get_pipeline_model(\"text_semantic_analyze\", args.pipeline_id)\n print(f\"[Config] Model from pipeline {args.pipeline_id}: {args.model}\")\n\n # Get pending chunks\n chunks = get_pending_chunks(args.limit)\n total = len(chunks)\n\n print(\"Text Semantics Backfill\")\n print(\"=\" * 50)\n print(f\"Pending chunks: {total}\")\n print(f\"Model: {args.model} (from pipeline_steps)\")\n print(f\"Batch size: {args.batch}\")\n\n if args.dry_run:\n print(\"\\nDry run - no processing\")\n return\n\n if total == 0:\n print(\"\\nNo pending chunks - all done!\")\n return\n\n print(\"\\nStarting analysis...\")\n print(\"-\" * 50)\n\n success = 0\n errors = 0\n start_time = time.time()\n\n for i, chunk in enumerate(chunks, 1):\n # Analyze\n semantics = analyze_chunk(chunk, args.model)\n\n if semantics:\n if store_semantics(chunk[\"id\"], semantics):\n success += 1\n else:\n errors += 1\n else:\n errors += 1... [TRUNCATED-6ae978ffe6498baf]",
"structuredPatch": [
{
"oldStart": 19,
"oldLines": 6,
"newStart": 19,
"newLines": 7,
"lines": [
" import ollama",
" ",
" from db import db",
"+from json_utils import extract_json",
" ",
" # Pipeline-ID für Wissenschaftliche Pipeline",
" DEFAULT_PIPELINE_ID = 5"
]
}
],
"userModified": false,
"replaceAll": false
}
}