Protokoll #15165

ID15165
Zeitstempel2025-12-25 14:05:00.001771
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens2,157 (Input: 1,076, Output: 1,081)
Dauer419 ms
Request-Zeit2025-12-25 14:05:00.001771
Response-Zeit2025-12-25 14:05:00.420523

Request

{
    "event": "PreToolUse",
    "tool_name": "Write",
    "tool_input": {
        "file_path": "\/var\/www\/scripts\/pipeline\/semantic_chunk\/pipeline.py",
        "content": "\"\"\"\nMain pipeline orchestration for semantic chunk analysis.\n\"\"\"\n\nimport sys\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom db import db\n\nfrom .analyzers import EntityExtractor, RelationExtractor, SemanticsAnalyzer, TaxonomyClassifier\nfrom .ollama_service import OllamaService\nfrom .repositories import ChunkRepository, EntityRepository, SemanticsRepository, TaxonomyRepository\n\nBATCH_SIZE = 10\n\n\nclass SemanticChunkPipeline:\n    \"\"\"Orchestriert die semantische Chunk-Analyse.\"\"\"\n\n    def __init__(self):\n        self.llm = OllamaService()\n        self.semantics_analyzer = SemanticsAnalyzer(self.llm)\n        self.entity_extractor = EntityExtractor(self.llm)\n        self.relation_extractor = RelationExtractor(self.llm)\n        self.taxonomy_classifier = TaxonomyClassifier(self.llm)\n\n        self.chunk_repo = ChunkRepository()\n        self.semantics_repo = SemanticsRepository()\n        self.entity_repo = EntityRepository()\n        self.taxonomy_repo = TaxonomyRepository()\n\n        self.taxonomy_repo.ensure_terms_exist(TaxonomyClassifier.CATEGORIES)\n\n    def analyze_chunk(self, chunk: dict) -> dict:\n        \"\"\"Analysiere einen einzelnen Chunk.\"\"\"\n        chunk_id = chunk[\"id\"]\n        text = chunk[\"content\"]\n\n        results = {\"chunk_id\": chunk_id, \"success\": False}\n\n        # 1. Semantik-Analyse\n        semantics = self.semantics_analyzer.analyze(chunk_id, text)\n        if semantics:\n            self.semantics_repo.save_semantics(semantics)\n            results[\"semantics\"] = True\n\n        # 2. Entity-Extraktion\n        entities = self.entity_extractor.extract(text)\n        entity_ids = {}\n        for entity in entities:\n            entity_id = self.entity_repo.find_or_create(entity)\n            self.entity_repo.link_to_chunk(chunk_id, entity_id)\n            entity_ids[entity.name] = entity_id\n        results[\"entities\"] = len(entities)\n\n        # 3. Relation-Extraktion\n        if len(entities) >= 2:\n            relations = self.relation_extractor.extract(text, entities)\n            for rel in relations:\n                source_id = entity_ids.get(rel.source)\n                target_id = entity_ids.get(rel.target)\n                if source_id and target_id:\n                    self.entity_repo.save_relation(source_id, target_id, rel, chunk_id)\n            results[\"relations\"] = len(relations)\n\n        # 4. Taxonomie-Klassifikation\n        categories, confidence = self.taxonomy_classifier.classify(text)\n        for cat in categories:\n            self.taxonomy_repo.link_to_chunk(chunk_id, cat, confidence)\n        results[\"categories\"] = categories\n\n        results[\"success\"] = True\n        return results\n\n    def run(self, limit: int = BATCH_SIZE) -> dict:\n        \"\"\"Führe Pipeline aus.\"\"\"\n        db.log(\"INFO\", f\"Starting semantic chunk analysis (limit={limit})\")\n\n        chunks = self.chunk_repo.get_pending_chunks(limit)\n        if not chunks:\n            print(\"Keine Chunks zur Analyse gefunden.\")\n            return {\"processed\": 0}\n\n        print(f\"Analysiere {len(chunks)} Chunks...\")\n\n        success = 0\n        failed = 0\n\n        for i, chunk in enumerate(chunks, 1):\n            print(f\"[{i}\/{len(chunks)}] Chunk #{chunk['id']}...\", end=\" \")\n\n            try:\n                result = self.analyze_chunk(chunk)\n                if result[\"success\"]:\n                    print(f\"✓ {result.get('entities', 0)} Entitäten, {result.get('categories', [])}\")\n                    success += 1\n                else:\n                    print(\"✗ Fehler\")\n                    failed += 1\n            except Exception as e:\n                print(f\"✗ {e}\")\n                db.log(\"ERROR\", f\"Chunk {chunk['id']} failed: {e}\")\n                failed += 1\n\n        db.log(\"INFO\", f\"Analysis complete: {success} success, {failed} failed\")\n        return {\"processed\": len(chunks), \"success\": success, \"failed\": failed}\n\n    def status(self) -> dict:\n        \"\"\"Zeige Status.\"\"\"\n        return self.chunk_repo.get_stats()\n"
    }
}

