vision.py

Code Hygiene Score: 59

Issues 4

Zeile Typ Beschreibung
382 magic_number Magic Number gefunden: 60
384 magic_number Magic Number gefunden: 60
394 magic_number Magic Number gefunden: 60
396 magic_number Magic Number gefunden: 60

Dependencies 14

Funktionen 6

Code

#!/usr/bin/env python3
"""
Vision analysis module for KI-System Pipeline.
Extracts PDF pages as images and analyzes them with vision models.

Usage:
    python vision.py <pdf_path>          # Analyze all pages
    python vision.py <pdf_path> --page 1 # Analyze specific page
"""

import base64
import os
import sys
from pathlib import Path

import requests

from config import OLLAMA_HOST
from constants import LLM_TIMEOUT
from db import db

# Default vision model (can be overridden by pipeline config)
DEFAULT_VISION_MODEL = "llama3.2-vision:11b"

# Image settings
IMAGE_DPI = 150  # Balance between quality and size
IMAGE_FORMAT = "png"
MAX_IMAGE_SIZE_MB = 10


def pdf_to_images(file_path, dpi=IMAGE_DPI):
    """
    Convert PDF pages to images with automatic rotation correction.

    Args:
        file_path: Path to PDF file
        dpi: Resolution for image extraction

    Returns:
        List of dicts with page_number, image_bytes, width, height, rotation
    """
    import fitz  # PyMuPDF

    from orientation import get_page_rotation, rotate_image

    doc = fitz.open(file_path)
    pages = []

    for page_num in range(len(doc)):
        page = doc[page_num]

        # Detect page rotation
        rotation = get_page_rotation(page)

        # Render page to image
        mat = fitz.Matrix(dpi / 72, dpi / 72)  # 72 is default PDF DPI
        pix = page.get_pixmap(matrix=mat)

        # Convert to PNG bytes
        img_bytes = pix.tobytes(IMAGE_FORMAT)

        # Apply rotation correction if needed
        if rotation != 0:
            img_bytes = rotate_image(img_bytes, rotation)
            db.log("INFO", f"Page {page_num + 1}: Rotated image by {rotation}°")

            # Update dimensions after rotation
            import io

            from PIL import Image

            rotated_img = Image.open(io.BytesIO(img_bytes))
            width, height = rotated_img.size
        else:
            width, height = pix.width, pix.height

        pages.append(
            {
                "page_number": page_num + 1,
                "image_bytes": img_bytes,
                "width": width,
                "height": height,
                "size_kb": len(img_bytes) / 1024,
                "rotation": rotation,
            }
        )

    doc.close()
    return pages


def analyze_image_ollama(image_bytes, model=DEFAULT_VISION_MODEL, prompt=None):
    """
    Analyze an image using Ollama vision model.

    Args:
        image_bytes: PNG/JPEG image as bytes
        model: Vision model name (e.g., minicpm-v:latest)
        prompt: Custom prompt (default: document analysis prompt)

    Returns:
        dict with analysis results
    """
    if prompt is None:
        prompt = """Analysiere diese Seite aus einem Schulungsdokument.

Beschreibe strukturiert:
1. **Überschriften/Titel**: Welche Überschriften gibt es?
2. **Hauptinhalt**: Worum geht es auf dieser Seite?
3. **Visuelle Elemente**:
   - Gibt es Bilder/Fotos? Was zeigen sie?
   - Gibt es Diagramme/Charts? Was stellen sie dar?
   - Gibt es Tabellen? Was enthalten sie?
4. **Layout**: Wie ist die Seite aufgebaut (Spalten, Boxen, etc.)?
5. **Besonderheiten**: Gibt es Hervorhebungen, Zitate oder Callouts?

Antworte auf Deutsch und sei präzise."""

    # Encode image as base64
    image_base64 = base64.b64encode(image_bytes).decode("utf-8")

    try:
        response = requests.post(
            f"{OLLAMA_HOST}/api/generate",
            json={
                "model": model,
                "prompt": prompt,
                "images": [image_base64],
                "stream": False,
                "options": {"temperature": 0.3, "num_predict": 2048, "num_ctx": 4096},
            },
            timeout=LLM_TIMEOUT,
        )
        response.raise_for_status()

        result = response.json()
        return {
            "success": True,
            "analysis": result.get("response", ""),
            "model": model,
            "eval_count": result.get("eval_count", 0),
            "eval_duration_ms": result.get("eval_duration", 0) / 1_000_000,
        }

    except requests.exceptions.Timeout:
        return {"success": False, "error": "Vision model timeout"}
    except requests.exceptions.RequestException as e:
        return {"success": False, "error": str(e)}
    except Exception as e:
        return {"success": False, "error": str(e)}


