enrich.py

Code Hygiene Score: 89

Issues 4

Zeile Typ Beschreibung
101 magic_number Magic Number gefunden: 60
102 magic_number Magic Number gefunden: 60
103 magic_number Magic Number gefunden: 60
114 magic_number Magic Number gefunden: 100

Dependencies 4

Funktionen 7

Code

#!/usr/bin/env python3
"""
Vision-Enrichment module for KI-System Pipeline.
Enriches chunks with visual context from page-level vision analysis.

Usage:
    python enrich.py <document_id>     # Enrich all chunks for a document
    python enrich.py --all             # Enrich all documents with vision data
"""

import json
import re
import sys

from db import db


def get_vision_context(document_id, page_number):
    """
    Get vision analysis for a specific page.

    Returns dict with structured vision info or None.
    """
    cursor = db.execute(
        """SELECT vision_analysis
           FROM document_pages
           WHERE document_id = %s AND page_number = %s""",
        (document_id, page_number),
    )
    result = cursor.fetchone()
    cursor.close()

    if not result or not result.get("vision_analysis"):
        return None

    try:
        vision_data = json.loads(result["vision_analysis"])
        return vision_data
    except (json.JSONDecodeError, TypeError):
        return None


def extract_vision_summary(vision_data):
    """
    Extract key information from vision analysis for chunk enrichment.

    Returns compact dict with:
    - detected_elements: list of visual elements found
    - page_title: extracted title if any
    - has_images: bool
    - has_charts: bool
    - has_tables: bool
    - layout_type: detected layout style
    - key_topics: extracted key topics/concepts
    """
    if not vision_data:
        return None

    analysis_text = vision_data.get("analysis", "")
    if not analysis_text:
        return None

    summary = {
        "detected_elements": [],
        "page_title": None,
        "has_images": False,
        "has_charts": False,
        "has_tables": False,
        "layout_type": "standard",
        "key_topics": [],
        "vision_tokens": vision_data.get("tokens", 0),
    }

    # Detect visual elements
    analysis_lower = analysis_text.lower()

    # Check for images
    if any(word in analysis_lower for word in ["bild", "foto", "image", "abbildung", "grafik"]):  # noqa: SIM102
        if "keine" not in analysis_lower.split("bild")[0][-20:] if "bild" in analysis_lower else True:
            summary["has_images"] = True
            summary["detected_elements"].append("images")

    # Check for charts/diagrams
    if any(word in analysis_lower for word in ["diagramm", "chart", "graph", "schaubild"]):  # noqa: SIM102
        if "keine" not in analysis_lower.split("diagramm")[0][-20:] if "diagramm" in analysis_lower else True:
            summary["has_charts"] = True
            summary["detected_elements"].append("charts")

    # Check for tables
    if any(word in analysis_lower for word in ["tabelle", "table", "übersicht"]):  # noqa: SIM102
        if "keine" not in analysis_lower.split("tabelle")[0][-20:] if "tabelle" in analysis_lower else True:
            summary["has_tables"] = True
            summary["detected_elements"].append("tables")

    # Check for callouts/highlights
    if any(word in analysis_lower for word in ["callout", "hervorhebung", "box", "kasten", "zitat"]):
        summary["detected_elements"].append("callouts")

    # Extract title (look for patterns like "Titel: X" or "Überschrift: X")
    title_patterns = [
        r'["\']([^"\']{5,60})["\']',  # Quoted strings
        r'Titel[:\s]+["\']?([^"\'\n]{5,60})',
        r'Überschrift[:\s]+["\']?([^"\'\n]{5,60})',
    ]

    for pattern in title_patterns:
        match = re.search(pattern, analysis_text)
        if match:
            potential_title = match.group(1).strip()
            # Filter out common non-titles
            if not any(
                skip in potential_title.lower() for skip in ["keine", "nicht", "gibt es", "vorhanden", "enthält"]
            ):
                summary["page_title"] = potential_title[:100]
                break

    # Detect layout type
    if any(word in analysis_lower for word in ["zwei spalten", "zweispaltig", "columns"]):
        summary["layout_type"] = "two-column"
    elif any(word in analysis_lower for word in ["liste", "aufzählung", "bullet"]):
        summary["layout_type"] = "list"
    elif any(word in analysis_lower for word in ["vollbild", "full page", "ganzseitig"]):
        summary["layout_type"] = "full-page"

    # Extract key topics (look for bold/emphasized terms)
    bold_pattern = r"\*\*([^*]+)\*\*"
    bold_matches = re.findall(bold_pattern, analysis_text)
    if bold_matches:
        # Filter and dedupe
        topics = []
        seen = set()
        for match in bold_matches[:10]:
            clean = match.strip()
            if len(clean) > 2 and clean.lower() not in seen:
                seen.add(clean.lower())
                topics.append(clean)
        summary["key_topics"] = topics[:5]

    return summary