Response

{
    "tool_response": {
        "type": "create",
        "filePath": "\/var\/www\/scripts\/pipeline\/semantic_chunk\/pipeline.py",
        "content": "\"\"\"\nMain pipeline orchestration for semantic chunk analysis.\n\"\"\"\n\nimport sys\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom db import db\n\nfrom .analyzers import EntityExtractor, RelationExtractor, SemanticsAnalyzer, TaxonomyClassifier\nfrom .ollama_service import OllamaService\nfrom .repositories import ChunkRepository, EntityRepository, SemanticsRepository, TaxonomyRepository\n\nBATCH_SIZE = 10\n\n\nclass SemanticChunkPipeline:\n    \"\"\"Orchestriert die semantische Chunk-Analyse.\"\"\"\n\n    def __init__(self):\n        self.llm = OllamaService()\n        self.semantics_analyzer = SemanticsAnalyzer(self.llm)\n        self.entity_extractor = EntityExtractor(self.llm)\n        self.relation_extractor = RelationExtractor(self.llm)\n        self.taxonomy_classifier = TaxonomyClassifier(self.llm)\n\n        self.chunk_repo = ChunkRepository()\n        self.semantics_repo = SemanticsRepository()\n        self.entity_repo = EntityRepository()\n        self.taxonomy_repo = TaxonomyRepository()\n\n        self.taxonomy_repo.ensure_terms_exist(TaxonomyClassifier.CATEGORIES)\n\n    def analyze_chunk(self, chunk: dict) -> dict:\n        \"\"\"Analysiere einen einzelnen Chunk.\"\"\"\n        chunk_id = chunk[\"id\"]\n        text = chunk[\"content\"]\n\n        results = {\"chunk_id\": chunk_id, \"success\": False}\n\n        # 1. Semantik-Analyse\n        semantics = self.semantics_analyzer.analyze(chunk_id, text)\n        if semantics:\n            self.semantics_repo.save_semantics(semantics)\n            results[\"semantics\"] = True\n\n        # 2. Entity-Extraktion\n        entities = self.entity_extractor.extract(text)\n        entity_ids = {}\n        for entity in entities:\n            entity_id = self.entity_repo.find_or_create(entity)\n            self.entity_repo.link_to_chunk(chunk_id, entity_id)\n            entity_ids[entity.name] = entity_id\n        results[\"entities\"] = len(entities)\n\n        # 3. Relation-Extraktion\n        if len(entities) >= 2:\n            relations = self.relation_extractor.extract(text, entities)\n            for rel in relations:\n                source_id = entity_ids.get(rel.source)\n                target_id = entity_ids.get(rel.target)\n                if source_id and target_id:\n                    self.entity_repo.save_relation(source_id, target_id, rel, chunk_id)\n            results[\"relations\"] = len(relations)\n\n        # 4. Taxonomie-Klassifikation\n        categories, confidence = self.taxonomy_classifier.classify(text)\n        for cat in categories:\n            self.taxonomy_repo.link_to_chunk(chunk_id, cat, confidence)\n        results[\"categories\"] = categories\n\n        results[\"success\"] = True\n        return results\n\n    def run(self, limit: int = BATCH_SIZE) -> dict:\n        \"\"\"Führe Pipeline aus.\"\"\"\n        db.log(\"INFO\", f\"Starting semantic chunk analysis (limit={limit})\")\n\n        chunks = self.chunk_repo.get_pending_chunks(limit)\n        if not chunks:\n            print(\"Keine Chunks zur Analyse gefunden.\")\n            return {\"processed\": 0}\n\n        print(f\"Analysiere {len(chunks)} Chunks...\")\n\n        success = 0\n        failed = 0\n\n        for i, chunk in enumerate(chunks, 1):\n            print(f\"[{i}\/{len(chunks)}] Chunk #{chunk['id']}...\", end=\" \")\n\n            try:\n                result = self.analyze_chunk(chunk)\n                if result[\"success\"]:\n                    print(f\"✓ {result.get('entities', 0)} Entitäten, {result.get('categories', [])}\")\n                    success += 1\n                else:\n                    print(\"✗ Fehler\")\n                    failed += 1\n            except Exception as e:\n                print(f\"✗ {e}\")\n                db.log(\"ERROR\", f\"Chunk {chunk['id']} failed: {e}\")\n                failed += 1\n\n        db.log(\"INFO\", f\"Analysis complete: {success} success, {failed} failed\")\n        return {\"processed\": len(chunks), \"success\": success, \"failed\": failed}\n\n    def status(self) -> dict:\n        \"\"\"Zeige Status.\"\"\"\n        return self.chunk_repo.get_stats()\n",
        "structuredPatch": [],
        "originalFile": null
    }
}
← Vorheriger Zur Liste Nächster →