{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/backfill_text_semantics.py",
"content": "#!\/usr\/bin\/env python3\n\"\"\"\nBackfill Text Semantics for existing chunks.\n\nAnalyzes all chunks that don't have text_semantics yet.\nResume-capable: skips already analyzed chunks.\n\nUsage:\n python backfill_text_semantics.py # Process all pending\n python backfill_text_semantics.py --limit 100 # Process max 100\n python backfill_text_semantics.py --batch 50 # Batch size 50\n python backfill_text_semantics.py --dry-run # Just count, don't process\n\"\"\"\n\nimport argparse\nimport json\nimport sys\nimport time\n\nimport ollama\n\nfrom db import db\n\n# Valid ENUM values for validation\nVALID_STATEMENT_FORMS = {\"assertion\", \"question\", \"command\", \"conditional\"}\nVALID_INTENTS = {\"explain\", \"argue\", \"define\", \"compare\", \"exemplify\", \"warn\", \"instruct\"}\nVALID_FRAMES = {\"theoretical\", \"practical\", \"historical\", \"methodological\", \"critical\"}\nVALID_DISCOURSE_ROLES = {\"thesis\", \"evidence\", \"example\", \"counter\", \"summary\", \"definition\"}\n\nPROMPT_TEMPLATE = \"\"\"Analysiere den folgenden Text semantisch.\n\nBestimme:\n1. statement_form: Ist es eine Aussage (assertion), Frage (question), Aufforderung (command) oder Bedingung (conditional)?\n2. intent: Was ist die Absicht? explain, argue, define, compare, exemplify, warn, instruct\n3. frame: Welcher Rahmen? theoretical, practical, historical, methodological, critical\n4. is_negated: Wird etwas verneint? true\/false\n5. discourse_role: Welche Rolle im Diskurs? thesis, evidence, example, counter, summary, definition\n\nAntworte NUR mit gültigem JSON:\n{{\n \"statement_form\": \"assertion|question|command|conditional\",\n \"intent\": \"explain|argue|define|compare|exemplify|warn|instruct\",\n \"frame\": \"theoretical|practical|historical|methodological|critical\",\n \"is_negated\": false,\n \"discourse_role\": \"thesis|evidence|example|counter|summary|definition\"\n}}\n\nText:\n{content}\"\"\"\n\n\ndef validate_and_fix(data: dict) -> dict:\n \"\"\"Validate and fix ENUM values from LLM response.\"\"\"\n # statement_form\n sf = data.get(\"statement_form\", \"\").lower().strip()\n if sf not in VALID_STATEMENT_FORMS:\n if \"frage\" in sf or \"question\" in sf or sf.endswith(\"?\"):\n sf = \"question\"\n elif \"befehl\" in sf or \"command\" in sf or \"aufford\" in sf:\n sf = \"command\"\n elif \"bedingun\" in sf or \"condition\" in sf or \"wenn\" in sf:\n sf = \"conditional\"\n else:\n sf = \"assertion\"\n data[\"statement_form\"] = sf\n\n # intent\n intent = data.get(\"intent\", \"\").lower().strip()\n if intent not in VALID_INTENTS:\n if \"erklär\" in intent or \"explain\" in intent:\n intent = \"explain\"\n elif \"argument\" in intent or \"argue\" in intent:\n intent = \"argue\"\n elif \"defini\" in intent or \"define\" in intent:\n intent = \"define\"\n elif \"vergleich\" in intent or \"compare\" in intent:\n intent = \"compare\"\n elif \"beispiel\" in intent or \"example\" in intent or \"exemplify\" in intent:\n intent = \"exemplify\"\n elif \"warn\" in intent:\n intent = \"warn\"\n elif \"instruc\" in intent or \"anleit\" in intent:\n intent = \"instruct\"\n else:\n intent = \"explain\"\n data[\"intent\"] = intent\n\n # frame\n frame = data.get(\"frame\", \"\").lower().strip()\n if frame not in VALID_FRAMES:\n if \"theor\" in frame:\n frame = \"theoretical\"\n elif \"prakt\" in frame or \"practic\" in frame:\n frame = \"practical\"\n elif \"histor\" in frame:\n frame = \"historical\"\n elif \"method\" in frame:\n frame = \"methodological\"\n elif \"krit\" in frame or \"critic\" in frame:\n frame = \"critical\"\n else:\n frame = \"theoretical\"\n data[\"frame\"] = frame\n\n # discourse_role\n role = data.get(\"discourse_role\", \"\").lower().strip()\n if role not in VALID_DISCOURSE_ROLES:\n if \"these\" in role or \"thesis\" in role:\n role = \"thesis\"\n elif \"evidence\" in role or \"beleg\" in role or \"beweis\" in role:\n role = \"evidence\"\n elif \"beispiel\" in role or \"example\" in role:\n role = \"example\"\n elif \"gegen\" in role or \"counter\" in role:\n role = \"counter\"\n elif \"zusammen\" in role or \"summary\" in role:\n role = \"summary\"\n elif \"definition\" in role:\n role = \"definition\"\n else:\n role = \"evidence\"\n data[\"discourse_role\"] = role\n\n # is_negated\n negated = data.get(\"is_negated\", False)\n if isinstance(negated, str):\n negated = negated.lower() in (\"true\", \"1\", \"yes\", \"ja\", \"wahr\")\n data[\"is_negated\"] = bool(negated)\n\n return data\n\n\ndef get_pending_chunks(limit: int = 0) -> list:\n \"\"\"Get chunks without text semantics.\"\"\"\n sql = \"\"\"\n SELECT c.id, c.content, c.document_id\n FROM chunks c\n LEFT JOIN chunk_text_semantics cts ON c.id = cts.chunk_id\n WHERE cts.id IS NULL\n ORDER BY c.id\n \"\"\"\n if limit > 0:\n sql += f\" LIMIT {limit}\"\n\n cursor = db.execute(sql)\n chunks = cursor.fetchall()\n cursor.close()\n return list(chunks)\n\n\ndef analyze_chunk(chunk: dict, model: str = \"mistral\") -> dict | None:\n \"\"\"Analyze a single chunk with Ollama.\"\"\"\n try:\n prompt = PROMPT_TEMPLATE.format(content=chunk[\"content\"][:2000])\n\n response = ollama.generate(\n model=model,\n prompt=prompt,\n options={\"num_predict\": 200},\n )\n\n response_text = response[\"response\"].strip()\n\n # Extract JSON\n start = response_text.find(\"{\")\n end = response_text.rfind(\"}\") + 1\n\n if start >= 0 and end > start:\n json_str = response_text[start:end]\n data = json.loads(json_str)\n data = validate_and_fix(data)\n data[\"model_used\"] = model\n return data\n\n except Exception as e:\n db.log(\"WARNING\", f\"Backfill: Text semantic analysis failed for chunk {chunk['id']}: {e}\")\n\n return None\n\n\ndef store_semantics(chunk_id: int, semantics: dict) -> bool:\n \"\"\"Store text semantics to database.\"\"\"\n try:\n cursor = db.execute(\n \"\"\"INSERT INTO chunk_text_semantics\n (chunk_id, statement_form, intent, frame, is_negated,\n discourse_role, model_used)\n VALUES (%s, %s, %s, %s, %s, %s, %s)\n ON DUPLICATE KEY UPDATE\n statement_form = VALUES(statement_form),\n intent = VALUES(intent),\n frame = VALUES(frame),\n is_negated = VALUES(is_negated),\n discourse_role = VALUES(discourse_role),\n model_used = VALUES(model_used),\n updated_at = NOW()\"\"\",\n (\n chunk_id,\n semantics.get(\"statement_form\"),\n semantics.get(\"intent\"),\n semantics.get(\"frame\"),\n semantics.get(\"is_negated\", False),\n semantics.get(\"discourse_role\"),\n semantics.get(\"model_used\"),\n ),\n )\n db.commit()\n cursor.close()\n return True\n except Exception as e:\n db.log(\"ERROR\", f\"Backfill: Failed to store text semantics for chunk {chunk_id}: {e}\")\n return False\n\n\ndef main():\n \"\"\"Main entry point.\"\"\"\n parser = argparse.ArgumentParser(description=\"Backfill Text Semantics\")\n parser.add_argument(\"--limit\", type=int, default=0, help=\"Max chunks to process (0=all)\")\n parser.add_argument(\"--batch\", type=int, default=50, help=\"Batch size for progress output\")\n parser.add_argument(\"--model\", default=\"mistral\", help=\"Ollama model to use\")\n parser.add_argument(\"--dry-run\", action=\"store_true\", help=\"Just count, don't process\")\n\n args = parser.parse_args()\n\n db.connect()\n\n try:\n # Get pending chunks\n chunks = get_pending_chunks(args.limit)\n total = len(chunks)\n\n print(f\"Text Semantics Backfill\")\n print(f\"=\" * 50)\n print(f\"Pending chunks: {total}\")\n print(f\"Model: {args.model}\")\n print(f\"Batch size: {args.batch}\")\n\n if args.dry_run:\n print(\"\\nDry run - no processing\")\n return\n\n if total == 0:\n print(\"\\nNo pending chunks - all done!\")\n return\n\n print(f\"\\nStarting analysis...\")\n print(\"-\" * 50)\n\n success = 0\n errors = 0\n start_time = time.time()\n\n for i, chunk in enumerate(chunks, 1):\n # Analyze\n semantics = analyze_chunk(chunk, args.model)\n\n if semantics:\n if store_semantics(chunk[\"id\"], semantics):\n success += 1\n else:\n errors += 1\n else:\n errors += 1\n\n # Progress output\n if i % args.batch == 0 or i == total:\n elapsed = time.time() - start_time\n rate = i \/ elapsed if elapsed > 0 else 0\n eta = (total - i) \/ rate if rate > 0 else 0\n print(f\" [{i}\/{total}] Success: {success}, Errors: {errors}, \"\n f\"Rate: {rate:.1f}\/s, ETA: {eta\/60:.1f}min\")\n\n # Final summary\n elapsed = time.time() - start_time\n print(\"-\" * 50)\n print(f\"Completed in {elapsed\/60:.1f} minutes\")\n print(f\"Success: {success}\")\n print(f\"Errors: {errors}\")\n\n db.log(\"INFO\", f\"Backfill text semantics: {success} success, {errors} errors\")\n\n finally:\n db.disconnect()\n\n\nif __name__ == \"__main__\":\n main()\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/scripts\/pipeline\/backfill_text_semantics.py",
"content": "#!\/usr\/bin\/env python3\n\"\"\"\nBackfill Text Semantics for existing chunks.\n\nAnalyzes all chunks that don't have text_semantics yet.\nResume-capable: skips already analyzed chunks.\n\nUsage:\n python backfill_text_semantics.py # Process all pending\n python backfill_text_semantics.py --limit 100 # Process max 100\n python backfill_text_semantics.py --batch 50 # Batch size 50\n python backfill_text_semantics.py --dry-run # Just count, don't process\n\"\"\"\n\nimport argparse\nimport json\nimport sys\nimport time\n\nimport ollama\n\nfrom db import db\n\n# Valid ENUM values for validation\nVALID_STATEMENT_FORMS = {\"assertion\", \"question\", \"command\", \"conditional\"}\nVALID_INTENTS = {\"explain\", \"argue\", \"define\", \"compare\", \"exemplify\", \"warn\", \"instruct\"}\nVALID_FRAMES = {\"theoretical\", \"practical\", \"historical\", \"methodological\", \"critical\"}\nVALID_DISCOURSE_ROLES = {\"thesis\", \"evidence\", \"example\", \"counter\", \"summary\", \"definition\"}\n\nPROMPT_TEMPLATE = \"\"\"Analysiere den folgenden Text semantisch.\n\nBestimme:\n1. statement_form: Ist es eine Aussage (assertion), Frage (question), Aufforderung (command) oder Bedingung (conditional)?\n2. intent: Was ist die Absicht? explain, argue, define, compare, exemplify, warn, instruct\n3. frame: Welcher Rahmen? theoretical, practical, historical, methodological, critical\n4. is_negated: Wird etwas verneint? true\/false\n5. discourse_role: Welche Rolle im Diskurs? thesis, evidence, example, counter, summary, definition\n\nAntworte NUR mit gültigem JSON:\n{{\n \"statement_form\": \"assertion|question|command|conditional\",\n \"intent\": \"explain|argue|define|compare|exemplify|warn|instruct\",\n \"frame\": \"theoretical|practical|historical|methodological|critical\",\n \"is_negated\": false,\n \"discourse_role\": \"thesis|evidence|example|counter|summary|definition\"\n}}\n\nText:\n{content}\"\"\"\n\n\ndef validate_and_fix(data: dict) -> dict:\n \"\"\"Validate and fix ENUM values from LLM response.\"\"\"\n # statement_form\n sf = data.get(\"statement_form\", \"\").lower().strip()\n if sf not in VALID_STATEMENT_FORMS:\n if \"frage\" in sf or \"question\" in sf or sf.endswith(\"?\"):\n sf = \"question\"\n elif \"befehl\" in sf or \"command\" in sf or \"aufford\" in sf:\n sf = \"command\"\n elif \"bedingun\" in sf or \"condition\" in sf or \"wenn\" in sf:\n sf = \"conditional\"\n else:\n sf = \"assertion\"\n data[\"statement_form\"] = sf\n\n # intent\n intent = data.get(\"intent\", \"\").lower().strip()\n if intent not in VALID_INTENTS:\n if \"erklär\" in intent or \"explain\" in intent:\n intent = \"explain\"\n elif \"argument\" in intent or \"argue\" in intent:\n intent = \"argue\"\n elif \"defini\" in intent or \"define\" in intent:\n intent = \"define\"\n elif \"vergleich\" in intent or \"compare\" in intent:\n intent = \"compare\"\n elif \"beispiel\" in intent or \"example\" in intent or \"exemplify\" in intent:\n intent = \"exemplify\"\n elif \"warn\" in intent:\n intent = \"warn\"\n elif \"instruc\" in intent or \"anleit\" in intent:\n intent = \"instruct\"\n else:\n intent = \"explain\"\n data[\"intent\"] = intent\n\n # frame\n frame = data.get(\"frame\", \"\").lower().strip()\n if frame not in VALID_FRAMES:\n if \"theor\" in frame:\n frame = \"theoretical\"\n elif \"prakt\" in frame or \"practic\" in frame:\n frame = \"practical\"\n elif \"histor\" in frame:\n frame = \"historical\"\n elif \"method\" in frame:\n frame = \"methodological\"\n elif \"krit\" in frame or \"critic\" in frame:\n frame = \"critical\"\n else:\n frame = \"theoretical\"\n data[\"frame\"] = frame\n\n # discourse_role\n role = data.get(\"discourse_role\", \"\").lower().strip()\n if role not in VALID_DISCOURSE_ROLES:\n if \"these\" in role or \"thesis\" in role:\n role = \"thesis\"\n elif \"evidence\" in role or \"beleg\" in role or \"beweis\" in role:\n role = \"evidence\"\n elif \"beispiel\" in role or \"example\" in role:\n role = \"example\"\n elif \"gegen\" in role or \"counter\" in role:\n role = \"counter\"\n elif \"zusammen\" in role or \"summary\" in role:\n role = \"summary\"\n elif \"definition\" in role:\n role = \"definition\"\n else:\n role = \"evidence\"\n data[\"discourse_role\"] = role\n\n # is_negated\n negated = data.get(\"is_negated\", False)\n if isinstance(negated, str):\n negated = negated.lower() in (\"true\", \"1\", \"yes\", \"ja\", \"wahr\")\n data[\"is_negated\"] = bool(negated)\n\n return data\n\n\ndef get_pending_chunks(limit: int = 0) -> list:\n \"\"\"Get chunks without text semantics.\"\"\"\n sql = \"\"\"\n SELECT c.id, c.content, c.document_id\n FROM chunks c\n LEFT JOIN chunk_text_semantics cts ON c.id = cts.chunk_id\n WHERE cts.id IS NULL\n ORDER BY c.id\n \"\"\"\n if limit > 0:\n sql += f\" LIMIT {limit}\"\n\n cursor = db.execute(sql)\n chunks = cursor.fetchall()\n cursor.close()\n return list(chunks)\n\n\ndef analyze_chunk(chunk: dict, model: str = \"mistral\") -> dict | None:\n \"\"\"Analyze a single chunk with Ollama.\"\"\"\n try:\n prompt = PROMPT_TEMPLATE.format(content=chunk[\"content\"][:2000])\n\n response = ollama.generate(\n model=model,\n prompt=prompt,\n options={\"num_predict\": 200},\n )\n\n response_text = response[\"response\"].strip()\n\n # Extract JSON\n start = response_text.find(\"{\")\n end = response_text.rfind(\"}\") + 1\n\n if start >= 0 and end > start:\n json_str = response_text[start:end]\n data = json.loads(json_str)\n data = validate_and_fix(data)\n data[\"model_used\"] = model\n return data\n\n except Exception as e:\n db.log(\"WARNING\", f\"Backfill: Text semantic analysis failed for chunk {chunk['id']}: {e}\")\n\n return None\n\n\ndef store_semantics(chunk_id: int, semantics: dict) -> bool:\n \"\"\"Store text semantics to database.\"\"\"\n try:\n cursor = db.execute(\n \"\"\"INSERT INTO chunk_text_semantics\n (chunk_id, statement_form, intent, frame, is_negated,\n discourse_role, model_used)\n VALUES (%s, %s, %s, %s, %s, %s, %s)\n ON DUPLICATE KEY UPDATE\n statement_form = VALUES(statement_form),\n intent = VALUES(intent),\n frame = VALUES(frame),\n is_negated = VALUES(is_negated),\n discourse_role = VALUES(discourse_role),\n model_used = VALUES(model_used),\n updated_at = NOW()\"\"\",\n (\n chunk_id,\n semantics.get(\"statement_form\"),\n semantics.get(\"intent\"),\n semantics.get(\"frame\"),\n semantics.get(\"is_negated\", False),\n semantics.get(\"discourse_role\"),\n semantics.get(\"model_used\"),\n ),\n )\n db.commit()\n cursor.close()\n return True\n except Exception as e:\n db.log(\"ERROR\", f\"Backfill: Failed to store text semantics for chunk {chunk_id}: {e}\")\n return False\n\n\ndef main():\n \"\"\"Main entry point.\"\"\"\n parser = argparse.ArgumentParser(description=\"Backfill Text Semantics\")\n parser.add_argument(\"--limit\", type=int, default=0, help=\"Max chunks to process (0=all)\")\n parser.add_argument(\"--batch\", type=int, default=50, help=\"Batch size for progress output\")\n parser.add_argument(\"--model\", default=\"mistral\", help=\"Ollama model to use\")\n parser.add_argument(\"--dry-run\", action=\"store_true\", help=\"Just count, don't process\")\n\n args = parser.parse_args()\n\n db.connect()\n\n try:\n # Get pending chunks\n chunks = get_pending_chunks(args.limit)\n total = len(chunks)\n\n print(f\"Text Semantics Backfill\")\n print(f\"=\" * 50)\n print(f\"Pending chunks: {total}\")\n print(f\"Model: {args.model}\")\n print(f\"Batch size: {args.batch}\")\n\n if args.dry_run:\n print(\"\\nDry run - no processing\")\n return\n\n if total == 0:\n print(\"\\nNo pending chunks - all done!\")\n return\n\n print(f\"\\nStarting analysis...\")\n print(\"-\" * 50)\n\n success = 0\n errors = 0\n start_time = time.time()\n\n for i, chunk in enumerate(chunks, 1):\n # Analyze\n semantics = analyze_chunk(chunk, args.model)\n\n if semantics:\n if store_semantics(chunk[\"id\"], semantics):\n success += 1\n else:\n errors += 1\n else:\n errors += 1\n\n # Progress output\n if i % args.batch == 0 or i == total:\n elapsed = time.time() - start_time\n rate = i \/ elapsed if elapsed > 0 else 0\n eta = (total - i) \/ rate if rate > 0 else 0\n print(f\" [{i}\/{total}] Success: {success}, Errors: {errors}, \"\n f\"Rate: {rate:.1f}\/s, ETA: {eta\/60:.1f}min\")\n\n # Final summary\n elapsed = time.time() - start_time\n print(\"-\" * 50)\n print(f\"Completed in {elapsed\/60:.1f} minutes\")\n print(f\"Success: {success}\")\n print(f\"Errors: {errors}\")\n\n db.log(\"INFO\", f\"Backfill text semantics: {success} success, {errors} errors\")\n\n finally:\n db.disconnect()\n\n\nif __name__ == \"__main__\":\n main()\n",
"structuredPatch": [],
"originalFile": null
}
}