{
"event": "PreToolUse",
"tool_name": "Edit",
"tool_input": {
"file_path": "\/opt\/scripts\/pipeline\/pipeline.py",
"old_string": "def process_file(file_path):\n \"\"\"Process a single file through the pipeline.\"\"\"\n db.log(\"INFO\", f\"Processing: {file_path}\")\n\n # Extract text\n extraction = extract(file_path)\n if not extraction[\"success\"]:\n db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n return False\n\n # Get document info\n file_name = Path(file_path).name\n file_stat = os.stat(file_path)\n\n import hashlib\n\n with open(file_path, \"rb\") as f:\n file_hash = hashlib.sha256(f.read()).hexdigest()\n\n # Insert document\n doc_id = db.insert_document(\n file_path=file_path,\n title=file_name,\n file_type=extraction[\"file_type\"],\n file_size=file_stat.st_size,\n file_hash=file_hash,\n )\n db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n # Vision analysis for PDFs\n if extraction[\"file_type\"] == \".pdf\":\n db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n vision_config = {\n \"model\": \"minicpm-v:latest\",\n \"store_images\": True,\n \"detect_images\": True,\n \"detect_charts\": True,\n \"detect_tables\": True,\n }\n vision_result = run_vision_step(doc_id, file_path, vision_config)\n if vision_result[\"success\"]:\n db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n else:\n db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n # Chunk content\n chunks = chunk_by_structure(extraction)\n db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n # Store chunks\n for i, chunk in enumerate(chunks):\n chunk_id = db.insert_chunk(\n doc_id=doc_id,\n chunk_index=i,\n content=chunk[\"content\"],\n heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n position_start=chunk.get(\"position_start\", 0),\n position_end=chunk.get(\"position_end\", 0),\n metadata=json.dumps(chunk.get(\"metadata\", {})),\n )\n chunk[\"db_id\"] = chunk_id\n\n # Enrich chunks with vision context (for PDFs)\n if extraction[\"file_type\"] == \".pdf\":\n db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n enrich_result = run_enrichment_step(doc_id)\n if enrich_result[\"success\"]:\n db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n else:\n db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n # Generate embeddings\n embedded = embed_chunks(chunks, doc_id, file_name, file_path)\n db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n # Semantic analysis\n full_text = get_full_text(extraction)\n analysis = analyze_document(doc_id, full_text)\n db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n # Update status\n db.update_document_status(doc_id, \"done\")\n return True",
"new_string": "def process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n db.log(\"INFO\", f\"Processing: {file_path}\")\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Extract text\n if progress:\n progress.update_step(\"extract\")\n progress.add_log(f\"Extrahiere Text: {file_name}\")\n\n extraction = extract(file_path)\n if not extraction[\"success\"]:\n db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n if progress:\n progress.add_log(f\"FEHLER: Extraktion fehlgeschlagen\")\n return False, 0, 0\n\n # Get document info\n file_stat = os.stat(file_path)\n\n import hashlib\n\n with open(file_path, \"rb\") as f:\n file_hash = hashlib.sha256(f.read()).hexdigest()\n\n # Insert document\n doc_id = db.insert_document(\n file_path=file_path,\n title=file_name,\n file_type=extraction[\"file_type\"],\n file_size=file_stat.st_size,\n file_hash=file_hash,\n )\n db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n # Vision analysis for PDFs\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"vision\")\n progress.add_log(\"Vision-Analyse gestartet...\")\n\n db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n vision_config = {\n \"model\": \"minicpm-v:latest\",\n \"store_images\": True,\n \"detect_images\": True,\n \"detect_charts\": True,\n \"detect_tables\": True,\n }\n vision_result = run_vision_step(doc_id, file_path, vision_config)\n if vision_result[\"success\"]:\n db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n if progress:\n progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")\n else:\n db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n # Chunk content\n if progress:\n progress.update_step(\"chunk\")\n progress.add_log(\"Erstelle Chunks...\")\n\n chunks = chunk_by_structure(extraction)\n db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n # Store chunks\n for i, chunk in enumerate(chunks):\n chunk_id = db.insert_chunk(\n doc_id=doc_id,\n chunk_index=i,\n content=chunk[\"content\"],\n heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n position_start=chunk.get(\"position_start\", 0),\n position_end=chunk.get(\"position_end\", 0),\n metadata=json.dumps(chunk.get(\"metadata\", {})),\n )\n chunk[\"db_id\"] = chunk_id\n\n if progress:\n progress.add_log(f\"{len(chunks)} Chunks erstellt\")\n\n # Enrich chunks with vision context (for PDFs)\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"enrich\")\n\n db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n enrich_result = run_enrichment_step(doc_id)\n if enrich_result[\"success\"]:\n db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n else:\n db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n # Generate embeddings\n if progress:\n progress.update_step(\"embed\")\n progress.add_log(\"Erstelle Embeddings...\")\n\n embedded = embed_chunks(chunks, doc_id, file_name, file_path)\n db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n if progress:\n progress.add_log(f\"{embedded} Embeddings erstellt\")\n\n # Semantic analysis\n if progress:\n progress.update_step(\"analyze\")\n progress.add_log(\"Semantische Analyse...\")\n\n full_text = get_full_text(extraction)\n analysis = analyze_document(doc_id, full_text)\n db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n # Update status\n db.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded"
}
}
{
"tool_response": {
"filePath": "\/opt\/scripts\/pipeline\/pipeline.py",
"oldString": "def process_file(file_path):\n \"\"\"Process a single file through the pipeline.\"\"\"\n db.log(\"INFO\", f\"Processing: {file_path}\")\n\n # Extract text\n extraction = extract(file_path)\n if not extraction[\"success\"]:\n db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n return False\n\n # Get document info\n file_name = Path(file_path).name\n file_stat = os.stat(file_path)\n\n import hashlib\n\n with open(file_path, \"rb\") as f:\n file_hash = hashlib.sha256(f.read()).hexdigest()\n\n # Insert document\n doc_id = db.insert_document(\n file_path=file_path,\n title=file_name,\n file_type=extraction[\"file_type\"],\n file_size=file_stat.st_size,\n file_hash=file_hash,\n )\n db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n # Vision analysis for PDFs\n if extraction[\"file_type\"] == \".pdf\":\n db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n vision_config = {\n \"model\": \"minicpm-v:latest\",\n \"store_images\": True,\n \"detect_images\": True,\n \"detect_charts\": True,\n \"detect_tables\": True,\n }\n vision_result = run_vision_step(doc_id, file_path, vision_config)\n if vision_result[\"success\"]:\n db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n else:\n db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n # Chunk content\n chunks = chunk_by_structure(extraction)\n db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n # Store chunks\n for i, chunk in enumerate(chunks):\n chunk_id = db.insert_chunk(\n doc_id=doc_id,\n chunk_index=i,\n content=chunk[\"content\"],\n heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n position_start=chunk.get(\"position_start\", 0),\n position_end=chunk.get(\"position_end\", 0),\n metadata=json.dumps(chunk.get(\"metadata\", {})),\n )\n chunk[\"db_id\"] = chunk_id\n\n # Enrich chunks with vision context (for PDFs)\n if extraction[\"file_type\"] == \".pdf\":\n db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n enrich_result = run_enrichment_step(doc_id)\n if enrich_result[\"success\"]:\n db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n else:\n db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n # Generate embeddings\n embedded = embed_chunks(chunks, doc_id, file_name, file_path)\n db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n # Semantic analysis\n full_text = get_full_text(extraction)\n analysis = analyze_document(doc_id, full_text)\n db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n # Update status\n db.update_document_status(doc_id, \"done\")\n return True",
"newString": "def process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n db.log(\"INFO\", f\"Processing: {file_path}\")\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Extract text\n if progress:\n progress.update_step(\"extract\")\n progress.add_log(f\"Extrahiere Text: {file_name}\")\n\n extraction = extract(file_path)\n if not extraction[\"success\"]:\n db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n if progress:\n progress.add_log(f\"FEHLER: Extraktion fehlgeschlagen\")\n return False, 0, 0\n\n # Get document info\n file_stat = os.stat(file_path)\n\n import hashlib\n\n with open(file_path, \"rb\") as f:\n file_hash = hashlib.sha256(f.read()).hexdigest()\n\n # Insert document\n doc_id = db.insert_document(\n file_path=file_path,\n title=file_name,\n file_type=extraction[\"file_type\"],\n file_size=file_stat.st_size,\n file_hash=file_hash,\n )\n db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n # Vision analysis for PDFs\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"vision\")\n progress.add_log(\"Vision-Analyse gestartet...\")\n\n db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n vision_config = {\n \"model\": \"minicpm-v:latest\",\n \"store_images\": True,\n \"detect_images\": True,\n \"detect_charts\": True,\n \"detect_tables\": True,\n }\n vision_result = run_vision_step(doc_id, file_path, vision_config)\n if vision_result[\"success\"]:\n db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n if progress:\n progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")\n else:\n db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n # Chunk content\n if progress:\n progress.update_step(\"chunk\")\n progress.add_log(\"Erstelle Chunks...\")\n\n chunks = chunk_by_structure(extraction)\n db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n # Store chunks\n for i, chunk in enumerate(chunks):\n chunk_id = db.insert_chunk(\n doc_id=doc_id,\n chunk_index=i,\n content=chunk[\"content\"],\n heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n position_start=chunk.get(\"position_start\", 0),\n position_end=chunk.get(\"position_end\", 0),\n metadata=json.dumps(chunk.get(\"metadata\", {})),\n )\n chunk[\"db_id\"] = chunk_id\n\n if progress:\n progress.add_log(f\"{len(chunks)} Chunks erstellt\")\n\n # Enrich chunks with vision context (for PDFs)\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"enrich\")\n\n db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n enrich_result = run_enrichment_step(doc_id)\n if enrich_result[\"success\"]:\n db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n else:\n db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n # Generate embeddings\n if progress:\n progress.update_step(\"embed\")\n progress.add_log(\"Erstelle Embeddings...\")\n\n embedded = embed_chunks(chunks, doc_id, file_name, file_path)\n db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n if progress:\n progress.add_log(f\"{embedded} Embeddings erstellt\")\n\n # Semantic analysis\n if progress:\n progress.update_step(\"analyze\")\n progress.add_log(\"Semantische Analyse...\")\n\n full_text = get_full_text(extraction)\n analysis = analyze_document(doc_id, full_text)\n db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n # Update status\n db.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded",
"originalFile": "#!\/usr\/bin\/env python3\n\"\"\"\nKI-System Document Pipeline\nMain orchestration script for document processing.\n\nUsage:\n python pipeline.py scan # Scan for new documents\n python pipeline.py process # Process queued documents\n python pipeline.py embed # Embed pending chunks\n python pipeline.py all # Full pipeline run\n python pipeline.py all --pipeline-id=1 --run-id=5 # With tracking\n python pipeline.py file <path> # Process single file\n\"\"\"\n\nimport argparse\nimport json # noqa: I001\nimport os\nimport sys\nimport time\nfrom pathlib import Path\n\nfrom analyze import analyze_document\nfrom chunk import chunk_by_structure\nfrom config import MAX_RETRIES, RETRY_BACKOFF_BASE\nfrom db import db, PipelineProgress\nfrom detect import queue_files, scan_directory\nfrom embed import embed_chunks, embed_pending_chunks\nfrom enrich import run_enrichment_step\nfrom extract import extract, get_full_text\nfrom vision import run_vision_step\n\n\ndef process_file(file_path):\n \"\"\"Process a single file through the pipeline.\"\"\"\n db.log(\"INFO\", f\"Processing: {file_path}\")\n\n # Extract text\n extraction = extract(file_path)\n if not extraction[\"success\"]:\n db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n return False\n\n # Get document info\n file_name = Path(file_path).name\n file_stat = os.stat(file_path)\n\n import hashlib\n\n with open(file_path, \"rb\") as f:\n file_hash = hashlib.sha256(f.read()).hexdigest()\n\n # Insert document\n doc_id = db.insert_document(\n file_path=file_path,\n title=file_name,\n file_type=extraction[\"file_type\"],\n file_size=file_stat.st_size,\n file_hash=file_hash,\n )\n db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n # Vision analysis for PDFs\n if extraction[\"file_type\"] == \".pdf\":\n db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n vision_config = {\n \"model\": \"minicpm-v:latest\",\n \"store_images\": True,\n \"detect_images\": True,\n \"detect_charts\": True,\n \"detect_tables\": True,\n }\n vision_result = run_vision_step(doc_id, file_path, vision_config)\n if vision_result[\"success\"]:\n db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n else:\n db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n # Chunk content\n chunks = chunk_by_structure(extraction)\n db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n # Store chunks\n for i, chunk in enumerate(chunks):\n chunk_id = db.insert_chunk(\n doc_id=doc_id,\n chunk_index=i,\n content=chunk[\"content\"],\n heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n position_start=chunk.get(\"position_start\", 0),\n position_end=chunk.get(\"position_end\", 0),\n metadata=json.dumps(chunk.get(\"metadata\", {})),\n )\n chunk[\"db_id\"] = chunk_id\n\n # Enrich chunks with vision context (for PDFs)\n if extraction[\"file_type\"] == \".pdf\":\n db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n enrich_result = run_enrichment_step(doc_id)\n if enrich_result[\"success\"]:\n db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n else:\n db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n # Generate embeddings\n embedded = embed_chunks(chunks, doc_id, file_name, file_path)\n db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n # Semantic analysis\n full_text = get_full_text(extraction)\n analysis = analyze_document(doc_id, full_text)\n db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n # Update status\n db.update_document_status(doc_id, \"done\")\n return True\n\n\ndef process_queue():\n \"\"\"Process items from the queue.\"\"\"\n items = db.get_pending_queue_items(limit=10)\n db.log(\"INFO\", f\"Found {len(items)} items in queue\")\n\n for item in items:\n queue_id = item[\"id\"]\n file_path = item[\"file_path\"]\n retry_count = item[\"retry_count\"]\n\n if retry_count >= MAX_RETRIES:\n db.update_queue_status(queue_id, \"failed\", \"Max retries exceeded\")\n continue\n\n db.update_queue_status(queue_id, \"processing\")\n\n try:\n success = process_file(file_path)\n if success:\n db.update_queue_status(queue_id, \"done\")\n else:\n raise Exception(\"Processing returned False\")\n except Exception as e:\n error_msg = str(e)\n db.update_queue_status(queue_id, \"pending\", error_msg)\n\n # Exponential backoff\n wait_time = RETRY_BACKOFF_BASE ** (retry_count + 1)\n db.log(\"INFO\", f\"Retry {retry_count + 1} in {wait_time}s: {file_path}\")\n time.sleep(wait_time)\n\n\ndef run_scan():\n \"\"\"Scan for new documents.\"\"\"\n files = scan_directory()\n print(f\"Found {len(files)} files\")\n\n if files:\n queued = queue_files(files)\n print(f\"Queued {queued} files\")\n\n return files\n\n\ndef run_full_pipeline():\n \"\"\"Run complete pipeline: scan → process → embed.\"\"\"\n print(\"=\" * 50)\n print(\"KI-System Pipeline - Full Run\")\n print(\"=\" * 50)\n\n print(\"\\n[1\/3] Scanning for documents...\")\n files = run_scan()\n\n if files:\n print(f\"\\n[2\/3] Processing {len(files)} documents...\")\n process_queue()\n else:\n print(\"\\n[2\/3] No new documents to process\")\n\n print(\"\\n[3\/3] Embedding remaining chunks...\")\n embedded = embed_pending_chunks()\n print(f\"Embedded {embedded} chunks\")\n\n print(\"\\n\" + \"=\" * 50)\n print(\"Pipeline complete!\")\n\n\ndef main():\n \"\"\"Main entry point.\"\"\"\n if len(sys.argv) < 2:\n print(__doc__)\n return\n\n command = sys.argv[1].lower()\n\n db.connect()\n\n try:\n if command == \"scan\":\n run_scan()\n\n elif command == \"process\":\n process_queue()\n\n elif command == \"embed\":\n count = embed_pending_chunks()\n print(f\"Embedded {count} chunks\")\n\n elif command == \"all\":\n run_full_pipeline()\n\n elif command == \"file\" and len(sys.argv) > 2:\n file_path = sys.argv[2]\n if os.path.exists(file_path):\n success = process_file(file_path)\n print(f\"Processing {'successful' if success else 'failed'}\")\n else:\n print(f\"File not found: {file_path}\")\n\n elif command == \"status\":\n # Show pipeline status\n cursor = db.execute(\n \"\"\"SELECT status, COUNT(*) as count\n FROM pipeline_queue\n GROUP BY status\"\"\"\n )\n results = cursor.fetchall()\n cursor.close()\n\n print(\"\\nQueue Status:\")\n for r in results:\n print(f\" {r['status']}: {r['count']}\")\n\n cursor = db.execute(\"SELECT COUNT(*) as count FROM documents\")\n doc_count = cursor.fetchone()[\"count\"]\n cursor.close()\n\n cursor = db.execute(\"SELECT COUNT(*) as count FROM chunks\")\n chunk_count = cursor.fetchone()[\"count\"]\n cursor.close()\n\n cursor = db.execute(\"SELECT COUNT(*) as count FROM chunks WHERE qdrant_id IS NOT NULL\")\n embedded_count = cursor.fetchone()[\"count\"]\n cursor.close()\n\n print(f\"\\nDocuments: {doc_count}\")\n print(f\"Chunks: {chunk_count} ({embedded_count} embedded)\")\n\n else:\n print(f\"Unknown command: {command}\")\n print(__doc__)\n\n except Exception as e:\n db.log(\"ERROR\", f\"Pipeline error: {e}\")\n print(f\"Error: {e}\")\n raise\n finally:\n db.disconnect()\n\n\nif __name__ == \"__main__\":\n main()\n",
"structuredPatch": [
{
"oldStart": 30,
"oldLines": 18,
"newStart": 30,
"newLines": 27,
"lines": [
" from vision import run_vision_step",
" ",
" ",
"-def process_file(file_path):",
"+def process_file(file_path, progress=None):",
" \"\"\"Process a single file through the pipeline.\"\"\"",
" db.log(\"INFO\", f\"Processing: {file_path}\")",
"+ file_name = Path(file_path).name",
" ",
"+ if progress:",
"+ progress.update_document(file_name)",
"+",
" # Extract text",
"+ if progress:",
"+ progress.update_step(\"extract\")",
"+ progress.add_log(f\"Extrahiere Text: {file_name}\")",
"+",
" extraction = extract(file_path)",
" if not extraction[\"success\"]:",
" db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")",
"- return False",
"+ if progress:",
"+ progress.add_log(f\"FEHLER: Extraktion fehlgeschlagen\")",
"+ return False, 0, 0",
" ",
" # Get document info",
"- file_name = Path(file_path).name",
" file_stat = os.stat(file_path)",
" ",
" import hashlib"
]
},
{
"oldStart": 61,
"oldLines": 6,
"newStart": 70,
"newLines": 10,
"lines": [
" ",
" # Vision analysis for PDFs",
" if extraction[\"file_type\"] == \".pdf\":",
"+ if progress:",
"+ progress.update_step(\"vision\")",
"+ progress.add_log(\"Vision-Analyse gestartet...\")",
"+",
" db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")",
" vision_config = {",
" \"model\": \"minicpm-v:latest\","
]
},
{
"oldStart": 72,
"oldLines": 10,
"newStart": 85,
"newLines": 16,
"lines": [
" vision_result = run_vision_step(doc_id, file_path, vision_config)",
" if vision_result[\"success\"]:",
" db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")",
"+ if progress:",
"+ progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")",
" else:",
" db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")",
" ",
" # Chunk content",
"+ if progress:",
"+ progress.update_step(\"chunk\")",
"+ progress.add_log(\"Erstelle Chunks...\")",
"+",
" chunks = chunk_by_structure(extraction)",
" db.log(\"INFO\", f\"Created {len(chunks)} chunks\")",
" "
]
},
{
"oldStart": 92,
"oldLines": 8,
"newStart": 111,
"newLines": 14,
"lines": [
" )",
" chunk[\"db_id\"] = chunk_id",
" ",
"+ if progress:",
"+ progress.add_log(f\"{len(chunks)} Chunks erstellt\")",
"+",
" # Enrich chunks with vision context (for PDFs)",
" if extraction[\"file_type\"] == \".pdf\":",
"+ if progress:",
"+ progress.update_step(\"enrich\")",
"+",
" db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")",
" enrich_result = run_enrichment_step(doc_id)",
" if enrich_result[\"success\"]:"
]
},
{
"oldStart": 102,
"oldLines": 19,
"newStart": 127,
"newLines": 34,
"lines": [
" db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")",
" ",
" # Generate embeddings",
"+ if progress:",
"+ progress.update_step(\"embed\")",
"+ progress.add_log(\"Erstelle Embeddings...\")",
"+",
" embedded = embed_chunks(chunks, doc_id, file_name, file_path)",
" db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")",
" ",
"+ if progress:",
"+ progress.add_log(f\"{embedded} Embeddings erstellt\")",
"+",
" # Semantic analysis",
"+ if progress:",
"+ progress.update_step(\"analyze\")",
"+ progress.add_log(\"Semantische Analyse...\")",
"+",
" full_text = get_full_text(extraction)",
" analysis = analyze_document(doc_id, full_text)",
" db.log(\"INFO\", f\"Analysis complete: {analysis}\")",
" ",
" # Update status",
" db.update_document_status(doc_id, \"done\")",
"- return True",
" ",
"+ if progress:",
"+ progress.add_log(f\"Fertig: {file_name}\")",
" ",
"+ return True, len(chunks), embedded",
"+",
"+",
" def process_queue():",
" \"\"\"Process items from the queue.\"\"\"",
" items = db.get_pending_queue_items(limit=10)"
]
}
],
"userModified": false,
"replaceAll": false
}
}