pipeline.py

Code Hygiene Score: 55

Issues 2

Zeile Typ Beschreibung
- complexity Datei hat 662 Zeilen (max: 500)
- coupling Klasse hat 26 Dependencies (max: 15)

Dependencies 26

Funktionen 6

Code

#!/usr/bin/env python3
"""
KI-System Document Pipeline
Main orchestration script for document processing.

Usage:
    python pipeline.py scan             # Scan for new documents
    python pipeline.py process          # Process queued documents
    python pipeline.py embed            # Embed pending chunks
    python pipeline.py semantic <id>    # Run semantic analysis on document
    python pipeline.py semantic-queue   # Process semantic queue
    python pipeline.py enrich-entities  # Enrich entity descriptions via Ollama
    python pipeline.py enrich-entities 50  # Limit to 50 entities
    python pipeline.py all              # Full pipeline run
    python pipeline.py all --pipeline-id=1 --run-id=5  # With tracking
    python pipeline.py file <path>      # Process single file
    python pipeline.py status           # Show pipeline status
"""

import argparse
import os
import time
from pathlib import Path

from config import (
    MAX_RETRIES,
    RETRY_BACKOFF_BASE,
    SEMANTIC_AUTO_QUEUE,
    SEMANTIC_SYNC,
    SEMANTIC_USE_ANTHROPIC,
)
from constants import DEFAULT_LIMIT
from db import PipelineProgress, db
from detect import queue_files, scan_directory
from pipeline_config import get_step_model
from step_embed import EmbeddingStep
from step_entity_enrich import EntityEnrichStep
from step_extract import ExtractionStep
from step_load import LoadStep
from step_semantic import SemanticStep
from step_semantic_extended import (
    DuplicateCheckStep,
    KnowledgeSemanticAnalyzeStep,
    KnowledgeSemanticStoreStep,
    TextSemanticAnalyzeStep,
    TextSemanticStoreStep,
)
from step_transform import TransformationStep


def process_file(file_path, progress=None):
    """Process a single file through the pipeline."""
    file_name = Path(file_path).name

    if progress:
        progress.update_document(file_name)

    # Initialize pipeline steps
    extract_step = ExtractionStep(db, progress)
    load_step = LoadStep(db, progress)
    transform_step = TransformationStep(db, progress)
    embed_step = EmbeddingStep(db, progress)

    # Check if cancelled before starting
    if progress and progress.is_cancelled():
        return "cancelled", 0, 0

    # Step 1: Extract
    extract_result = extract_step.execute(file_path)
    if not extract_result["success"]:
        if extract_result.get("error") == "cancelled":
            return "cancelled", 0, 0
        return False, 0, 0

    extraction = extract_result["extraction"]
    file_info = extract_result["file_info"]
    total_pages = extract_result.get("total_pages", 0)

    # Check if cancelled after extraction
    if progress and progress.is_cancelled():
        return "cancelled", 0, 0

    # Step 2: Load document
    doc_id = load_step.create_document(file_info)

    # Step 3: Store pages (PDFs and multi-page documents)
    page_map = load_step.store_pages(doc_id, extraction)

    # Step 4: Vision analysis (PDFs only)
    if file_info["type"] == ".pdf":
        transform_step.execute_vision(doc_id, file_path, file_info["type"])

        # Check if cancelled after vision
        if progress and progress.is_cancelled():
            return "cancelled", 0, 0

    # Step 5: Chunking
    chunks = transform_step.execute_chunking(extraction, total_pages)

    # Step 6: Store chunks with page references
    chunks = load_step.store_chunks(doc_id, chunks, page_map)

    # Check if cancelled after chunking
    if progress and progress.is_cancelled():
        return "cancelled", len(chunks), 0

    # Step 7: Enrichment (PDFs only)
    if file_info["type"] == ".pdf":
        transform_step.execute_enrichment(doc_id, file_info["type"])

        # Check if cancelled after enrichment
        if progress and progress.is_cancelled():
            return "cancelled", len(chunks), 0

    # Step 8: Embeddings (Layer 3 - Document becomes searchable)
    embedded = embed_step.execute(chunks, doc_id, file_name, file_path)

    # Document is now searchable - update status to "embedded"
    load_step.update_document_status(doc_id, "embedded")

    if progress:
        progress.add_log(f"Layer 3 fertig: {file_name} ist jetzt suchbar")

    # Check if cancelled after embedding
    if progress and progress.is_cancelled():
        return "cancelled", len(chunks), embedded

    # Step 9: Semantic analysis (Layer 4 - Optional/Async)
    semantic_step = SemanticStep(db, progress)
    full_text = extract_step.get_full_text_from_extraction(extraction)

    if SEMANTIC_SYNC:
        # Run semantic analysis synchronously
        try:
            semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)
            # Update to done only after semantic completes
            load_step.update_document_status(doc_id, "done")
        except Exception as e:
            # Semantic failed but document is still searchable
            db.log("WARNING", f"Semantic analysis failed for {file_name}: {e}")
            if progress:
                progress.add_log(f"Semantik-Fehler (Dokument bleibt suchbar): {str(e)[:50]}")
    elif SEMANTIC_AUTO_QUEUE:
        # Queue for async processing
        semantic_step.queue(doc_id, priority=5)
        load_step.update_document_status(doc_id, "done")
        if progress:
            progress.add_log(f"Semantik in Queue: {file_name}")
    else:
        # No semantic analysis
        load_step.update_document_status(doc_id, "done")

    if progress:
        progress.add_log(f"Fertig: {file_name}")

    return True, len(chunks), embedded


