{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"content": "\n if progress:\n progress.update_document(file_name)\n\n db.update_queue_status(queue_id, \"processing\")\n\n try:\n result = process_file(file_path, progress)\n success, chunks, embedded = result if isinstance(result, tuple) else (result, 0, 0)\n\n # Handle cancellation during file processing\n if success == \"cancelled\":\n progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n progress.complete(\"cancelled\")\n print(\"\\nPipeline cancelled by user\")\n return\n\n if success:\n db.update_queue_status(queue_id, \"completed\")\n processed += 1\n total_chunks += chunks\n total_embeddings += embedded\n else:\n db.update_queue_status(queue_id, \"failed\", \"Processing failed\")\n failed += 1\n except Exception as e:\n db.update_queue_status(queue_id, \"failed\", str(e))\n failed += 1\n if progress:\n progress.add_log(f\"FEHLER bei {file_name}: {str(e)[:50]}\")\n\n if progress:\n progress.update_progress(\n processed=processed,\n failed=failed,\n chunks=total_chunks,\n embeddings=total_embeddings,\n )\n else:\n print(\"\\n[2\/3] No pending documents in queue\")\n if progress:\n progress.add_log(\"Keine ausstehenden Dokumente in Queue\")\n\n # Phase 3: Embed remaining\n print(\"\\n[3\/3] Embedding remaining chunks...\")\n embed_step = EmbeddingStep(db, progress)\n embedded = embed_step.embed_pending()\n print(f\"Embedded {embedded} chunks\")\n\n # Complete\n print(\"\\n\" + \"=\" * 50)\n print(\"Pipeline complete!\")\n\n if progress:\n progress.add_log(\"Pipeline erfolgreich abgeschlossen\")\n progress.complete(\"completed\")\n\n except Exception as e:\n db.log(\"ERROR\", f\"Pipeline error: {e}\")\n print(f\"Error: {e}\")\n if progress:\n progress.add_log(f\"FEHLER: {str(e)}\")\n progress.complete(\"failed\", str(e))\n raise\n\n\ndef main():\n \"\"\"Main entry point.\"\"\"\n parser = argparse.ArgumentParser(description=\"KI-System Document Pipeline\")\n parser.add_argument(\n \"command\",\n choices=[\"scan\", \"process\", \"embed\", \"semantic\", \"semantic-queue\", \"all\", \"file\", \"status\"],\n help=\"Command to execute\",\n )\n parser.add_argument(\"file_path\", nargs=\"?\", help=\"File path for 'file' command\")\n parser.add_argument(\"--pipeline-id\", type=int, help=\"Pipeline ID for tracking\")\n parser.add_argument(\"--run-id\", type=int, help=\"Run ID for progress tracking\")\n\n args = parser.parse_args()\n\n db.connect()\n\n try:\n if args.command == \"scan\":\n run_scan()\n\n elif args.command == \"process\":\n process_queue()\n\n elif args.command == \"embed\":\n embed_step = EmbeddingStep(db)\n count = embed_step.embed_pending()\n print(f\"Embedded {count} chunks\")\n\n elif args.command == \"semantic\":\n # Run semantic analysis on a specific document\n if not args.file_path:\n print(\"Error: semantic command requires a document ID\")\n return\n try:\n doc_id = int(args.file_path)\n except ValueError:\n print(\"Error: document ID must be an integer\")\n return\n\n semantic_step = SemanticStep(db)\n # Get document text\n text = semantic_step._get_document_text(doc_id)\n if not text:\n print(f\"No text found for document {doc_id}\")\n return\n\n result = semantic_step.execute(doc_id, text, use_anthropic=SEMANTIC_USE_ANTHROPIC)\n print(f\"Semantic analysis complete: {result}\")\n\n elif args.command == \"semantic-queue\":\n # Process pending items from semantic queue\n semantic_step = SemanticStep(db)\n result = semantic_step.process_queue(\n limit=int(args.file_path) if args.file_path else 5,\n use_anthropic=SEMANTIC_USE_ANTHROPIC,\n )\n print(f\"Semantic queue processed: {result}\")\n\n elif args.command == \"all\":\n run_full_pipeline(run_id=args.run_id, pipeline_id=args.pipeline_id)\n\n elif args.command == \"file\":\n if not args.file_path:\n print(\"Error: file command requires a file path\")\n return\n if os.path.exists(args.file_path):\n result = process_file(args.file_path)\n success = result[0] if isinstance(result, tuple) else result\n print(f\"Processing {'successful' if success else 'failed'}\")\n else:\n print(f\"File not found: {args.file_path}\")\n\n elif args.command == \"status\":\n # Show pipeline status\n cursor = db.execute(\n \"\"\"SELECT status, COUNT(*) as count\n FROM pipeline_queue\n GROUP BY status\"\"\"\n )\n results = cursor.fetchall()\n cursor.close()\n\n print(\"\\nQueue Status:\")\n for r in results:\n print(f\" {r['status']}: {r['count']}\")\n\n cursor = db.execute(\"SELECT COUNT(*) as count FROM documents\")\n doc_count = cursor.fetchone()[\"count\"]\n cursor.close()\n\n cursor = db.execute(\"SELECT COUNT(*) as count FROM chunks\")\n chunk_count = cursor.fetchone()[\"count\"]\n cursor.close()\n\n cursor = db.execute(\"SELECT COUNT(*) as count FROM chunks WHERE qdrant_id IS NOT NULL\")\n embedded_count = cursor.fetchone()[\"count\"]\n cursor.close()\n\n print(f\"\\nDocuments: {doc_count}\")\n print(f\"Chunks: {chunk_count} ({embedded_count} embedded)\")\n\n # Semantic status\n cursor = db.execute(\n \"\"\"SELECT semantic_status, COUNT(*) as count\n FROM documents\n GROUP BY semantic_status\"\"\"\n )\n sem_results = cursor.fetchall()\n cursor.close()\n\n print(\"\\nSemantic Status:\")\n for r in sem_results:\n status = r[\"semantic_status\"] or \"null\"\n print(f\" {status}: {r['count']}\")\n\n # Semantic queue\n cursor = db.execute(\n \"\"\"SELECT status, COUNT(*) as count\n FROM semantic_queue\n GROUP BY status\"\"\"\n )\n queue_results = cursor.fetchall()\n cursor.close()\n\n if queue_results:\n print(\"\\nSemantic Queue:\")\n for r in queue_results:\n print(f\" {r['status']}: {r['count']}\")\n\n except Exception as e:\n db.log(\"ERROR\", f\"Pipeline error: {e}\")\n print(f\"Error: {e}\")\n raise\n finally:\n db.disconnect()\n\n\nif __name__ == \"__main__\":\n main()\n",
"numLines": 206,
"startLine": 250,
"totalLines": 455
}
}
}