step_embed.py
- Pfad:
/var/www/scripts/pipeline/step_embed.py - Namespace: pipeline
- Zeilen: 70 | Größe: 1,960 Bytes
- Geändert: 2025-12-25 16:56:34 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 100
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 2
- use embed.embed_chunks
- use embed.embed_pending_chunks
Klassen 1
-
EmbeddingStepclass Zeile 11
Code
"""
Embedding Step Module
Handles vector embedding generation and storage.
Part of modularized pipeline architecture.
"""
from embed import embed_chunks, embed_pending_chunks
class EmbeddingStep:
"""Step: Generate and store vector embeddings."""
def __init__(self, db, progress=None):
"""
Initialize embedding step.
Args:
db: Database instance
progress: Optional PipelineProgress instance
"""
self.db = db
self.progress = progress
def execute(self, chunks, doc_id, file_name, file_path):
"""
Generate embeddings for chunks.
Args:
chunks: List of chunk dictionaries with db_id
doc_id: Document ID
file_name: Document filename (for logging)
file_path: Document file path (for metadata)
Returns:
int: Number of chunks successfully embedded
"""
if self.progress:
self.progress.update_step("embed")
self.progress.add_log(f"Erstelle Embeddings für {len(chunks)} Chunks...")
embedded = embed_chunks(chunks, doc_id, file_name, file_path, progress=self.progress)
self.db.log("INFO", f"Embedded {embedded}/{len(chunks)} chunks")
if self.progress:
self.progress.add_log(f"{embedded} Embeddings erstellt")
return embedded
def embed_pending(self):
"""
Process any pending chunks that need embeddings.
Returns:
int: Number of chunks embedded
"""
if self.progress:
self.progress.update_step("embed")
self.progress.add_log("Verarbeite ausstehende Embeddings...")
embedded = embed_pending_chunks()
self.db.log("INFO", f"Embedded {embedded} pending chunks")
if self.progress and embedded > 0:
self.progress.add_log(f"{embedded} weitere Embeddings erstellt")
return embedded