def process_file_v5(file_path, progress=None):
    """Process a single file through Pipeline #5 (Scientific Pipeline).

    Key difference from process_file():
    - Semantic analysis happens BEFORE embedding (scientifically correct)
    - Uses extended semantic steps for text and knowledge semantics
    """
    file_name = Path(file_path).name

    if progress:
        progress.update_document(file_name)

    # Initialize pipeline steps
    extract_step = ExtractionStep(db, progress)
    load_step = LoadStep(db, progress)
    transform_step = TransformationStep(db, progress)
    embed_step = EmbeddingStep(db, progress)
    text_semantic_analyze = TextSemanticAnalyzeStep(db, progress)
    text_semantic_store = TextSemanticStoreStep(db, progress)
    knowledge_semantic_analyze = KnowledgeSemanticAnalyzeStep(db, progress)
    knowledge_semantic_store = KnowledgeSemanticStoreStep(db, progress)
    duplicate_check = DuplicateCheckStep(db, progress)

    # Check if cancelled before starting
    if progress and progress.is_cancelled():
        return "cancelled", 0, 0

    # Phase 1: Existenz - Extract
    extract_result = extract_step.execute(file_path)
    if not extract_result["success"]:
        if extract_result.get("error") == "cancelled":
            return "cancelled", 0, 0
        return False, 0, 0

    extraction = extract_result["extraction"]
    file_info = extract_result["file_info"]
    total_pages = extract_result.get("total_pages", 0)
    content_hash = file_info.get("hash", "")

    # Check if cancelled after extraction
    if progress and progress.is_cancelled():
        return "cancelled", 0, 0

    # Phase 1: Existenz - Load document
    doc_id = load_step.create_document(file_info)

    # Phase 1: Existenz - Duplicate check
    dup_result = duplicate_check.execute(doc_id, content_hash)
    if dup_result["status"] == "abort":
        load_step.update_document_status(doc_id, "duplicate")
        if progress:
            progress.add_log(f"Duplikat: {file_name} = Doc #{dup_result['duplicate_id']}")
        return True, 0, 0  # Not an error, just skip

    # Phase 2: Text - Store pages
    page_map = load_step.store_pages(doc_id, extraction)

    # Phase 2: Text - Vision analysis (PDFs only)
    if file_info["type"] == ".pdf":
        transform_step.execute_vision(doc_id, file_path, file_info["type"])
        if progress and progress.is_cancelled():
            return "cancelled", 0, 0

    # Phase 3: Struktur - Chunking
    chunks = transform_step.execute_chunking(extraction, total_pages)

    # Phase 3: Struktur - Store chunks with page references
    chunks = load_step.store_chunks(doc_id, chunks, page_map)

    if progress and progress.is_cancelled():
        return "cancelled", len(chunks), 0

    # Phase 3: Struktur - Enrichment (PDFs only)
    if file_info["type"] == ".pdf":
        transform_step.execute_enrichment(doc_id, file_info["type"])
        if progress and progress.is_cancelled():
            return "cancelled", len(chunks), 0

    # Phase 4: Textsemantik - Analyze chunks
    if progress:
        progress.add_log("Phase 4: Textsemantik...")

    # Prepare chunks for analysis
    chunk_data = [{"id": c["id"], "content": c["content"]} for c in chunks]
    text_model = get_step_model("text_semantic_analyze")
    analyzed_chunks = text_semantic_analyze.execute(chunk_data, {"model": text_model})

    # Store text semantics
    text_semantic_store.execute(analyzed_chunks, {})

    if progress and progress.is_cancelled():
        return "cancelled", len(chunks), 0

    # Phase 5-6: Entity + Wissenssemantik
    if progress:
        progress.add_log("Phase 5-6: Entity-Extraktion + Wissenssemantik...")

    # Run standard semantic analysis (entities, relations, taxonomy)
    semantic_step = SemanticStep(db, progress)
    full_text = extract_step.get_full_text_from_extraction(extraction)

    try:
        semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)
    except Exception as e:
        db.log("WARNING", f"Semantic analysis failed for {file_name}: {e}")
        if progress:
            progress.add_log(f"Semantik-Warnung: {str(e)[:50]}")

    # Load entities for knowledge semantics
    cursor = db.execute(
        """SELECT e.id, e.name, e.type, c.content as context
           FROM entities e
           JOIN chunk_entities ce ON e.id = ce.entity_id
           JOIN chunks c ON ce.chunk_id = c.id
           WHERE c.document_id = %s
           GROUP BY e.id""",
        (doc_id,),
    )
    entities = cursor.fetchall()
    cursor.close()

    if entities:
        # Convert to list of dicts
        entity_list = [{"id": e["id"], "name": e["name"], "type": e["type"], "context": e["context"]} for e in entities]

        # Analyze knowledge semantics
        knowledge_model = get_step_model("knowledge_semantic_analyze")
        analyzed_entities = knowledge_semantic_analyze.execute(entity_list, {"model": knowledge_model})

        # Store knowledge semantics
        knowledge_semantic_store.execute(analyzed_entities, {})

    if progress and progress.is_cancelled():
        return "cancelled", len(chunks), 0

    # Phase 10: Retrieval - Embeddings (AFTER semantics!)
    if progress:
        progress.add_log("Phase 10: Embeddings...")

    embedded = embed_step.execute(chunks, doc_id, file_name, file_path)

    # Document is now searchable and semantically analyzed
    load_step.update_document_status(doc_id, "done")

    if progress:
        progress.add_log(f"Pipeline #5 fertig: {file_name}")

    return True, len(chunks), embedded


