{
"event": "PreToolUse",
"tool_name": "mcp__mcp-tasks__tasks_create",
"tool_input": {
"title": "Progressive Pipeline Architecture: Layer-basierte Verarbeitung",
"type": "ai_task",
"description": "## Ziel\nUmstellung auf Progressive Enhancement Pipeline: Dokumente sind sofort nach Embedding durchsuchbar, Semantic-Layer läuft asynchron danach.\n\n## Architektur\n\n### Layer-Modell\n```\nLayer 1: IMPORT → documents (1:1 Datei-Import)\nLayer 2: STRUKTUR → document_pages, chunks (Zerlegung)\nLayer 3: VEKTOR → Qdrant Embeddings (AB HIER SUCHBAR!)\nLayer 4: SEMANTIK → entities, relations, ontology, taxonomy (async)\n```\n\n### Status-Flow\n```\nDocument: pending → imported → chunked → embedded → enriched\nChunk: created → embedded → (enriched via chunk_entities etc.)\n```\n\n## Implementierungsplan\n\n### Phase 1: Schema-Erweiterung\n\n**1.1 documents.status erweitern**\n```sql\nALTER TABLE documents MODIFY COLUMN status \n ENUM('pending','importing','imported','chunking','chunked',\n 'embedding','embedded','enriching','enriched','error') \n DEFAULT 'pending';\n```\n\n**1.2 documents.semantic_status hinzufügen (optional, für Klarheit)**\n```sql\nALTER TABLE documents ADD COLUMN semantic_status \n ENUM('pending','processing','partial','complete','error') \n DEFAULT 'pending' AFTER status;\n```\n\n### Phase 2: Pipeline-Refactoring\n\n**2.1 Neue Pipeline-Steps (modular)**\n```\n\/var\/www\/scripts\/pipeline\/\n├── step_import.py # Layer 1: Datei → documents\n├── step_structure.py # Layer 2: pages + chunks erstellen \n├── step_embed.py # Layer 3: Qdrant Embeddings (existiert)\n├── step_semantic.py # Layer 4: Entity\/Relation\/Taxonomy (NEU)\n└── pipeline.py # Orchestrierung\n```\n\n**2.2 pipeline.py Änderungen**\n```python\ndef process_file(file_path, progress=None):\n # Layer 1-3: Schneller Pfad (blockierend)\n doc_id = import_document(file_path) # Layer 1\n page_map = create_structure(doc_id) # Layer 2\n embed_chunks(doc_id) # Layer 3\n update_status(doc_id, 'embedded') # <-- AB HIER SUCHBAR\n \n # Layer 4: Semantic (async oder direkt)\n if config.semantic_sync:\n run_semantic_analysis(doc_id) # Layer 4\n update_status(doc_id, 'enriched')\n else:\n queue_semantic_analysis(doc_id) # Async Queue\n```\n\n**2.3 step_semantic.py (NEU)**\n```python\nclass SemanticStep:\n def execute(self, doc_id):\n # Entity Extraction\n entities = extract_entities(doc_id)\n store_entities(doc_id, entities)\n \n # Relation Extraction \n relations = extract_relations(doc_id, entities)\n store_relations(relations)\n \n # Taxonomy Classification\n classify_taxonomy(doc_id)\n \n # Ontology Classification\n classify_ontology(doc_id)\n \n # Chunk-Entity Linking\n link_chunk_entities(doc_id)\n \n return {\"entities\": len(entities), \"relations\": len(relations)}\n```\n\n### Phase 3: Query-Logik (Chat\/Content Studio)\n\n**3.1 ChatService.php anpassen**\n```php\npublic function getContext(string $query, array $collections): array\n{\n \/\/ Layer 3: Immer - Vektor-Suche\n $chunks = $this->qdrantSearch($query, $collections);\n \n \/\/ Layer 4: Wenn verfügbar - Semantic Enrichment\n foreach ($chunks as &$chunk) {\n $chunk['entities'] = $this->getChunkEntities($chunk['id']) ?? [];\n $chunk['taxonomy'] = $this->getChunkTaxonomy($chunk['id']) ?? [];\n }\n \n \/\/ Optional: Entity-basierte Erweiterung\n $queryEntities = $this->extractQueryEntities($query);\n if ($queryEntities) {\n $relatedChunks = $this->findViaOntology($queryEntities);\n $chunks = array_merge($chunks, $relatedChunks);\n }\n \n return $chunks;\n}\n```\n\n**3.2 Neue Repository-Methoden**\n```php\n\/\/ ChunkRepository.php\npublic function getChunkEntities(int $chunkId): array;\npublic function getChunkTaxonomy(int $chunkId): array;\npublic function findChunksByEntity(int $entityId): array;\n```\n\n### Phase 4: Async Semantic Queue\n\n**4.1 semantic_queue Tabelle**\n```sql\nCREATE TABLE semantic_queue (\n id INT AUTO_INCREMENT PRIMARY KEY,\n document_id INT NOT NULL,\n priority INT DEFAULT 5,\n status ENUM('pending','processing','completed','failed') DEFAULT 'pending',\n retry_count INT DEFAULT 0,\n created_at DATETIME DEFAULT CURRENT_TIMESTAMP,\n started_at DATETIME NULL,\n completed_at DATETIME NULL,\n error_message TEXT NULL,\n FOREIGN KEY (document_id) REFERENCES documents(id)\n);\n```\n\n**4.2 Semantic Worker (Cron\/Service)**\n```python\n# semantic_worker.py\ndef process_queue():\n items = db.get_pending_semantic_items(limit=5)\n for item in items:\n try:\n db.update_semantic_status(item['id'], 'processing')\n run_semantic_analysis(item['document_id'])\n db.update_semantic_status(item['id'], 'completed')\n except Exception as e:\n db.update_semantic_status(item['id'], 'failed', str(e))\n```\n\n### Phase 5: CLI Commands\n\n```bash\n# Manueller Semantic-Run\npython pipeline.py semantic <document_id>\npython pipeline.py semantic --all-pending\n\n# Status prüfen\npython pipeline.py status\n# Output:\n# Documents: 10 (8 embedded, 5 enriched)\n# Semantic Queue: 3 pending\n```\n\n## Dateien zu ändern\n\n| Datei | Änderung |\n|-------|----------|\n| `db.py` | Status-Enum, Queue-Methoden |\n| `pipeline.py` | Layer-Trennung, Async-Option |\n| `step_semantic.py` | NEU: Semantic-Step |\n| `ChatService.php` | Graceful Degradation Query |\n| `ChunkRepository.php` | Entity\/Taxonomy Getter |\n\n## Qualitätskriterien\n\n1. ✅ Chat funktioniert ab Layer 3 (nur Chunks)\n2. ✅ Chat nutzt Semantic-Daten wenn verfügbar\n3. ✅ Semantic-Analyse kann async laufen\n4. ✅ Manueller Trigger möglich\n5. ✅ Fehler in Layer 4 blockieren nicht Layer 1-3\n\n## Testfälle\n\n```python\n# Test 1: Chat mit nur Chunks (Layer 3)\n# Erwartung: Antwort basiert auf Chunk-Suche\n\n# Test 2: Chat mit Entities (Layer 4)\n# Erwartung: Antwort enthält Entity-Referenzen\n\n# Test 3: Partial Semantic\n# Erwartung: Entities da, Taxonomy fehlt → Entities werden genutzt\n```\n\n## Abhängigkeiten\n- Task #419 (document_pages Integration) ✅ abgeschlossen\n- Qdrant Collection muss existieren\n- Anthropic API für Entity-Extraktion"
}
}
{
"tool_response": [
{
"type": "text",
"text": "{\n \"success\": true,\n \"task\": {\n \"id\": 420,\n \"uuid\": \"71a52c06-1766-46e5-8b33-56a5ac335522\",\n \"title\": \"Progressive Pipeline Architecture: Layer-basierte Verarbeitung\",\n \"description\": \"## Ziel\\nUmstellung auf Progressive Enhancement Pipeline: Dokumente sind sofort nach Embedding durchsuchbar, Semantic-Layer läuft asynchron danach.\\n\\n## Architektur\\n\\n### Layer-Modell\\n```\\nLayer 1: IMPORT → documents (1:1 Datei-Import)\\nLayer 2: STRUKTUR → document_pages, chunks (Zerlegung)\\nLayer 3: VEKTOR → Qdrant Embeddings (AB HIER SUCHBAR!)\\nLayer 4: SEMANTIK → entities, relations, ontology, taxonomy (async)\\n```\\n\\n### Status-Flow\\n```\\nDocument: pending → imported → chunked → embedded → enriched\\nChunk: created → embedded → (enriched via chunk_entities etc.)\\n```\\n\\n## Implementierungsplan\\n\\n### Phase 1: Schema-Erweiterung\\n\\n**1.1 documents.status erweitern**\\n```sql\\nALTER TABLE documents MODIFY COLUMN status \\n ENUM('pending','importing','imported','chunking','chunked',\\n 'embedding','embedded','enriching','enriched','error') \\n DEFAULT 'pending';\\n```\\n\\n**1.2 documents.semantic_status hinzufügen (optional, für Klarheit)**\\n```sql\\nALTER TABLE documents ADD COLUMN semantic_status \\n ENUM('pending','processing','partial','complete','error') \\n DEFAULT 'pending' AFTER status;\\n```\\n\\n### Phase 2: Pipeline-Refactoring\\n\\n**2.1 Neue Pipeline-Steps (modular)**\\n```\\n\/var\/www\/scripts\/pipeline\/\\n├── step_import.py # Layer 1: Datei → documents\\n├── step_structure.py # Layer 2: pages + chunks erstellen \\n├── step_embed.py # Layer 3: Qdrant Embeddings (existiert)\\n├── step_semantic.py # Layer 4: Entity\/Relation\/Taxonomy (NEU)\\n└── pipeline.py # Orchestrierung\\n```\\n\\n**2.2 pipeline.py Änderungen**\\n```python\\ndef process_file(file_path, progress=None):\\n # Layer 1-3: Schneller Pfad (blockierend)\\n doc_id = import_document(file_path) # Layer 1\\n page_map = create_structure(doc_id) # Layer 2\\n embed_chunks(doc_id) # Layer 3\\n update_status(doc_id, 'embedded') # <-- AB HIER SUCHBAR\\n \\n # Layer 4: Semantic (async oder direkt)\\n if config.semantic_sync:\\n run_semantic_analysis(doc_id) # Layer 4\\n update_status(doc_id, 'enriched')\\n else:\\n queue_semantic_analysis(doc_id) # Async Queue\\n```\\n\\n**2.3 step_semantic.py (NEU)**\\n```python\\nclass SemanticStep:\\n def execute(self, doc_id):\\n # Entity Extraction\\n entities = extract_entities(doc_id)\\n store_entities(doc_id, entities)\\n \\n # Relation Extraction \\n relations = extract_relations(doc_id, entities)\\n store_relations(relations)\\n \\n # Taxonomy Classification\\n classify_taxonomy(doc_id)\\n \\n # Ontology Classification\\n classify_ontology(doc_id)\\n \\n # Chunk-Entity Linking\\n link_chunk_entities(doc_id)\\n \\n return {\\\"entities\\\": len(entities), \\\"relations\\\": len(relations)}\\n```\\n\\n### Phase 3: Query-Logik (Chat\/Content Studio)\\n\\n**3.1 ChatService.php anpassen**\\n```php\\npublic function getContext(string $query, array $collections): array\\n{\\n \/\/ Layer 3: Immer - Vektor-Suche\\n $chunks = $this->qdrantSearch($query, $collections);\\n \\n \/\/ Layer 4: Wenn verfügbar - Semantic Enrichment\\n foreach ($chunks as &$chunk) {\\n $chunk['entities'] = $this->getChunkEntities($chunk['id']) ?? [];\\n $chunk['taxonomy'] = $this->getChunkTaxonomy($chunk['id']) ?? [];\\n }\\n \\n \/\/ Optional: Entity-basierte Erweiterung\\n $queryEntities = $this->extractQueryEntities($query);\\n if ($queryEntities) {\\n $relatedChunks = $this->findViaOntology($queryEntities);\\n $chunks = array_merge($chunks, $relatedChunks);\\n }\\n \\n return $chunks;\\n}\\n```\\n\\n**3.2 Neue Repository-Methoden**\\n```php\\n\/\/ ChunkRepository.php\\npublic function getChunkEntities(int $chunkId): array;\\npublic function getChunkTaxonomy(int $chunkId): array;\\npublic function findChunksByEntity(int $entityId): array;\\n```\\n\\n### Phase 4: Async Semantic Queue\\n\\n**4.1 semantic_queue Tabelle**\\n```sql\\nCREATE TABLE semantic_queue (\\n id INT AUTO_INCREMENT PRIMARY KEY,\\n document_id INT NOT NULL,\\n priority INT DEFAULT 5,\\n status ENUM('pending','processing','completed','failed') DEFAULT 'pending',\\n retry_count INT DEFAULT 0,\\n created_at DATETIME DEFAULT CURRENT_TIMESTAMP,\\n started_at DATETIME NULL,\\n completed_at DATETIME NULL,\\n error_message TEXT NULL,\\n FOREIGN KEY (document_id) REFERENCES documents(id)\\n);\\n```\\n\\n**4.2 Semantic Worker (Cron\/Service)**\\n```python\\n# semantic_worker.py\\ndef process_queue():\\n items = db.get_pending_semantic_items(limit=5)\\n for item in items:\\n try:\\n db.update_semantic_status(item['id'], 'processing')\\n run_semantic_analysis(item['document_id'])\\n db.update_semantic_status(item['id'], 'completed')\\n except Exception as e:\\n db.update_semantic_status(item['id'], 'failed', str(e))\\n```\\n\\n### Phase 5: CLI Commands\\n\\n```bash\\n# Manueller Semantic-Run\\npython pipeline.py semantic <document_id>\\npython pipeline.py semantic --all-pending\\n\\n# Status prüfen\\npython pipeline.py status\\n# Output:\\n# Documents: 10 (8 embedded, 5 enriched)\\n# Semantic Queue: 3 pending\\n```\\n\\n## Dateien zu ändern\\n\\n| Datei | Änderung |\\n|-------|----------|\\n| `db.py` | Status-Enum, Queue-Methoden |\\n| `pipeline.py` | Layer-Trennung, Async-Option |\\n| `step_semantic.py` | NEU: Semantic-Step |\\n| `ChatService.php` | Graceful Degradation Query |\\n| `ChunkRepository.php` | Entity\/Taxonomy Getter |\\n\\n## Qualitätskriterien\\n\\n1. ✅ Chat funktioniert ab Layer 3 (nur Chunks)\\n2. ✅ Chat nutzt Semantic-Daten wenn verfügbar\\n3. ✅ Semantic-Analyse kann async laufen\\n4. ✅ Manueller Trigger möglich\\n5. ✅ Fehler in Layer 4 blockieren nicht Layer 1-3\\n\\n## Testfälle\\n\\n```python\\n# Test 1: Chat mit nur Chunks (Layer 3)\\n# Erwartung: Antwort basiert auf Chunk-Suche\\n\\n# Test 2: Chat mit Entities (Layer 4)\\n# Erwartung: Antwort enthält Entity-Referenzen\\n\\n# Test 3: Partial Semantic\\n# Erwartung: Entities da, Taxonomy fehlt → Entities werden genutzt\\n```\\n\\n## Abhängigkeiten\\n- Task #419 (document_pages Integration) ✅ abgeschlossen\\n- Qdrant Collection muss existieren\\n- Anthropic API für Entity-Extraktion\",\n \"type\": \"ai_task\",\n \"status\": \"pending\",\n \"created_by\": \"mcp-tasks\",\n \"created_by_type\": \"ai\",\n \"parent_task_id\": null,\n \"due_date\": null,\n \"created_at\": \"2025-12-27T00:01:52.075843\",\n \"updated_at\": \"2025-12-27T00:01:52.075845\",\n \"completed_at\": null,\n \"metadata\": {}\n },\n \"message\": \"Task #420 created\"\n}"
}
]
}