def enrich_chunk(chunk_id, document_id, page_number):
    """
    Enrich a single chunk with vision context.

    Updates the chunk's metadata with vision information.
    Returns True if enriched, False otherwise.
    """
    # Get vision context for the page
    vision_data = get_vision_context(document_id, page_number)
    if not vision_data:
        return False

    # Extract summary
    vision_summary = extract_vision_summary(vision_data)
    if not vision_summary:
        return False

    # Get current chunk metadata
    cursor = db.execute("SELECT metadata FROM chunks WHERE id = %s", (chunk_id,))
    result = cursor.fetchone()
    cursor.close()

    if not result:
        return False

    # Parse existing metadata
    try:
        metadata = json.loads(result["metadata"]) if result["metadata"] else {}
    except (json.JSONDecodeError, TypeError):
        metadata = {}

    # Add vision context
    metadata["vision"] = vision_summary

    # Update chunk
    db.execute("UPDATE chunks SET metadata = %s WHERE id = %s", (json.dumps(metadata, ensure_ascii=False), chunk_id))
    db.commit()

    return True


def enrich_document_chunks(document_id):
    """
    Enrich all chunks for a document with vision context.

    Returns dict with statistics.
    """
    db.log("INFO", f"Starting vision enrichment for document {document_id}")

    # Get all chunks with page info
    cursor = db.execute("""SELECT id, metadata FROM chunks WHERE document_id = %s""", (document_id,))
    chunks = cursor.fetchall()
    cursor.close()

    stats = {"total_chunks": len(chunks), "enriched": 0, "skipped": 0, "no_page": 0}

    for chunk in chunks:
        chunk_id = chunk["id"]

        # Extract page number from metadata
        try:
            metadata = json.loads(chunk["metadata"]) if chunk["metadata"] else {}
            page_number = metadata.get("page")
        except (json.JSONDecodeError, TypeError):
            page_number = None

        if not page_number:
            stats["no_page"] += 1
            continue

        # Check if already enriched
        if metadata.get("vision"):
            stats["skipped"] += 1
            continue

        # Enrich
        if enrich_chunk(chunk_id, document_id, page_number):
            stats["enriched"] += 1
        else:
            stats["skipped"] += 1

    db.log("INFO", f"Enrichment complete: {stats['enriched']}/{stats['total_chunks']} chunks enriched")
    return stats


def enrich_all_documents():
    """
    Enrich chunks for all documents that have vision analysis.
    """
    # Find documents with vision analysis
    cursor = db.execute(
        """SELECT DISTINCT d.id, d.filename
           FROM documents d
           INNER JOIN document_pages dp ON d.id = dp.document_id
           WHERE dp.vision_analysis IS NOT NULL"""
    )
    documents = cursor.fetchall()
    cursor.close()

    total_stats = {"documents": len(documents), "total_enriched": 0, "total_skipped": 0}

    for doc in documents:
        print(f"Enriching: {doc['filename']}")
        stats = enrich_document_chunks(doc["id"])
        total_stats["total_enriched"] += stats["enriched"]
        total_stats["total_skipped"] += stats["skipped"]

    return total_stats


def run_enrichment_step(document_id):
    """
    Run enrichment as a pipeline step.

    Args:
        document_id: Document ID to enrich

    Returns:
        dict with success status and statistics
    """
    try:
        stats = enrich_document_chunks(document_id)
        return {"success": True, **stats}
    except Exception as e:
        db.log("ERROR", f"Enrichment failed: {e}")
        return {"success": False, "error": str(e)}


def main():
    """CLI entry point."""
    if len(sys.argv) < 2:
        print(__doc__)
        return

    db.connect()

    try:
        if sys.argv[1] == "--all":
            print("Enriching all documents with vision data...")
            stats = enrich_all_documents()
            print(f"\nTotal: {stats['total_enriched']} chunks enriched across {stats['documents']} documents")
        else:
            document_id = int(sys.argv[1])
            print(f"Enriching document {document_id}...")
            stats = enrich_document_chunks(document_id)
            print("\nResults:")
            print(f"  Total chunks: {stats['total_chunks']}")
            print(f"  Enriched: {stats['enriched']}")
            print(f"  Skipped: {stats['skipped']}")
            print(f"  No page info: {stats['no_page']}")
    finally:
        db.disconnect()


if __name__ == "__main__":
    main()
← Übersicht