#!/usr/bin/env python3
"""
Vision-Enrichment module for KI-System Pipeline.
Enriches chunks with visual context from page-level vision analysis.
Usage:
python enrich.py <document_id> # Enrich all chunks for a document
python enrich.py --all # Enrich all documents with vision data
"""
import json
import re
import sys
from db import db
def get_vision_context(document_id, page_number):
"""
Get vision analysis for a specific page.
Returns dict with structured vision info or None.
"""
cursor = db.execute(
"""SELECT vision_analysis
FROM document_pages
WHERE document_id = %s AND page_number = %s""",
(document_id, page_number),
)
result = cursor.fetchone()
cursor.close()
if not result or not result.get("vision_analysis"):
return None
try:
vision_data = json.loads(result["vision_analysis"])
return vision_data
except (json.JSONDecodeError, TypeError):
return None
def extract_vision_summary(vision_data):
"""
Extract key information from vision analysis for chunk enrichment.
Returns compact dict with:
- detected_elements: list of visual elements found
- page_title: extracted title if any
- has_images: bool
- has_charts: bool
- has_tables: bool
- layout_type: detected layout style
- key_topics: extracted key topics/concepts
"""
if not vision_data:
return None
analysis_text = vision_data.get("analysis", "")
if not analysis_text:
return None
summary = {
"detected_elements": [],
"page_title": None,
"has_images": False,
"has_charts": False,
"has_tables": False,
"layout_type": "standard",
"key_topics": [],
"vision_tokens": vision_data.get("tokens", 0),
}
# Detect visual elements
analysis_lower = analysis_text.lower()
# Check for images
if any(word in analysis_lower for word in ["bild", "foto", "image", "abbildung", "grafik"]): # noqa: SIM102
if "keine" not in analysis_lower.split("bild")[0][-20:] if "bild" in analysis_lower else True:
summary["has_images"] = True
summary["detected_elements"].append("images")
# Check for charts/diagrams
if any(word in analysis_lower for word in ["diagramm", "chart", "graph", "schaubild"]): # noqa: SIM102
if "keine" not in analysis_lower.split("diagramm")[0][-20:] if "diagramm" in analysis_lower else True:
summary["has_charts"] = True
summary["detected_elements"].append("charts")
# Check for tables
if any(word in analysis_lower for word in ["tabelle", "table", "übersicht"]): # noqa: SIM102
if "keine" not in analysis_lower.split("tabelle")[0][-20:] if "tabelle" in analysis_lower else True:
summary["has_tables"] = True
summary["detected_elements"].append("tables")
# Check for callouts/highlights
if any(word in analysis_lower for word in ["callout", "hervorhebung", "box", "kasten", "zitat"]):
summary["detected_elements"].append("callouts")
# Extract title (look for patterns like "Titel: X" or "Überschrift: X")
title_patterns = [
r'["\']([^"\']{5,60})["\']', # Quoted strings
r'Titel[:\s]+["\']?([^"\'\n]{5,60})',
r'Überschrift[:\s]+["\']?([^"\'\n]{5,60})',
]
for pattern in title_patterns:
match = re.search(pattern, analysis_text)
if match:
potential_title = match.group(1).strip()
# Filter out common non-titles
if not any(
skip in potential_title.lower() for skip in ["keine", "nicht", "gibt es", "vorhanden", "enthält"]
):
summary["page_title"] = potential_title[:100]
break
# Detect layout type
if any(word in analysis_lower for word in ["zwei spalten", "zweispaltig", "columns"]):
summary["layout_type"] = "two-column"
elif any(word in analysis_lower for word in ["liste", "aufzählung", "bullet"]):
summary["layout_type"] = "list"
elif any(word in analysis_lower for word in ["vollbild", "full page", "ganzseitig"]):
summary["layout_type"] = "full-page"
# Extract key topics (look for bold/emphasized terms)
bold_pattern = r"\*\*([^*]+)\*\*"
bold_matches = re.findall(bold_pattern, analysis_text)
if bold_matches:
# Filter and dedupe
topics = []
seen = set()
for match in bold_matches[:10]:
clean = match.strip()
if len(clean) > 2 and clean.lower() not in seen:
seen.add(clean.lower())
topics.append(clean)
summary["key_topics"] = topics[:5]
return summary
def enrich_chunk(chunk_id, document_id, page_number):
"""
Enrich a single chunk with vision context.
Updates the chunk's metadata with vision information.
Returns True if enriched, False otherwise.
"""
# Get vision context for the page
vision_data = get_vision_context(document_id, page_number)
if not vision_data:
return False
# Extract summary
vision_summary = extract_vision_summary(vision_data)
if not vision_summary:
return False
# Get current chunk metadata
cursor = db.execute("SELECT metadata FROM chunks WHERE id = %s", (chunk_id,))
result = cursor.fetchone()
cursor.close()
if not result:
return False
# Parse existing metadata
try:
metadata = json.loads(result["metadata"]) if result["metadata"] else {}
except (json.JSONDecodeError, TypeError):
metadata = {}
# Add vision context
metadata["vision"] = vision_summary
# Update chunk
db.execute("UPDATE chunks SET metadata = %s WHERE id = %s", (json.dumps(metadata, ensure_ascii=False), chunk_id))
db.commit()
return True
def enrich_document_chunks(document_id):
"""
Enrich all chunks for a document with vision context.
Returns dict with statistics.
"""
db.log("INFO", f"Starting vision enrichment for document {document_id}")
# Get all chunks with page info
cursor = db.execute("""SELECT id, metadata FROM chunks WHERE document_id = %s""", (document_id,))
chunks = cursor.fetchall()
cursor.close()
stats = {"total_chunks": len(chunks), "enriched": 0, "skipped": 0, "no_page": 0}
for chunk in chunks:
chunk_id = chunk["id"]
# Extract page number from metadata
try:
metadata = json.loads(chunk["metadata"]) if chunk["metadata"] else {}
page_number = metadata.get("page")
except (json.JSONDecodeError, TypeError):
page_number = None
if not page_number:
stats["no_page"] += 1
continue
# Check if already enriched
if metadata.get("vision"):
stats["skipped"] += 1
continue
# Enrich
if enrich_chunk(chunk_id, document_id, page_number):
stats["enriched"] += 1
else:
stats["skipped"] += 1
db.log("INFO", f"Enrichment complete: {stats['enriched']}/{stats['total_chunks']} chunks enriched")
return stats
def enrich_all_documents():
"""
Enrich chunks for all documents that have vision analysis.
"""
# Find documents with vision analysis
cursor = db.execute(
"""SELECT DISTINCT d.id, d.filename
FROM documents d
INNER JOIN document_pages dp ON d.id = dp.document_id
WHERE dp.vision_analysis IS NOT NULL"""
)
documents = cursor.fetchall()
cursor.close()
total_stats = {"documents": len(documents), "total_enriched": 0, "total_skipped": 0}
for doc in documents:
print(f"Enriching: {doc['filename']}")
stats = enrich_document_chunks(doc["id"])
total_stats["total_enriched"] += stats["enriched"]
total_stats["total_skipped"] += stats["skipped"]
return total_stats
def run_enrichment_step(document_id):
"""
Run enrichment as a pipeline step.
Args:
document_id: Document ID to enrich
Returns:
dict with success status and statistics
"""
try:
stats = enrich_document_chunks(document_id)
return {"success": True, **stats}
except Exception as e:
db.log("ERROR", f"Enrichment failed: {e}")
return {"success": False, "error": str(e)}
def main():
"""CLI entry point."""
if len(sys.argv) < 2:
print(__doc__)
return
db.connect()
try:
if sys.argv[1] == "--all":
print("Enriching all documents with vision data...")
stats = enrich_all_documents()
print(f"\nTotal: {stats['total_enriched']} chunks enriched across {stats['documents']} documents")
else:
document_id = int(sys.argv[1])
print(f"Enriching document {document_id}...")
stats = enrich_document_chunks(document_id)
print("\nResults:")
print(f" Total chunks: {stats['total_chunks']}")
print(f" Enriched: {stats['enriched']}")
print(f" Skipped: {stats['skipped']}")
print(f" No page info: {stats['no_page']}")
finally:
db.disconnect()
if __name__ == "__main__":
main()