pipeline.py
- Pfad:
/var/www/scripts/pipeline/pipeline.py
- Namespace: pipeline
- Zeilen: 662 | Größe: 23,805 Bytes
- Geändert: 2025-12-31 03:01:09 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 55
- Dependencies: 0 (25%)
- LOC: 0 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Issues 2
| Zeile |
Typ |
Beschreibung |
| - |
complexity |
Datei hat 662 Zeilen (max: 500) |
| - |
coupling |
Klasse hat 26 Dependencies (max: 15) |
Dependencies 26
- use argparse
- use os
- use time
- use pathlib.Path
- use config.MAX_RETRIES
- use config.RETRY_BACKOFF_BASE
- use config.SEMANTIC_AUTO_QUEUE
- use config.SEMANTIC_SYNC
- use config.SEMANTIC_USE_ANTHROPIC
- use constants.DEFAULT_LIMIT
- use db.PipelineProgress
- use db.db
- use detect.queue_files
- use detect.scan_directory
- use pipeline_config.get_step_model
- use step_embed.EmbeddingStep
- use step_entity_enrich.EntityEnrichStep
- use step_extract.ExtractionStep
- use step_load.LoadStep
- use step_semantic.SemanticStep
- use step_semantic_extended.DuplicateCheckStep
- use step_semantic_extended.KnowledgeSemanticAnalyzeStep
- use step_semantic_extended.KnowledgeSemanticStoreStep
- use step_semantic_extended.TextSemanticAnalyzeStep
- use step_semantic_extended.TextSemanticStoreStep
- use step_transform.TransformationStep
Funktionen 6
-
process_file()
Zeile 51
-
process_file_v5()
Zeile 159
-
process_queue()
Zeile 309
-
run_scan()
Zeile 341
-
run_full_pipeline()
Zeile 353
-
main()
Zeile 496
Code
#!/usr/bin/env python3
"""
KI-System Document Pipeline
Main orchestration script for document processing.
Usage:
python pipeline.py scan # Scan for new documents
python pipeline.py process # Process queued documents
python pipeline.py embed # Embed pending chunks
python pipeline.py semantic <id> # Run semantic analysis on document
python pipeline.py semantic-queue # Process semantic queue
python pipeline.py enrich-entities # Enrich entity descriptions via Ollama
python pipeline.py enrich-entities 50 # Limit to 50 entities
python pipeline.py all # Full pipeline run
python pipeline.py all --pipeline-id=1 --run-id=5 # With tracking
python pipeline.py file <path> # Process single file
python pipeline.py status # Show pipeline status
"""
import argparse
import os
import time
from pathlib import Path
from config import (
MAX_RETRIES,
RETRY_BACKOFF_BASE,
SEMANTIC_AUTO_QUEUE,
SEMANTIC_SYNC,
SEMANTIC_USE_ANTHROPIC,
)
from constants import DEFAULT_LIMIT
from db import PipelineProgress, db
from detect import queue_files, scan_directory
from pipeline_config import get_step_model
from step_embed import EmbeddingStep
from step_entity_enrich import EntityEnrichStep
from step_extract import ExtractionStep
from step_load import LoadStep
from step_semantic import SemanticStep
from step_semantic_extended import (
DuplicateCheckStep,
KnowledgeSemanticAnalyzeStep,
KnowledgeSemanticStoreStep,
TextSemanticAnalyzeStep,
TextSemanticStoreStep,
)
from step_transform import TransformationStep
def process_file(file_path, progress=None):
"""Process a single file through the pipeline."""
file_name = Path(file_path).name
if progress:
progress.update_document(file_name)
# Initialize pipeline steps
extract_step = ExtractionStep(db, progress)
load_step = LoadStep(db, progress)
transform_step = TransformationStep(db, progress)
embed_step = EmbeddingStep(db, progress)
# Check if cancelled before starting
if progress and progress.is_cancelled():
return "cancelled", 0, 0
# Step 1: Extract
extract_result = extract_step.execute(file_path)
if not extract_result["success"]:
if extract_result.get("error") == "cancelled":
return "cancelled", 0, 0
return False, 0, 0
extraction = extract_result["extraction"]
file_info = extract_result["file_info"]
total_pages = extract_result.get("total_pages", 0)
# Check if cancelled after extraction
if progress and progress.is_cancelled():
return "cancelled", 0, 0
# Step 2: Load document
doc_id = load_step.create_document(file_info)
# Step 3: Store pages (PDFs and multi-page documents)
page_map = load_step.store_pages(doc_id, extraction)
# Step 4: Vision analysis (PDFs only)
if file_info["type"] == ".pdf":
transform_step.execute_vision(doc_id, file_path, file_info["type"])
# Check if cancelled after vision
if progress and progress.is_cancelled():
return "cancelled", 0, 0
# Step 5: Chunking
chunks = transform_step.execute_chunking(extraction, total_pages)
# Step 6: Store chunks with page references
chunks = load_step.store_chunks(doc_id, chunks, page_map)
# Check if cancelled after chunking
if progress and progress.is_cancelled():
return "cancelled", len(chunks), 0
# Step 7: Enrichment (PDFs only)
if file_info["type"] == ".pdf":
transform_step.execute_enrichment(doc_id, file_info["type"])
# Check if cancelled after enrichment
if progress and progress.is_cancelled():
return "cancelled", len(chunks), 0
# Step 8: Embeddings (Layer 3 - Document becomes searchable)
embedded = embed_step.execute(chunks, doc_id, file_name, file_path)
# Document is now searchable - update status to "embedded"
load_step.update_document_status(doc_id, "embedded")
if progress:
progress.add_log(f"Layer 3 fertig: {file_name} ist jetzt suchbar")
# Check if cancelled after embedding
if progress and progress.is_cancelled():
return "cancelled", len(chunks), embedded
# Step 9: Semantic analysis (Layer 4 - Optional/Async)
semantic_step = SemanticStep(db, progress)
full_text = extract_step.get_full_text_from_extraction(extraction)
if SEMANTIC_SYNC:
# Run semantic analysis synchronously
try:
semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)
# Update to done only after semantic completes
load_step.update_document_status(doc_id, "done")
except Exception as e:
# Semantic failed but document is still searchable
db.log("WARNING", f"Semantic analysis failed for {file_name}: {e}")
if progress:
progress.add_log(f"Semantik-Fehler (Dokument bleibt suchbar): {str(e)[:50]}")
elif SEMANTIC_AUTO_QUEUE:
# Queue for async processing
semantic_step.queue(doc_id, priority=5)
load_step.update_document_status(doc_id, "done")
if progress:
progress.add_log(f"Semantik in Queue: {file_name}")
else:
# No semantic analysis
load_step.update_document_status(doc_id, "done")
if progress:
progress.add_log(f"Fertig: {file_name}")
return True, len(chunks), embedded
def process_file_v5(file_path, progress=None):
"""Process a single file through Pipeline #5 (Scientific Pipeline).
Key difference from process_file():
- Semantic analysis happens BEFORE embedding (scientifically correct)
- Uses extended semantic steps for text and knowledge semantics
"""
file_name = Path(file_path).name
if progress:
progress.update_document(file_name)
# Initialize pipeline steps
extract_step = ExtractionStep(db, progress)
load_step = LoadStep(db, progress)
transform_step = TransformationStep(db, progress)
embed_step = EmbeddingStep(db, progress)
text_semantic_analyze = TextSemanticAnalyzeStep(db, progress)
text_semantic_store = TextSemanticStoreStep(db, progress)
knowledge_semantic_analyze = KnowledgeSemanticAnalyzeStep(db, progress)
knowledge_semantic_store = KnowledgeSemanticStoreStep(db, progress)
duplicate_check = DuplicateCheckStep(db, progress)
# Check if cancelled before starting
if progress and progress.is_cancelled():
return "cancelled", 0, 0
# Phase 1: Existenz - Extract
extract_result = extract_step.execute(file_path)
if not extract_result["success"]:
if extract_result.get("error") == "cancelled":
return "cancelled", 0, 0
return False, 0, 0
extraction = extract_result["extraction"]
file_info = extract_result["file_info"]
total_pages = extract_result.get("total_pages", 0)
content_hash = file_info.get("hash", "")
# Check if cancelled after extraction
if progress and progress.is_cancelled():
return "cancelled", 0, 0
# Phase 1: Existenz - Load document
doc_id = load_step.create_document(file_info)
# Phase 1: Existenz - Duplicate check
dup_result = duplicate_check.execute(doc_id, content_hash)
if dup_result["status"] == "abort":
load_step.update_document_status(doc_id, "duplicate")
if progress:
progress.add_log(f"Duplikat: {file_name} = Doc #{dup_result['duplicate_id']}")
return True, 0, 0 # Not an error, just skip
# Phase 2: Text - Store pages
page_map = load_step.store_pages(doc_id, extraction)
# Phase 2: Text - Vision analysis (PDFs only)
if file_info["type"] == ".pdf":
transform_step.execute_vision(doc_id, file_path, file_info["type"])
if progress and progress.is_cancelled():
return "cancelled", 0, 0
# Phase 3: Struktur - Chunking
chunks = transform_step.execute_chunking(extraction, total_pages)
# Phase 3: Struktur - Store chunks with page references
chunks = load_step.store_chunks(doc_id, chunks, page_map)
if progress and progress.is_cancelled():
return "cancelled", len(chunks), 0
# Phase 3: Struktur - Enrichment (PDFs only)
if file_info["type"] == ".pdf":
transform_step.execute_enrichment(doc_id, file_info["type"])
if progress and progress.is_cancelled():
return "cancelled", len(chunks), 0
# Phase 4: Textsemantik - Analyze chunks
if progress:
progress.add_log("Phase 4: Textsemantik...")
# Prepare chunks for analysis
chunk_data = [{"id": c["id"], "content": c["content"]} for c in chunks]
text_model = get_step_model("text_semantic_analyze")
analyzed_chunks = text_semantic_analyze.execute(chunk_data, {"model": text_model})
# Store text semantics
text_semantic_store.execute(analyzed_chunks, {})
if progress and progress.is_cancelled():
return "cancelled", len(chunks), 0
# Phase 5-6: Entity + Wissenssemantik
if progress:
progress.add_log("Phase 5-6: Entity-Extraktion + Wissenssemantik...")
# Run standard semantic analysis (entities, relations, taxonomy)
semantic_step = SemanticStep(db, progress)
full_text = extract_step.get_full_text_from_extraction(extraction)
try:
semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)
except Exception as e:
db.log("WARNING", f"Semantic analysis failed for {file_name}: {e}")
if progress:
progress.add_log(f"Semantik-Warnung: {str(e)[:50]}")
# Load entities for knowledge semantics
cursor = db.execute(
"""SELECT e.id, e.name, e.type, c.content as context
FROM entities e
JOIN chunk_entities ce ON e.id = ce.entity_id
JOIN chunks c ON ce.chunk_id = c.id
WHERE c.document_id = %s
GROUP BY e.id""",
(doc_id,),
)
entities = cursor.fetchall()
cursor.close()
if entities:
# Convert to list of dicts
entity_list = [{"id": e["id"], "name": e["name"], "type": e["type"], "context": e["context"]} for e in entities]
# Analyze knowledge semantics
knowledge_model = get_step_model("knowledge_semantic_analyze")
analyzed_entities = knowledge_semantic_analyze.execute(entity_list, {"model": knowledge_model})
# Store knowledge semantics
knowledge_semantic_store.execute(analyzed_entities, {})
if progress and progress.is_cancelled():
return "cancelled", len(chunks), 0
# Phase 10: Retrieval - Embeddings (AFTER semantics!)
if progress:
progress.add_log("Phase 10: Embeddings...")
embedded = embed_step.execute(chunks, doc_id, file_name, file_path)
# Document is now searchable and semantically analyzed
load_step.update_document_status(doc_id, "done")
if progress:
progress.add_log(f"Pipeline #5 fertig: {file_name}")
return True, len(chunks), embedded
def process_queue():
"""Process items from the queue."""
items = db.get_pending_queue_items(limit=10)
db.log("INFO", f"Found {len(items)} items in queue")
for item in items:
queue_id = item["id"]
file_path = item["file_path"]
retry_count = item["retry_count"]
if retry_count >= MAX_RETRIES:
db.update_queue_status(queue_id, "failed", "Max retries exceeded")
continue
db.update_queue_status(queue_id, "processing")
try:
success = process_file(file_path)
if success:
db.update_queue_status(queue_id, "completed")
else:
raise Exception("Processing returned False")
except Exception as e:
error_msg = str(e)
db.update_queue_status(queue_id, "pending", error_msg)
# Exponential backoff
wait_time = RETRY_BACKOFF_BASE ** (retry_count + 1)
db.log("INFO", f"Retry {retry_count + 1} in {wait_time}s: {file_path}")
time.sleep(wait_time)
def run_scan():
"""Scan for new documents."""
files = scan_directory()
print(f"Found {len(files)} files")
if files:
queued = queue_files(files)
print(f"Queued {queued} files")
return files
def run_full_pipeline(run_id=None, pipeline_id=None):
"""Run complete pipeline: scan → process → embed.
Pipeline selection:
- pipeline_id=5: Scientific Pipeline (semantics BEFORE embedding)
- pipeline_id=1-4 or None: Legacy Pipeline (semantics AFTER embedding)
"""
progress = PipelineProgress(run_id) if run_id else None
# Determine which processing function to use
use_v5 = pipeline_id == 5
pipeline_name = "Wissenschaftliche Pipeline v1" if use_v5 else "Standard Pipeline"
print("=" * 50)
print(f"KI-System Pipeline - {pipeline_name}")
if run_id:
print(f"Run ID: {run_id}, Pipeline ID: {pipeline_id}")
if use_v5:
print("Mode: Semantik VOR Embedding (wissenschaftlich korrekt)")
print("=" * 50)
try:
# Phase 1: Scan
if progress:
progress.update_step("detect")
progress.add_log("Scanne nach Dokumenten...")
print("\n[1/3] Scanning for documents...")
files = scan_directory()
print(f"Found {len(files)} files")
if progress:
progress.add_log(f"{len(files)} neue Dokumente gefunden")
if files:
queued = queue_files(files)
print(f"Queued {queued} files")
# Phase 2: Process queue items (includes resume of previous runs)
items = db.get_pending_queue_items(limit=DEFAULT_LIMIT)
print(f"\n[2/3] Processing {len(items)} documents...")
if items:
# Update total with actual queue count (may include items from previous runs)
if progress:
progress.update_progress(total=len(items))
progress.add_log(f"{len(items)} Dokumente in Queue")
total_chunks = 0
total_embeddings = 0
processed = 0
failed = 0
for item in items:
# Check if cancelled
if progress and progress.is_cancelled():
progress.add_log("Pipeline abgebrochen durch Benutzer")
progress.complete("cancelled")
print("\nPipeline cancelled by user")
return
queue_id = item["id"]
file_path = item["file_path"]
file_name = Path(file_path).name
# Skip already-done documents (for resume capability)
if db.document_is_done(file_path):
db.update_queue_status(queue_id, "completed")
processed += 1
if progress:
progress.add_log(f"Übersprungen (bereits fertig): {file_name}")
progress.update_progress(processed=processed)
continue
if progress:
progress.update_document(file_name)
db.update_queue_status(queue_id, "processing")
try:
# Use Pipeline #5 if selected, otherwise legacy
if use_v5:
result = process_file_v5(file_path, progress)
else:
result = process_file(file_path, progress)
success, chunks, embedded = result if isinstance(result, tuple) else (result, 0, 0)
# Handle cancellation during file processing
if success == "cancelled":
progress.add_log("Pipeline abgebrochen durch Benutzer")
progress.complete("cancelled")
print("\nPipeline cancelled by user")
return
if success:
db.update_queue_status(queue_id, "completed")
processed += 1
total_chunks += chunks
total_embeddings += embedded
else:
db.update_queue_status(queue_id, "failed", "Processing failed")
failed += 1
except Exception as e:
db.update_queue_status(queue_id, "failed", str(e))
failed += 1
if progress:
progress.add_log(f"FEHLER bei {file_name}: {str(e)[:50]}")
if progress:
progress.update_progress(
processed=processed,
failed=failed,
chunks=total_chunks,
embeddings=total_embeddings,
)
else:
print("\n[2/3] No pending documents in queue")
if progress:
progress.add_log("Keine ausstehenden Dokumente in Queue")
# Phase 3: Embed remaining
print("\n[3/3] Embedding remaining chunks...")
embed_step = EmbeddingStep(db, progress)
embedded = embed_step.embed_pending()
print(f"Embedded {embedded} chunks")
# Complete
print("\n" + "=" * 50)
print("Pipeline complete!")
if progress:
progress.add_log("Pipeline erfolgreich abgeschlossen")
progress.complete("completed")
except Exception as e:
db.log("ERROR", f"Pipeline error: {e}")
print(f"Error: {e}")
if progress:
progress.add_log(f"FEHLER: {str(e)}")
progress.complete("failed", str(e))
raise
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="KI-System Document Pipeline")
parser.add_argument(
"command",
choices=["scan", "process", "embed", "semantic", "semantic-queue", "enrich-entities", "all", "file", "status"],
help="Command to execute",
)
parser.add_argument("file_path", nargs="?", help="File path for 'file' command")
parser.add_argument("--pipeline-id", type=int, help="Pipeline ID for tracking")
parser.add_argument("--run-id", type=int, help="Run ID for progress tracking")
args = parser.parse_args()
db.connect()
try:
if args.command == "scan":
run_scan()
elif args.command == "process":
process_queue()
elif args.command == "embed":
embed_step = EmbeddingStep(db)
count = embed_step.embed_pending()
print(f"Embedded {count} chunks")
elif args.command == "semantic":
# Run semantic analysis on a specific document
if not args.file_path:
print("Error: semantic command requires a document ID")
return
try:
doc_id = int(args.file_path)
except ValueError:
print("Error: document ID must be an integer")
return
semantic_step = SemanticStep(db)
# Get document text
text = semantic_step._get_document_text(doc_id)
if not text:
print(f"No text found for document {doc_id}")
return
result = semantic_step.execute(doc_id, text, use_anthropic=SEMANTIC_USE_ANTHROPIC)
print(f"Semantic analysis complete: {result}")
elif args.command == "semantic-queue":
# Process pending items from semantic queue
semantic_step = SemanticStep(db)
result = semantic_step.process_queue(
limit=int(args.file_path) if args.file_path else 5,
use_anthropic=SEMANTIC_USE_ANTHROPIC,
)
print(f"Semantic queue processed: {result}")
elif args.command == "enrich-entities":
# Enrich entity descriptions via Ollama
limit = int(args.file_path) if args.file_path else DEFAULT_LIMIT
model = get_step_model("enrich")
print(f"Entity Description Enrichment (limit={limit}, model={model})")
print("-" * 50)
enrich_step = EntityEnrichStep()
# Show current stats
stats = enrich_step.get_stats()
print(f"Entities total: {stats['total']}")
print(f"Need enrichment: {stats['needs_enrichment']}")
print(f"Already enriched: {stats['enriched']}")
if stats["avg_length"]:
print(f"Avg description length: {int(stats['avg_length'])} chars")
print()
# Run enrichment
result = enrich_step.execute(limit=limit, model=model)
print()
print("=" * 50)
print(f"Processed: {result['processed']}")
print(f"Success: {result['success']}")
print(f"Errors: {result['errors']}")
elif args.command == "all":
run_full_pipeline(run_id=args.run_id, pipeline_id=args.pipeline_id)
elif args.command == "file":
if not args.file_path:
print("Error: file command requires a file path")
return
if os.path.exists(args.file_path):
result = process_file(args.file_path)
success = result[0] if isinstance(result, tuple) else result
print(f"Processing {'successful' if success else 'failed'}")
else:
print(f"File not found: {args.file_path}")
elif args.command == "status":
# Show pipeline status
cursor = db.execute(
"""SELECT status, COUNT(*) as count
FROM pipeline_queue
GROUP BY status"""
)
results = cursor.fetchall()
cursor.close()
print("\nQueue Status:")
for r in results:
print(f" {r['status']}: {r['count']}")
cursor = db.execute("SELECT COUNT(*) as count FROM documents")
doc_count = cursor.fetchone()["count"]
cursor.close()
cursor = db.execute("SELECT COUNT(*) as count FROM chunks")
chunk_count = cursor.fetchone()["count"]
cursor.close()
cursor = db.execute("SELECT COUNT(*) as count FROM chunks WHERE qdrant_id IS NOT NULL")
embedded_count = cursor.fetchone()["count"]
cursor.close()
print(f"\nDocuments: {doc_count}")
print(f"Chunks: {chunk_count} ({embedded_count} embedded)")
# Semantic status
cursor = db.execute(
"""SELECT semantic_status, COUNT(*) as count
FROM documents
GROUP BY semantic_status"""
)
sem_results = cursor.fetchall()
cursor.close()
print("\nSemantic Status:")
for r in sem_results:
status = r["semantic_status"] or "null"
print(f" {status}: {r['count']}")
# Semantic queue
cursor = db.execute(
"""SELECT status, COUNT(*) as count
FROM semantic_queue
GROUP BY status"""
)
queue_results = cursor.fetchall()
cursor.close()
if queue_results:
print("\nSemantic Queue:")
for r in queue_results:
print(f" {r['status']}: {r['count']}")
except Exception as e:
db.log("ERROR", f"Pipeline error: {e}")
print(f"Error: {e}")
raise
finally:
db.disconnect()
if __name__ == "__main__":
main()