{
"event": "PreToolUse",
"tool_name": "Read",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"offset": 100,
"limit": 100
}
}
{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"content": " # Step 8: Embeddings (Layer 3 - Document becomes searchable)\n embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n # Document is now searchable - update status to \"embedded\"\n load_step.update_document_status(doc_id, \"embedded\")\n\n if progress:\n progress.add_log(f\"Layer 3 fertig: {file_name} ist jetzt suchbar\")\n\n # Check if cancelled after embedding\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), embedded\n\n # Step 9: Semantic analysis (Layer 4 - Optional\/Async)\n semantic_step = SemanticStep(db, progress)\n full_text = extract_step.get_full_text_from_extraction(extraction)\n\n if SEMANTIC_SYNC:\n # Run semantic analysis synchronously\n try:\n semantic_step.execute(doc_id, full_text, use_anthropic=SEMANTIC_USE_ANTHROPIC)\n # Update to done only after semantic completes\n load_step.update_document_status(doc_id, \"done\")\n except Exception as e:\n # Semantic failed but document is still searchable\n db.log(\"WARNING\", f\"Semantic analysis failed for {file_name}: {e}\")\n if progress:\n progress.add_log(f\"Semantik-Fehler (Dokument bleibt suchbar): {str(e)[:50]}\")\n elif SEMANTIC_AUTO_QUEUE:\n # Queue for async processing\n semantic_step.queue(doc_id, priority=5)\n load_step.update_document_status(doc_id, \"done\")\n if progress:\n progress.add_log(f\"Semantik in Queue: {file_name}\")\n else:\n # No semantic analysis\n load_step.update_document_status(doc_id, \"done\")\n\n if progress:\n progress.add_log(f\"Fertig: {file_name}\")\n\n return True, len(chunks), embedded\n\n\ndef process_queue():\n \"\"\"Process items from the queue.\"\"\"\n items = db.get_pending_queue_items(limit=10)\n db.log(\"INFO\", f\"Found {len(items)} items in queue\")\n\n for item in items:\n queue_id = item[\"id\"]\n file_path = item[\"file_path\"]\n retry_count = item[\"retry_count\"]\n\n if retry_count >= MAX_RETRIES:\n db.update_queue_status(queue_id, \"failed\", \"Max retries exceeded\")\n continue\n\n db.update_queue_status(queue_id, \"processing\")\n\n try:\n success = process_file(file_path)\n if success:\n db.update_queue_status(queue_id, \"completed\")\n else:\n raise Exception(\"Processing returned False\")\n except Exception as e:\n error_msg = str(e)\n db.update_queue_status(queue_id, \"pending\", error_msg)\n\n # Exponential backoff\n wait_time = RETRY_BACKOFF_BASE ** (retry_count + 1)\n db.log(\"INFO\", f\"Retry {retry_count + 1} in {wait_time}s: {file_path}\")\n time.sleep(wait_time)\n\n\ndef run_scan():\n \"\"\"Scan for new documents.\"\"\"\n files = scan_directory()\n print(f\"Found {len(files)} files\")\n\n if files:\n queued = queue_files(files)\n print(f\"Queued {queued} files\")\n\n return files\n\n\ndef run_full_pipeline(run_id=None, pipeline_id=None):\n \"\"\"Run complete pipeline: scan → process → embed.\"\"\"\n progress = PipelineProgress(run_id) if run_id else None\n\n print(\"=\" * 50)\n print(\"KI-System Pipeline - Full Run\")\n if run_id:\n print(f\"Run ID: {run_id}, Pipeline ID: {pipeline_id}\")\n print(\"=\" * 50)\n\n try:\n # Phase 1: Scan",
"numLines": 100,
"startLine": 100,
"totalLines": 455
}
}
}