{
"event": "PreToolUse",
"tool_name": "Read",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"limit": 100
}
}
{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/pipeline.py",
"content": "#!\/usr\/bin\/env python3\n\"\"\"\nKI-System Document Pipeline\nMain orchestration script for document processing.\n\nUsage:\n python pipeline.py scan # Scan for new documents\n python pipeline.py process # Process queued documents\n python pipeline.py embed # Embed pending chunks\n python pipeline.py all # Full pipeline run\n python pipeline.py all --pipeline-id=1 --run-id=5 # With tracking\n python pipeline.py file <path> # Process single file\n\"\"\"\n\nimport argparse\nimport os\nimport time\nfrom pathlib import Path\n\nfrom config import (\n MAX_RETRIES,\n RETRY_BACKOFF_BASE,\n SEMANTIC_AUTO_QUEUE,\n SEMANTIC_SYNC,\n SEMANTIC_USE_ANTHROPIC,\n)\nfrom db import PipelineProgress, db\nfrom detect import queue_files, scan_directory\nfrom step_embed import EmbeddingStep\nfrom step_extract import ExtractionStep\nfrom step_load import LoadStep\nfrom step_semantic import SemanticStep\nfrom step_transform import TransformationStep\n\n\ndef process_file(file_path, progress=None):\n \"\"\"Process a single file through the pipeline.\"\"\"\n file_name = Path(file_path).name\n\n if progress:\n progress.update_document(file_name)\n\n # Initialize pipeline steps\n extract_step = ExtractionStep(db, progress)\n load_step = LoadStep(db, progress)\n transform_step = TransformationStep(db, progress)\n embed_step = EmbeddingStep(db, progress)\n\n # Check if cancelled before starting\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 1: Extract\n extract_result = extract_step.execute(file_path)\n if not extract_result[\"success\"]:\n if extract_result.get(\"error\") == \"cancelled\":\n return \"cancelled\", 0, 0\n return False, 0, 0\n\n extraction = extract_result[\"extraction\"]\n file_info = extract_result[\"file_info\"]\n total_pages = extract_result.get(\"total_pages\", 0)\n\n # Check if cancelled after extraction\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 2: Load document\n doc_id = load_step.create_document(file_info)\n\n # Step 3: Store pages (PDFs and multi-page documents)\n page_map = load_step.store_pages(doc_id, extraction)\n\n # Step 4: Vision analysis (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n\n # Check if cancelled after vision\n if progress and progress.is_cancelled():\n return \"cancelled\", 0, 0\n\n # Step 5: Chunking\n chunks = transform_step.execute_chunking(extraction, total_pages)\n\n # Step 6: Store chunks with page references\n chunks = load_step.store_chunks(doc_id, chunks, page_map)\n\n # Check if cancelled after chunking\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Step 7: Enrichment (PDFs only)\n if file_info[\"type\"] == \".pdf\":\n transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n\n # Check if cancelled after enrichment\n if progress and progress.is_cancelled():\n return \"cancelled\", len(chunks), 0\n\n # Step 8: Embeddings (Layer 3 - Document becomes searchable)",
"numLines": 100,
"startLine": 1,
"totalLines": 455
}
}
}