{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/semantic_chunk\/pipeline.py",
"content": "\"\"\"\nMain pipeline orchestration for semantic chunk analysis.\n\"\"\"\n\nimport sys\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom db import db\n\nfrom .analyzers import EntityExtractor, RelationExtractor, SemanticsAnalyzer, TaxonomyClassifier\nfrom .ollama_service import OllamaService\nfrom .repositories import ChunkRepository, EntityRepository, SemanticsRepository, TaxonomyRepository\n\nBATCH_SIZE = 10\n\n\nclass SemanticChunkPipeline:\n \"\"\"Orchestriert die semantische Chunk-Analyse.\"\"\"\n\n def __init__(self):\n self.llm = OllamaService()\n self.semantics_analyzer = SemanticsAnalyzer(self.llm)\n self.entity_extractor = EntityExtractor(self.llm)\n self.relation_extractor = RelationExtractor(self.llm)\n self.taxonomy_classifier = TaxonomyClassifier(self.llm)\n\n self.chunk_repo = ChunkRepository()\n self.semantics_repo = SemanticsRepository()\n self.entity_repo = EntityRepository()\n self.taxonomy_repo = TaxonomyRepository()\n\n self.taxonomy_repo.ensure_terms_exist(TaxonomyClassifier.CATEGORIES)\n\n def analyze_chunk(self, chunk: dict) -> dict:\n \"\"\"Analysiere einen einzelnen Chunk.\"\"\"\n chunk_id = chunk[\"id\"]\n text = chunk[\"content\"]\n\n results = {\"chunk_id\": chunk_id, \"success\": False}\n\n # 1. Semantik-Analyse\n semantics = self.semantics_analyzer.analyze(chunk_id, text)\n if semantics:\n self.semantics_repo.save_semantics(semantics)\n results[\"semantics\"] = True\n\n # 2. Entity-Extraktion\n entities = self.entity_extractor.extract(text)\n entity_ids = {}\n for entity in entities:\n entity_id = self.entity_repo.find_or_create(entity)\n self.entity_repo.link_to_chunk(chunk_id, entity_id)\n entity_ids[entity.name] = entity_id\n results[\"entities\"] = len(entities)\n\n # 3. Relation-Extraktion\n if len(entities) >= 2:\n relations = self.relation_extractor.extract(text, entities)\n for rel in relations:\n source_id = entity_ids.get(rel.source)\n target_id = entity_ids.get(rel.target)\n if source_id and target_id:\n self.entity_repo.save_relation(source_id, target_id, rel, chunk_id)\n results[\"relations\"] = len(relations)\n\n # 4. Taxonomie-Klassifikation\n categories, confidence = self.taxonomy_classifier.classify(text)\n for cat in categories:\n self.taxonomy_repo.link_to_chunk(chunk_id, cat, confidence)\n results[\"categories\"] = categories\n\n results[\"success\"] = True\n return results\n\n def run(self, limit: int = BATCH_SIZE) -> dict:\n \"\"\"Führe Pipeline aus.\"\"\"\n db.log(\"INFO\", f\"Starting semantic chunk analysis (limit={limit})\")\n\n chunks = self.chunk_repo.get_pending_chunks(limit)\n if not chunks:\n print(\"Keine Chunks zur Analyse gefunden.\")\n return {\"processed\": 0}\n\n print(f\"Analysiere {len(chunks)} Chunks...\")\n\n success = 0\n failed = 0\n\n for i, chunk in enumerate(chunks, 1):\n print(f\"[{i}\/{len(chunks)}] Chunk #{chunk['id']}...\", end=\" \")\n\n try:\n result = self.analyze_chunk(chunk)\n if result[\"success\"]:\n print(f\"✓ {result.get('entities', 0)} Entitäten, {result.get('categories', [])}\")\n success += 1\n else:\n print(\"✗ Fehler\")\n failed += 1\n except Exception as e:\n print(f\"✗ {e}\")\n db.log(\"ERROR\", f\"Chunk {chunk['id']} failed: {e}\")\n failed += 1\n\n db.log(\"INFO\", f\"Analysis complete: {success} success, {failed} failed\")\n return {\"processed\": len(chunks), \"success\": success, \"failed\": failed}\n\n def status(self) -> dict:\n \"\"\"Zeige Status.\"\"\"\n return self.chunk_repo.get_stats()\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/scripts\/pipeline\/semantic_chunk\/pipeline.py",
"content": "\"\"\"\nMain pipeline orchestration for semantic chunk analysis.\n\"\"\"\n\nimport sys\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom db import db\n\nfrom .analyzers import EntityExtractor, RelationExtractor, SemanticsAnalyzer, TaxonomyClassifier\nfrom .ollama_service import OllamaService\nfrom .repositories import ChunkRepository, EntityRepository, SemanticsRepository, TaxonomyRepository\n\nBATCH_SIZE = 10\n\n\nclass SemanticChunkPipeline:\n \"\"\"Orchestriert die semantische Chunk-Analyse.\"\"\"\n\n def __init__(self):\n self.llm = OllamaService()\n self.semantics_analyzer = SemanticsAnalyzer(self.llm)\n self.entity_extractor = EntityExtractor(self.llm)\n self.relation_extractor = RelationExtractor(self.llm)\n self.taxonomy_classifier = TaxonomyClassifier(self.llm)\n\n self.chunk_repo = ChunkRepository()\n self.semantics_repo = SemanticsRepository()\n self.entity_repo = EntityRepository()\n self.taxonomy_repo = TaxonomyRepository()\n\n self.taxonomy_repo.ensure_terms_exist(TaxonomyClassifier.CATEGORIES)\n\n def analyze_chunk(self, chunk: dict) -> dict:\n \"\"\"Analysiere einen einzelnen Chunk.\"\"\"\n chunk_id = chunk[\"id\"]\n text = chunk[\"content\"]\n\n results = {\"chunk_id\": chunk_id, \"success\": False}\n\n # 1. Semantik-Analyse\n semantics = self.semantics_analyzer.analyze(chunk_id, text)\n if semantics:\n self.semantics_repo.save_semantics(semantics)\n results[\"semantics\"] = True\n\n # 2. Entity-Extraktion\n entities = self.entity_extractor.extract(text)\n entity_ids = {}\n for entity in entities:\n entity_id = self.entity_repo.find_or_create(entity)\n self.entity_repo.link_to_chunk(chunk_id, entity_id)\n entity_ids[entity.name] = entity_id\n results[\"entities\"] = len(entities)\n\n # 3. Relation-Extraktion\n if len(entities) >= 2:\n relations = self.relation_extractor.extract(text, entities)\n for rel in relations:\n source_id = entity_ids.get(rel.source)\n target_id = entity_ids.get(rel.target)\n if source_id and target_id:\n self.entity_repo.save_relation(source_id, target_id, rel, chunk_id)\n results[\"relations\"] = len(relations)\n\n # 4. Taxonomie-Klassifikation\n categories, confidence = self.taxonomy_classifier.classify(text)\n for cat in categories:\n self.taxonomy_repo.link_to_chunk(chunk_id, cat, confidence)\n results[\"categories\"] = categories\n\n results[\"success\"] = True\n return results\n\n def run(self, limit: int = BATCH_SIZE) -> dict:\n \"\"\"Führe Pipeline aus.\"\"\"\n db.log(\"INFO\", f\"Starting semantic chunk analysis (limit={limit})\")\n\n chunks = self.chunk_repo.get_pending_chunks(limit)\n if not chunks:\n print(\"Keine Chunks zur Analyse gefunden.\")\n return {\"processed\": 0}\n\n print(f\"Analysiere {len(chunks)} Chunks...\")\n\n success = 0\n failed = 0\n\n for i, chunk in enumerate(chunks, 1):\n print(f\"[{i}\/{len(chunks)}] Chunk #{chunk['id']}...\", end=\" \")\n\n try:\n result = self.analyze_chunk(chunk)\n if result[\"success\"]:\n print(f\"✓ {result.get('entities', 0)} Entitäten, {result.get('categories', [])}\")\n success += 1\n else:\n print(\"✗ Fehler\")\n failed += 1\n except Exception as e:\n print(f\"✗ {e}\")\n db.log(\"ERROR\", f\"Chunk {chunk['id']} failed: {e}\")\n failed += 1\n\n db.log(\"INFO\", f\"Analysis complete: {success} success, {failed} failed\")\n return {\"processed\": len(chunks), \"success\": success, \"failed\": failed}\n\n def status(self) -> dict:\n \"\"\"Zeige Status.\"\"\"\n return self.chunk_repo.get_stats()\n",
"structuredPatch": [],
"originalFile": null
}
}