{
"event": "PreToolUse",
"tool_name": "Edit",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"old_string": " if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_queue():",
"new_string": " if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_file_v5(file_path, progress=None):\n \"\"\"Process a single file through Pipeline #5 (Scientific Pipeline).\n\n Key difference from process_file():\n - Semantic analysis happens BEFORE embedding (scientifically correct)\n - Uses extended semantic steps for text and knowledge semantics\n \"\"\"\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Initialize pipeline steps\n extract_step = ExtractionStep(db, progress)\n load_step = LoadStep(db, progress)\n transform_step = TransformationStep(db, progress)\n embed_step = EmbeddingStep(db, progress)\n text_semantic_analyze = TextSemanticAnalyzeStep(db, progress)\n text_semantic_store = TextSemanticStoreStep(db, progress)\n knowledge_semantic_analyze = KnowledgeSemanticAnalyzeStep(db, progress)\n knowledge_semantic_store = KnowledgeSemanticStoreStep(db, progress)\n duplicate_check = DuplicateCheckStep(db, progress)\n\n # Check if cancelled before starting\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Phase 1: Existenz - Extract\n extract_result = extract_step.execute(file_path)\n if not extract_result[\"success\"]:\n if extract_result.get(\"error\") == \"cancelled\":\n return \"cancelled\", 0, 0\n return False, 0, 0\n\n extraction = extract_result[\"extraction\"]\n file_info = extract_result[\"file_info\"]\n total_pages = extract_result.get(\"total_pages\", 0)\n content_hash = file_info.get(\"hash\", \"\")\n\n # Check if cancelled after extraction\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Phase 1: Existenz - Load document\n doc_id = load_step.create_document(file_info)\n\n # Phase 1: Existenz - Duplicate check\n dup_result = duplicate_check.execute(doc_id, content_hash)\n if dup_result[\"status\"] == \"abort\":\n load_step.update_document_status(doc_id, \"duplicate\")\n if progress:\n progress.add_log(f\"Duplikat: {file_name} = Doc #{dup_result['duplicate_id']}\")\n return True, 0, 0 # Not an error, just skip\n\n # Phase 2: Text - Store pages\n page_map = load_step.store_pages(doc_id, extraction)\n\n # Phase 2: Text - Vision analysis (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Phase 3: Struktur - Chunking\n chunks = transform_step.execute_chunking(extraction, total_pages)\n\n # Phase 3: Struktur - Store chunks with page references\n chunks = load_step.store_chunks(doc_id, chunks, page_map)\n\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Phase 3: Struktur - Enrichment (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Phase 4: Textsemantik - Analyze chunks\n if progress:\n progress.add_log(\"Phase 4: Textsemantik...\")\n\n # Prepare chunks for analysis\n chunk_data = [{\"id\": c[\"id\"], \"content\": c[\"content\"]} for c in chunks]\n analyzed_chunks = text_semantic_analyze.execute(chunk_data, {\"model\": \"mistral\"})\n\n # Store text semantics\n text_semantic_store.execute(analyzed_chunks, {})\n\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Phase 5-6: Entity + Wissenssemantik\n if progress:\n progress.add_log(\"Phase 5-6: Entity-Extraktion + Wissenssemantik...\")\n\n # Run standard semantic analysis (entities, relations, taxonomy)\n semantic_step = SemanticStep(db, progress)\n full_text = extract_step.get_full_text_from_extraction(extraction)\n\n try:\n semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)\n except Exception as e:\n db.log(\"WARNING\", f\"Semantic analysis failed for {file_name}: {e}\")\n if progress:\n progress.add_log(f\"Semantik-Warnung: {str(e)[:50]}\")\n\n # Load entities for knowledge semantics\n cursor = db.execute(\n \"\"\"SELECT e.id, e.name, e.type, c.content as context\n FROM entities e\n JOIN chunk_entities ce ON e.id = ce.entity_id\n JOIN chunks c ON ce.chunk_id = c.id\n WHERE c.document_id = %s\n GROUP BY e.id\"\"\",\n (doc_id,),\n )\n entities = cursor.fetchall()\n cursor.close()\n\n if entities:\n # Convert to list of dicts\n entity_list = [\n {\"id\": e[\"id\"], \"name\": e[\"name\"], \"type\": e[\"type\"], \"context\": e[\"context\"]}\n for e in entities\n ]\n\n # Analyze knowledge semantics\n analyzed_entities = knowledge_semantic_analyze.execute(entity_list, {\"model\": \"mistral\"})\n\n # Store knowledge semantics\n knowledge_semantic_store.execute(analyzed_entities, {})\n\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Phase 10: Retrieval - Embeddings (AFTER semantics!)\n if progress:\n progress.add_log(\"Phase 10: Embeddings...\")\n\n embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n # Document is now searchable and semantically analyzed\n load_step.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Pipeline #5 fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_queue():"
}
}
{
"tool_response": {
"filePath": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"oldString": " if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_queue():",
"newString": " if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_file_v5(file_path, progress=None):\n \"\"\"Process a single file through Pipeline #5 (Scientific Pipeline).\n\n Key difference from process_file():\n - Semantic analysis happens BEFORE embedding (scientifically correct)\n - Uses extended semantic steps for text and knowledge semantics\n \"\"\"\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Initialize pipeline steps\n extract_step = ExtractionStep(db, progress)\n load_step = LoadStep(db, progress)\n transform_step = TransformationStep(db, progress)\n embed_step = EmbeddingStep(db, progress)\n text_semantic_analyze = TextSemanticAnalyzeStep(db, progress)\n text_semantic_store = TextSemanticStoreStep(db, progress)\n knowledge_semantic_analyze = KnowledgeSemanticAnalyzeStep(db, progress)\n knowledge_semantic_store = KnowledgeSemanticStoreStep(db, progress)\n duplicate_check = DuplicateCheckStep(db, progress)\n\n # Check if cancelled before starting\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Phase 1: Existenz - Extract\n extract_result = extract_step.execute(file_path)\n if not extract_result[\"success\"]:\n if extract_result.get(\"error\") == \"cancelled\":\n return \"cancelled\", 0, 0\n return False, 0, 0\n\n extraction = extract_result[\"extraction\"]\n file_info = extract_result[\"file_info\"]\n total_pages = extract_result.get(\"total_pages\", 0)\n content_hash = file_info.get(\"hash\", \"\")\n\n # Check if cancelled after extraction\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Phase 1: Existenz - Load document\n doc_id = load_step.create_document(file_info)\n\n # Phase 1: Existenz - Duplicate check\n dup_result = duplicate_check.execute(doc_id, content_hash)\n if dup_result[\"status\"] == \"abort\":\n load_step.update_document_status(doc_id, \"duplicate\")\n if progress:\n progress.add_log(f\"Duplikat: {file_name} = Doc #{dup_result['duplicate_id']}\")\n return True, 0, 0 # Not an error, just skip\n\n # Phase 2: Text - Store pages\n page_map = load_step.store_pages(doc_id, extraction)\n\n # Phase 2: Text - Vision analysis (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Phase 3: Struktur - Chunking\n chunks = transform_step.execute_chunking(extraction, total_pages)\n\n # Phase 3: Struktur - Store chunks with page references\n chunks = load_step.store_chunks(doc_id, chunks, page_map)\n\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Phase 3: Struktur - Enrichment (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Phase 4: Textsemantik - Analyze chunks\n if progress:\n progress.add_log(\"Phase 4: Textsemantik...\")\n\n # Prepare chunks for analysis\n chunk_data = [{\"id\": c[\"id\"], \"content\": c[\"content\"]} for c in chunks]\n analyzed_chunks = text_semantic_analyze.execute(chunk_data, {\"model\": \"mistral\"})\n\n # Store text semantics\n text_semantic_store.execute(analyzed_chunks, {})\n\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Phase 5-6: Entity + Wissenssemantik\n if progress:\n progress.add_log(\"Phase 5-6: Entity-Extraktion + Wissenssemantik...\")\n\n # Run standard semantic analysis (entities, relations, taxonomy)\n semantic_step = SemanticStep(db, progress)\n full_text = extract_step.get_full_text_from_extraction(extraction)\n\n try:\n semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)\n except Exception as e:\n db.log(\"WARNING\", f\"Semantic analysis failed for {file_name}: {e}\")\n if progress:\n progress.add_log(f\"Semantik-Warnung: {str(e)[:50]}\")\n\n # Load entities for knowledge semantics\n cursor = db.execute(\n \"\"\"SELECT e.id, e.name, e.type, c.content as context\n FROM entities e\n JOIN chunk_entities ce ON e.id = ce.entity_id\n JOIN chunks c ON ce.chunk_id = c.id\n WHERE c.document_id = %s\n GROUP BY e.id\"\"\",\n (doc_id,),\n )\n entities = cursor.fetchall()\n cursor.close()\n\n if entities:\n # Convert to list of dicts\n entity_list = [\n {\"id\": e[\"id\"], \"name\": e[\"name\"], \"type\": e[\"type\"], \"context\": e[\"context\"]}\n for e in entities\n ]\n\n # Analyze knowledge semantics\n analyzed_entities = knowledge_semantic_analyze.execute(entity_list, {\"model\": \"mistral\"})\n\n # Store knowledge semantics\n knowledge_semantic_store.execute(analyzed_entities, {})\n\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Phase 10: Retrieval - Embeddings (AFTER semantics!)\n if progress:\n progress.add_log(\"Phase 10: Embeddings...\")\n\n embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n # Document is now searchable and semantically analyzed\n load_step.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Pipeline #5 fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_queue():",
"originalFile": "#!\/usr\/bin\/env python3\n\"\"\"\nKI-System Document Pipeline\nMain orchestration script for document processing.\n\nUsage:\n python pipeline.py scan # Scan for new documents\n python pipeline.py process # Process queued documents\n python pipeline.py embed # Embed pending chunks\n python pipeline.py semantic <id> # Run semantic analysis on document\n python pipeline.py semantic-queue # Process semantic queue\n python pipeline.py enrich-entities # Enrich entity descriptions via Ollama\n python pipeline.py enrich-entities 50 # Limit to 50 entities\n python pipeline.py all # Full pipeline run\n python pipeline.py all --pipeline-id=1 --run-id=5 # With tracking\n python pipeline.py file <path> # Process single file\n python pipeline.py status # Show pipeline status\n\"\"\"\n\nimport argparse\nimport os\nimport time\nfrom pathlib import Path\n\nfrom config import (\n MAX_RETRIES,\n RETRY_BACKOFF_BASE,\n SEMANTIC_AUTO_QUEUE,\n SEMANTIC_SYNC,\n SEMANTIC_USE_ANTHROPIC,\n)\nfrom constants import DEFAULT_LIMIT\nfrom db import PipelineProgress, db\nfrom detect import queue_files, scan_directory\nfrom step_embed import EmbeddingStep\nfrom step_entity_enrich import EntityEnrichStep\nfrom step_extract import ExtractionStep\nfrom step_load import LoadStep\nfrom step_semantic import SemanticStep\nfrom step_semantic_extended import (\n DuplicateCheckStep,\n KnowledgeSemanticAnalyzeStep,\n KnowledgeSemanticStoreStep,\n TextSemanticAnalyzeStep,\n TextSemanticStoreStep,\n)\nfrom step_transform import TransformationStep\n\n\ndef process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Initialize pipeline steps\n extract_step = ExtractionStep(db, progress)\n load_step = LoadStep(db, progress)\n transform_step = TransformationStep(db, progress)\n embed_step = EmbeddingStep(db, progress)\n\n # Check if cancelled before starting\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 1: Extract\n extract_result = extract_step.execute(file_path)\n if not extract_result[\"success\"]:\n if extract_result.get(\"error\") == \"cancelled\":\n return \"cancelled\", 0, 0\n return False, 0, 0\n\n extraction = extract_result[\"extraction\"]\n file_info = extract_result[\"file_info\"]\n total_pages = extract_result.get(\"total_pages\", 0)\n\n # Check if cancelled after extraction\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 2: Load document\n doc_id = load_step.create_document(file_info)\n\n # Step 3: Store pages (PDFs and multi-page documents)\n page_map = load_step.store_pages(doc_id, extraction)\n\n # Step 4: Vision analysis (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n\n # Check if cancelled after vision\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 5: Chunking\n chunks = transform_step.execute_chunking(extraction, total_pages)\n\n # Step 6: Store chunks with page references\n chunks = load_step.store_chunks(doc_id, chunks, page_map)\n\n # Check if cancelled after chunking\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Step 7: Enrichment (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n\n # Check if cancelled after enrichment\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Step 8: Embeddings (Layer 3 - Document becomes searchable)\n embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n # Document is now searchable - update status to \"embedded\"\n load_step.update_document_status(doc_id, \"embedded\")\n\n if progress:\n progress.add_log(f\"Layer 3 fertig: {file_name} ist jetzt suchbar\")\n\n # Check if cancelled after embedding\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), embedded\n\n # Step 9: Semantic analysis (Layer 4 - Optional\/Async)\n semantic_step = SemanticStep(db, progress)\n full_text = extract_step.get_full_text_from_extraction(extraction)\n\n if SEMANTIC_SYNC:\n # Run semantic analysis synchronously\n try:\n semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)\n # Update to done only after semantic completes\n load_step.update_document_status(doc_id, \"done\")\n except Exception as e:\n # Semantic failed but document is still searchable\n db.log(\"WARNING\", f\"Semantic analysis failed for {file_name}: {e}\")\n if progress:\n progress.add_log(f\"Semantik-Fehler (Dokument bleibt suchbar): {str(e)[:50]}\")\n elif SEMANTIC_AUTO_QUEUE:\n # Queue for async processing\n semantic_step.queue(doc_id, priority=5)\n load_step.update_document_status(doc_id, \"done\")\n if progress:\n progress.add_log(f\"Semantik in Queue: {file_name}\")\n else:\n # No semantic analysis\n load_step.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_queue():\n \"\"\"Process items from the queue.\"\"\"\n items = db.get_pending_queue_items(limit=10)\n db.log(\"INFO\", f\"Found {len(items)} items in queue\")\n\n for item in items:\n queue_id = item[\"id\"]\n file_path = item[\"file_path\"]\n retry_count = item[\"retry_count\"]\n\n if retry_count >= MAX_RETRIES:\n db.update_queue_status(queue_id, \"failed\", \"Max retries exceeded\")\n continue\n\n db.update_queue_status(queue_id, \"processing\")\n\n try:\n success = process_file(file_path)\n if success:\n db.update_queue_status(queue_id, \"completed\")\n else:\n raise Exception(\"Processing returned False\")\n except Exception as e:\n error_msg = str(e)\n db.update_queue_status(queue_id, \"pending\", error_msg)\n\n # Exponential backoff\n wait_time = RETRY_BACKOFF_BASE ** (retry_count + 1)\n db.log(\"INFO\", f\"Retry {retry_count + 1} in {wait_time}s: {file_path}\")\n time.sleep(wait_time)\n\n\ndef run_scan():\n \"\"\"Scan for new documents.\"\"\"\n files = scan_directory()\n print(f\"Found {len(files)} files\")\n\n if files:\n queued = queue_files(files)\n print(f\"Queued {queued} files\")\n\n return files\n\n\ndef run_full_pipeline(run_id=None, pipeline_id=None):\n \"\"\"Run complete pipeline: scan → process → embed.\"\"\"\n progress = PipelineProgress(run_id) if run_id else None\n\n print(\"=\" * 50)\n print(\"KI-System Pipeline - Full Run\")\n if run_id:\n print(f\"Run ID: {run_id}, Pipeline ID: {pipeline_id}\")\n print(\"=\" * 50)\n\n try:\n # Phase 1: Scan\n if progress:\n progress.update_step(\"detect\")\n progress.add_log(\"Scanne nach Dokumenten...\")\n\n print(\"\\n[1\/3] Scanning for documents...\")\n files = scan_directory()\n print(f\"Found {len(files)} files\")\n\n if progress:\n progress.add_log(f\"{len(files)} neue Dokumente gefunden\")\n\n if files:\n queued = queue_files(files)\n print(f\"Queued {queued} files\")\n\n # Phase 2: Process queue items (includes resume of previous runs)\n items = db.get_pending_queue_items(limit=DEFAULT_LIMIT)\n print(f\"\\n[2\/3] Processing {len(items)} documents...\")\n\n if items:\n # Update total with actual queue count (may include items from previous runs)\n if progress:\n progress.update_progress(total=len(items))\n progress.add_log(f\"{len(items)} Dokumente in Queue\")\n\n total_chunks = 0\n total_embeddings = 0\n processed = 0\n failed = 0\n\n for item in items:\n # Check if cancelled\n if progress and progress.is_cancelled():\n progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n progress.complete(\"cancelled\")\n print(\"\\nPipeline cancelled by user\")\n return\n\n queue_id = item[\"id\"]\n file_path = item[\"file_path\"]\n file_name = Path(file_path).name\n\n # Skip already-done documents (for resume capability)\n if db.document_is_done(file_path):\n db.update_queue_status(queue_id, \"completed\")\n processed += 1\n if progress:\n progress.add_log(f\"Übersprungen (bereits fertig): {file_name}\")\n progress.update_progress(processed=processed)\n continue\n\n if progress:\n progress.update_document(file_name)\n\n db.update_queue_status(queue_id, \"processing\")\n\n try:\n result = process_file(file_path, progress)\n success, chunks, embedded = result if isinstance(result, tuple) else (result, 0, 0)\n\n # Handle cancellation during file processing\n if success == \"cancelled\":\n progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n progress.complete(\"cancelled\")\n print(\"\\nPipeline cancelled by user\")\n return\n\n if success:\n db.update_queue_status(queue_id, \"completed\")\n processed += 1\n total_chunks += chunks\n total_embeddings += embedded\n ... [TRUNCATED-abeef8b93a01b116]",
"structuredPatch": [
{
"oldStart": 155,
"oldLines": 6,
"newStart": 155,
"newLines": 157,
"lines": [
" return True, len(chunks), embedded",
" ",
" ",
"+def process_file_v5(file_path, progress=None):",
"+ \"\"\"Process a single file through Pipeline #5 (Scientific Pipeline).",
"+",
"+ Key difference from process_file():",
"+ - Semantic analysis happens BEFORE embedding (scientifically correct)",
"+ - Uses extended semantic steps for text and knowledge semantics",
"+ \"\"\"",
"+ file_name = Path(file_path).name",
"+",
"+ if progress:",
"+ progress.update_document(file_name)",
"+",
"+ # Initialize pipeline steps",
"+ extract_step = ExtractionStep(db, progress)",
"+ load_step = LoadStep(db, progress)",
"+ transform_step = TransformationStep(db, progress)",
"+ embed_step = EmbeddingStep(db, progress)",
"+ text_semantic_analyze = TextSemanticAnalyzeStep(db, progress)",
"+ text_semantic_store = TextSemanticStoreStep(db, progress)",
"+ knowledge_semantic_analyze = KnowledgeSemanticAnalyzeStep(db, progress)",
"+ knowledge_semantic_store = KnowledgeSemanticStoreStep(db, progress)",
"+ duplicate_check = DuplicateCheckStep(db, progress)",
"+",
"+ # Check if cancelled before starting",
"+ if progress and progress.is_cancelled():",
"+ return \"cancelled\", 0, 0",
"+",
"+ # Phase 1: Existenz - Extract",
"+ extract_result = extract_step.execute(file_path)",
"+ if not extract_result[\"success\"]:",
"+ if extract_result.get(\"error\") == \"cancelled\":",
"+ return \"cancelled\", 0, 0",
"+ return False, 0, 0",
"+",
"+ extraction = extract_result[\"extraction\"]",
"+ file_info = extract_result[\"file_info\"]",
"+ total_pages = extract_result.get(\"total_pages\", 0)",
"+ content_hash = file_info.get(\"hash\", \"\")",
"+",
"+ # Check if cancelled after extraction",
"+ if progress and progress.is_cancelled():",
"+ return \"cancelled\", 0, 0",
"+",
"+ # Phase 1: Existenz - Load document",
"+ doc_id = load_step.create_document(file_info)",
"+",
"+ # Phase 1: Existenz - Duplicate check",
"+ dup_result = duplicate_check.execute(doc_id, content_hash)",
"+ if dup_result[\"status\"] == \"abort\":",
"+ load_step.update_document_status(doc_id, \"duplicate\")",
"+ if progress:",
"+ progress.add_log(f\"Duplikat: {file_name} = Doc #{dup_result['duplicate_id']}\")",
"+ return True, 0, 0 # Not an error, just skip",
"+",
"+ # Phase 2: Text - Store pages",
"+ page_map = load_step.store_pages(doc_id, extraction)",
"+",
"+ # Phase 2: Text - Vision analysis (PDFs only)",
"+ if file_info[\"type\"] == \".pdf\":",
"+ transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])",
"+ if progress and progress.is_cancelled():",
"+ return \"cancelled\", 0, 0",
"+",
"+ # Phase 3: Struktur - Chunking",
"+ chunks = transform_step.execute_chunking(extraction, total_pages)",
"+",
"+ # Phase 3: Struktur - Store chunks with page references",
"+ chunks = load_step.store_chunks(doc_id, chunks, page_map)",
"+",
"+ if progress and progress.is_cancelled():",
"+ return \"cancelled\", len(chunks), 0",
"+",
"+ # Phase 3: Struktur - Enrichment (PDFs only)",
"+ if file_info[\"type\"] == \".pdf\":",
"+ transform_step.execute_enrichment(doc_id, file_info[\"type\"])",
"+ if progress and progress.is_cancelled():",
"+ return \"cancelled\", len(chunks), 0",
"+",
"+ # Phase 4: Textsemantik - Analyze chunks",
"+ if progress:",
"+ progress.add_log(\"Phase 4: Textsemantik...\")",
"+",
"+ # Prepare chunks for analysis",
"+ chunk_data = [{\"id\": c[\"id\"], \"content\": c[\"content\"]} for c in chunks]",
"+ analyzed_chunks = text_semantic_analyze.execute(chunk_data, {\"model\": \"mistral\"})",
"+",
"+ # Store text semantics",
"+ text_semantic_store.execute(analyzed_chunks, {})",
"+",
"+ if progress and progress.is_cancelled():",
"+ return \"cancelled\", len(chunks), 0",
"+",
"+ # Phase 5-6: Entity + Wissenssemantik",
"+ if progress:",
"+ progress.add_log(\"Phase 5-6: Entity-Extraktion + Wissenssemantik...\")",
"+",
"+ # Run standard semantic analysis (entities, relations, taxonomy)",
"+ semantic_step = SemanticStep(db, progress)",
"+ full_text = extract_step.get_full_text_from_extraction(extraction)",
"+",
"+ try:",
"+ semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)",
"+ except Exception as e:",
"+ db.log(\"WARNING\", f\"Semantic analysis failed for {file_name}: {e}\")",
"+ if progress:",
"+ progress.add_log(f\"Semantik-Warnung: {str(e)[:50]}\")",
"+",
"+ # Load entities for knowledge semantics",
"+ cursor = db.execute(",
"+ \"\"\"SELECT e.id, e.name, e.type, c.content as context",
"+ FROM entities e",
"+ JOIN chunk_entities ce ON e.id = ce.entity_id",
"+ JOIN chunks c ON ce.chunk_id = c.id",
"+ WHERE c.document_id = %s",
"+ GROUP BY e.id\"\"\",",
"+ (doc_id,),",
"+ )",
"+ entities = cursor.fetchall()",
"+ cursor.close()",
"+",
"+ if entities:",
"+ # Convert to list of dicts",
"+ entity_list = [",
"+ {\"id\": e[\"id\"], \"name\": e[\"name\"], \"type\": e[\"type\"], \"context\": e[\"context\"]}",
"+ for e in entities",
"+ ]",
"+",
"+ # Analyze knowledge semantics",
"+ analyzed_entities = knowledge_semantic_analyze.execute(entity_list, {\"model\": \"mistral\"})",
"+",
"+ # Store knowledge semantics",
"+ knowledge_semantic_store.execute(analyzed_entities, {})",
"+",
"+ if progress and progress.is_cancelled():",
"+ return \"cancelled\", len(chunks), 0",
"+",
"+ # Phase 10: Retrieval - Embeddings (AFTER semantics!)",
"+ if progress:",
"+ progress.add_log(\"Phase 10: Embeddings...\")",
"+",
"+ embedded = embed_step.execute(chunks, doc_id, file_name, file_path)",
"+",
"+ # Document is now searchable and semantically analyzed",
"+ load_step.update_document_status(doc_id, \"done\")",
"+",
"+ if progress:",
"+ progress.add_log(f\"Pipeline #5 fertig: {file_name}\")",
"+",
"+ return True, len(chunks), embedded",
"+",
"+",
" def process_queue():",
" \"\"\"Process items from the queue.\"\"\"",
" items = db.get_pending_queue_items(limit=10)"
]
}
],
"userModified": false,
"replaceAll": false
}
}