step_load.py
- Pfad:
/var/www/scripts/pipeline/step_load.py - Namespace: pipeline
- Zeilen: 152 | Größe: 4,203 Bytes
- Geändert: 2025-12-26 23:50:34 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 100
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 1
- use json
Klassen 1
-
LoadStepclass Zeile 11
Code
"""
Load Step Module
Handles database persistence operations for documents and chunks.
Part of modularized pipeline architecture.
"""
import json
class LoadStep:
"""Step: Load documents and chunks into database."""
def __init__(self, db, progress=None):
"""
Initialize load step.
Args:
db: Database instance
progress: Optional PipelineProgress instance
"""
self.db = db
self.progress = progress
def create_document(self, file_info):
"""
Create document record in database.
Args:
file_info: Dict with path, name, type, size, hash
Returns:
int: Document ID
"""
doc_id = self.db.insert_document(
file_path=file_info["path"],
title=file_info["name"],
file_type=file_info["type"],
file_size=file_info["size"],
file_hash=file_info["hash"],
)
self.db.log("INFO", f"Created document: {doc_id}")
return doc_id
def store_pages(self, doc_id, extraction):
"""
Store document pages in database.
Args:
doc_id: Document ID
extraction: Extraction result with content list (for PDFs)
Returns:
dict: Mapping of page_number to page_id
"""
page_map = {}
content = extraction.get("content", [])
if not isinstance(content, list):
# Not a multi-page document
return page_map
for i, page_content in enumerate(content):
page_number = i + 1
text = page_content.get("text", "") if isinstance(page_content, dict) else str(page_content)
page_id = self.db.insert_page(
doc_id=doc_id,
page_number=page_number,
text_content=text,
)
page_map[page_number] = page_id
if self.progress and page_map:
self.progress.add_log(f"{len(page_map)} Seiten gespeichert")
self.db.log("INFO", f"Stored {len(page_map)} pages for document {doc_id}")
return page_map
def store_chunks(self, doc_id, chunks, page_map=None):
"""
Store chunks in database.
Args:
doc_id: Document ID
chunks: List of chunk dictionaries
page_map: Optional dict mapping page_number to page_id
Returns:
list: Chunks with db_id added
"""
page_map = page_map or {}
for i, chunk in enumerate(chunks):
# Determine page_id from chunk metadata
page_id = None
metadata = chunk.get("metadata", {})
page_number = metadata.get("page")
if page_number and page_map:
page_id = page_map.get(page_number)
chunk_id = self.db.insert_chunk(
doc_id=doc_id,
chunk_index=i,
content=chunk["content"],
heading_path=json.dumps(chunk.get("heading_path", [])),
position_start=chunk.get("position_start", 0),
position_end=chunk.get("position_end", 0),
metadata=json.dumps(metadata),
page_id=page_id,
)
chunk["db_id"] = chunk_id
return chunks
def update_document_status(self, doc_id, status):
"""
Update document processing status.
Args:
doc_id: Document ID
status: New status (processing, done, failed)
"""
self.db.update_document_status(doc_id, status)
def check_document_exists(self, file_path):
"""
Check if document already exists.
Args:
file_path: Path to file
Returns:
int|None: Document ID if exists, None otherwise
"""
return self.db.document_exists(file_path)
def check_document_done(self, file_path):
"""
Check if document is already fully processed.
Args:
file_path: Path to file
Returns:
bool: True if document is done
"""
return self.db.document_is_done(file_path)