{
"event": "PreToolUse",
"tool_name": "Edit",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"old_string": "def process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n db.log(\"INFO\", f\"Processing: {file_path}\")\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Check if cancelled before starting\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Extract text\n if progress:\n progress.update_step(\"extract\")\n progress.add_log(f\"Extrahiere Text: {file_name}\")\n\n extraction = extract(file_path)\n if not extraction[\"success\"]:\n db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n if progress:\n progress.add_log(\"FEHLER: Extraktion fehlgeschlagen\")\n return False, 0, 0\n\n # Check if cancelled after extraction\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Get page count for PDFs\n total_pages = 0\n if extraction[\"file_type\"] == \".pdf\" and isinstance(extraction.get(\"content\"), list):\n total_pages = len(extraction[\"content\"])\n if progress:\n progress.add_log(f\"{total_pages} Seiten extrahiert\")\n\n # Get document info\n file_stat = os.stat(file_path)\n\n import hashlib\n\n with open(file_path, \"rb\") as f:\n file_hash = hashlib.sha256(f.read()).hexdigest()\n\n # Insert document\n doc_id = db.insert_document(\n file_path=file_path,\n title=file_name,\n file_type=extraction[\"file_type\"],\n file_size=file_stat.st_size,\n file_hash=file_hash,\n )\n db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n # Vision analysis for PDFs\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"vision\")\n progress.add_log(\"Vision-Analyse gestartet...\")\n\n db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n vision_config = {\n \"model\": \"llama3.2-vision:11b\",\n \"store_images\": True,\n \"detect_images\": True,\n \"detect_charts\": True,\n \"detect_tables\": True,\n }\n vision_result = run_vision_step(doc_id, file_path, vision_config, progress=progress)\n if vision_result[\"success\"]:\n db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n if progress:\n progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")\n else:\n db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n # Check if cancelled after vision\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Chunk content\n if progress:\n progress.update_step(\"chunk\")\n if total_pages > 0:\n progress.add_log(f\"Erstelle Chunks aus {total_pages} Seiten...\")\n else:\n progress.add_log(\"Erstelle Chunks...\")\n\n chunks = chunk_by_structure(extraction)\n db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n # Store chunks\n for i, chunk in enumerate(chunks):\n chunk_id = db.insert_chunk(\n doc_id=doc_id,\n chunk_index=i,\n content=chunk[\"content\"],\n heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n position_start=chunk.get(\"position_start\", 0),\n position_end=chunk.get(\"position_end\", 0),\n metadata=json.dumps(chunk.get(\"metadata\", {})),\n )\n chunk[\"db_id\"] = chunk_id\n\n if progress:\n progress.add_log(\n f\"{len(chunks)} Chunks erstellt (aus {total_pages} Seiten)\"\n if total_pages > 0\n else f\"{len(chunks)} Chunks erstellt\"\n )\n\n # Check if cancelled after chunking\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Enrich chunks with vision context (for PDFs)\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"enrich\")\n\n db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n enrich_result = run_enrichment_step(doc_id)\n if enrich_result[\"success\"]:\n db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n else:\n db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n # Check if cancelled after enrichment\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Generate embeddings\n if progress:\n progress.update_step(\"embed\")\n progress.add_log(f\"Erstelle Embeddings für {len(chunks)} Chunks...\")\n\n embedded = embed_chunks(chunks, doc_id, file_name, file_path, progress=progress)\n db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n if progress:\n progress.add_log(f\"{embedded} Embeddings erstellt\")\n\n # Check if cancelled after embedding\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), embedded\n\n # Semantic analysis\n if progress:\n progress.update_step(\"analyze\")\n\n full_text = get_full_text(extraction)\n analysis = analyze_document(doc_id, full_text, progress=progress)\n db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n # Update status\n db.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded",
"new_string": "def process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Initialize pipeline steps\n extract_step = ExtractionStep(db, progress)\n load_step = LoadStep(db, progress)\n transform_step = TransformationStep(db, progress)\n embed_step = EmbeddingStep(db, progress)\n\n # Check if cancelled before starting\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 1: Extract\n extract_result = extract_step.execute(file_path)\n if not extract_result[\"success\"]:\n if extract_result.get(\"error\") == \"cancelled\":\n return \"cancelled\", 0, 0\n return False, 0, 0\n\n extraction = extract_result[\"extraction\"]\n file_info = extract_result[\"file_info\"]\n total_pages = extract_result.get(\"total_pages\", 0)\n\n # Check if cancelled after extraction\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 2: Load document\n doc_id = load_step.create_document(file_info)\n\n # Step 3: Vision analysis (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n\n # Check if cancelled after vision\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 4: Chunking\n chunks = transform_step.execute_chunking(extraction, total_pages)\n\n # Step 5: Store chunks\n chunks = load_step.store_chunks(doc_id, chunks)\n\n # Check if cancelled after chunking\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Step 6: Enrichment (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n\n # Check if cancelled after enrichment\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Step 7: Embeddings\n embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n # Check if cancelled after embedding\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), embedded\n\n # Step 8: Semantic analysis\n full_text = extract_step.get_full_text_from_extraction(extraction)\n transform_step.execute_analysis(doc_id, full_text)\n\n # Step 9: Update status\n load_step.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded"
}
}
{
"tool_response": {
"filePath": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"oldString": "def process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n db.log(\"INFO\", f\"Processing: {file_path}\")\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Check if cancelled before starting\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Extract text\n if progress:\n progress.update_step(\"extract\")\n progress.add_log(f\"Extrahiere Text: {file_name}\")\n\n extraction = extract(file_path)\n if not extraction[\"success\"]:\n db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n if progress:\n progress.add_log(\"FEHLER: Extraktion fehlgeschlagen\")\n return False, 0, 0\n\n # Check if cancelled after extraction\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Get page count for PDFs\n total_pages = 0\n if extraction[\"file_type\"] == \".pdf\" and isinstance(extraction.get(\"content\"), list):\n total_pages = len(extraction[\"content\"])\n if progress:\n progress.add_log(f\"{total_pages} Seiten extrahiert\")\n\n # Get document info\n file_stat = os.stat(file_path)\n\n import hashlib\n\n with open(file_path, \"rb\") as f:\n file_hash = hashlib.sha256(f.read()).hexdigest()\n\n # Insert document\n doc_id = db.insert_document(\n file_path=file_path,\n title=file_name,\n file_type=extraction[\"file_type\"],\n file_size=file_stat.st_size,\n file_hash=file_hash,\n )\n db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n # Vision analysis for PDFs\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"vision\")\n progress.add_log(\"Vision-Analyse gestartet...\")\n\n db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n vision_config = {\n \"model\": \"llama3.2-vision:11b\",\n \"store_images\": True,\n \"detect_images\": True,\n \"detect_charts\": True,\n \"detect_tables\": True,\n }\n vision_result = run_vision_step(doc_id, file_path, vision_config, progress=progress)\n if vision_result[\"success\"]:\n db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n if progress:\n progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")\n else:\n db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n # Check if cancelled after vision\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Chunk content\n if progress:\n progress.update_step(\"chunk\")\n if total_pages > 0:\n progress.add_log(f\"Erstelle Chunks aus {total_pages} Seiten...\")\n else:\n progress.add_log(\"Erstelle Chunks...\")\n\n chunks = chunk_by_structure(extraction)\n db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n # Store chunks\n for i, chunk in enumerate(chunks):\n chunk_id = db.insert_chunk(\n doc_id=doc_id,\n chunk_index=i,\n content=chunk[\"content\"],\n heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n position_start=chunk.get(\"position_start\", 0),\n position_end=chunk.get(\"position_end\", 0),\n metadata=json.dumps(chunk.get(\"metadata\", {})),\n )\n chunk[\"db_id\"] = chunk_id\n\n if progress:\n progress.add_log(\n f\"{len(chunks)} Chunks erstellt (aus {total_pages} Seiten)\"\n if total_pages > 0\n else f\"{len(chunks)} Chunks erstellt\"\n )\n\n # Check if cancelled after chunking\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Enrich chunks with vision context (for PDFs)\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"enrich\")\n\n db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n enrich_result = run_enrichment_step(doc_id)\n if enrich_result[\"success\"]:\n db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n else:\n db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n # Check if cancelled after enrichment\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Generate embeddings\n if progress:\n progress.update_step(\"embed\")\n progress.add_log(f\"Erstelle Embeddings für {len(chunks)} Chunks...\")\n\n embedded = embed_chunks(chunks, doc_id, file_name, file_path, progress=progress)\n db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n if progress:\n progress.add_log(f\"{embedded} Embeddings erstellt\")\n\n # Check if cancelled after embedding\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), embedded\n\n # Semantic analysis\n if progress:\n progress.update_step(\"analyze\")\n\n full_text = get_full_text(extraction)\n analysis = analyze_document(doc_id, full_text, progress=progress)\n db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n # Update status\n db.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded",
"newString": "def process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Initialize pipeline steps\n extract_step = ExtractionStep(db, progress)\n load_step = LoadStep(db, progress)\n transform_step = TransformationStep(db, progress)\n embed_step = EmbeddingStep(db, progress)\n\n # Check if cancelled before starting\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 1: Extract\n extract_result = extract_step.execute(file_path)\n if not extract_result[\"success\"]:\n if extract_result.get(\"error\") == \"cancelled\":\n return \"cancelled\", 0, 0\n return False, 0, 0\n\n extraction = extract_result[\"extraction\"]\n file_info = extract_result[\"file_info\"]\n total_pages = extract_result.get(\"total_pages\", 0)\n\n # Check if cancelled after extraction\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 2: Load document\n doc_id = load_step.create_document(file_info)\n\n # Step 3: Vision analysis (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n\n # Check if cancelled after vision\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 4: Chunking\n chunks = transform_step.execute_chunking(extraction, total_pages)\n\n # Step 5: Store chunks\n chunks = load_step.store_chunks(doc_id, chunks)\n\n # Check if cancelled after chunking\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Step 6: Enrichment (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n\n # Check if cancelled after enrichment\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Step 7: Embeddings\n embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n # Check if cancelled after embedding\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), embedded\n\n # Step 8: Semantic analysis\n full_text = extract_step.get_full_text_from_extraction(extraction)\n transform_step.execute_analysis(doc_id, full_text)\n\n # Step 9: Update status\n load_step.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded",
"originalFile": "#!\/usr\/bin\/env python3\n\"\"\"\nKI-System Document Pipeline\nMain orchestration script for document processing.\n\nUsage:\n python pipeline.py scan # Scan for new documents\n python pipeline.py process # Process queued documents\n python pipeline.py embed # Embed pending chunks\n python pipeline.py all # Full pipeline run\n python pipeline.py all --pipeline-id=1 --run-id=5 # With tracking\n python pipeline.py file <path> # Process single file\n\"\"\"\n\nimport argparse\nimport os\nimport time\nfrom pathlib import Path\n\nfrom config import MAX_RETRIES, RETRY_BACKOFF_BASE\nfrom db import PipelineProgress, db\nfrom detect import queue_files, scan_directory\nfrom step_embed import EmbeddingStep\nfrom step_extract import ExtractionStep\nfrom step_load import LoadStep\nfrom step_transform import TransformationStep\n\n\ndef process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n db.log(\"INFO\", f\"Processing: {file_path}\")\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Check if cancelled before starting\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Extract text\n if progress:\n progress.update_step(\"extract\")\n progress.add_log(f\"Extrahiere Text: {file_name}\")\n\n extraction = extract(file_path)\n if not extraction[\"success\"]:\n db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n if progress:\n progress.add_log(\"FEHLER: Extraktion fehlgeschlagen\")\n return False, 0, 0\n\n # Check if cancelled after extraction\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Get page count for PDFs\n total_pages = 0\n if extraction[\"file_type\"] == \".pdf\" and isinstance(extraction.get(\"content\"), list):\n total_pages = len(extraction[\"content\"])\n if progress:\n progress.add_log(f\"{total_pages} Seiten extrahiert\")\n\n # Get document info\n file_stat = os.stat(file_path)\n\n import hashlib\n\n with open(file_path, \"rb\") as f:\n file_hash = hashlib.sha256(f.read()).hexdigest()\n\n # Insert document\n doc_id = db.insert_document(\n file_path=file_path,\n title=file_name,\n file_type=extraction[\"file_type\"],\n file_size=file_stat.st_size,\n file_hash=file_hash,\n )\n db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n # Vision analysis for PDFs\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"vision\")\n progress.add_log(\"Vision-Analyse gestartet...\")\n\n db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n vision_config = {\n \"model\": \"llama3.2-vision:11b\",\n \"store_images\": True,\n \"detect_images\": True,\n \"detect_charts\": True,\n \"detect_tables\": True,\n }\n vision_result = run_vision_step(doc_id, file_path, vision_config, progress=progress)\n if vision_result[\"success\"]:\n db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n if progress:\n progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")\n else:\n db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n # Check if cancelled after vision\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Chunk content\n if progress:\n progress.update_step(\"chunk\")\n if total_pages > 0:\n progress.add_log(f\"Erstelle Chunks aus {total_pages} Seiten...\")\n else:\n progress.add_log(\"Erstelle Chunks...\")\n\n chunks = chunk_by_structure(extraction)\n db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n # Store chunks\n for i, chunk in enumerate(chunks):\n chunk_id = db.insert_chunk(\n doc_id=doc_id,\n chunk_index=i,\n content=chunk[\"content\"],\n heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n position_start=chunk.get(\"position_start\", 0),\n position_end=chunk.get(\"position_end\", 0),\n metadata=json.dumps(chunk.get(\"metadata\", {})),\n )\n chunk[\"db_id\"] = chunk_id\n\n if progress:\n progress.add_log(\n f\"{len(chunks)} Chunks erstellt (aus {total_pages} Seiten)\"\n if total_pages > 0\n else f\"{len(chunks)} Chunks erstellt\"\n )\n\n # Check if cancelled after chunking\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Enrich chunks with vision context (for PDFs)\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"enrich\")\n\n db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n enrich_result = run_enrichment_step(doc_id)\n if enrich_result[\"success\"]:\n db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n else:\n db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n # Check if cancelled after enrichment\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Generate embeddings\n if progress:\n progress.update_step(\"embed\")\n progress.add_log(f\"Erstelle Embeddings für {len(chunks)} Chunks...\")\n\n embedded = embed_chunks(chunks, doc_id, file_name, file_path, progress=progress)\n db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n if progress:\n progress.add_log(f\"{embedded} Embeddings erstellt\")\n\n # Check if cancelled after embedding\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), embedded\n\n # Semantic analysis\n if progress:\n progress.update_step(\"analyze\")\n\n full_text = get_full_text(extraction)\n analysis = analyze_document(doc_id, full_text, progress=progress)\n db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n # Update status\n db.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_queue():\n \"\"\"Process items from the queue.\"\"\"\n items = db.get_pending_queue_items(limit=10)\n db.log(\"INFO\", f\"Found {len(items)} items in queue\")\n\n for item in items:\n queue_id = item[\"id\"]\n file_path = item[\"file_path\"]\n retry_count = item[\"retry_count\"]\n\n if retry_count >= MAX_RETRIES:\n db.update_queue_status(queue_id, \"failed\", \"Max retries exceeded\")\n continue\n\n db.update_queue_status(queue_id, \"processing\")\n\n try:\n success = process_file(file_path)\n if success:\n db.update_queue_status(queue_id, \"completed\")\n else:\n raise Exception(\"Processing returned False\")\n except Exception as e:\n error_msg = str(e)\n db.update_queue_status(queue_id, \"pending\", error_msg)\n\n # Exponential backoff\n wait_time = RETRY_BACKOFF_BASE ** (retry_count + 1)\n db.log(\"INFO\", f\"Retry {retry_count + 1} in {wait_time}s: {file_path}\")\n time.sleep(wait_time)\n\n\ndef run_scan():\n \"\"\"Scan for new documents.\"\"\"\n files = scan_directory()\n print(f\"Found {len(files)} files\")\n\n if files:\n queued = queue_files(files)\n print(f\"Queued {queued} files\")\n\n return files\n\n\ndef run_full_pipeline(run_id=None, pipeline_id=None):\n \"\"\"Run complete pipeline: scan → process → embed.\"\"\"\n progress = PipelineProgress(run_id) if run_id else None\n\n print(\"=\" * 50)\n print(\"KI-System Pipeline - Full Run\")\n if run_id:\n print(f\"Run ID: {run_id}, Pipeline ID: {pipeline_id}\")\n print(\"=\" * 50)\n\n try:\n # Phase 1: Scan\n if progress:\n progress.update_step(\"detect\")\n progress.add_log(\"Scanne nach Dokumenten...\")\n\n print(\"\\n[1\/3] Scanning for documents...\")\n files = scan_directory()\n print(f\"Found {len(files)} files\")\n\n if progress:\n progress.add_log(f\"{len(files)} neue Dokumente gefunden\")\n\n if files:\n queued = queue_files(files)\n print(f\"Queued {queued} files\")\n\n # Phase 2: Process queue items (includes resume of previous runs)\n items = db.get_pending_queue_items(limit=100)\n print(f\"\\n[2\/3] Processing {len(items)} documents...\")\n\n if items:\n # Update total with actual queue count (may include items from previous runs)\n if progress:\n progress.update_progress(total=len(items))\n progress.add_log(f\"{len(items)} Dokumente in Queue\")\n\n total_chunks = 0\n total_embeddings = 0\n processed = 0\n failed = 0\n\n for item in items:\n # Check if cancelled\n if progress and progress.is_cancelled():\n progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n progress.complete(\"cancelled\")\n print(\"\\nPipeline cancelled by user\")\n return\n\n queue_id = item[\"id\"]\n file_path = item[\"file_path\"]\n file_name = Path(file_path).name\n\n # Skip already-done documents (for resume capability)\n if db.document_is_done(file_path):\n db.update_queue_status(queue_id, \"completed\")\n processed += 1\n if progress:\n progress.add_log(f\"Übersprungen (bereits fertig): {file_name}\")\n progress.update_progress(processed=processed)\n continue\n\n if progress:\n progress.update_document(file_name)\n\n ... [TRUNCATED-936a7f517f4e4357]",
"structuredPatch": [
{
"oldStart": 28,
"oldLines": 160,
"newStart": 28,
"newLines": 79,
"lines": [
" ",
" def process_file(file_path, progress=None):",
" \"\"\"Process a single file through the pipeline.\"\"\"",
"- db.log(\"INFO\", f\"Processing: {file_path}\")",
" file_name = Path(file_path).name",
" ",
" if progress:",
" progress.update_document(file_name)",
" ",
"+ # Initialize pipeline steps",
"+ extract_step = ExtractionStep(db, progress)",
"+ load_step = LoadStep(db, progress)",
"+ transform_step = TransformationStep(db, progress)",
"+ embed_step = EmbeddingStep(db, progress)",
"+",
" # Check if cancelled before starting",
" if progress and progress.is_cancelled():",
" return \"cancelled\", 0, 0",
" ",
"- # Extract text",
"- if progress:",
"- progress.update_step(\"extract\")",
"- progress.add_log(f\"Extrahiere Text: {file_name}\")",
"-",
"- extraction = extract(file_path)",
"- if not extraction[\"success\"]:",
"- db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")",
"- if progress:",
"- progress.add_log(\"FEHLER: Extraktion fehlgeschlagen\")",
"+ # Step 1: Extract",
"+ extract_result = extract_step.execute(file_path)",
"+ if not extract_result[\"success\"]:",
"+ if extract_result.get(\"error\") == \"cancelled\":",
"+ return \"cancelled\", 0, 0",
" return False, 0, 0",
" ",
"+ extraction = extract_result[\"extraction\"]",
"+ file_info = extract_result[\"file_info\"]",
"+ total_pages = extract_result.get(\"total_pages\", 0)",
"+",
" # Check if cancelled after extraction",
" if progress and progress.is_cancelled():",
" return \"cancelled\", 0, 0",
" ",
"- # Get page count for PDFs",
"- total_pages = 0",
"- if extraction[\"file_type\"] == \".pdf\" and isinstance(extraction.get(\"content\"), list):",
"- total_pages = len(extraction[\"content\"])",
"- if progress:",
"- progress.add_log(f\"{total_pages} Seiten extrahiert\")",
"+ # Step 2: Load document",
"+ doc_id = load_step.create_document(file_info)",
" ",
"- # Get document info",
"- file_stat = os.stat(file_path)",
"+ # Step 3: Vision analysis (PDFs only)",
"+ if file_info[\"type\"] == \".pdf\":",
"+ transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])",
" ",
"- import hashlib",
"-",
"- with open(file_path, \"rb\") as f:",
"- file_hash = hashlib.sha256(f.read()).hexdigest()",
"-",
"- # Insert document",
"- doc_id = db.insert_document(",
"- file_path=file_path,",
"- title=file_name,",
"- file_type=extraction[\"file_type\"],",
"- file_size=file_stat.st_size,",
"- file_hash=file_hash,",
"- )",
"- db.log(\"INFO\", f\"Created document: {doc_id}\")",
"-",
"- # Vision analysis for PDFs",
"- if extraction[\"file_type\"] == \".pdf\":",
"- if progress:",
"- progress.update_step(\"vision\")",
"- progress.add_log(\"Vision-Analyse gestartet...\")",
"-",
"- db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")",
"- vision_config = {",
"- \"model\": \"llama3.2-vision:11b\",",
"- \"store_images\": True,",
"- \"detect_images\": True,",
"- \"detect_charts\": True,",
"- \"detect_tables\": True,",
"- }",
"- vision_result = run_vision_step(doc_id, file_path, vision_config, progress=progress)",
"- if vision_result[\"success\"]:",
"- db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")",
"- if progress:",
"- progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")",
"- else:",
"- db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")",
"-",
" # Check if cancelled after vision",
" if progress and progress.is_cancelled():",
" return \"cancelled\", 0, 0",
" ",
"- # Chunk content",
"- if progress:",
"- progress.update_step(\"chunk\")",
"- if total_pages > 0:",
"- progress.add_log(f\"Erstelle Chunks aus {total_pages} Seiten...\")",
"- else:",
"- progress.add_log(\"Erstelle Chunks...\")",
"+ # Step 4: Chunking",
"+ chunks = transform_step.execute_chunking(extraction, total_pages)",
" ",
"- chunks = chunk_by_structure(extraction)",
"- db.log(\"INFO\", f\"Created {len(chunks)} chunks\")",
"+ # Step 5: Store chunks",
"+ chunks = load_step.store_chunks(doc_id, chunks)",
" ",
"- # Store chunks",
"- for i, chunk in enumerate(chunks):",
"- chunk_id = db.insert_chunk(",
"- doc_id=doc_id,",
"- chunk_index=i,",
"- content=chunk[\"content\"],",
"- heading_path=json.dumps(chunk.get(\"heading_path\", [])),",
"- position_start=chunk.get(\"position_start\", 0),",
"- position_end=chunk.get(\"position_end\", 0),",
"- metadata=json.dumps(chunk.get(\"metadata\", {})),",
"- )",
"- chunk[\"db_id\"] = chunk_id",
"-",
"- if progress:",
"- progress.add_log(",
"- f\"{len(chunks)} Chunks erstellt (aus {total_pages} Seiten)\"",
"- if total_pages > 0",
"- else f\"{len(chunks)} Chunks erstellt\"",
"- )",
"-",
" # Check if cancelled after chunking",
" if progress and progress.is_cancelled():",
" return \"cancelled\", len(chunks), 0",
" ",
"- # Enrich chunks with vision context (for PDFs)",
"- if extraction[\"file_type\"] == \".pdf\":",
"- if progress:",
"- progress.update_step(\"enrich\")",
"+ # Step 6: Enrichment (PDFs only)",
"+ if file_info[\"type\"] == \".pdf\":",
"+ transform_step.execute_enrichment(doc_id, file_info[\"type\"])",
" ",
"- db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")",
"- enrich_result = run_enrichment_step(doc_id)",
"- if enrich_result[\"success\"]:",
"- db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")",
"- else:",
"- db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")",
"-",
" # Check if cancelled after enrichment",
" if progress and progress.is_cancelled():",
" return \"cancelled\", len(chunks), 0",
" ",
"- # Generate embeddings",
"- if progress:",
"- progress.update_step(\"embed\")",
"- progress.add_log(f\"Erstelle Embeddings für {len(chunks)} Chunks...\")",
"+ # Step 7: Embeddings",
"+ embedded = embed_step.execute(chunks, doc_id, file_name, file_path)",
" ",
"- embedded = embed_chunks(chunks, doc_id, file_name, file_path, progress=progress)",
"- db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")",
"-",
"- if progress:",
"- progress.add_log(f\"{embedded} Embeddings erstellt\")",
"-",
" # Check if cancelled after embedding",
" if progress and progress.is_cancelled():",
" return \"cancelled\", len(chunks), embedded",
" ",
"- # Semantic analysis",
"- if progress:",
"- progress.update_step(\"analyze\")",
"+ # Step 8: Semantic analysis",
"+ full_text = extract_step.get_full_text_from_extraction(extraction)",
"+ transform_step.execute_analysis(doc_id, full_text)",
" ",
"- full_text = get_full_text(extraction)",
"- analysis = analyze_document(doc_id, full_text, progress=progress)",
"- db.log(\"INFO\", f\"Analysis complete: {analysis}\")",
"+ # Step 9: Update status",
"+ load_step.update_document_status(doc_id, \"done\")",
" ",
"- # Update status",
"- db.update_document_status(doc_id, \"done\")",
"-",
" if progress:",
" progress.add_log(f\"Fertig: {file_name}\")",
" "
]
}
],
"userModified": false,
"replaceAll": false
}
}