step_transform.py

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 4

Klassen 1

Code

"""
Transformation Step Module
Handles chunking and semantic analysis of extracted content.

Part of modularized pipeline architecture.
"""

from analyze import analyze_document
from chunk import chunk_by_structure
from enrich import run_enrichment_step
from vision import run_vision_step


class TransformationStep:
    """Step: Transform extracted content into chunks and analyze."""

    def __init__(self, db, progress=None):
        """
        Initialize transformation step.

        Args:
            db: Database instance
            progress: Optional PipelineProgress instance
        """
        self.db = db
        self.progress = progress

    def execute_vision(self, doc_id, file_path, file_type):
        """
        Execute vision analysis for PDFs.

        Args:
            doc_id: Document database ID
            file_path: Path to file
            file_type: File extension

        Returns:
            dict: Vision analysis result
        """
        if file_type != ".pdf":
            return {"success": False, "error": "Not a PDF"}

        if self.progress:
            self.progress.update_step("vision")
            self.progress.add_log("Vision-Analyse gestartet...")

        self.db.log("INFO", f"Running vision analysis for document {doc_id}")

        vision_config = {
            "model": "llama3.2-vision:11b",
            "store_images": True,
            "detect_images": True,
            "detect_charts": True,
            "detect_tables": True,
        }

        vision_result = run_vision_step(doc_id, file_path, vision_config, progress=self.progress)

        if vision_result["success"]:
            self.db.log("INFO", f"Vision: {vision_result['pages_analyzed']}/{vision_result['pages_total']} pages")
            if self.progress:
                self.progress.add_log(f"Vision: {vision_result['pages_analyzed']} Seiten analysiert")
        else:
            self.db.log("WARNING", f"Vision analysis failed: {vision_result.get('error')}")

        return vision_result

    def execute_chunking(self, extraction, total_pages=0):
        """
        Chunk extracted content.

        Args:
            extraction: Extraction result dict
            total_pages: Number of pages (for logging)

        Returns:
            list: Chunk dictionaries
        """
        if self.progress:
            self.progress.update_step("chunk")
            if total_pages > 0:
                self.progress.add_log(f"Erstelle Chunks aus {total_pages} Seiten...")
            else:
                self.progress.add_log("Erstelle Chunks...")

        chunks = chunk_by_structure(extraction)
        self.db.log("INFO", f"Created {len(chunks)} chunks")

        if self.progress:
            msg = f"{len(chunks)} Chunks erstellt"
            if total_pages > 0:
                msg += f" (aus {total_pages} Seiten)"
            self.progress.add_log(msg)

        return chunks

    def execute_enrichment(self, doc_id, file_type):
        """
        Enrich chunks with vision context (PDFs only).

        Args:
            doc_id: Document database ID
            file_type: File extension

        Returns:
            dict: Enrichment result
        """
        if file_type != ".pdf":
            return {"success": False, "error": "Not a PDF"}

        if self.progress:
            self.progress.update_step("enrich")

        self.db.log("INFO", f"Running vision enrichment for document {doc_id}")

        enrich_result = run_enrichment_step(doc_id)

        if enrich_result["success"]:
            self.db.log("INFO", f"Enrichment: {enrich_result['enriched']}/{enrich_result['total_chunks']} chunks")
        else:
            self.db.log("WARNING", f"Enrichment failed: {enrich_result.get('error')}")

        return enrich_result

    def execute_analysis(self, doc_id, full_text):
        """
        Execute semantic analysis on document.

        Args:
            doc_id: Document database ID
            full_text: Full document text

        Returns:
            dict: Analysis result
        """
        if self.progress:
            self.progress.update_step("analyze")

        analysis = analyze_document(doc_id, full_text, progress=self.progress)
        self.db.log("INFO", f"Analysis complete: {analysis}")

        return analysis
← Übersicht Graph