db_documents.py
- Pfad:
/var/www/scripts/pipeline/db_documents.py - Namespace: pipeline
- Zeilen: 277 | Größe: 8,246 Bytes
- Geändert: 2025-12-28 08:58:04 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 95
- Dependencies: 100 (25%)
- LOC: 74 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 2
- use os
- use constants.DEFAULT_LIMIT
Klassen 1
-
DocumentsMixinclass Zeile 12
Code
"""
Database Documents Mixin
Single Responsibility: Document, Page, and Chunk CRUD operations.
"""
import os
from constants import DEFAULT_LIMIT
class DocumentsMixin:
"""Mixin for document, page, and chunk operations.
Provides:
- Document: exists, is_done, insert, update_status
- Page: insert, get_id
- Chunk: insert, get_for_embedding, update_qdrant_id
"""
# ========== Document Operations ==========
def document_exists(self, file_path: str) -> int | None:
"""Check if document already exists.
Args:
file_path: Source file path
Returns:
Document ID if exists, None otherwise
"""
cursor = self.execute(
"SELECT id FROM documents WHERE source_path = %s",
(file_path,),
)
result = cursor.fetchone()
cursor.close()
return result["id"] if result else None
def document_is_done(self, file_path: str) -> int | None:
"""Check if document is already fully processed (status='done').
Args:
file_path: Source file path
Returns:
Document ID if done, None otherwise
"""
cursor = self.execute(
"SELECT id, status FROM documents WHERE source_path = %s",
(file_path,),
)
result = cursor.fetchone()
cursor.close()
if result and result["status"] == "done":
return result["id"]
return None
def insert_document(
self,
file_path: str,
title: str,
file_type: str,
file_size: int,
file_hash: str,
) -> int | None:
"""Insert a new document or update existing one.
Args:
file_path: Source file path
title: Document title (filename)
file_type: MIME type
file_size: File size in bytes
file_hash: File hash for change detection
Returns:
Document ID
"""
folder_path = os.path.dirname(file_path)
cursor = self.execute(
"""INSERT INTO documents
(source_path, folder_path, filename, mime_type, file_size, file_hash, status)
VALUES (%s, %s, %s, %s, %s, %s, 'processing')
ON DUPLICATE KEY UPDATE
file_hash = VALUES(file_hash),
file_size = VALUES(file_size),
status = 'processing',
processed_at = NULL,
error_message = NULL""",
(file_path, folder_path, title, file_type, file_size, file_hash),
)
self.commit()
doc_id = cursor.lastrowid
# If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0
if doc_id == 0:
cursor_select = self.execute(
"SELECT id FROM documents WHERE source_path = %s",
(file_path,),
)
result = cursor_select.fetchone()
cursor_select.close()
doc_id = result["id"] if result else None
cursor.close()
return doc_id
def update_document_status(
self,
doc_id: int,
status: str,
error_message: str = None,
):
"""Update document processing status.
Args:
doc_id: Document ID
status: New status (processing, done, error)
error_message: Optional error message
"""
if error_message:
cursor = self.execute(
"""UPDATE documents
SET status = %s, error_message = %s, processed_at = NOW()
WHERE id = %s""",
(status, error_message, doc_id),
)
else:
cursor = self.execute(
"UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s",
(status, doc_id),
)
self.commit()
cursor.close()
# ========== Page Operations ==========
def insert_page(
self,
doc_id: int,
page_number: int,
text_content: str,
token_count: int = None,
) -> int | None:
"""Insert a document page.
Args:
doc_id: Document ID
page_number: Page number (1-based)
text_content: Page text content
token_count: Optional token count
Returns:
Page ID
"""
if token_count is None:
token_count = len(text_content.split()) if text_content else 0
cursor = self.execute(
"""INSERT INTO document_pages
(document_id, page_number, text_content, token_count, created_at)
VALUES (%s, %s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE
text_content = VALUES(text_content),
token_count = VALUES(token_count)""",
(doc_id, page_number, text_content, token_count),
)
self.commit()
page_id = cursor.lastrowid
if page_id == 0:
cursor_select = self.execute(
"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s",
(doc_id, page_number),
)
result = cursor_select.fetchone()
cursor_select.close()
page_id = result["id"] if result else None
cursor.close()
return page_id
def get_page_id(self, doc_id: int, page_number: int) -> int | None:
"""Get page ID by document and page number.
Args:
doc_id: Document ID
page_number: Page number
Returns:
Page ID or None
"""
cursor = self.execute(
"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s",
(doc_id, page_number),
)
result = cursor.fetchone()
cursor.close()
return result["id"] if result else None
# ========== Chunk Operations ==========
def insert_chunk(
self,
doc_id: int,
chunk_index: int,
content: str,
heading_path: str,
position_start: int = None,
position_end: int = None,
metadata: str = None,
page_id: int = None,
) -> int:
"""Insert a text chunk.
Args:
doc_id: Document ID
chunk_index: Chunk index within document
content: Chunk text content
heading_path: Heading hierarchy path
position_start: Optional start position
position_end: Optional end position
metadata: Optional JSON metadata
page_id: Optional page ID
Returns:
Chunk ID
"""
# Calculate token count (rough estimate: 4 chars per token)
token_count = len(content) // 4
cursor = self.execute(
"""INSERT INTO chunks
(document_id, page_id, chunk_index, content, token_count, heading_path, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(doc_id, page_id, chunk_index, content, token_count, heading_path, metadata),
)
self.commit()
chunk_id = cursor.lastrowid
cursor.close()
return chunk_id
def get_chunks_for_embedding(self, limit: int = DEFAULT_LIMIT) -> list:
"""Get chunks that need embeddings.
Args:
limit: Maximum number of chunks to return
Returns:
List of chunk dicts with id, content, document_id
"""
cursor = self.execute(
"""SELECT c.id, c.content, c.document_id
FROM chunks c
WHERE c.qdrant_id IS NULL
ORDER BY c.created_at
LIMIT %s""",
(limit,),
)
results = cursor.fetchall()
cursor.close()
return results
def update_chunk_qdrant_id(self, chunk_id: int, qdrant_id: str):
"""Update chunk with Qdrant point ID.
Args:
chunk_id: Chunk ID
qdrant_id: Qdrant point UUID
"""
cursor = self.execute(
"UPDATE chunks SET qdrant_id = %s WHERE id = %s",
(qdrant_id, chunk_id),
)
self.commit()
cursor.close()