def analyze_document(file_path, model=DEFAULT_VISION_MODEL, store_images=False, image_dir=None, progress=None):
    """
    Analyze all pages of a PDF document.

    Args:
        file_path: Path to PDF file
        model: Vision model to use
        store_images: Whether to save images to disk
        image_dir: Directory for saved images (default: /tmp/pipeline_images)
        progress: PipelineProgress instance for live updates

    Returns:
        List of page analysis results
    """
    db.log("INFO", f"Vision analysis starting: {file_path}", f"model={model}")

    # Convert PDF to images
    pages = pdf_to_images(file_path)
    db.log("INFO", f"Extracted {len(pages)} pages from PDF")

    if progress:
        progress.add_log(f"Vision: {len(pages)} Seiten gefunden")

    if image_dir is None:
        image_dir = "/tmp/pipeline_images"  # noqa: S108

    if store_images:
        os.makedirs(image_dir, exist_ok=True)

    results = []

    for page in pages:
        page_num = page["page_number"]
        db.log("INFO", f"Analyzing page {page_num}/{len(pages)}")

        # Log every page for full visibility
        if progress:
            progress.add_log(f"Vision: Seite {page_num}/{len(pages)} wird analysiert...")

        # Optional: Save image to disk
        image_path = None
        if store_images:
            filename = f"{Path(file_path).stem}_page_{page_num:03d}.{IMAGE_FORMAT}"
            image_path = os.path.join(image_dir, filename)
            with open(image_path, "wb") as f:
                f.write(page["image_bytes"])

        # Analyze with vision model
        analysis = analyze_image_ollama(page["image_bytes"], model=model)

        results.append(
            {
                "page_number": page_num,
                "width": page["width"],
                "height": page["height"],
                "size_kb": page["size_kb"],
                "image_path": image_path,
                "analysis": analysis.get("analysis", "") if analysis["success"] else None,
                "error": analysis.get("error") if not analysis["success"] else None,
                "eval_tokens": analysis.get("eval_count", 0),
                "eval_duration_ms": analysis.get("eval_duration_ms", 0),
            }
        )

        if analysis["success"]:
            db.log("INFO", f"Page {page_num} analyzed: {analysis.get('eval_count', 0)} tokens")
        else:
            db.log("WARNING", f"Page {page_num} analysis failed: {analysis.get('error')}")

    return results


def store_page_analysis(document_id, page_results):
    """
    Store page analysis results in database.

    Args:
        document_id: ID of the document in documents table
        page_results: List of page analysis results from analyze_document()

    Returns:
        Number of pages stored
    """
    import json

    stored = 0

    for page in page_results:
        try:
            # Convert analysis to JSON (required by DB constraint)
            vision_json = None
            if page["analysis"]:
                vision_json = json.dumps(
                    {
                        "analysis": page["analysis"],
                        "tokens": page["eval_tokens"],
                        "duration_ms": page["eval_duration_ms"],
                        "width": page["width"],
                        "height": page["height"],
                    },
                    ensure_ascii=False,
                )

            db.execute(
                """INSERT INTO document_pages
                   (document_id, page_number, image_path, vision_analysis, token_count)
                   VALUES (%s, %s, %s, %s, %s)
                   ON DUPLICATE KEY UPDATE
                   image_path = VALUES(image_path),
                   vision_analysis = VALUES(vision_analysis),
                   token_count = VALUES(token_count)""",
                (document_id, page["page_number"], page["image_path"], vision_json, page["eval_tokens"]),
            )
            db.commit()
            stored += 1
        except Exception as e:
            db.log("ERROR", f"Failed to store page {page['page_number']}: {e}")

    return stored


