#!/usr/bin/env python3
"""
Vision analysis module for KI-System Pipeline.
Extracts PDF pages as images and analyzes them with vision models.
Usage:
python vision.py <pdf_path> # Analyze all pages
python vision.py <pdf_path> --page 1 # Analyze specific page
"""
import base64
import os
import sys
from pathlib import Path
import requests
from config import OLLAMA_HOST
from constants import LLM_TIMEOUT
from db import db
# Default vision model (can be overridden by pipeline config)
DEFAULT_VISION_MODEL = "llama3.2-vision:11b"
# Image settings
IMAGE_DPI = 150 # Balance between quality and size
IMAGE_FORMAT = "png"
MAX_IMAGE_SIZE_MB = 10
def pdf_to_images(file_path, dpi=IMAGE_DPI):
"""
Convert PDF pages to images with automatic rotation correction.
Args:
file_path: Path to PDF file
dpi: Resolution for image extraction
Returns:
List of dicts with page_number, image_bytes, width, height, rotation
"""
import fitz # PyMuPDF
from orientation import get_page_rotation, rotate_image
doc = fitz.open(file_path)
pages = []
for page_num in range(len(doc)):
page = doc[page_num]
# Detect page rotation
rotation = get_page_rotation(page)
# Render page to image
mat = fitz.Matrix(dpi / 72, dpi / 72) # 72 is default PDF DPI
pix = page.get_pixmap(matrix=mat)
# Convert to PNG bytes
img_bytes = pix.tobytes(IMAGE_FORMAT)
# Apply rotation correction if needed
if rotation != 0:
img_bytes = rotate_image(img_bytes, rotation)
db.log("INFO", f"Page {page_num + 1}: Rotated image by {rotation}°")
# Update dimensions after rotation
import io
from PIL import Image
rotated_img = Image.open(io.BytesIO(img_bytes))
width, height = rotated_img.size
else:
width, height = pix.width, pix.height
pages.append(
{
"page_number": page_num + 1,
"image_bytes": img_bytes,
"width": width,
"height": height,
"size_kb": len(img_bytes) / 1024,
"rotation": rotation,
}
)
doc.close()
return pages
def analyze_image_ollama(image_bytes, model=DEFAULT_VISION_MODEL, prompt=None):
"""
Analyze an image using Ollama vision model.
Args:
image_bytes: PNG/JPEG image as bytes
model: Vision model name (e.g., minicpm-v:latest)
prompt: Custom prompt (default: document analysis prompt)
Returns:
dict with analysis results
"""
if prompt is None:
prompt = """Analysiere diese Seite aus einem Schulungsdokument.
Beschreibe strukturiert:
1. **Überschriften/Titel**: Welche Überschriften gibt es?
2. **Hauptinhalt**: Worum geht es auf dieser Seite?
3. **Visuelle Elemente**:
- Gibt es Bilder/Fotos? Was zeigen sie?
- Gibt es Diagramme/Charts? Was stellen sie dar?
- Gibt es Tabellen? Was enthalten sie?
4. **Layout**: Wie ist die Seite aufgebaut (Spalten, Boxen, etc.)?
5. **Besonderheiten**: Gibt es Hervorhebungen, Zitate oder Callouts?
Antworte auf Deutsch und sei präzise."""
# Encode image as base64
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
try:
response = requests.post(
f"{OLLAMA_HOST}/api/generate",
json={
"model": model,
"prompt": prompt,
"images": [image_base64],
"stream": False,
"options": {"temperature": 0.3, "num_predict": 2048, "num_ctx": 4096},
},
timeout=LLM_TIMEOUT,
)
response.raise_for_status()
result = response.json()
return {
"success": True,
"analysis": result.get("response", ""),
"model": model,
"eval_count": result.get("eval_count", 0),
"eval_duration_ms": result.get("eval_duration", 0) / 1_000_000,
}
except requests.exceptions.Timeout:
return {"success": False, "error": "Vision model timeout"}
except requests.exceptions.RequestException as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": str(e)}
def analyze_document(file_path, model=DEFAULT_VISION_MODEL, store_images=False, image_dir=None, progress=None):
"""
Analyze all pages of a PDF document.
Args:
file_path: Path to PDF file
model: Vision model to use
store_images: Whether to save images to disk
image_dir: Directory for saved images (default: /tmp/pipeline_images)
progress: PipelineProgress instance for live updates
Returns:
List of page analysis results
"""
db.log("INFO", f"Vision analysis starting: {file_path}", f"model={model}")
# Convert PDF to images
pages = pdf_to_images(file_path)
db.log("INFO", f"Extracted {len(pages)} pages from PDF")
if progress:
progress.add_log(f"Vision: {len(pages)} Seiten gefunden")
if image_dir is None:
image_dir = "/tmp/pipeline_images" # noqa: S108
if store_images:
os.makedirs(image_dir, exist_ok=True)
results = []
for page in pages:
page_num = page["page_number"]
db.log("INFO", f"Analyzing page {page_num}/{len(pages)}")
# Log every page for full visibility
if progress:
progress.add_log(f"Vision: Seite {page_num}/{len(pages)} wird analysiert...")
# Optional: Save image to disk
image_path = None
if store_images:
filename = f"{Path(file_path).stem}_page_{page_num:03d}.{IMAGE_FORMAT}"
image_path = os.path.join(image_dir, filename)
with open(image_path, "wb") as f:
f.write(page["image_bytes"])
# Analyze with vision model
analysis = analyze_image_ollama(page["image_bytes"], model=model)
results.append(
{
"page_number": page_num,
"width": page["width"],
"height": page["height"],
"size_kb": page["size_kb"],
"image_path": image_path,
"analysis": analysis.get("analysis", "") if analysis["success"] else None,
"error": analysis.get("error") if not analysis["success"] else None,
"eval_tokens": analysis.get("eval_count", 0),
"eval_duration_ms": analysis.get("eval_duration_ms", 0),
}
)
if analysis["success"]:
db.log("INFO", f"Page {page_num} analyzed: {analysis.get('eval_count', 0)} tokens")
else:
db.log("WARNING", f"Page {page_num} analysis failed: {analysis.get('error')}")
return results
def store_page_analysis(document_id, page_results):
"""
Store page analysis results in database.
Args:
document_id: ID of the document in documents table
page_results: List of page analysis results from analyze_document()
Returns:
Number of pages stored
"""
import json
stored = 0
for page in page_results:
try:
# Convert analysis to JSON (required by DB constraint)
vision_json = None
if page["analysis"]:
vision_json = json.dumps(
{
"analysis": page["analysis"],
"tokens": page["eval_tokens"],
"duration_ms": page["eval_duration_ms"],
"width": page["width"],
"height": page["height"],
},
ensure_ascii=False,
)
db.execute(
"""INSERT INTO document_pages
(document_id, page_number, image_path, vision_analysis, token_count)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
image_path = VALUES(image_path),
vision_analysis = VALUES(vision_analysis),
token_count = VALUES(token_count)""",
(document_id, page["page_number"], page["image_path"], vision_json, page["eval_tokens"]),
)
db.commit()
stored += 1
except Exception as e:
db.log("ERROR", f"Failed to store page {page['page_number']}: {e}")
return stored
def run_vision_step(document_id, file_path, config=None, progress=None):
"""
Run vision analysis step for pipeline.
Args:
document_id: Document ID in database
file_path: Path to PDF file
config: Step configuration dict
progress: PipelineProgress instance for live updates
Returns:
dict with success status and statistics
"""
if config is None:
config = {}
model = config.get("model", DEFAULT_VISION_MODEL)
store_images = config.get("store_images", False)
detect_images = config.get("detect_images", True)
detect_charts = config.get("detect_charts", True)
detect_tables = config.get("detect_tables", True)
# Build custom prompt based on config
prompt_parts = ["Analysiere diese Seite aus einem Schulungsdokument.\n\nBeschreibe strukturiert:"]
prompt_parts.append("1. **Überschriften/Titel**: Welche Überschriften gibt es?")
prompt_parts.append("2. **Hauptinhalt**: Worum geht es auf dieser Seite?")
visual_parts = []
if detect_images:
visual_parts.append("Gibt es Bilder/Fotos? Was zeigen sie?")
if detect_charts:
visual_parts.append("Gibt es Diagramme/Charts? Was stellen sie dar?")
if detect_tables:
visual_parts.append("Gibt es Tabellen? Was enthalten sie?")
if visual_parts:
prompt_parts.append("3. **Visuelle Elemente**:\n - " + "\n - ".join(visual_parts))
prompt_parts.append("4. **Layout**: Wie ist die Seite aufgebaut?")
prompt_parts.append("5. **Schlüsselbegriffe**: Welche wichtigen Begriffe/Konzepte werden genannt?")
prompt_parts.append("\nAntworte auf Deutsch und sei präzise.")
# Note: prompt_parts built for future custom prompt support
_ = "\n".join(prompt_parts) # Currently unused, reserved for custom prompts
try:
# Analyze document
results = analyze_document(file_path, model=model, store_images=store_images, progress=progress)
# Store results
stored = store_page_analysis(document_id, results)
# Calculate statistics
successful = sum(1 for r in results if r["analysis"])
total_tokens = sum(r["eval_tokens"] for r in results)
total_time_ms = sum(r["eval_duration_ms"] for r in results)
return {
"success": True,
"pages_total": len(results),
"pages_analyzed": successful,
"pages_stored": stored,
"total_tokens": total_tokens,
"total_time_ms": total_time_ms,
"model": model,
}
except Exception as e:
db.log("ERROR", f"Vision step failed: {e}")
return {"success": False, "error": str(e)}
def main():
"""CLI entry point."""
if len(sys.argv) < 2:
print(__doc__)
return
file_path = sys.argv[1]
if not os.path.exists(file_path):
print(f"Error: File not found: {file_path}")
return
# Check for specific page
page_num = None
if "--page" in sys.argv:
idx = sys.argv.index("--page")
if idx + 1 < len(sys.argv):
page_num = int(sys.argv[idx + 1])
# Connect to database
db.connect()
try:
if page_num:
# Analyze single page
pages = pdf_to_images(file_path)
if page_num > len(pages):
print(f"Error: Page {page_num} does not exist (max: {len(pages)})")
return
page = pages[page_num - 1]
print(f"Analyzing page {page_num} ({page['size_kb']:.1f} KB)...")
result = analyze_image_ollama(page["image_bytes"])
if result["success"]:
print(f"\n{'=' * 60}")
print(f"Page {page_num} Analysis")
print(f"{'=' * 60}")
print(result["analysis"])
print(f"\n[Tokens: {result.get('eval_count', 0)}, Time: {result.get('eval_duration_ms', 0):.0f}ms]")
else:
print(f"Error: {result['error']}")
else:
# Analyze entire document
print(f"Analyzing document: {file_path}")
results = analyze_document(file_path, store_images=True)
print(f"\n{'=' * 60}")
print("Document Analysis Summary")
print(f"{'=' * 60}")
for r in results:
status = "✓" if r["analysis"] else "✗"
print(f"Page {r['page_number']}: {status} ({r['size_kb']:.1f} KB, {r['eval_tokens']} tokens)")
successful = sum(1 for r in results if r["analysis"])
print(f"\nTotal: {successful}/{len(results)} pages analyzed")
finally:
db.disconnect()
if __name__ == "__main__":
main()