def process_queue():
    """Process items from the queue."""
    items = db.get_pending_queue_items(limit=10)
    db.log("INFO", f"Found {len(items)} items in queue")

    for item in items:
        queue_id = item["id"]
        file_path = item["file_path"]
        retry_count = item["retry_count"]

        if retry_count >= MAX_RETRIES:
            db.update_queue_status(queue_id, "failed", "Max retries exceeded")
            continue

        db.update_queue_status(queue_id, "processing")

        try:
            success = process_file(file_path)
            if success:
                db.update_queue_status(queue_id, "completed")
            else:
                raise Exception("Processing returned False")
        except Exception as e:
            error_msg = str(e)
            db.update_queue_status(queue_id, "pending", error_msg)

            # Exponential backoff
            wait_time = RETRY_BACKOFF_BASE ** (retry_count + 1)
            db.log("INFO", f"Retry {retry_count + 1} in {wait_time}s: {file_path}")
            time.sleep(wait_time)


def run_scan():
    """Scan for new documents."""
    files = scan_directory()
    print(f"Found {len(files)} files")

    if files:
        queued = queue_files(files)
        print(f"Queued {queued} files")

    return files


def run_full_pipeline(run_id=None, pipeline_id=None):
    """Run complete pipeline: scan → process → embed.

    Pipeline selection:
    - pipeline_id=5: Scientific Pipeline (semantics BEFORE embedding)
    - pipeline_id=1-4 or None: Legacy Pipeline (semantics AFTER embedding)
    """
    progress = PipelineProgress(run_id) if run_id else None

    # Determine which processing function to use
    use_v5 = pipeline_id == 5
    pipeline_name = "Wissenschaftliche Pipeline v1" if use_v5 else "Standard Pipeline"

    print("=" * 50)
    print(f"KI-System Pipeline - {pipeline_name}")
    if run_id:
        print(f"Run ID: {run_id}, Pipeline ID: {pipeline_id}")
    if use_v5:
        print("Mode: Semantik VOR Embedding (wissenschaftlich korrekt)")
    print("=" * 50)

    try:
        # Phase 1: Scan
        if progress:
            progress.update_step("detect")
            progress.add_log("Scanne nach Dokumenten...")

        print("\n[1/3] Scanning for documents...")
        files = scan_directory()
        print(f"Found {len(files)} files")

        if progress:
            progress.add_log(f"{len(files)} neue Dokumente gefunden")

        if files:
            queued = queue_files(files)
            print(f"Queued {queued} files")

        # Phase 2: Process queue items (includes resume of previous runs)
        items = db.get_pending_queue_items(limit=DEFAULT_LIMIT)
        print(f"\n[2/3] Processing {len(items)} documents...")

        if items:
            # Update total with actual queue count (may include items from previous runs)
            if progress:
                progress.update_progress(total=len(items))
                progress.add_log(f"{len(items)} Dokumente in Queue")

            total_chunks = 0
            total_embeddings = 0
            processed = 0
            failed = 0

            for item in items:
                # Check if cancelled
                if progress and progress.is_cancelled():
                    progress.add_log("Pipeline abgebrochen durch Benutzer")
                    progress.complete("cancelled")
                    print("\nPipeline cancelled by user")
                    return

                queue_id = item["id"]
                file_path = item["file_path"]
                file_name = Path(file_path).name

                # Skip already-done documents (for resume capability)
                if db.document_is_done(file_path):
                    db.update_queue_status(queue_id, "completed")
                    processed += 1
                    if progress:
                        progress.add_log(f"Übersprungen (bereits fertig): {file_name}")
                        progress.update_progress(processed=processed)
                    continue

                if progress:
                    progress.update_document(file_name)

                db.update_queue_status(queue_id, "processing")

                try:
                    # Use Pipeline #5 if selected, otherwise legacy
                    if use_v5:
                        result = process_file_v5(file_path, progress)
                    else:
                        result = process_file(file_path, progress)
                    success, chunks, embedded = result if isinstance(result, tuple) else (result, 0, 0)

                    # Handle cancellation during file processing
                    if success == "cancelled":
                        progress.add_log("Pipeline abgebrochen durch Benutzer")
                        progress.complete("cancelled")
                        print("\nPipeline cancelled by user")
                        return

                    if success:
                        db.update_queue_status(queue_id, "completed")
                        processed += 1
                        total_chunks += chunks
                        total_embeddings += embedded
                    else:
                        db.update_queue_status(queue_id, "failed", "Processing failed")
                        failed += 1
                except Exception as e:
                    db.update_queue_status(queue_id, "failed", str(e))
                    failed += 1
                    if progress:
                        progress.add_log(f"FEHLER bei {file_name}: {str(e)[:50]}")

                if progress:
                    progress.update_progress(
                        processed=processed,
                        failed=failed,
                        chunks=total_chunks,
                        embeddings=total_embeddings,
                    )
        else:
            print("\n[2/3] No pending documents in queue")
            if progress:
                progress.add_log("Keine ausstehenden Dokumente in Queue")

        # Phase 3: Embed remaining
        print("\n[3/3] Embedding remaining chunks...")
        embed_step = EmbeddingStep(db, progress)
        embedded = embed_step.embed_pending()
        print(f"Embedded {embedded} chunks")

        # Complete
        print("\n" + "=" * 50)
        print("Pipeline complete!")

        if progress:
            progress.add_log("Pipeline erfolgreich abgeschlossen")
            progress.complete("completed")

    except Exception as e:
        db.log("ERROR", f"Pipeline error: {e}")
        print(f"Error: {e}")
        if progress:
            progress.add_log(f"FEHLER: {str(e)}")
            progress.complete("failed", str(e))
        raise


