{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/opt\/scripts\/pipeline\/enrich.py",
"content": "#!\/usr\/bin\/env python3\n\"\"\"\nVision-Enrichment module for KI-System Pipeline.\nEnriches chunks with visual context from page-level vision analysis.\n\nUsage:\n python enrich.py <document_id> # Enrich all chunks for a document\n python enrich.py --all # Enrich all documents with vision data\n\"\"\"\n\nimport json\nimport re\nimport sys\n\nfrom db import db\n\n\ndef get_vision_context(document_id, page_number):\n \"\"\"\n Get vision analysis for a specific page.\n\n Returns dict with structured vision info or None.\n \"\"\"\n cursor = db.execute(\n \"\"\"SELECT vision_analysis\n FROM document_pages\n WHERE document_id = %s AND page_number = %s\"\"\",\n (document_id, page_number)\n )\n result = cursor.fetchone()\n cursor.close()\n\n if not result or not result.get(\"vision_analysis\"):\n return None\n\n try:\n vision_data = json.loads(result[\"vision_analysis\"])\n return vision_data\n except (json.JSONDecodeError, TypeError):\n return None\n\n\ndef extract_vision_summary(vision_data):\n \"\"\"\n Extract key information from vision analysis for chunk enrichment.\n\n Returns compact dict with:\n - detected_elements: list of visual elements found\n - page_title: extracted title if any\n - has_images: bool\n - has_charts: bool\n - has_tables: bool\n - layout_type: detected layout style\n - key_topics: extracted key topics\/concepts\n \"\"\"\n if not vision_data:\n return None\n\n analysis_text = vision_data.get(\"analysis\", \"\")\n if not analysis_text:\n return None\n\n summary = {\n \"detected_elements\": [],\n \"page_title\": None,\n \"has_images\": False,\n \"has_charts\": False,\n \"has_tables\": False,\n \"layout_type\": \"standard\",\n \"key_topics\": [],\n \"vision_tokens\": vision_data.get(\"tokens\", 0)\n }\n\n # Detect visual elements\n analysis_lower = analysis_text.lower()\n\n # Check for images\n if any(word in analysis_lower for word in [\"bild\", \"foto\", \"image\", \"abbildung\", \"grafik\"]):\n if \"keine\" not in analysis_lower.split(\"bild\")[0][-20:] if \"bild\" in analysis_lower else True:\n summary[\"has_images\"] = True\n summary[\"detected_elements\"].append(\"images\")\n\n # Check for charts\/diagrams\n if any(word in analysis_lower for word in [\"diagramm\", \"chart\", \"graph\", \"schaubild\"]):\n if \"keine\" not in analysis_lower.split(\"diagramm\")[0][-20:] if \"diagramm\" in analysis_lower else True:\n summary[\"has_charts\"] = True\n summary[\"detected_elements\"].append(\"charts\")\n\n # Check for tables\n if any(word in analysis_lower for word in [\"tabelle\", \"table\", \"übersicht\"]):\n if \"keine\" not in analysis_lower.split(\"tabelle\")[0][-20:] if \"tabelle\" in analysis_lower else True:\n summary[\"has_tables\"] = True\n summary[\"detected_elements\"].append(\"tables\")\n\n # Check for callouts\/highlights\n if any(word in analysis_lower for word in [\"callout\", \"hervorhebung\", \"box\", \"kasten\", \"zitat\"]):\n summary[\"detected_elements\"].append(\"callouts\")\n\n # Extract title (look for patterns like \"Titel: X\" or \"Überschrift: X\")\n title_patterns = [\n r'[\"\\']([^\"\\']{5,60})[\"\\']', # Quoted strings\n r'Titel[:\\s]+[\"\\']?([^\"\\'\\n]{5,60})',\n r'Überschrift[:\\s]+[\"\\']?([^\"\\'\\n]{5,60})',\n ]\n\n for pattern in title_patterns:\n match = re.search(pattern, analysis_text)\n if match:\n potential_title = match.group(1).strip()\n # Filter out common non-titles\n if not any(skip in potential_title.lower() for skip in\n [\"keine\", \"nicht\", \"gibt es\", \"vorhanden\", \"enthält\"]):\n summary[\"page_title\"] = potential_title[:100]\n break\n\n # Detect layout type\n if any(word in analysis_lower for word in [\"zwei spalten\", \"zweispaltig\", \"columns\"]):\n summary[\"layout_type\"] = \"two-column\"\n elif any(word in analysis_lower for word in [\"liste\", \"aufzählung\", \"bullet\"]):\n summary[\"layout_type\"] = \"list\"\n elif any(word in analysis_lower for word in [\"vollbild\", \"full page\", \"ganzseitig\"]):\n summary[\"layout_type\"] = \"full-page\"\n\n # Extract key topics (look for bold\/emphasized terms)\n bold_pattern = r'\\*\\*([^*]+)\\*\\*'\n bold_matches = re.findall(bold_pattern, analysis_text)\n if bold_matches:\n # Filter and dedupe\n topics = []\n seen = set()\n for match in bold_matches[:10]:\n clean = match.strip()\n if len(clean) > 2 and clean.lower() not in seen:\n seen.add(clean.lower())\n topics.append(clean)\n summary[\"key_topics\"] = topics[:5]\n\n return summary\n\n\ndef enrich_chunk(chunk_id, document_id, page_number):\n \"\"\"\n Enrich a single chunk with vision context.\n\n Updates the chunk's metadata with vision information.\n Returns True if enriched, False otherwise.\n \"\"\"\n # Get vision context for the page\n vision_data = get_vision_context(document_id, page_number)\n if not vision_data:\n return False\n\n # Extract summary\n vision_summary = extract_vision_summary(vision_data)\n if not vision_summary:\n return False\n\n # Get current chunk metadata\n cursor = db.execute(\n \"SELECT metadata FROM chunks WHERE id = %s\",\n (chunk_id,)\n )\n result = cursor.fetchone()\n cursor.close()\n\n if not result:\n return False\n\n # Parse existing metadata\n try:\n metadata = json.loads(result[\"metadata\"]) if result[\"metadata\"] else {}\n except (json.JSONDecodeError, TypeError):\n metadata = {}\n\n # Add vision context\n metadata[\"vision\"] = vision_summary\n\n # Update chunk\n db.execute(\n \"UPDATE chunks SET metadata = %s WHERE id = %s\",\n (json.dumps(metadata, ensure_ascii=False), chunk_id)\n )\n db.commit()\n\n return True\n\n\ndef enrich_document_chunks(document_id):\n \"\"\"\n Enrich all chunks for a document with vision context.\n\n Returns dict with statistics.\n \"\"\"\n db.log(\"INFO\", f\"Starting vision enrichment for document {document_id}\")\n\n # Get all chunks with page info\n cursor = db.execute(\n \"\"\"SELECT id, metadata FROM chunks WHERE document_id = %s\"\"\",\n (document_id,)\n )\n chunks = cursor.fetchall()\n cursor.close()\n\n stats = {\n \"total_chunks\": len(chunks),\n \"enriched\": 0,\n \"skipped\": 0,\n \"no_page\": 0\n }\n\n for chunk in chunks:\n chunk_id = chunk[\"id\"]\n\n # Extract page number from metadata\n try:\n metadata = json.loads(chunk[\"metadata\"]) if chunk[\"metadata\"] else {}\n page_number = metadata.get(\"page\")\n except (json.JSONDecodeError, TypeError):\n page_number = None\n\n if not page_number:\n stats[\"no_page\"] += 1\n continue\n\n # Check if already enriched\n if metadata.get(\"vision\"):\n stats[\"skipped\"] += 1\n continue\n\n # Enrich\n if enrich_chunk(chunk_id, document_id, page_number):\n stats[\"enriched\"] += 1\n else:\n stats[\"skipped\"] += 1\n\n db.log(\"INFO\", f\"Enrichment complete: {stats['enriched']}\/{stats['total_chunks']} chunks enriched\")\n return stats\n\n\ndef enrich_all_documents():\n \"\"\"\n Enrich chunks for all documents that have vision analysis.\n \"\"\"\n # Find documents with vision analysis\n cursor = db.execute(\n \"\"\"SELECT DISTINCT d.id, d.filename\n FROM documents d\n INNER JOIN document_pages dp ON d.id = dp.document_id\n WHERE dp.vision_analysis IS NOT NULL\"\"\"\n )\n documents = cursor.fetchall()\n cursor.close()\n\n total_stats = {\n \"documents\": len(documents),\n \"total_enriched\": 0,\n \"total_skipped\": 0\n }\n\n for doc in documents:\n print(f\"Enriching: {doc['filename']}\")\n stats = enrich_document_chunks(doc[\"id\"])\n total_stats[\"total_enriched\"] += stats[\"enriched\"]\n total_stats[\"total_skipped\"] += stats[\"skipped\"]\n\n return total_stats\n\n\ndef run_enrichment_step(document_id):\n \"\"\"\n Run enrichment as a pipeline step.\n\n Args:\n document_id: Document ID to enrich\n\n Returns:\n dict with success status and statistics\n \"\"\"\n try:\n stats = enrich_document_chunks(document_id)\n return {\n \"success\": True,\n **stats\n }\n except Exception as e:\n db.log(\"ERROR\", f\"Enrichment failed: {e}\")\n return {\n \"success\": False,\n \"error\": str(e)\n }\n\n\ndef main():\n \"\"\"CLI entry point.\"\"\"\n if len(sys.argv) < 2:\n print(__doc__)\n return\n\n db.connect()\n\n try:\n if sys.argv[1] == \"--all\":\n print(\"Enriching all documents with vision data...\")\n stats = enrich_all_documents()\n print(f\"\\nTotal: {stats['total_enriched']} chunks enriched across {stats['documents']} documents\")\n else:\n document_id = int(sys.argv[1])\n print(f\"Enriching document {document_id}...\")\n stats = enrich_document_chunks(document_id)\n print(f\"\\nResults:\")\n print(f\" Total chunks: {stats['total_chunks']}\")\n print(f\" Enriched: {stats['enriched']}\")\n print(f\" Skipped: {stats['skipped']}\")\n print(f\" No page info: {stats['no_page']}\")\n finally:\n db.disconnect()\n\n\nif __name__ == \"__main__\":\n main()\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/opt\/scripts\/pipeline\/enrich.py",
"content": "#!\/usr\/bin\/env python3\n\"\"\"\nVision-Enrichment module for KI-System Pipeline.\nEnriches chunks with visual context from page-level vision analysis.\n\nUsage:\n python enrich.py <document_id> # Enrich all chunks for a document\n python enrich.py --all # Enrich all documents with vision data\n\"\"\"\n\nimport json\nimport re\nimport sys\n\nfrom db import db\n\n\ndef get_vision_context(document_id, page_number):\n \"\"\"\n Get vision analysis for a specific page.\n\n Returns dict with structured vision info or None.\n \"\"\"\n cursor = db.execute(\n \"\"\"SELECT vision_analysis\n FROM document_pages\n WHERE document_id = %s AND page_number = %s\"\"\",\n (document_id, page_number)\n )\n result = cursor.fetchone()\n cursor.close()\n\n if not result or not result.get(\"vision_analysis\"):\n return None\n\n try:\n vision_data = json.loads(result[\"vision_analysis\"])\n return vision_data\n except (json.JSONDecodeError, TypeError):\n return None\n\n\ndef extract_vision_summary(vision_data):\n \"\"\"\n Extract key information from vision analysis for chunk enrichment.\n\n Returns compact dict with:\n - detected_elements: list of visual elements found\n - page_title: extracted title if any\n - has_images: bool\n - has_charts: bool\n - has_tables: bool\n - layout_type: detected layout style\n - key_topics: extracted key topics\/concepts\n \"\"\"\n if not vision_data:\n return None\n\n analysis_text = vision_data.get(\"analysis\", \"\")\n if not analysis_text:\n return None\n\n summary = {\n \"detected_elements\": [],\n \"page_title\": None,\n \"has_images\": False,\n \"has_charts\": False,\n \"has_tables\": False,\n \"layout_type\": \"standard\",\n \"key_topics\": [],\n \"vision_tokens\": vision_data.get(\"tokens\", 0)\n }\n\n # Detect visual elements\n analysis_lower = analysis_text.lower()\n\n # Check for images\n if any(word in analysis_lower for word in [\"bild\", \"foto\", \"image\", \"abbildung\", \"grafik\"]):\n if \"keine\" not in analysis_lower.split(\"bild\")[0][-20:] if \"bild\" in analysis_lower else True:\n summary[\"has_images\"] = True\n summary[\"detected_elements\"].append(\"images\")\n\n # Check for charts\/diagrams\n if any(word in analysis_lower for word in [\"diagramm\", \"chart\", \"graph\", \"schaubild\"]):\n if \"keine\" not in analysis_lower.split(\"diagramm\")[0][-20:] if \"diagramm\" in analysis_lower else True:\n summary[\"has_charts\"] = True\n summary[\"detected_elements\"].append(\"charts\")\n\n # Check for tables\n if any(word in analysis_lower for word in [\"tabelle\", \"table\", \"übersicht\"]):\n if \"keine\" not in analysis_lower.split(\"tabelle\")[0][-20:] if \"tabelle\" in analysis_lower else True:\n summary[\"has_tables\"] = True\n summary[\"detected_elements\"].append(\"tables\")\n\n # Check for callouts\/highlights\n if any(word in analysis_lower for word in [\"callout\", \"hervorhebung\", \"box\", \"kasten\", \"zitat\"]):\n summary[\"detected_elements\"].append(\"callouts\")\n\n # Extract title (look for patterns like \"Titel: X\" or \"Überschrift: X\")\n title_patterns = [\n r'[\"\\']([^\"\\']{5,60})[\"\\']', # Quoted strings\n r'Titel[:\\s]+[\"\\']?([^\"\\'\\n]{5,60})',\n r'Überschrift[:\\s]+[\"\\']?([^\"\\'\\n]{5,60})',\n ]\n\n for pattern in title_patterns:\n match = re.search(pattern, analysis_text)\n if match:\n potential_title = match.group(1).strip()\n # Filter out common non-titles\n if not any(skip in potential_title.lower() for skip in\n [\"keine\", \"nicht\", \"gibt es\", \"vorhanden\", \"enthält\"]):\n summary[\"page_title\"] = potential_title[:100]\n break\n\n # Detect layout type\n if any(word in analysis_lower for word in [\"zwei spalten\", \"zweispaltig\", \"columns\"]):\n summary[\"layout_type\"] = \"two-column\"\n elif any(word in analysis_lower for word in [\"liste\", \"aufzählung\", \"bullet\"]):\n summary[\"layout_type\"] = \"list\"\n elif any(word in analysis_lower for word in [\"vollbild\", \"full page\", \"ganzseitig\"]):\n summary[\"layout_type\"] = \"full-page\"\n\n # Extract key topics (look for bold\/emphasized terms)\n bold_pattern = r'\\*\\*([^*]+)\\*\\*'\n bold_matches = re.findall(bold_pattern, analysis_text)\n if bold_matches:\n # Filter and dedupe\n topics = []\n seen = set()\n for match in bold_matches[:10]:\n clean = match.strip()\n if len(clean) > 2 and clean.lower() not in seen:\n seen.add(clean.lower())\n topics.append(clean)\n summary[\"key_topics\"] = topics[:5]\n\n return summary\n\n\ndef enrich_chunk(chunk_id, document_id, page_number):\n \"\"\"\n Enrich a single chunk with vision context.\n\n Updates the chunk's metadata with vision information.\n Returns True if enriched, False otherwise.\n \"\"\"\n # Get vision context for the page\n vision_data = get_vision_context(document_id, page_number)\n if not vision_data:\n return False\n\n # Extract summary\n vision_summary = extract_vision_summary(vision_data)\n if not vision_summary:\n return False\n\n # Get current chunk metadata\n cursor = db.execute(\n \"SELECT metadata FROM chunks WHERE id = %s\",\n (chunk_id,)\n )\n result = cursor.fetchone()\n cursor.close()\n\n if not result:\n return False\n\n # Parse existing metadata\n try:\n metadata = json.loads(result[\"metadata\"]) if result[\"metadata\"] else {}\n except (json.JSONDecodeError, TypeError):\n metadata = {}\n\n # Add vision context\n metadata[\"vision\"] = vision_summary\n\n # Update chunk\n db.execute(\n \"UPDATE chunks SET metadata = %s WHERE id = %s\",\n (json.dumps(metadata, ensure_ascii=False), chunk_id)\n )\n db.commit()\n\n return True\n\n\ndef enrich_document_chunks(document_id):\n \"\"\"\n Enrich all chunks for a document with vision context.\n\n Returns dict with statistics.\n \"\"\"\n db.log(\"INFO\", f\"Starting vision enrichment for document {document_id}\")\n\n # Get all chunks with page info\n cursor = db.execute(\n \"\"\"SELECT id, metadata FROM chunks WHERE document_id = %s\"\"\",\n (document_id,)\n )\n chunks = cursor.fetchall()\n cursor.close()\n\n stats = {\n \"total_chunks\": len(chunks),\n \"enriched\": 0,\n \"skipped\": 0,\n \"no_page\": 0\n }\n\n for chunk in chunks:\n chunk_id = chunk[\"id\"]\n\n # Extract page number from metadata\n try:\n metadata = json.loads(chunk[\"metadata\"]) if chunk[\"metadata\"] else {}\n page_number = metadata.get(\"page\")\n except (json.JSONDecodeError, TypeError):\n page_number = None\n\n if not page_number:\n stats[\"no_page\"] += 1\n continue\n\n # Check if already enriched\n if metadata.get(\"vision\"):\n stats[\"skipped\"] += 1\n continue\n\n # Enrich\n if enrich_chunk(chunk_id, document_id, page_number):\n stats[\"enriched\"] += 1\n else:\n stats[\"skipped\"] += 1\n\n db.log(\"INFO\", f\"Enrichment complete: {stats['enriched']}\/{stats['total_chunks']} chunks enriched\")\n return stats\n\n\ndef enrich_all_documents():\n \"\"\"\n Enrich chunks for all documents that have vision analysis.\n \"\"\"\n # Find documents with vision analysis\n cursor = db.execute(\n \"\"\"SELECT DISTINCT d.id, d.filename\n FROM documents d\n INNER JOIN document_pages dp ON d.id = dp.document_id\n WHERE dp.vision_analysis IS NOT NULL\"\"\"\n )\n documents = cursor.fetchall()\n cursor.close()\n\n total_stats = {\n \"documents\": len(documents),\n \"total_enriched\": 0,\n \"total_skipped\": 0\n }\n\n for doc in documents:\n print(f\"Enriching: {doc['filename']}\")\n stats = enrich_document_chunks(doc[\"id\"])\n total_stats[\"total_enriched\"] += stats[\"enriched\"]\n total_stats[\"total_skipped\"] += stats[\"skipped\"]\n\n return total_stats\n\n\ndef run_enrichment_step(document_id):\n \"\"\"\n Run enrichment as a pipeline step.\n\n Args:\n document_id: Document ID to enrich\n\n Returns:\n dict with success status and statistics\n \"\"\"\n try:\n stats = enrich_document_chunks(document_id)\n return {\n \"success\": True,\n **stats\n }\n except Exception as e:\n db.log(\"ERROR\", f\"Enrichment failed: {e}\")\n return {\n \"success\": False,\n \"error\": str(e)\n }\n\n\ndef main():\n \"\"\"CLI entry point.\"\"\"\n if len(sys.argv) < 2:\n print(__doc__)\n return\n\n db.connect()\n\n try:\n if sys.argv[1] == \"--all\":\n print(\"Enriching all documents with vision data...\")\n stats = enrich_all_documents()\n print(f\"\\nTotal: {stats['total_enriched']} chunks enriched across {stats['documents']} documents\")\n else:\n document_id = int(sys.argv[1])\n print(f\"Enriching document {document_id}...\")\n stats = enrich_document_chunks(document_id)\n print(f\"\\nResults:\")\n print(f\" Total chunks: {stats['total_chunks']}\")\n print(f\" Enriched: {stats['enriched']}\")\n print(f\" Skipped: {stats['skipped']}\")\n print(f\" No page info: {stats['no_page']}\")\n finally:\n db.disconnect()\n\n\nif __name__ == \"__main__\":\n main()\n",
"structuredPatch": [],
"originalFile": null
}
}