step_load.py

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 1

Klassen 1

Code

"""
Load Step Module
Handles database persistence operations for documents and chunks.

Part of modularized pipeline architecture.
"""

import json


class LoadStep:
    """Step: Load documents and chunks into database."""

    def __init__(self, db, progress=None):
        """
        Initialize load step.

        Args:
            db: Database instance
            progress: Optional PipelineProgress instance
        """
        self.db = db
        self.progress = progress

    def create_document(self, file_info):
        """
        Create document record in database.

        Args:
            file_info: Dict with path, name, type, size, hash

        Returns:
            int: Document ID
        """
        doc_id = self.db.insert_document(
            file_path=file_info["path"],
            title=file_info["name"],
            file_type=file_info["type"],
            file_size=file_info["size"],
            file_hash=file_info["hash"],
        )

        self.db.log("INFO", f"Created document: {doc_id}")

        return doc_id

    def store_pages(self, doc_id, extraction):
        """
        Store document pages in database.

        Args:
            doc_id: Document ID
            extraction: Extraction result with content list (for PDFs)

        Returns:
            dict: Mapping of page_number to page_id
        """
        page_map = {}
        content = extraction.get("content", [])

        if not isinstance(content, list):
            # Not a multi-page document
            return page_map

        for i, page_content in enumerate(content):
            page_number = i + 1
            text = page_content.get("text", "") if isinstance(page_content, dict) else str(page_content)

            page_id = self.db.insert_page(
                doc_id=doc_id,
                page_number=page_number,
                text_content=text,
            )
            page_map[page_number] = page_id

        if self.progress and page_map:
            self.progress.add_log(f"{len(page_map)} Seiten gespeichert")

        self.db.log("INFO", f"Stored {len(page_map)} pages for document {doc_id}")

        return page_map

    def store_chunks(self, doc_id, chunks, page_map=None):
        """
        Store chunks in database.

        Args:
            doc_id: Document ID
            chunks: List of chunk dictionaries
            page_map: Optional dict mapping page_number to page_id

        Returns:
            list: Chunks with db_id added
        """
        page_map = page_map or {}

        for i, chunk in enumerate(chunks):
            # Determine page_id from chunk metadata
            page_id = None
            metadata = chunk.get("metadata", {})
            page_number = metadata.get("page")
            if page_number and page_map:
                page_id = page_map.get(page_number)

            chunk_id = self.db.insert_chunk(
                doc_id=doc_id,
                chunk_index=i,
                content=chunk["content"],
                heading_path=json.dumps(chunk.get("heading_path", [])),
                position_start=chunk.get("position_start", 0),
                position_end=chunk.get("position_end", 0),
                metadata=json.dumps(metadata),
                page_id=page_id,
            )
            chunk["db_id"] = chunk_id

        return chunks

    def update_document_status(self, doc_id, status):
        """
        Update document processing status.

        Args:
            doc_id: Document ID
            status: New status (processing, done, failed)
        """
        self.db.update_document_status(doc_id, status)

    def check_document_exists(self, file_path):
        """
        Check if document already exists.

        Args:
            file_path: Path to file

        Returns:
            int|None: Document ID if exists, None otherwise
        """
        return self.db.document_exists(file_path)

    def check_document_done(self, file_path):
        """
        Check if document is already fully processed.

        Args:
            file_path: Path to file

        Returns:
            bool: True if document is done
        """
        return self.db.document_is_done(file_path)
← Übersicht Graph