db_documents.py

Code Hygiene Score: 95

Keine Issues gefunden.

Dependencies 2

Klassen 1

Code

"""
Database Documents Mixin

Single Responsibility: Document, Page, and Chunk CRUD operations.
"""

import os

from constants import DEFAULT_LIMIT


class DocumentsMixin:
    """Mixin for document, page, and chunk operations.

    Provides:
    - Document: exists, is_done, insert, update_status
    - Page: insert, get_id
    - Chunk: insert, get_for_embedding, update_qdrant_id
    """

    # ========== Document Operations ==========

    def document_exists(self, file_path: str) -> int | None:
        """Check if document already exists.

        Args:
            file_path: Source file path

        Returns:
            Document ID if exists, None otherwise
        """
        cursor = self.execute(
            "SELECT id FROM documents WHERE source_path = %s",
            (file_path,),
        )
        result = cursor.fetchone()
        cursor.close()
        return result["id"] if result else None

    def document_is_done(self, file_path: str) -> int | None:
        """Check if document is already fully processed (status='done').

        Args:
            file_path: Source file path

        Returns:
            Document ID if done, None otherwise
        """
        cursor = self.execute(
            "SELECT id, status FROM documents WHERE source_path = %s",
            (file_path,),
        )
        result = cursor.fetchone()
        cursor.close()
        if result and result["status"] == "done":
            return result["id"]
        return None

    def insert_document(
        self,
        file_path: str,
        title: str,
        file_type: str,
        file_size: int,
        file_hash: str,
    ) -> int | None:
        """Insert a new document or update existing one.

        Args:
            file_path: Source file path
            title: Document title (filename)
            file_type: MIME type
            file_size: File size in bytes
            file_hash: File hash for change detection

        Returns:
            Document ID
        """
        folder_path = os.path.dirname(file_path)
        cursor = self.execute(
            """INSERT INTO documents
               (source_path, folder_path, filename, mime_type, file_size, file_hash, status)
               VALUES (%s, %s, %s, %s, %s, %s, 'processing')
               ON DUPLICATE KEY UPDATE
               file_hash = VALUES(file_hash),
               file_size = VALUES(file_size),
               status = 'processing',
               processed_at = NULL,
               error_message = NULL""",
            (file_path, folder_path, title, file_type, file_size, file_hash),
        )
        self.commit()
        doc_id = cursor.lastrowid

        # If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0
        if doc_id == 0:
            cursor_select = self.execute(
                "SELECT id FROM documents WHERE source_path = %s",
                (file_path,),
            )
            result = cursor_select.fetchone()
            cursor_select.close()
            doc_id = result["id"] if result else None

        cursor.close()
        return doc_id

    def update_document_status(
        self,
        doc_id: int,
        status: str,
        error_message: str = None,
    ):
        """Update document processing status.

        Args:
            doc_id: Document ID
            status: New status (processing, done, error)
            error_message: Optional error message
        """
        if error_message:
            cursor = self.execute(
                """UPDATE documents
                   SET status = %s, error_message = %s, processed_at = NOW()
                   WHERE id = %s""",
                (status, error_message, doc_id),
            )
        else:
            cursor = self.execute(
                "UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s",
                (status, doc_id),
            )
        self.commit()
        cursor.close()

    # ========== Page Operations ==========

    def insert_page(
        self,
        doc_id: int,
        page_number: int,
        text_content: str,
        token_count: int = None,
    ) -> int | None:
        """Insert a document page.

        Args:
            doc_id: Document ID
            page_number: Page number (1-based)
            text_content: Page text content
            token_count: Optional token count

        Returns:
            Page ID
        """
        if token_count is None:
            token_count = len(text_content.split()) if text_content else 0

        cursor = self.execute(
            """INSERT INTO document_pages
               (document_id, page_number, text_content, token_count, created_at)
               VALUES (%s, %s, %s, %s, NOW())
               ON DUPLICATE KEY UPDATE
               text_content = VALUES(text_content),
               token_count = VALUES(token_count)""",
            (doc_id, page_number, text_content, token_count),
        )
        self.commit()
        page_id = cursor.lastrowid

        if page_id == 0:
            cursor_select = self.execute(
                "SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s",
                (doc_id, page_number),
            )
            result = cursor_select.fetchone()
            cursor_select.close()
            page_id = result["id"] if result else None

        cursor.close()
        return page_id

    def get_page_id(self, doc_id: int, page_number: int) -> int | None:
        """Get page ID by document and page number.

        Args:
            doc_id: Document ID
            page_number: Page number

        Returns:
            Page ID or None
        """
        cursor = self.execute(
            "SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s",
            (doc_id, page_number),
        )
        result = cursor.fetchone()
        cursor.close()
        return result["id"] if result else None

    # ========== Chunk Operations ==========

    def insert_chunk(
        self,
        doc_id: int,
        chunk_index: int,
        content: str,
        heading_path: str,
        position_start: int = None,
        position_end: int = None,
        metadata: str = None,
        page_id: int = None,
    ) -> int:
        """Insert a text chunk.

        Args:
            doc_id: Document ID
            chunk_index: Chunk index within document
            content: Chunk text content
            heading_path: Heading hierarchy path
            position_start: Optional start position
            position_end: Optional end position
            metadata: Optional JSON metadata
            page_id: Optional page ID

        Returns:
            Chunk ID
        """
        # Calculate token count (rough estimate: 4 chars per token)
        token_count = len(content) // 4

        cursor = self.execute(
            """INSERT INTO chunks
               (document_id, page_id, chunk_index, content, token_count, heading_path, metadata)
               VALUES (%s, %s, %s, %s, %s, %s, %s)""",
            (doc_id, page_id, chunk_index, content, token_count, heading_path, metadata),
        )
        self.commit()
        chunk_id = cursor.lastrowid
        cursor.close()
        return chunk_id

    def get_chunks_for_embedding(self, limit: int = DEFAULT_LIMIT) -> list:
        """Get chunks that need embeddings.

        Args:
            limit: Maximum number of chunks to return

        Returns:
            List of chunk dicts with id, content, document_id
        """
        cursor = self.execute(
            """SELECT c.id, c.content, c.document_id
               FROM chunks c
               WHERE c.qdrant_id IS NULL
               ORDER BY c.created_at
               LIMIT %s""",
            (limit,),
        )
        results = cursor.fetchall()
        cursor.close()
        return results

    def update_chunk_qdrant_id(self, chunk_id: int, qdrant_id: str):
        """Update chunk with Qdrant point ID.

        Args:
            chunk_id: Chunk ID
            qdrant_id: Qdrant point UUID
        """
        cursor = self.execute(
            "UPDATE chunks SET qdrant_id = %s WHERE id = %s",
            (qdrant_id, chunk_id),
        )
        self.commit()
        cursor.close()
← Übersicht Graph