{
"event": "PreToolUse",
"tool_name": "Read",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"offset": 340,
"limit": 100
}
}
{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"content": "\ndef run_scan():\n \"\"\"Scan for new documents.\"\"\"\n files = scan_directory()\n print(f\"Found {len(files)} files\")\n\n if files:\n queued = queue_files(files)\n print(f\"Queued {queued} files\")\n\n return files\n\n\ndef run_full_pipeline(run_id=None, pipeline_id=None):\n \"\"\"Run complete pipeline: scan → process → embed.\"\"\"\n progress = PipelineProgress(run_id) if run_id else None\n\n print(\"=\" * 50)\n print(\"KI-System Pipeline - Full Run\")\n if run_id:\n print(f\"Run ID: {run_id}, Pipeline ID: {pipeline_id}\")\n print(\"=\" * 50)\n\n try:\n # Phase 1: Scan\n if progress:\n progress.update_step(\"detect\")\n progress.add_log(\"Scanne nach Dokumenten...\")\n\n print(\"\\n[1\/3] Scanning for documents...\")\n files = scan_directory()\n print(f\"Found {len(files)} files\")\n\n if progress:\n progress.add_log(f\"{len(files)} neue Dokumente gefunden\")\n\n if files:\n queued = queue_files(files)\n print(f\"Queued {queued} files\")\n\n # Phase 2: Process queue items (includes resume of previous runs)\n items = db.get_pending_queue_items(limit=DEFAULT_LIMIT)\n print(f\"\\n[2\/3] Processing {len(items)} documents...\")\n\n if items:\n # Update total with actual queue count (may include items from previous runs)\n if progress:\n progress.update_progress(total=len(items))\n progress.add_log(f\"{len(items)} Dokumente in Queue\")\n\n total_chunks = 0\n total_embeddings = 0\n processed = 0\n failed = 0\n\n for item in items:\n # Check if cancelled\n if progress and progress.is_cancelled():\n progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n progress.complete(\"cancelled\")\n print(\"\\nPipeline cancelled by user\")\n return\n\n queue_id = item[\"id\"]\n file_path = item[\"file_path\"]\n file_name = Path(file_path).name\n\n # Skip already-done documents (for resume capability)\n if db.document_is_done(file_path):\n db.update_queue_status(queue_id, \"completed\")\n processed += 1\n if progress:\n progress.add_log(f\"Übersprungen (bereits fertig): {file_name}\")\n progress.update_progress(processed=processed)\n continue\n\n if progress:\n progress.update_document(file_name)\n\n db.update_queue_status(queue_id, \"processing\")\n\n try:\n result = process_file(file_path, progress)\n success, chunks, embedded = result if isinstance(result, tuple) else (result, 0, 0)\n\n # Handle cancellation during file processing\n if success == \"cancelled\":\n progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n progress.complete(\"cancelled\")\n print(\"\\nPipeline cancelled by user\")\n return\n\n if success:\n db.update_queue_status(queue_id, \"completed\")\n processed += 1\n total_chunks += chunks\n total_embeddings += embedded\n else:\n db.update_queue_status(queue_id, \"failed\", \"Processing failed\")\n failed += 1",
"numLines": 100,
"startLine": 340,
"totalLines": 647
}
}
}