def run_vision_step(document_id, file_path, config=None, progress=None):
    """
    Run vision analysis step for pipeline.

    Args:
        document_id: Document ID in database
        file_path: Path to PDF file
        config: Step configuration dict
        progress: PipelineProgress instance for live updates

    Returns:
        dict with success status and statistics
    """
    if config is None:
        config = {}

    model = config.get("model", DEFAULT_VISION_MODEL)
    store_images = config.get("store_images", False)
    detect_images = config.get("detect_images", True)
    detect_charts = config.get("detect_charts", True)
    detect_tables = config.get("detect_tables", True)

    # Build custom prompt based on config
    prompt_parts = ["Analysiere diese Seite aus einem Schulungsdokument.\n\nBeschreibe strukturiert:"]
    prompt_parts.append("1. **Überschriften/Titel**: Welche Überschriften gibt es?")
    prompt_parts.append("2. **Hauptinhalt**: Worum geht es auf dieser Seite?")

    visual_parts = []
    if detect_images:
        visual_parts.append("Gibt es Bilder/Fotos? Was zeigen sie?")
    if detect_charts:
        visual_parts.append("Gibt es Diagramme/Charts? Was stellen sie dar?")
    if detect_tables:
        visual_parts.append("Gibt es Tabellen? Was enthalten sie?")

    if visual_parts:
        prompt_parts.append("3. **Visuelle Elemente**:\n   - " + "\n   - ".join(visual_parts))

    prompt_parts.append("4. **Layout**: Wie ist die Seite aufgebaut?")
    prompt_parts.append("5. **Schlüsselbegriffe**: Welche wichtigen Begriffe/Konzepte werden genannt?")
    prompt_parts.append("\nAntworte auf Deutsch und sei präzise.")

    # Note: prompt_parts built for future custom prompt support
    _ = "\n".join(prompt_parts)  # Currently unused, reserved for custom prompts

    try:
        # Analyze document
        results = analyze_document(file_path, model=model, store_images=store_images, progress=progress)

        # Store results
        stored = store_page_analysis(document_id, results)

        # Calculate statistics
        successful = sum(1 for r in results if r["analysis"])
        total_tokens = sum(r["eval_tokens"] for r in results)
        total_time_ms = sum(r["eval_duration_ms"] for r in results)

        return {
            "success": True,
            "pages_total": len(results),
            "pages_analyzed": successful,
            "pages_stored": stored,
            "total_tokens": total_tokens,
            "total_time_ms": total_time_ms,
            "model": model,
        }

    except Exception as e:
        db.log("ERROR", f"Vision step failed: {e}")
        return {"success": False, "error": str(e)}


def main():
    """CLI entry point."""
    if len(sys.argv) < 2:
        print(__doc__)
        return

    file_path = sys.argv[1]

    if not os.path.exists(file_path):
        print(f"Error: File not found: {file_path}")
        return

    # Check for specific page
    page_num = None
    if "--page" in sys.argv:
        idx = sys.argv.index("--page")
        if idx + 1 < len(sys.argv):
            page_num = int(sys.argv[idx + 1])

    # Connect to database
    db.connect()

    try:
        if page_num:
            # Analyze single page
            pages = pdf_to_images(file_path)
            if page_num > len(pages):
                print(f"Error: Page {page_num} does not exist (max: {len(pages)})")
                return

            page = pages[page_num - 1]
            print(f"Analyzing page {page_num} ({page['size_kb']:.1f} KB)...")

            result = analyze_image_ollama(page["image_bytes"])

            if result["success"]:
                print(f"\n{'=' * 60}")
                print(f"Page {page_num} Analysis")
                print(f"{'=' * 60}")
                print(result["analysis"])
                print(f"\n[Tokens: {result.get('eval_count', 0)}, Time: {result.get('eval_duration_ms', 0):.0f}ms]")
            else:
                print(f"Error: {result['error']}")
        else:
            # Analyze entire document
            print(f"Analyzing document: {file_path}")
            results = analyze_document(file_path, store_images=True)

            print(f"\n{'=' * 60}")
            print("Document Analysis Summary")
            print(f"{'=' * 60}")

            for r in results:
                status = "✓" if r["analysis"] else "✗"
                print(f"Page {r['page_number']}: {status} ({r['size_kb']:.1f} KB, {r['eval_tokens']} tokens)")

            successful = sum(1 for r in results if r["analysis"])
            print(f"\nTotal: {successful}/{len(results)} pages analyzed")

    finally:
        db.disconnect()


if __name__ == "__main__":
    main()
← Übersicht