Protokoll #19151
| ID | 19151 |
|---|---|
| Zeitstempel | 2025-12-26 23:48:36.959230 |
| Client | root |
| IP | Warning: Undefined array key "request_ip" in /var/www/dev.campus.systemische-tools.de/src/View/protokoll/show.php on line 13 Deprecated: htmlspecialchars(): Passing null to parameter #1 ($string) of type string is deprecated in /var/www/dev.campus.systemische-tools.de/src/View/protokoll/show.php on line 13 |
| Modell | claude-sonnet-4-20250514 |
| Status | completed |
| Tokens | 2,708 (Input: 0, Output: 0) |
| Dauer | 88 ms |
| Request-Zeit | Warning: Undefined array key "request_timestamp" in /var/www/dev.campus.systemische-tools.de/src/View/protokoll/show.php on line 18 |
| Response-Zeit | - |
Warning: Undefined array key "error_message" in /var/www/dev.campus.systemische-tools.de/src/View/protokoll/show.php on line 22
Request
{
"event": "PreToolUse",
"tool_name": "Read",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/step_load.py"
}
}
Response
{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"content": "#!\/usr\/bin\/env python3\n\"\"\"\nKI-System Document Pipeline\nMain orchestration script for document processing.\n\nUsage:\n python pipeline.py scan # Scan for new documents\n python pipeline.py process # Process queued documents\n python pipeline.py embed # Embed pending chunks\n python pipeline.py all # Full pipeline run\n python pipeline.py all --pipeline-id=1 --run-id=5 # With tracking\n python pipeline.py file <path> # Process single file\n\"\"\"\n\nimport argparse\nimport os\nimport time\nfrom pathlib import Path\n\nfrom config import MAX_RETRIES, RETRY_BACKOFF_BASE\nfrom db import PipelineProgress, db\nfrom detect import queue_files, scan_directory\nfrom step_embed import EmbeddingStep\nfrom step_extract import ExtractionStep\nfrom step_load import LoadStep\nfrom step_transform import TransformationStep\n\n\ndef process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Initialize pipeline steps\n extract_step = ExtractionStep(db, progress)\n load_step = LoadStep(db, progress)\n transform_step = TransformationStep(db, progress)\n embed_step = EmbeddingStep(db, progress)\n\n # Check if cancelled before starting\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 1: Extract\n extract_result = extract_step.execute(file_path)\n if not extract_result[\"success\"]:\n if extract_result.get(\"error\") == \"cancelled\":\n return \"cancelled\", 0, 0\n return False, 0, 0\n\n extraction = extract_result[\"extraction\"]\n file_info = extract_result[\"file_info\"]\n total_pages = extract_result.get(\"total_pages\", 0)\n\n # Check if cancelled after extraction\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 2: Load document\n doc_id = load_step.create_document(file_info)\n\n # Step 3: Vision analysis (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n\n # Check if cancelled after vision\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 4: Chunking\n chunks = transform_step.execute_chunking(extraction, total_pages)\n\n # Step 5: Store chunks\n chunks = load_step.store_chunks(doc_id, chunks)\n\n # Check if cancelled after chunking\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Step 6: Enrichment (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n\n # Check if cancelled after enrichment\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Step 7: Embeddings\n embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n # Check if cancelled after embedding\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), embedded\n\n # Step 8: Semantic analysis\n full_text = extract_step.get_full_text_from_extraction(extraction)\n transform_step.execute_analysis(doc_id, full_text)\n\n # Step 9: Update status\n load_step.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_queue():\n \"\"\"Process items from the queue.\"\"\"\n items = db.get_pending_queue_items(limit=10)\n db.log(\"INFO\", f\"Found {len(items)} items in queue\")\n\n for item in items:\n queue_id = item[\"id\"]\n file_path = item[\"file_path\"]\n retry_count = item[\"retry_count\"]\n\n if retry_count >= MAX_RETRIES:\n db.update_queue_status(queue_id, \"failed\", \"Max retries exceeded\")\n continue\n\n db.update_queue_status(queue_id, \"processing\")\n\n try:\n success = process_file(file_path)\n if success:\n db.update_queue_status(queue_id, \"completed\")\n else:\n raise Exception(\"Processing returned False\")\n except Exception as e:\n error_msg = str(e)\n db.update_queue_status(queue_id, \"pending\", error_msg)\n\n # Exponential backoff\n wait_time = RETRY_BACKOFF_BASE ** (retry_count + 1)\n db.log(\"INFO\", f\"Retry {retry_count + 1} in {wait_time}s: {file_path}\")\n time.sleep(wait_time)\n\n\ndef run_scan():\n \"\"\"Scan for new documents.\"\"\"\n files = scan_directory()\n print(f\"Found {len(files)} files\")\n\n if files:\n queued = queue_files(files)\n print(f\"Queued {queued} files\")\n\n return files\n\n\ndef run_full_pipeline(run_id=None, pipeline_id=None):\n \"\"\"Run complete pipeline: scan → process → embed.\"\"\"\n progress = PipelineProgress(run_id) if run_id else None\n\n print(\"=\" * 50)\n print(\"KI-System Pipeline - Full Run\")\n if run_id:\n print(f\"Run ID: {run_id}, Pipeline ID: {pipeline_id}\")\n print(\"=\" * 50)\n\n try:\n # Phase 1: Scan\n if progress:\n progress.update_step(\"detect\")\n progress.add_log(\"Scanne nach Dokumenten...\")\n\n print(\"\\n[1\/3] Scanning for documents...\")\n files = scan_directory()\n print(f\"Found {len(files)} files\")\n\n if progress:\n progress.add_log(f\"{len(files)} neue Dokumente gefunden\")\n\n if files:\n queued = queue_files(files)\n print(f\"Queued {queued} files\")\n\n # Phase 2: Process queue items (includes resume of previous runs)\n items = db.get_pending_queue_items(limit=100)\n print(f\"\\n[2\/3] Processing {len(items)} documents...\")\n\n if items:\n # Update total with actual queue count (may include items from previous runs)\n if progress:\n progress.update_progress(total=len(items))\n progress.add_log(f\"{len(items)} Dokumente in Queue\")\n\n total_chunks = 0\n total_embeddings = 0\n processed = 0\n failed = 0\n\n for item in items:\n # Check if cancelled\n if progress and progress.is_cancelled():\n progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n progress.complete(\"cancelled\")\n print(\"\\nPipeline cancelled by user\")\n return\n\n queue_id = item[\"id\"]\n file_path = item[\"file_path\"]\n file_name = Path(file_path).name\n\n # Skip already-done documents (for resume capability)\n if db.document_is_done(file_path):\n db.update_queue_status(queue_id, \"completed\")\n processed += 1\n if progress:\n progress.add_log(f\"Übersprungen (bereits fertig): {file_name}\")\n progress.update_progress(processed=processed)\n continue\n\n if progress:\n progress.update_document(file_name)\n\n db.update_queue_status(queue_id, \"processing\")\n\n try:\n result = process_file(file_path, progress)\n success, chunks, embedded = result if isinstance(result, tuple) else (result, 0, 0)\n\n # Handle cancellation during file processing\n if success == \"cancelled\":\n progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n progress.complete(\"cancelled\")\n print(\"\\nPipeline cancelled by user\")\n return\n\n if success:\n db.update_queue_status(queue_id, \"completed\")\n processed += 1\n total_chunks += chunks\n total_embeddings += embedded\n else:\n db.update_queue_status(queue_id, \"failed\", \"Processing failed\")\n failed += 1\n except Exception as e:\n db.update_queue_status(queue_id, \"failed\", str(e))\n failed += 1\n if progress:\n progress.add_log(f\"FEHLER bei {file_name}: {str(e)[:50]}\")\n\n if progress:\n progress.update_progress(\n processed=processed,\n failed=failed,\n chunks=total_chunks,\n embeddings=total_embeddings,\n )\n else:\n print(\"\\n[2\/3] No pending documents in queue\")\n if progress:\n progress.add_log(\"Keine ausstehenden Dokumente in Queue\")\n\n # Phase 3: Embed remaining\n print(\"\\n[3\/3] Embedding remaining chunks...\")\n embed_step = EmbeddingStep(db, progress)\n embedded = embed_step.embed_pending()\n print(f\"Embedded {embedded} chunks\")\n\n # Complete\n print(\"\\n\" + \"=\" * 50)\n print(\"Pipeline complete!\")\n\n if progress:\n progress.add_log(\"Pipeline erfolgreich abgeschlossen\")\n progress.complete(\"completed\")\n\n except Exception as e:\n db.log(\"ERROR\", f\"Pipeline error: {e}\")\n print(f\"Error: {e}\")\n if progress:\n progress.add_log(f\"FEHLER: {str(e)}\")\n progress.complete(\"failed\", str(e))\n raise\n\n\ndef main():\n \"\"\"Main entry point.\"\"\"\n parser = argparse.ArgumentParser(description=\"KI-System Document Pipeline\")\n parser.add_argument(\n \"command\",\n choices=[\"scan\", \"process\", \"embed\", \"all\", \"file\", \"status\"],\n help=\"Command to execute\",\n )\n parser.add_argument(\"file_path\", nargs=\"?\", help=\"File path for 'file' command\")\n parser.add_argument(\"--pipeline-id\", type=int, help=\"Pipeline ID for tracking\")\n parser.add_argument(\"--run-id\", type=int, help=\"Run ID for progress tracking\")\n\n a... [TRUNCATED-9f9b2ced8306fe7d]",
"numLines": 363,
"startLine": 1,
"totalLines": 363
}
}
}