Protokoll #27216

ID27216
Zeitstempel2025-12-28 02:44:55.034097
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens7,328 (Input: 1,541, Output: 5,787)
Dauer121 ms
Request-Zeit2025-12-28 02:44:55.034097
Response-Zeit2025-12-28 02:44:55.155555

Request

{
    "event": "PreToolUse",
    "tool_name": "Edit",
    "tool_input": {
        "file_path": "\/var\/www\/scripts\/pipeline\/pipeline.py",
        "old_string": "    if progress:\n        progress.add_log(f\"Fertig: {file_name}\")\n\n    return True, len(chunks), embedded\n\n\ndef process_queue():",
        "new_string": "    if progress:\n        progress.add_log(f\"Fertig: {file_name}\")\n\n    return True, len(chunks), embedded\n\n\ndef process_file_v5(file_path, progress=None):\n    \"\"\"Process a single file through Pipeline #5 (Scientific Pipeline).\n\n    Key difference from process_file():\n    - Semantic analysis happens BEFORE embedding (scientifically correct)\n    - Uses extended semantic steps for text and knowledge semantics\n    \"\"\"\n    file_name = Path(file_path).name\n\n    if progress:\n        progress.update_document(file_name)\n\n    # Initialize pipeline steps\n    extract_step = ExtractionStep(db, progress)\n    load_step = LoadStep(db, progress)\n    transform_step = TransformationStep(db, progress)\n    embed_step = EmbeddingStep(db, progress)\n    text_semantic_analyze = TextSemanticAnalyzeStep(db, progress)\n    text_semantic_store = TextSemanticStoreStep(db, progress)\n    knowledge_semantic_analyze = KnowledgeSemanticAnalyzeStep(db, progress)\n    knowledge_semantic_store = KnowledgeSemanticStoreStep(db, progress)\n    duplicate_check = DuplicateCheckStep(db, progress)\n\n    # Check if cancelled before starting\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Phase 1: Existenz - Extract\n    extract_result = extract_step.execute(file_path)\n    if not extract_result[\"success\"]:\n        if extract_result.get(\"error\") == \"cancelled\":\n            return \"cancelled\", 0, 0\n        return False, 0, 0\n\n    extraction = extract_result[\"extraction\"]\n    file_info = extract_result[\"file_info\"]\n    total_pages = extract_result.get(\"total_pages\", 0)\n    content_hash = file_info.get(\"hash\", \"\")\n\n    # Check if cancelled after extraction\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Phase 1: Existenz - Load document\n    doc_id = load_step.create_document(file_info)\n\n    # Phase 1: Existenz - Duplicate check\n    dup_result = duplicate_check.execute(doc_id, content_hash)\n    if dup_result[\"status\"] == \"abort\":\n        load_step.update_document_status(doc_id, \"duplicate\")\n        if progress:\n            progress.add_log(f\"Duplikat: {file_name} = Doc #{dup_result['duplicate_id']}\")\n        return True, 0, 0  # Not an error, just skip\n\n    # Phase 2: Text - Store pages\n    page_map = load_step.store_pages(doc_id, extraction)\n\n    # Phase 2: Text - Vision analysis (PDFs only)\n    if file_info[\"type\"] == \".pdf\":\n        transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n        if progress and progress.is_cancelled():\n            return \"cancelled\", 0, 0\n\n    # Phase 3: Struktur - Chunking\n    chunks = transform_step.execute_chunking(extraction, total_pages)\n\n    # Phase 3: Struktur - Store chunks with page references\n    chunks = load_step.store_chunks(doc_id, chunks, page_map)\n\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Phase 3: Struktur - Enrichment (PDFs only)\n    if file_info[\"type\"] == \".pdf\":\n        transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n        if progress and progress.is_cancelled():\n            return \"cancelled\", len(chunks), 0\n\n    # Phase 4: Textsemantik - Analyze chunks\n    if progress:\n        progress.add_log(\"Phase 4: Textsemantik...\")\n\n    # Prepare chunks for analysis\n    chunk_data = [{\"id\": c[\"id\"], \"content\": c[\"content\"]} for c in chunks]\n    analyzed_chunks = text_semantic_analyze.execute(chunk_data, {\"model\": \"mistral\"})\n\n    # Store text semantics\n    text_semantic_store.execute(analyzed_chunks, {})\n\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Phase 5-6: Entity + Wissenssemantik\n    if progress:\n        progress.add_log(\"Phase 5-6: Entity-Extraktion + Wissenssemantik...\")\n\n    # Run standard semantic analysis (entities, relations, taxonomy)\n    semantic_step = SemanticStep(db, progress)\n    full_text = extract_step.get_full_text_from_extraction(extraction)\n\n    try:\n        semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)\n    except Exception as e:\n        db.log(\"WARNING\", f\"Semantic analysis failed for {file_name}: {e}\")\n        if progress:\n            progress.add_log(f\"Semantik-Warnung: {str(e)[:50]}\")\n\n    # Load entities for knowledge semantics\n    cursor = db.execute(\n        \"\"\"SELECT e.id, e.name, e.type, c.content as context\n           FROM entities e\n           JOIN chunk_entities ce ON e.id = ce.entity_id\n           JOIN chunks c ON ce.chunk_id = c.id\n           WHERE c.document_id = %s\n           GROUP BY e.id\"\"\",\n        (doc_id,),\n    )\n    entities = cursor.fetchall()\n    cursor.close()\n\n    if entities:\n        # Convert to list of dicts\n        entity_list = [\n            {\"id\": e[\"id\"], \"name\": e[\"name\"], \"type\": e[\"type\"], \"context\": e[\"context\"]}\n            for e in entities\n        ]\n\n        # Analyze knowledge semantics\n        analyzed_entities = knowledge_semantic_analyze.execute(entity_list, {\"model\": \"mistral\"})\n\n        # Store knowledge semantics\n        knowledge_semantic_store.execute(analyzed_entities, {})\n\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Phase 10: Retrieval - Embeddings (AFTER semantics!)\n    if progress:\n        progress.add_log(\"Phase 10: Embeddings...\")\n\n    embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n    # Document is now searchable and semantically analyzed\n    load_step.update_document_status(doc_id, \"done\")\n\n    if progress:\n        progress.add_log(f\"Pipeline #5 fertig: {file_name}\")\n\n    return True, len(chunks), embedded\n\n\ndef process_queue():"
    }
}

