{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/opt\/scripts\/pipeline\/pipeline.py",
"content": "\n\ndef process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n db.log(\"INFO\", f\"Processing: {file_path}\")\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Extract text\n if progress:\n progress.update_step(\"extract\")\n progress.add_log(f\"Extrahiere Text: {file_name}\")\n\n extraction = extract(file_path)\n if not extraction[\"success\"]:\n db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n if progress:\n progress.add_log(\"FEHLER: Extraktion fehlgeschlagen\")\n return False, 0, 0\n\n # Get document info\n file_stat = os.stat(file_path)\n\n import hashlib\n\n with open(file_path, \"rb\") as f:\n file_hash = hashlib.sha256(f.read()).hexdigest()\n\n # Insert document\n doc_id = db.insert_document(\n file_path=file_path,\n title=file_name,\n file_type=extraction[\"file_type\"],\n file_size=file_stat.st_size,\n file_hash=file_hash,\n )\n db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n # Vision analysis for PDFs\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"vision\")\n progress.add_log(\"Vision-Analyse gestartet...\")\n\n db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n vision_config = {\n \"model\": \"minicpm-v:latest\",\n \"store_images\": True,\n \"detect_images\": True,\n \"detect_charts\": True,\n \"detect_tables\": True,\n }\n vision_result = run_vision_step(doc_id, file_path, vision_config, progress=progress)\n if vision_result[\"success\"]:\n db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n if progress:\n progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")\n else:\n db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n # Chunk content\n if progress:\n progress.update_step(\"chunk\")\n progress.add_log(\"Erstelle Chunks...\")\n\n chunks = chunk_by_structure(extraction)\n db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n # Store chunks\n for i, chunk in enumerate(chunks):\n chunk_id = db.insert_chunk(\n doc_id=doc_id,\n chunk_index=i,\n content=chunk[\"content\"],\n heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n position_start=chunk.get(\"position_start\", 0),\n position_end=chunk.get(\"position_end\", 0),\n metadata=json.dumps(chunk.get(\"metadata\", {})),\n )\n chunk[\"db_id\"] = chunk_id\n\n if progress:\n progress.add_log(f\"{len(chunks)} Chunks erstellt\")\n\n # Enrich chunks with vision context (for PDFs)\n if extraction[\"file_type\"] == \".pdf\":\n if progress:\n progress.update_step(\"enrich\")\n\n db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n enrich_result = run_enrichment_step(doc_id)\n if enrich_result[\"success\"]:\n db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n else:\n db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n # Generate embeddings\n if progress:\n progress.update_step(\"embed\")\n progress.add_log(\"Erstelle Embeddings...\")\n\n embedded = embed_chunks(chunks, doc_id, file_name, file_path)\n db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n if progress:\n progress.add_log(f\"{embedded} Embeddings erstellt\")\n\n # Semantic analysis\n if progress:\n progress.update_step(\"analyze\")\n progress.add_log(\"Semantische Analyse...\")\n\n full_text = get_full_text(extraction)\n analysis = analyze_document(doc_id, full_text)\n db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n # Update status\n db.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_queue():\n \"\"\"Process items from the queue.\"\"\"\n items = db.get_pending_queue_items(limit=10)",
"numLines": 130,
"startLine": 30,
"totalLines": 396
}
}
}