{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/migration\/content-pipeline\/docs\/ARCHITECTURE.md",
"content": "# Content Pipeline - Architecture Documentation\n\n## System Components\n\n### 1. Document Processing Layer\n\n```\nDocuments (PDF, DOCX, PPTX, MD, TXT)\n │\n ▼\n ┌─────────────┐\n │ DETECT │ → Scans directory, hashes files\n └─────────────┘\n │\n ▼\n ┌─────────────┐\n │ EXTRACT │ → PyMuPDF, python-docx, python-pptx\n └─────────────┘\n │\n ▼\n ┌─────────────┐\n │ VISION │ → Tesseract OCR for images\n └─────────────┘\n │\n ▼\n ┌─────────────┐\n │ CHUNK │ → Semantic splitting with overlap\n └─────────────┘\n```\n\n### 2. Embedding Layer\n\n```\n Chunks (text segments)\n │\n ▼\n ┌─────────────┐\n │ EMBED │ → Ollama mxbai-embed-large (1024 dims)\n └─────────────┘\n │\n ▼\n ┌─────────────┐\n │ QDRANT │ → Vector storage & similarity search\n └─────────────┘\n```\n\n### 3. Semantic Analysis Layer\n\n```\n Chunks + Embeddings\n │\n ├──────────────────────────────────┐\n ▼ ▼\n ┌─────────────┐ ┌─────────────┐\n │ ENTITY │ │ SEMANTIC │\n │ EXTRACTOR │ │ ANALYZER │\n └─────────────┘ └─────────────┘\n │ │\n ▼ ▼\n ┌─────────────┐ ┌─────────────┐\n │ RELATION │ │ TAXONOMY │\n │ EXTRACTOR │ │ CLASSIFIER │\n └─────────────┘ └─────────────┘\n │ │\n └──────────────┬───────────────────┘\n ▼\n ┌─────────────┐\n │ KNOWLEDGE │\n │ GRAPH │\n └─────────────┘\n```\n\n## Data Flow\n\n### Document Lifecycle\n\n```\n1. PENDING → Document detected, awaiting processing\n2. IMPORTING → Being read from filesystem\n3. IMPORTED → Text extracted\n4. CHUNKING → Being split into chunks\n5. CHUNKED → Chunks created\n6. EMBEDDING → Generating vectors\n7. EMBEDDED → Vectors stored in Qdrant\n8. ENRICHING → Semantic analysis running\n9. ENRICHED → Entities\/relations extracted\n10. DONE → Fully processed\n```\n\n### Chunk Processing\n\n```\nChunk Status:\n- created → Initial state\n- embedding → Vector generation in progress\n- embedded → Vector stored in Qdrant\n- error → Processing failed\n- deprecated → Superseded by newer version\n```\n\n## Database Design\n\n### Entity-Relationship Overview\n\n```\ndocuments ─────┬───── document_pages\n │\n └───── chunks ─────┬───── chunk_semantics\n │ │\n │ └───── chunk_entities ───── entities\n │ │\n │ ├── entity_relations\n │ │\n └───── chunk_taxonomy ─────────── taxonomy_terms\n │\n entity_taxonomy_mapping\n```\n\n### Key Relationships\n\n| From | To | Relationship |\n|------|-----|--------------|\n| documents | chunks | 1:N |\n| documents | document_pages | 1:N |\n| chunks | chunk_semantics | 1:1 |\n| chunks | entities | N:M (via chunk_entities) |\n| entities | entities | N:M (via entity_relations) |\n| entities | taxonomy_terms | N:M (via entity_taxonomy_mapping) |\n\n## LLM Integration\n\n### Embedding Pipeline\n\n```python\n# Ollama Embedding\ndef embed_text(text: str) -> list[float]:\n response = ollama.embed(\n model=\"mxbai-embed-large\",\n input=text\n )\n return response[\"embeddings\"][0] # 1024 dimensions\n```\n\n### Analysis Pipeline\n\n```python\n# Semantic Analysis with Ollama or Anthropic\ndef analyze_chunk(chunk: str) -> dict:\n prompt = f\"\"\"Analyze this text:\n {chunk}\n\n Extract: keywords, sentiment, topics, entities\"\"\"\n\n if USE_ANTHROPIC:\n response = anthropic.messages.create(...)\n else:\n response = ollama.chat(model=\"llama3.2:3b\", ...)\n\n return parse_response(response)\n```\n\n## Qdrant Vector Storage\n\n### Collection Structure\n\n```python\n# Documents collection\n{\n \"id\": \"uuid\",\n \"vector\": [float; 1024],\n \"payload\": {\n \"chunk_id\": int,\n \"document_id\": int,\n \"content\": str,\n \"heading_path\": list[str],\n \"page_number\": int\n }\n}\n```\n\n### Search Query\n\n```python\nresults = client.search(\n collection_name=\"documents\",\n query_vector=query_embedding,\n limit=10,\n query_filter={\n \"must\": [\n {\"key\": \"document_id\", \"match\": {\"value\": 123}}\n ]\n }\n)\n```\n\n## Scaling Considerations\n\n### Batch Processing\n\n```python\n# Process documents in batches\nBATCH_SIZE = 10\n\nfor batch in chunks(documents, BATCH_SIZE):\n for doc in batch:\n process_document(doc)\n\n # Commit after each batch\n connection.commit()\n```\n\n### Queue-Based Processing\n\n```python\n# Async semantic processing via queue\nif not SEMANTIC_SYNC:\n queue_for_semantic_analysis(chunk_id)\n # Worker processes queue separately\n```\n\n### Parallel Embedding\n\n```python\n# Thread pool for embedding\nfrom concurrent.futures import ThreadPoolExecutor\n\nwith ThreadPoolExecutor(max_workers=4) as executor:\n embeddings = list(executor.map(embed_text, chunks))\n```\n\n## Error Handling\n\n### Retry Logic\n\n```python\nMAX_RETRIES = 3\nBACKOFF_BASE = 2\n\nfor attempt in range(MAX_RETRIES):\n try:\n result = process_step(document)\n break\n except Exception as e:\n if attempt == MAX_RETRIES - 1:\n mark_as_failed(document, str(e))\n raise\n time.sleep(BACKOFF_BASE ** attempt)\n```\n\n### Provenance Tracking\n\n```python\n# Track processing history\nINSERT INTO provenance (\n artifact_type, artifact_id,\n source_type, pipeline_step,\n model_used, prompt_version\n) VALUES (\n 'chunk', chunk_id,\n 'extraction', 'semantic_analyze',\n 'llama3.2:3b', '1.2'\n)\n```\n\n## Configuration Hierarchy\n\n```\n1. Environment Variables (.env)\n ↓\n2. config\/settings.py (defaults)\n ↓\n3. pipeline_configs table (runtime)\n ↓\n4. pipeline_steps table (per-step config)\n```\n\n## Monitoring\n\n### Key Metrics\n\n| Metric | Table\/Query |\n|--------|-------------|\n| Documents processed | `SELECT COUNT(*) FROM documents WHERE status='done'` |\n| Chunks created | `SELECT COUNT(*) FROM chunks` |\n| Embeddings stored | `SELECT COUNT(*) FROM chunks WHERE qdrant_id IS NOT NULL` |\n| Entities extracted | `SELECT COUNT(*) FROM entities` |\n| Pipeline runs | `SELECT * FROM pipeline_runs ORDER BY created_at DESC` |\n\n### Health Checks\n\n```python\ndef health_check():\n checks = {\n \"database\": check_db_connection(),\n \"qdrant\": check_qdrant_connection(),\n \"ollama\": check_ollama_status(),\n }\n return all(checks.values()), checks\n```\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/migration\/content-pipeline\/docs\/ARCHITECTURE.md",
"content": "# Content Pipeline - Architecture Documentation\n\n## System Components\n\n### 1. Document Processing Layer\n\n```\nDocuments (PDF, DOCX, PPTX, MD, TXT)\n │\n ▼\n ┌─────────────┐\n │ DETECT │ → Scans directory, hashes files\n └─────────────┘\n │\n ▼\n ┌─────────────┐\n │ EXTRACT │ → PyMuPDF, python-docx, python-pptx\n └─────────────┘\n │\n ▼\n ┌─────────────┐\n │ VISION │ → Tesseract OCR for images\n └─────────────┘\n │\n ▼\n ┌─────────────┐\n │ CHUNK │ → Semantic splitting with overlap\n └─────────────┘\n```\n\n### 2. Embedding Layer\n\n```\n Chunks (text segments)\n │\n ▼\n ┌─────────────┐\n │ EMBED │ → Ollama mxbai-embed-large (1024 dims)\n └─────────────┘\n │\n ▼\n ┌─────────────┐\n │ QDRANT │ → Vector storage & similarity search\n └─────────────┘\n```\n\n### 3. Semantic Analysis Layer\n\n```\n Chunks + Embeddings\n │\n ├──────────────────────────────────┐\n ▼ ▼\n ┌─────────────┐ ┌─────────────┐\n │ ENTITY │ │ SEMANTIC │\n │ EXTRACTOR │ │ ANALYZER │\n └─────────────┘ └─────────────┘\n │ │\n ▼ ▼\n ┌─────────────┐ ┌─────────────┐\n │ RELATION │ │ TAXONOMY │\n │ EXTRACTOR │ │ CLASSIFIER │\n └─────────────┘ └─────────────┘\n │ │\n └──────────────┬───────────────────┘\n ▼\n ┌─────────────┐\n │ KNOWLEDGE │\n │ GRAPH │\n └─────────────┘\n```\n\n## Data Flow\n\n### Document Lifecycle\n\n```\n1. PENDING → Document detected, awaiting processing\n2. IMPORTING → Being read from filesystem\n3. IMPORTED → Text extracted\n4. CHUNKING → Being split into chunks\n5. CHUNKED → Chunks created\n6. EMBEDDING → Generating vectors\n7. EMBEDDED → Vectors stored in Qdrant\n8. ENRICHING → Semantic analysis running\n9. ENRICHED → Entities\/relations extracted\n10. DONE → Fully processed\n```\n\n### Chunk Processing\n\n```\nChunk Status:\n- created → Initial state\n- embedding → Vector generation in progress\n- embedded → Vector stored in Qdrant\n- error → Processing failed\n- deprecated → Superseded by newer version\n```\n\n## Database Design\n\n### Entity-Relationship Overview\n\n```\ndocuments ─────┬───── document_pages\n │\n └───── chunks ─────┬───── chunk_semantics\n │ │\n │ └───── chunk_entities ───── entities\n │ │\n │ ├── entity_relations\n │ │\n └───── chunk_taxonomy ─────────── taxonomy_terms\n │\n entity_taxonomy_mapping\n```\n\n### Key Relationships\n\n| From | To | Relationship |\n|------|-----|--------------|\n| documents | chunks | 1:N |\n| documents | document_pages | 1:N |\n| chunks | chunk_semantics | 1:1 |\n| chunks | entities | N:M (via chunk_entities) |\n| entities | entities | N:M (via entity_relations) |\n| entities | taxonomy_terms | N:M (via entity_taxonomy_mapping) |\n\n## LLM Integration\n\n### Embedding Pipeline\n\n```python\n# Ollama Embedding\ndef embed_text(text: str) -> list[float]:\n response = ollama.embed(\n model=\"mxbai-embed-large\",\n input=text\n )\n return response[\"embeddings\"][0] # 1024 dimensions\n```\n\n### Analysis Pipeline\n\n```python\n# Semantic Analysis with Ollama or Anthropic\ndef analyze_chunk(chunk: str) -> dict:\n prompt = f\"\"\"Analyze this text:\n {chunk}\n\n Extract: keywords, sentiment, topics, entities\"\"\"\n\n if USE_ANTHROPIC:\n response = anthropic.messages.create(...)\n else:\n response = ollama.chat(model=\"llama3.2:3b\", ...)\n\n return parse_response(response)\n```\n\n## Qdrant Vector Storage\n\n### Collection Structure\n\n```python\n# Documents collection\n{\n \"id\": \"uuid\",\n \"vector\": [float; 1024],\n \"payload\": {\n \"chunk_id\": int,\n \"document_id\": int,\n \"content\": str,\n \"heading_path\": list[str],\n \"page_number\": int\n }\n}\n```\n\n### Search Query\n\n```python\nresults = client.search(\n collection_name=\"documents\",\n query_vector=query_embedding,\n limit=10,\n query_filter={\n \"must\": [\n {\"key\": \"document_id\", \"match\": {\"value\": 123}}\n ]\n }\n)\n```\n\n## Scaling Considerations\n\n### Batch Processing\n\n```python\n# Process documents in batches\nBATCH_SIZE = 10\n\nfor batch in chunks(documents, BATCH_SIZE):\n for doc in batch:\n process_document(doc)\n\n # Commit after each batch\n connection.commit()\n```\n\n### Queue-Based Processing\n\n```python\n# Async semantic processing via queue\nif not SEMANTIC_SYNC:\n queue_for_semantic_analysis(chunk_id)\n # Worker processes queue separately\n```\n\n### Parallel Embedding\n\n```python\n# Thread pool for embedding\nfrom concurrent.futures import ThreadPoolExecutor\n\nwith ThreadPoolExecutor(max_workers=4) as executor:\n embeddings = list(executor.map(embed_text, chunks))\n```\n\n## Error Handling\n\n### Retry Logic\n\n```python\nMAX_RETRIES = 3\nBACKOFF_BASE = 2\n\nfor attempt in range(MAX_RETRIES):\n try:\n result = process_step(document)\n break\n except Exception as e:\n if attempt == MAX_RETRIES - 1:\n mark_as_failed(document, str(e))\n raise\n time.sleep(BACKOFF_BASE ** attempt)\n```\n\n### Provenance Tracking\n\n```python\n# Track processing history\nINSERT INTO provenance (\n artifact_type, artifact_id,\n source_type, pipeline_step,\n model_used, prompt_version\n) VALUES (\n 'chunk', chunk_id,\n 'extraction', 'semantic_analyze',\n 'llama3.2:3b', '1.2'\n)\n```\n\n## Configuration Hierarchy\n\n```\n1. Environment Variables (.env)\n ↓\n2. config\/settings.py (defaults)\n ↓\n3. pipeline_configs table (runtime)\n ↓\n4. pipeline_steps table (per-step config)\n```\n\n## Monitoring\n\n### Key Metrics\n\n| Metric | Table\/Query |\n|--------|-------------|\n| Documents processed | `SELECT COUNT(*) FROM documents WHERE status='done'` |\n| Chunks created | `SELECT COUNT(*) FROM chunks` |\n| Embeddings stored | `SELECT COUNT(*) FROM chunks WHERE qdrant_id IS NOT NULL` |\n| Entities extracted | `SELECT COUNT(*) FROM entities` |\n| Pipeline runs | `SELECT * FROM pipeline_runs ORDER BY created_at DESC` |\n\n### Health Checks\n\n```python\ndef health_check():\n checks = {\n \"database\": check_db_connection(),\n \"qdrant\": check_qdrant_connection(),\n \"ollama\": check_ollama_status(),\n }\n return all(checks.values()), checks\n```\n",
"structuredPatch": [],
"originalFile": null
}
}