step_transform.py
- Pfad:
/var/www/scripts/pipeline/step_transform.py - Namespace: pipeline
- Zeilen: 143 | Größe: 4,296 Bytes
- Geändert: 2025-12-25 18:29:01 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 100
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 4
- use analyze.analyze_document
- use chunk.chunk_by_structure
- use enrich.run_enrichment_step
- use vision.run_vision_step
Klassen 1
-
TransformationStepclass Zeile 14
Code
"""
Transformation Step Module
Handles chunking and semantic analysis of extracted content.
Part of modularized pipeline architecture.
"""
from analyze import analyze_document
from chunk import chunk_by_structure
from enrich import run_enrichment_step
from vision import run_vision_step
class TransformationStep:
"""Step: Transform extracted content into chunks and analyze."""
def __init__(self, db, progress=None):
"""
Initialize transformation step.
Args:
db: Database instance
progress: Optional PipelineProgress instance
"""
self.db = db
self.progress = progress
def execute_vision(self, doc_id, file_path, file_type):
"""
Execute vision analysis for PDFs.
Args:
doc_id: Document database ID
file_path: Path to file
file_type: File extension
Returns:
dict: Vision analysis result
"""
if file_type != ".pdf":
return {"success": False, "error": "Not a PDF"}
if self.progress:
self.progress.update_step("vision")
self.progress.add_log("Vision-Analyse gestartet...")
self.db.log("INFO", f"Running vision analysis for document {doc_id}")
vision_config = {
"model": "llama3.2-vision:11b",
"store_images": True,
"detect_images": True,
"detect_charts": True,
"detect_tables": True,
}
vision_result = run_vision_step(doc_id, file_path, vision_config, progress=self.progress)
if vision_result["success"]:
self.db.log("INFO", f"Vision: {vision_result['pages_analyzed']}/{vision_result['pages_total']} pages")
if self.progress:
self.progress.add_log(f"Vision: {vision_result['pages_analyzed']} Seiten analysiert")
else:
self.db.log("WARNING", f"Vision analysis failed: {vision_result.get('error')}")
return vision_result
def execute_chunking(self, extraction, total_pages=0):
"""
Chunk extracted content.
Args:
extraction: Extraction result dict
total_pages: Number of pages (for logging)
Returns:
list: Chunk dictionaries
"""
if self.progress:
self.progress.update_step("chunk")
if total_pages > 0:
self.progress.add_log(f"Erstelle Chunks aus {total_pages} Seiten...")
else:
self.progress.add_log("Erstelle Chunks...")
chunks = chunk_by_structure(extraction)
self.db.log("INFO", f"Created {len(chunks)} chunks")
if self.progress:
msg = f"{len(chunks)} Chunks erstellt"
if total_pages > 0:
msg += f" (aus {total_pages} Seiten)"
self.progress.add_log(msg)
return chunks
def execute_enrichment(self, doc_id, file_type):
"""
Enrich chunks with vision context (PDFs only).
Args:
doc_id: Document database ID
file_type: File extension
Returns:
dict: Enrichment result
"""
if file_type != ".pdf":
return {"success": False, "error": "Not a PDF"}
if self.progress:
self.progress.update_step("enrich")
self.db.log("INFO", f"Running vision enrichment for document {doc_id}")
enrich_result = run_enrichment_step(doc_id)
if enrich_result["success"]:
self.db.log("INFO", f"Enrichment: {enrich_result['enriched']}/{enrich_result['total_chunks']} chunks")
else:
self.db.log("WARNING", f"Enrichment failed: {enrich_result.get('error')}")
return enrich_result
def execute_analysis(self, doc_id, full_text):
"""
Execute semantic analysis on document.
Args:
doc_id: Document database ID
full_text: Full document text
Returns:
dict: Analysis result
"""
if self.progress:
self.progress.update_step("analyze")
analysis = analyze_document(doc_id, full_text, progress=self.progress)
self.db.log("INFO", f"Analysis complete: {analysis}")
return analysis