Protokoll #31127

ID31127
Zeitstempel2025-12-31 02:28:23.377465
Clientroot
IP145.224.72.245
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens2,709 (Input: 29, Output: 2,680)
Dauer184 ms
Request-Zeit2025-12-31 02:28:23.377465
Response-Zeit2025-12-31 02:28:23.561595

Request

{
    "event": "PreToolUse",
    "tool_name": "Read",
    "tool_input": {
        "file_path": "\/var\/www\/scripts\/pipeline\/db_semantic.py"
    }
}

Response

{
    "tool_response": {
        "type": "text",
        "file": {
            "filePath": "\/var\/www\/scripts\/pipeline\/db_semantic.py",
            "content": "\"\"\"\nDatabase Semantic Mixin\n\nSingle Responsibility: Semantic operations (entity types, stopwords, taxonomy, synonyms).\n\"\"\"\n\nimport re\nimport unicodedata\n\n\nclass SemanticMixin:\n    \"\"\"Mixin for semantic operations.\n\n    Provides:\n    - Entity Types: get_entity_types, get_entity_type_codes, build_entity_prompt_categories\n    - Stopwords: get_stopwords, is_stopword, _normalize_stopword\n    - Synonyms: find_entity_by_synonym, add_synonym\n    - Chunk Taxonomy: add_chunk_taxonomy, get_chunk_taxonomies\n    - Entity Taxonomy: add_entity_taxonomy, get_entity_taxonomies, get_taxonomy_terms\n    \"\"\"\n\n    # ========== Entity Types ==========\n\n    def get_entity_types(self, active_only: bool = True) -> list[dict]:\n        \"\"\"Get all entity types from database.\n\n        Args:\n            active_only: Only return active types\n\n        Returns:\n            List of dicts with code, name, description, criteria, indicators, examples\n        \"\"\"\n        query = \"\"\"SELECT code, name, description, criteria, indicators, examples\n                   FROM entity_types\"\"\"\n        if active_only:\n            query += \" WHERE is_active = 1\"\n        query += \" ORDER BY sort_order\"\n\n        cursor = self.execute(query)\n        results = cursor.fetchall()\n        cursor.close()\n        return list(results) if results else []\n\n    def get_entity_type_codes(self) -> set[str]:\n        \"\"\"Get set of valid entity type codes.\n\n        Returns:\n            Set of active entity type codes\n        \"\"\"\n        cursor = self.execute(\"SELECT code FROM entity_types WHERE is_active = 1\")\n        results = cursor.fetchall()\n        cursor.close()\n        return {r[\"code\"] for r in results} if results else set()\n\n    def build_entity_prompt_categories(self) -> str:\n        \"\"\"Build categories section for entity extraction prompt from DB.\n\n        Returns:\n            Formatted string of entity categories for prompts\n        \"\"\"\n        types = self.get_entity_types()\n        lines = []\n        for t in types:\n            lines.append(f\"  {t['code']}: {t['criteria']}\")\n        return \"\\n\".join(lines)\n\n    # ========== Stopwords ==========\n\n    def get_stopwords(self, active_only: bool = True) -> list[str]:\n        \"\"\"Get list of stopword canonical forms for entity filtering.\n\n        Args:\n            active_only: Only return active stopwords\n\n        Returns:\n            List of canonical stopword strings (lowercase, normalized)\n        \"\"\"\n        query = \"SELECT canonical_form FROM stopwords\"\n        if active_only:\n            query += \" WHERE is_active = 1\"\n\n        cursor = self.execute(query)\n        results = cursor.fetchall()\n        cursor.close()\n        return [r[\"canonical_form\"] for r in results] if results else []\n\n    def is_stopword(self, word: str) -> bool:\n        \"\"\"Check if a word is in the stopword list.\n\n        Args:\n            word: Word to check\n\n        Returns:\n            True if word is a stopword\n        \"\"\"\n        canonical = self._normalize_stopword(word)\n        stopwords = self.get_stopwords()\n        return canonical in stopwords\n\n    def _normalize_stopword(self, word: str) -> str:\n        \"\"\"Normalize word to canonical form for stopword matching.\n\n        Args:\n            word: Word to normalize\n\n        Returns:\n            Normalized canonical form\n        \"\"\"\n        result = word.lower().strip()\n        # German umlauts\n        replacements = {\"ä\": \"ae\", \"ö\": \"oe\", \"ü\": \"ue\", \"ß\": \"ss\"}\n        for old, new in replacements.items():\n            result = result.replace(old, new)\n        # Normalize unicode\n        result = unicodedata.normalize(\"NFKD\", result)\n        result = result.encode(\"ascii\", \"ignore\").decode(\"ascii\")\n        # Keep only alphanumeric\n        result = re.sub(r\"[^a-z0-9]\", \"\", result)\n        return result\n\n    # ========== Entity Synonyms ==========\n\n    def find_entity_by_synonym(self, synonym: str) -> dict | None:\n        \"\"\"Find entity by synonym.\n\n        Args:\n            synonym: Synonym to search for\n\n        Returns:\n            Dict with entity_id or None\n        \"\"\"\n        cursor = self.execute(\n            \"SELECT entity_id FROM entity_synonyms WHERE synonym = %s LIMIT 1\",\n            (synonym,),\n        )\n        result = cursor.fetchone()\n        cursor.close()\n        return result\n\n    def add_synonym(\n        self,\n        entity_id: int,\n        synonym: str,\n        source: str = \"extraction\",\n        language: str = \"de\",\n    ) -> int | None:\n        \"\"\"Add synonym to entity if not exists.\n\n        Args:\n            entity_id: Entity ID to add synonym to\n            synonym: The synonym text\n            source: How it was found (extraction, manual, merge)\n            language: Language code\n\n        Returns:\n            Synonym ID or None if already exists\n        \"\"\"\n        # Check if synonym already exists for this entity\n        cursor = self.execute(\n            \"SELECT id FROM entity_synonyms WHERE entity_id = %s AND synonym = %s\",\n            (entity_id, synonym),\n        )\n        existing = cursor.fetchone()\n        cursor.close()\n\n        if existing:\n            return None\n\n        try:\n            cursor = self.execute(\n                \"\"\"INSERT INTO entity_synonyms (entity_id, synonym, source, language, created_at)\n                   VALUES (%s, %s, %s, %s, NOW())\"\"\",\n                (entity_id, synonym, source, language),\n            )\n            self.commit()\n            syn_id = cursor.lastrowid\n            cursor.close()\n            return syn_id\n        except Exception as e:\n            self.log(\"WARNING\", f\"Failed to add synonym: {e}\")\n            return None\n\n    # ========== Chunk Taxonomy ==========\n\n    def add_chunk_taxonomy(\n        self,\n        chunk_id: int,\n        term_id: int,\n        confidence: float = 0.7,\n        source: str = \"auto\",\n    ) -> int | None:\n        \"\"\"Add taxonomy mapping for a chunk.\n\n        Args:\n            chunk_id: Chunk ID\n            term_id: Taxonomy term ID\n            confidence: Confidence score (0.0-1.0)\n            source: 'auto' or 'manual'\n\n        Returns:\n            Mapping ID or None if already exists\n        \"\"\"\n        # Check if mapping already exists\n        cursor = self.execute(\n            \"SELECT id FROM chunk_taxonomy WHERE chunk_id = %s AND taxonomy_term_id = %s\",\n            (chunk_id, term_id),\n        )\n        existing = cursor.fetchone()\n        cursor.close()\n\n        if existing:\n            return None\n\n        try:\n            cursor = self.execute(\n                \"\"\"INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence, source, created_at)\n                   VALUES (%s, %s, %s, %s, NOW())\"\"\",\n                (chunk_id, term_id, confidence, source),\n            )\n            self.commit()\n            mapping_id = cursor.lastrowid\n            cursor.close()\n            return mapping_id\n        except Exception as e:\n            self.log(\"WARNING\", f\"Failed to add chunk taxonomy: {e}\")\n            return None\n\n    def get_chunk_taxonomies(self, chunk_id: int) -> list:\n        \"\"\"Get all taxonomy mappings for a chunk.\n\n        Args:\n            chunk_id: Chunk ID\n\n        Returns:\n            List of taxonomy mappings with term details\n        \"\"\"\n        cursor = self.execute(\n            \"\"\"SELECT ct.*, tt.name as term_name, tt.path as term_path\n               FROM chunk_taxonomy ct\n               JOIN taxonomy_terms tt ON ct.taxonomy_term_id = tt.id\n               WHERE ct.chunk_id = %s\n               ORDER BY ct.confidence DESC\"\"\",\n            (chunk_id,),\n        )\n        results = cursor.fetchall()\n        cursor.close()\n        return results\n\n    # ========== Entity Taxonomy ==========\n\n    def add_entity_taxonomy(\n        self,\n        entity_id: int,\n        term_id: int,\n        relevance: float = 0.7,\n        validated: bool = False,\n    ) -> int | None:\n        \"\"\"Add taxonomy mapping for an entity.\n\n        Args:\n            entity_id: Entity ID\n            term_id: Taxonomy term ID\n            relevance: Relevance score (0.0-1.0)\n            validated: Whether manually validated\n\n        Returns:\n            Mapping ID or None if already exists\n        \"\"\"\n        # Check if mapping already exists\n        cursor = self.execute(\n            \"SELECT id FROM entity_taxonomy_mapping WHERE entity_id = %s AND taxonomy_term_id = %s\",\n            (entity_id, term_id),\n        )\n        existing = cursor.fetchone()\n        cursor.close()\n\n        if existing:\n            return None\n\n        try:\n            cursor = self.execute(\n                \"\"\"INSERT INTO entity_taxonomy_mapping\n                   (entity_id, taxonomy_term_id, confidence, validated, created_at)\n                   VALUES (%s, %s, %s, %s, NOW())\"\"\",\n                (entity_id, term_id, relevance, 1 if validated else 0),\n            )\n            self.commit()\n            mapping_id = cursor.lastrowid\n            cursor.close()\n            return mapping_id\n        except Exception as e:\n            self.log(\"WARNING\", f\"Failed to add entity taxonomy: {e}\")\n            return None\n\n    def get_entity_taxonomies(self, entity_id: int) -> list:\n        \"\"\"Get all taxonomy mappings for an entity.\n\n        Args:\n            entity_id: Entity ID\n\n        Returns:\n            List of taxonomy mappings with term details\n        \"\"\"\n        cursor = self.execute(\n            \"\"\"SELECT etm.*, tt.name as term_name, tt.path as term_path\n               FROM entity_taxonomy_mapping etm\n               JOIN taxonomy_terms tt ON etm.taxonomy_term_id = tt.id\n               WHERE etm.entity_id = %s\n               ORDER BY etm.relevance DESC\"\"\",\n            (entity_id,),\n        )\n        results = cursor.fetchall()\n        cursor.close()\n        return results\n\n    def get_taxonomy_terms(self) -> list:\n        \"\"\"Get all taxonomy terms for matching.\n\n        Returns:\n            List of taxonomy terms with id, name, path, parent_id\n        \"\"\"\n        cursor = self.execute(\n            \"SELECT id, name, path, pare... [TRUNCATED-9b9a45d5292de671]",
            "numLines": 328,
            "startLine": 1,
            "totalLines": 328
        }
    }
}
← Vorheriger Zur Liste Nächster →