def main():
    """Main entry point."""
    parser = argparse.ArgumentParser(description="KI-System Document Pipeline")
    parser.add_argument(
        "command",
        choices=["scan", "process", "embed", "semantic", "semantic-queue", "enrich-entities", "all", "file", "status"],
        help="Command to execute",
    )
    parser.add_argument("file_path", nargs="?", help="File path for 'file' command")
    parser.add_argument("--pipeline-id", type=int, help="Pipeline ID for tracking")
    parser.add_argument("--run-id", type=int, help="Run ID for progress tracking")

    args = parser.parse_args()

    db.connect()

    try:
        if args.command == "scan":
            run_scan()

        elif args.command == "process":
            process_queue()

        elif args.command == "embed":
            embed_step = EmbeddingStep(db)
            count = embed_step.embed_pending()
            print(f"Embedded {count} chunks")

        elif args.command == "semantic":
            # Run semantic analysis on a specific document
            if not args.file_path:
                print("Error: semantic command requires a document ID")
                return
            try:
                doc_id = int(args.file_path)
            except ValueError:
                print("Error: document ID must be an integer")
                return

            semantic_step = SemanticStep(db)
            # Get document text
            text = semantic_step._get_document_text(doc_id)
            if not text:
                print(f"No text found for document {doc_id}")
                return

            result = semantic_step.execute(doc_id, text, use_anthropic=SEMANTIC_USE_ANTHROPIC)
            print(f"Semantic analysis complete: {result}")

        elif args.command == "semantic-queue":
            # Process pending items from semantic queue
            semantic_step = SemanticStep(db)
            result = semantic_step.process_queue(
                limit=int(args.file_path) if args.file_path else 5,
                use_anthropic=SEMANTIC_USE_ANTHROPIC,
            )
            print(f"Semantic queue processed: {result}")

        elif args.command == "enrich-entities":
            # Enrich entity descriptions via Ollama
            limit = int(args.file_path) if args.file_path else DEFAULT_LIMIT
            model = get_step_model("enrich")

            print(f"Entity Description Enrichment (limit={limit}, model={model})")
            print("-" * 50)

            enrich_step = EntityEnrichStep()

            # Show current stats
            stats = enrich_step.get_stats()
            print(f"Entities total: {stats['total']}")
            print(f"Need enrichment: {stats['needs_enrichment']}")
            print(f"Already enriched: {stats['enriched']}")
            if stats["avg_length"]:
                print(f"Avg description length: {int(stats['avg_length'])} chars")
            print()

            # Run enrichment
            result = enrich_step.execute(limit=limit, model=model)
            print()
            print("=" * 50)
            print(f"Processed: {result['processed']}")
            print(f"Success: {result['success']}")
            print(f"Errors: {result['errors']}")

        elif args.command == "all":
            run_full_pipeline(run_id=args.run_id, pipeline_id=args.pipeline_id)

        elif args.command == "file":
            if not args.file_path:
                print("Error: file command requires a file path")
                return
            if os.path.exists(args.file_path):
                result = process_file(args.file_path)
                success = result[0] if isinstance(result, tuple) else result
                print(f"Processing {'successful' if success else 'failed'}")
            else:
                print(f"File not found: {args.file_path}")

        elif args.command == "status":
            # Show pipeline status
            cursor = db.execute(
                """SELECT status, COUNT(*) as count
                   FROM pipeline_queue
                   GROUP BY status"""
            )
            results = cursor.fetchall()
            cursor.close()

            print("\nQueue Status:")
            for r in results:
                print(f"  {r['status']}: {r['count']}")

            cursor = db.execute("SELECT COUNT(*) as count FROM documents")
            doc_count = cursor.fetchone()["count"]
            cursor.close()

            cursor = db.execute("SELECT COUNT(*) as count FROM chunks")
            chunk_count = cursor.fetchone()["count"]
            cursor.close()

            cursor = db.execute("SELECT COUNT(*) as count FROM chunks WHERE qdrant_id IS NOT NULL")
            embedded_count = cursor.fetchone()["count"]
            cursor.close()

            print(f"\nDocuments: {doc_count}")
            print(f"Chunks: {chunk_count} ({embedded_count} embedded)")

            # Semantic status
            cursor = db.execute(
                """SELECT semantic_status, COUNT(*) as count
                   FROM documents
                   GROUP BY semantic_status"""
            )
            sem_results = cursor.fetchall()
            cursor.close()

            print("\nSemantic Status:")
            for r in sem_results:
                status = r["semantic_status"] or "null"
                print(f"  {status}: {r['count']}")

            # Semantic queue
            cursor = db.execute(
                """SELECT status, COUNT(*) as count
                   FROM semantic_queue
                   GROUP BY status"""
            )
            queue_results = cursor.fetchall()
            cursor.close()

            if queue_results:
                print("\nSemantic Queue:")
                for r in queue_results:
                    print(f"  {r['status']}: {r['count']}")

    except Exception as e:
        db.log("ERROR", f"Pipeline error: {e}")
        print(f"Error: {e}")
        raise
    finally:
        db.disconnect()


if __name__ == "__main__":
    main()
← Übersicht