Response

{
    "tool_response": {
        "filePath": "\/var\/www\/scripts\/pipeline\/pipeline.py",
        "oldString": "    if progress:\n        progress.add_log(f\"Fertig: {file_name}\")\n\n    return True, len(chunks), embedded\n\n\ndef process_queue():",
        "newString": "    if progress:\n        progress.add_log(f\"Fertig: {file_name}\")\n\n    return True, len(chunks), embedded\n\n\ndef process_file_v5(file_path, progress=None):\n    \"\"\"Process a single file through Pipeline #5 (Scientific Pipeline).\n\n    Key difference from process_file():\n    - Semantic analysis happens BEFORE embedding (scientifically correct)\n    - Uses extended semantic steps for text and knowledge semantics\n    \"\"\"\n    file_name = Path(file_path).name\n\n    if progress:\n        progress.update_document(file_name)\n\n    # Initialize pipeline steps\n    extract_step = ExtractionStep(db, progress)\n    load_step = LoadStep(db, progress)\n    transform_step = TransformationStep(db, progress)\n    embed_step = EmbeddingStep(db, progress)\n    text_semantic_analyze = TextSemanticAnalyzeStep(db, progress)\n    text_semantic_store = TextSemanticStoreStep(db, progress)\n    knowledge_semantic_analyze = KnowledgeSemanticAnalyzeStep(db, progress)\n    knowledge_semantic_store = KnowledgeSemanticStoreStep(db, progress)\n    duplicate_check = DuplicateCheckStep(db, progress)\n\n    # Check if cancelled before starting\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Phase 1: Existenz - Extract\n    extract_result = extract_step.execute(file_path)\n    if not extract_result[\"success\"]:\n        if extract_result.get(\"error\") == \"cancelled\":\n            return \"cancelled\", 0, 0\n        return False, 0, 0\n\n    extraction = extract_result[\"extraction\"]\n    file_info = extract_result[\"file_info\"]\n    total_pages = extract_result.get(\"total_pages\", 0)\n    content_hash = file_info.get(\"hash\", \"\")\n\n    # Check if cancelled after extraction\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Phase 1: Existenz - Load document\n    doc_id = load_step.create_document(file_info)\n\n    # Phase 1: Existenz - Duplicate check\n    dup_result = duplicate_check.execute(doc_id, content_hash)\n    if dup_result[\"status\"] == \"abort\":\n        load_step.update_document_status(doc_id, \"duplicate\")\n        if progress:\n            progress.add_log(f\"Duplikat: {file_name} = Doc #{dup_result['duplicate_id']}\")\n        return True, 0, 0  # Not an error, just skip\n\n    # Phase 2: Text - Store pages\n    page_map = load_step.store_pages(doc_id, extraction)\n\n    # Phase 2: Text - Vision analysis (PDFs only)\n    if file_info[\"type\"] == \".pdf\":\n        transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n        if progress and progress.is_cancelled():\n            return \"cancelled\", 0, 0\n\n    # Phase 3: Struktur - Chunking\n    chunks = transform_step.execute_chunking(extraction, total_pages)\n\n    # Phase 3: Struktur - Store chunks with page references\n    chunks = load_step.store_chunks(doc_id, chunks, page_map)\n\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Phase 3: Struktur - Enrichment (PDFs only)\n    if file_info[\"type\"] == \".pdf\":\n        transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n        if progress and progress.is_cancelled():\n            return \"cancelled\", len(chunks), 0\n\n    # Phase 4: Textsemantik - Analyze chunks\n    if progress:\n        progress.add_log(\"Phase 4: Textsemantik...\")\n\n    # Prepare chunks for analysis\n    chunk_data = [{\"id\": c[\"id\"], \"content\": c[\"content\"]} for c in chunks]\n    analyzed_chunks = text_semantic_analyze.execute(chunk_data, {\"model\": \"mistral\"})\n\n    # Store text semantics\n    text_semantic_store.execute(analyzed_chunks, {})\n\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Phase 5-6: Entity + Wissenssemantik\n    if progress:\n        progress.add_log(\"Phase 5-6: Entity-Extraktion + Wissenssemantik...\")\n\n    # Run standard semantic analysis (entities, relations, taxonomy)\n    semantic_step = SemanticStep(db, progress)\n    full_text = extract_step.get_full_text_from_extraction(extraction)\n\n    try:\n        semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)\n    except Exception as e:\n        db.log(\"WARNING\", f\"Semantic analysis failed for {file_name}: {e}\")\n        if progress:\n            progress.add_log(f\"Semantik-Warnung: {str(e)[:50]}\")\n\n    # Load entities for knowledge semantics\n    cursor = db.execute(\n        \"\"\"SELECT e.id, e.name, e.type, c.content as context\n           FROM entities e\n           JOIN chunk_entities ce ON e.id = ce.entity_id\n           JOIN chunks c ON ce.chunk_id = c.id\n           WHERE c.document_id = %s\n           GROUP BY e.id\"\"\",\n        (doc_id,),\n    )\n    entities = cursor.fetchall()\n    cursor.close()\n\n    if entities:\n        # Convert to list of dicts\n        entity_list = [\n            {\"id\": e[\"id\"], \"name\": e[\"name\"], \"type\": e[\"type\"], \"context\": e[\"context\"]}\n            for e in entities\n        ]\n\n        # Analyze knowledge semantics\n        analyzed_entities = knowledge_semantic_analyze.execute(entity_list, {\"model\": \"mistral\"})\n\n        # Store knowledge semantics\n        knowledge_semantic_store.execute(analyzed_entities, {})\n\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Phase 10: Retrieval - Embeddings (AFTER semantics!)\n    if progress:\n        progress.add_log(\"Phase 10: Embeddings...\")\n\n    embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n    # Document is now searchable and semantically analyzed\n    load_step.update_document_status(doc_id, \"done\")\n\n    if progress:\n        progress.add_log(f\"Pipeline #5 fertig: {file_name}\")\n\n    return True, len(chunks), embedded\n\n\ndef process_queue():",
        "originalFile": "#!\/usr\/bin\/env python3\n\"\"\"\nKI-System Document Pipeline\nMain orchestration script for document processing.\n\nUsage:\n    python pipeline.py scan             # Scan for new documents\n    python pipeline.py process          # Process queued documents\n    python pipeline.py embed            # Embed pending chunks\n    python pipeline.py semantic <id>    # Run semantic analysis on document\n    python pipeline.py semantic-queue   # Process semantic queue\n    python pipeline.py enrich-entities  # Enrich entity descriptions via Ollama\n    python pipeline.py enrich-entities 50  # Limit to 50 entities\n    python pipeline.py all              # Full pipeline run\n    python pipeline.py all --pipeline-id=1 --run-id=5  # With tracking\n    python pipeline.py file <path>      # Process single file\n    python pipeline.py status           # Show pipeline status\n\"\"\"\n\nimport argparse\nimport os\nimport time\nfrom pathlib import Path\n\nfrom config import (\n    MAX_RETRIES,\n    RETRY_BACKOFF_BASE,\n    SEMANTIC_AUTO_QUEUE,\n    SEMANTIC_SYNC,\n    SEMANTIC_USE_ANTHROPIC,\n)\nfrom constants import DEFAULT_LIMIT\nfrom db import PipelineProgress, db\nfrom detect import queue_files, scan_directory\nfrom step_embed import EmbeddingStep\nfrom step_entity_enrich import EntityEnrichStep\nfrom step_extract import ExtractionStep\nfrom step_load import LoadStep\nfrom step_semantic import SemanticStep\nfrom step_semantic_extended import (\n    DuplicateCheckStep,\n    KnowledgeSemanticAnalyzeStep,\n    KnowledgeSemanticStoreStep,\n    TextSemanticAnalyzeStep,\n    TextSemanticStoreStep,\n)\nfrom step_transform import TransformationStep\n\n\ndef process_file(file_path, progress=None):\n    \"\"\"Process a single file through the pipeline.\"\"\"\n    file_name = Path(file_path).name\n\n    if progress:\n        progress.update_document(file_name)\n\n    # Initialize pipeline steps\n    extract_step = ExtractionStep(db, progress)\n    load_step = LoadStep(db, progress)\n    transform_step = TransformationStep(db, progress)\n    embed_step = EmbeddingStep(db, progress)\n\n    # Check if cancelled before starting\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Step 1: Extract\n    extract_result = extract_step.execute(file_path)\n    if not extract_result[\"success\"]:\n        if extract_result.get(\"error\") == \"cancelled\":\n            return \"cancelled\", 0, 0\n        return False, 0, 0\n\n    extraction = extract_result[\"extraction\"]\n    file_info = extract_result[\"file_info\"]\n    total_pages = extract_result.get(\"total_pages\", 0)\n\n    # Check if cancelled after extraction\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Step 2: Load document\n    doc_id = load_step.create_document(file_info)\n\n    # Step 3: Store pages (PDFs and multi-page documents)\n    page_map = load_step.store_pages(doc_id, extraction)\n\n    # Step 4: Vision analysis (PDFs only)\n    if file_info[\"type\"] == \".pdf\":\n        transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n\n        # Check if cancelled after vision\n        if progress and progress.is_cancelled():\n            return \"cancelled\", 0, 0\n\n    # Step 5: Chunking\n    chunks = transform_step.execute_chunking(extraction, total_pages)\n\n    # Step 6: Store chunks with page references\n    chunks = load_step.store_chunks(doc_id, chunks, page_map)\n\n    # Check if cancelled after chunking\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Step 7: Enrichment (PDFs only)\n    if file_info[\"type\"] == \".pdf\":\n        transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n\n        # Check if cancelled after enrichment\n        if progress and progress.is_cancelled():\n            return \"cancelled\", len(chunks), 0\n\n    # Step 8: Embeddings (Layer 3 - Document becomes searchable)\n    embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n    # Document is now searchable - update status to \"embedded\"\n    load_step.update_document_status(doc_id, \"embedded\")\n\n    if progress:\n        progress.add_log(f\"Layer 3 fertig: {file_name} ist jetzt suchbar\")\n\n    # Check if cancelled after embedding\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), embedded\n\n    # Step 9: Semantic analysis (Layer 4 - Optional\/Async)\n    semantic_step = SemanticStep(db, progress)\n    full_text = extract_step.get_full_text_from_extraction(extraction)\n\n    if SEMANTIC_SYNC:\n        # Run semantic analysis synchronously\n        try:\n            semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)\n            # Update to done only after semantic completes\n            load_step.update_document_status(doc_id, \"done\")\n        except Exception as e:\n            # Semantic failed but document is still searchable\n            db.log(\"WARNING\", f\"Semantic analysis failed for {file_name}: {e}\")\n            if progress:\n                progress.add_log(f\"Semantik-Fehler (Dokument bleibt suchbar): {str(e)[:50]}\")\n    elif SEMANTIC_AUTO_QUEUE:\n        # Queue for async processing\n        semantic_step.queue(doc_id, priority=5)\n        load_step.update_document_status(doc_id, \"done\")\n        if progress:\n            progress.add_log(f\"Semantik in Queue: {file_name}\")\n    else:\n        # No semantic analysis\n        load_step.update_document_status(doc_id, \"done\")\n\n    if progress:\n        progress.add_log(f\"Fertig: {file_name}\")\n\n    return True, len(chunks), embedded\n\n\ndef process_queue():\n    \"\"\"Process items from the queue.\"\"\"\n    items = db.get_pending_queue_items(limit=10)\n    db.log(\"INFO\", f\"Found {len(items)} items in queue\")\n\n    for item in items:\n        queue_id = item[\"id\"]\n        file_path = item[\"file_path\"]\n        retry_count = item[\"retry_count\"]\n\n        if retry_count >= MAX_RETRIES:\n            db.update_queue_status(queue_id, \"failed\", \"Max retries exceeded\")\n            continue\n\n        db.update_queue_status(queue_id, \"processing\")\n\n        try:\n            success = process_file(file_path)\n            if success:\n                db.update_queue_status(queue_id, \"completed\")\n            else:\n                raise Exception(\"Processing returned False\")\n        except Exception as e:\n            error_msg = str(e)\n            db.update_queue_status(queue_id, \"pending\", error_msg)\n\n            # Exponential backoff\n            wait_time = RETRY_BACKOFF_BASE ** (retry_count + 1)\n            db.log(\"INFO\", f\"Retry {retry_count + 1} in {wait_time}s: {file_path}\")\n            time.sleep(wait_time)\n\n\ndef run_scan():\n    \"\"\"Scan for new documents.\"\"\"\n    files = scan_directory()\n    print(f\"Found {len(files)} files\")\n\n    if files:\n        queued = queue_files(files)\n        print(f\"Queued {queued} files\")\n\n    return files\n\n\ndef run_full_pipeline(run_id=None, pipeline_id=None):\n    \"\"\"Run complete pipeline: scan → process → embed.\"\"\"\n    progress = PipelineProgress(run_id) if run_id else None\n\n    print(\"=\" * 50)\n    print(\"KI-System Pipeline - Full Run\")\n    if run_id:\n        print(f\"Run ID: {run_id}, Pipeline ID: {pipeline_id}\")\n    print(\"=\" * 50)\n\n    try:\n        # Phase 1: Scan\n        if progress:\n            progress.update_step(\"detect\")\n            progress.add_log(\"Scanne nach Dokumenten...\")\n\n        print(\"\\n[1\/3] Scanning for documents...\")\n        files = scan_directory()\n        print(f\"Found {len(files)} files\")\n\n        if progress:\n            progress.add_log(f\"{len(files)} neue Dokumente gefunden\")\n\n        if files:\n            queued = queue_files(files)\n            print(f\"Queued {queued} files\")\n\n        # Phase 2: Process queue items (includes resume of previous runs)\n        items = db.get_pending_queue_items(limit=DEFAULT_LIMIT)\n        print(f\"\\n[2\/3] Processing {len(items)} documents...\")\n\n        if items:\n            # Update total with actual queue count (may include items from previous runs)\n            if progress:\n                progress.update_progress(total=len(items))\n                progress.add_log(f\"{len(items)} Dokumente in Queue\")\n\n            total_chunks = 0\n            total_embeddings = 0\n            processed = 0\n            failed = 0\n\n            for item in items:\n                # Check if cancelled\n                if progress and progress.is_cancelled():\n                    progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n                    progress.complete(\"cancelled\")\n                    print(\"\\nPipeline cancelled by user\")\n                    return\n\n                queue_id = item[\"id\"]\n                file_path = item[\"file_path\"]\n                file_name = Path(file_path).name\n\n                # Skip already-done documents (for resume capability)\n                if db.document_is_done(file_path):\n                    db.update_queue_status(queue_id, \"completed\")\n                    processed += 1\n                    if progress:\n                        progress.add_log(f\"Übersprungen (bereits fertig): {file_name}\")\n                        progress.update_progress(processed=processed)\n                    continue\n\n                if progress:\n                    progress.update_document(file_name)\n\n                db.update_queue_status(queue_id, \"processing\")\n\n                try:\n                    result = process_file(file_path, progress)\n                    success, chunks, embedded = result if isinstance(result, tuple) else (result, 0, 0)\n\n                    # Handle cancellation during file processing\n                    if success == \"cancelled\":\n                        progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n                        progress.complete(\"cancelled\")\n                        print(\"\\nPipeline cancelled by user\")\n                        return\n\n                    if success:\n                        db.update_queue_status(queue_id, \"completed\")\n                        processed += 1\n                        total_chunks += chunks\n                        total_embeddings += embedded\n                    ... [TRUNCATED-abeef8b93a01b116]",
        "structuredPatch": [
            {
                "oldStart": 155,
                "oldLines": 6,
                "newStart": 155,
                "newLines": 157,
                "lines": [
                    "     return True, len(chunks), embedded",
                    " ",
                    " ",
                    "+def process_file_v5(file_path, progress=None):",
                    "+    \"\"\"Process a single file through Pipeline #5 (Scientific Pipeline).",
                    "+",
                    "+    Key difference from process_file():",
                    "+    - Semantic analysis happens BEFORE embedding (scientifically correct)",
                    "+    - Uses extended semantic steps for text and knowledge semantics",
                    "+    \"\"\"",
                    "+    file_name = Path(file_path).name",
                    "+",
                    "+    if progress:",
                    "+        progress.update_document(file_name)",
                    "+",
                    "+    # Initialize pipeline steps",
                    "+    extract_step = ExtractionStep(db, progress)",
                    "+    load_step = LoadStep(db, progress)",
                    "+    transform_step = TransformationStep(db, progress)",
                    "+    embed_step = EmbeddingStep(db, progress)",
                    "+    text_semantic_analyze = TextSemanticAnalyzeStep(db, progress)",
                    "+    text_semantic_store = TextSemanticStoreStep(db, progress)",
                    "+    knowledge_semantic_analyze = KnowledgeSemanticAnalyzeStep(db, progress)",
                    "+    knowledge_semantic_store = KnowledgeSemanticStoreStep(db, progress)",
                    "+    duplicate_check = DuplicateCheckStep(db, progress)",
                    "+",
                    "+    # Check if cancelled before starting",
                    "+    if progress and progress.is_cancelled():",
                    "+        return \"cancelled\", 0, 0",
                    "+",
                    "+    # Phase 1: Existenz - Extract",
                    "+    extract_result = extract_step.execute(file_path)",
                    "+    if not extract_result[\"success\"]:",
                    "+        if extract_result.get(\"error\") == \"cancelled\":",
                    "+            return \"cancelled\", 0, 0",
                    "+        return False, 0, 0",
                    "+",
                    "+    extraction = extract_result[\"extraction\"]",
                    "+    file_info = extract_result[\"file_info\"]",
                    "+    total_pages = extract_result.get(\"total_pages\", 0)",
                    "+    content_hash = file_info.get(\"hash\", \"\")",
                    "+",
                    "+    # Check if cancelled after extraction",
                    "+    if progress and progress.is_cancelled():",
                    "+        return \"cancelled\", 0, 0",
                    "+",
                    "+    # Phase 1: Existenz - Load document",
                    "+    doc_id = load_step.create_document(file_info)",
                    "+",
                    "+    # Phase 1: Existenz - Duplicate check",
                    "+    dup_result = duplicate_check.execute(doc_id, content_hash)",
                    "+    if dup_result[\"status\"] == \"abort\":",
                    "+        load_step.update_document_status(doc_id, \"duplicate\")",
                    "+        if progress:",
                    "+            progress.add_log(f\"Duplikat: {file_name} = Doc #{dup_result['duplicate_id']}\")",
                    "+        return True, 0, 0  # Not an error, just skip",
                    "+",
                    "+    # Phase 2: Text - Store pages",
                    "+    page_map = load_step.store_pages(doc_id, extraction)",
                    "+",
                    "+    # Phase 2: Text - Vision analysis (PDFs only)",
                    "+    if file_info[\"type\"] == \".pdf\":",
                    "+        transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])",
                    "+        if progress and progress.is_cancelled():",
                    "+            return \"cancelled\", 0, 0",
                    "+",
                    "+    # Phase 3: Struktur - Chunking",
                    "+    chunks = transform_step.execute_chunking(extraction, total_pages)",
                    "+",
                    "+    # Phase 3: Struktur - Store chunks with page references",
                    "+    chunks = load_step.store_chunks(doc_id, chunks, page_map)",
                    "+",
                    "+    if progress and progress.is_cancelled():",
                    "+        return \"cancelled\", len(chunks), 0",
                    "+",
                    "+    # Phase 3: Struktur - Enrichment (PDFs only)",
                    "+    if file_info[\"type\"] == \".pdf\":",
                    "+        transform_step.execute_enrichment(doc_id, file_info[\"type\"])",
                    "+        if progress and progress.is_cancelled():",
                    "+            return \"cancelled\", len(chunks), 0",
                    "+",
                    "+    # Phase 4: Textsemantik - Analyze chunks",
                    "+    if progress:",
                    "+        progress.add_log(\"Phase 4: Textsemantik...\")",
                    "+",
                    "+    # Prepare chunks for analysis",
                    "+    chunk_data = [{\"id\": c[\"id\"], \"content\": c[\"content\"]} for c in chunks]",
                    "+    analyzed_chunks = text_semantic_analyze.execute(chunk_data, {\"model\": \"mistral\"})",
                    "+",
                    "+    # Store text semantics",
                    "+    text_semantic_store.execute(analyzed_chunks, {})",
                    "+",
                    "+    if progress and progress.is_cancelled():",
                    "+        return \"cancelled\", len(chunks), 0",
                    "+",
                    "+    # Phase 5-6: Entity + Wissenssemantik",
                    "+    if progress:",
                    "+        progress.add_log(\"Phase 5-6: Entity-Extraktion + Wissenssemantik...\")",
                    "+",
                    "+    # Run standard semantic analysis (entities, relations, taxonomy)",
                    "+    semantic_step = SemanticStep(db, progress)",
                    "+    full_text = extract_step.get_full_text_from_extraction(extraction)",
                    "+",
                    "+    try:",
                    "+        semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)",
                    "+    except Exception as e:",
                    "+        db.log(\"WARNING\", f\"Semantic analysis failed for {file_name}: {e}\")",
                    "+        if progress:",
                    "+            progress.add_log(f\"Semantik-Warnung: {str(e)[:50]}\")",
                    "+",
                    "+    # Load entities for knowledge semantics",
                    "+    cursor = db.execute(",
                    "+        \"\"\"SELECT e.id, e.name, e.type, c.content as context",
                    "+           FROM entities e",
                    "+           JOIN chunk_entities ce ON e.id = ce.entity_id",
                    "+           JOIN chunks c ON ce.chunk_id = c.id",
                    "+           WHERE c.document_id = %s",
                    "+           GROUP BY e.id\"\"\",",
                    "+        (doc_id,),",
                    "+    )",
                    "+    entities = cursor.fetchall()",
                    "+    cursor.close()",
                    "+",
                    "+    if entities:",
                    "+        # Convert to list of dicts",
                    "+        entity_list = [",
                    "+            {\"id\": e[\"id\"], \"name\": e[\"name\"], \"type\": e[\"type\"], \"context\": e[\"context\"]}",
                    "+            for e in entities",
                    "+        ]",
                    "+",
                    "+        # Analyze knowledge semantics",
                    "+        analyzed_entities = knowledge_semantic_analyze.execute(entity_list, {\"model\": \"mistral\"})",
                    "+",
                    "+        # Store knowledge semantics",
                    "+        knowledge_semantic_store.execute(analyzed_entities, {})",
                    "+",
                    "+    if progress and progress.is_cancelled():",
                    "+        return \"cancelled\", len(chunks), 0",
                    "+",
                    "+    # Phase 10: Retrieval - Embeddings (AFTER semantics!)",
                    "+    if progress:",
                    "+        progress.add_log(\"Phase 10: Embeddings...\")",
                    "+",
                    "+    embedded = embed_step.execute(chunks, doc_id, file_name, file_path)",
                    "+",
                    "+    # Document is now searchable and semantically analyzed",
                    "+    load_step.update_document_status(doc_id, \"done\")",
                    "+",
                    "+    if progress:",
                    "+        progress.add_log(f\"Pipeline #5 fertig: {file_name}\")",
                    "+",
                    "+    return True, len(chunks), embedded",
                    "+",
                    "+",
                    " def process_queue():",
                    "     \"\"\"Process items from the queue.\"\"\"",
                    "     items = db.get_pending_queue_items(limit=10)"
                ]
            }
        ],
        "userModified": false,
        "replaceAll": false
    }
}
← Vorheriger Zur Liste Nächster →