{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/db_semantic.py",
"content": "\"\"\"\nDatabase Semantic Mixin\n\nSingle Responsibility: Semantic operations (entity types, stopwords, taxonomy, synonyms).\n\"\"\"\n\nimport re\nimport unicodedata\n\n\nclass SemanticMixin:\n \"\"\"Mixin for semantic operations.\n\n Provides:\n - Entity Types: get_entity_types, get_entity_type_codes, build_entity_prompt_categories\n - Stopwords: get_stopwords, is_stopword, _normalize_stopword\n - Synonyms: find_entity_by_synonym, add_synonym\n - Chunk Taxonomy: add_chunk_taxonomy, get_chunk_taxonomies\n - Entity Taxonomy: add_entity_taxonomy, get_entity_taxonomies, get_taxonomy_terms\n \"\"\"\n\n # ========== Entity Types ==========\n\n def get_entity_types(self, active_only: bool = True) -> list[dict]:\n \"\"\"Get all entity types from database.\n\n Args:\n active_only: Only return active types\n\n Returns:\n List of dicts with code, name, description, criteria, indicators, examples\n \"\"\"\n query = \"\"\"SELECT code, name, description, criteria, indicators, examples\n FROM entity_types\"\"\"\n if active_only:\n query += \" WHERE is_active = 1\"\n query += \" ORDER BY sort_order\"\n\n cursor = self.execute(query)\n results = cursor.fetchall()\n cursor.close()\n return list(results) if results else []\n\n def get_entity_type_codes(self) -> set[str]:\n \"\"\"Get set of valid entity type codes.\n\n Returns:\n Set of active entity type codes\n \"\"\"\n cursor = self.execute(\"SELECT code FROM entity_types WHERE is_active = 1\")\n results = cursor.fetchall()\n cursor.close()\n return {r[\"code\"] for r in results} if results else set()\n\n def build_entity_prompt_categories(self) -> str:\n \"\"\"Build categories section for entity extraction prompt from DB.\n\n Returns:\n Formatted string of entity categories for prompts\n \"\"\"\n types = self.get_entity_types()\n lines = []\n for t in types:\n lines.append(f\" {t['code']}: {t['criteria']}\")\n return \"\\n\".join(lines)\n\n # ========== Stopwords ==========\n\n def get_stopwords(self, active_only: bool = True) -> list[str]:\n \"\"\"Get list of stopword canonical forms for entity filtering.\n\n Args:\n active_only: Only return active stopwords\n\n Returns:\n List of canonical stopword strings (lowercase, normalized)\n \"\"\"\n query = \"SELECT canonical_form FROM stopwords\"\n if active_only:\n query += \" WHERE is_active = 1\"\n\n cursor = self.execute(query)\n results = cursor.fetchall()\n cursor.close()\n return [r[\"canonical_form\"] for r in results] if results else []\n\n def is_stopword(self, word: str) -> bool:\n \"\"\"Check if a word is in the stopword list.\n\n Args:\n word: Word to check\n\n Returns:\n True if word is a stopword\n \"\"\"\n canonical = self._normalize_stopword(word)\n stopwords = self.get_stopwords()\n return canonical in stopwords\n\n def _normalize_stopword(self, word: str) -> str:\n \"\"\"Normalize word to canonical form for stopword matching.\n\n Args:\n word: Word to normalize\n\n Returns:\n Normalized canonical form\n \"\"\"\n result = word.lower().strip()\n # German umlauts\n replacements = {\"ä\": \"ae\", \"ö\": \"oe\", \"ü\": \"ue\", \"ß\": \"ss\"}\n for old, new in replacements.items():\n result = result.replace(old, new)\n # Normalize unicode\n result = unicodedata.normalize(\"NFKD\", result)\n result = result.encode(\"ascii\", \"ignore\").decode(\"ascii\")\n # Keep only alphanumeric\n result = re.sub(r\"[^a-z0-9]\", \"\", result)\n return result\n\n # ========== Entity Synonyms ==========\n\n def find_entity_by_synonym(self, synonym: str) -> dict | None:\n \"\"\"Find entity by synonym.\n\n Args:\n synonym: Synonym to search for\n\n Returns:\n Dict with entity_id or None\n \"\"\"\n cursor = self.execute(\n \"SELECT entity_id FROM entity_synonyms WHERE synonym = %s LIMIT 1\",\n (synonym,),\n )\n result = cursor.fetchone()\n cursor.close()\n return result\n\n def add_synonym(\n self,\n entity_id: int,\n synonym: str,\n source: str = \"extraction\",\n language: str = \"de\",\n ) -> int | None:\n \"\"\"Add synonym to entity if not exists.\n\n Args:\n entity_id: Entity ID to add synonym to\n synonym: The synonym text\n source: How it was found (extraction, manual, merge)\n language: Language code\n\n Returns:\n Synonym ID or None if already exists\n \"\"\"\n # Check if synonym already exists for this entity\n cursor = self.execute(\n \"SELECT id FROM entity_synonyms WHERE entity_id = %s AND synonym = %s\",\n (entity_id, synonym),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n return None\n\n try:\n cursor = self.execute(\n \"\"\"INSERT INTO entity_synonyms (entity_id, synonym, source, language, created_at)\n VALUES (%s, %s, %s, %s, NOW())\"\"\",\n (entity_id, synonym, source, language),\n )\n self.commit()\n syn_id = cursor.lastrowid\n cursor.close()\n return syn_id\n except Exception as e:\n self.log(\"WARNING\", f\"Failed to add synonym: {e}\")\n return None\n\n # ========== Chunk Taxonomy ==========\n\n def add_chunk_taxonomy(\n self,\n chunk_id: int,\n term_id: int,\n confidence: float = 0.7,\n source: str = \"auto\",\n ) -> int | None:\n \"\"\"Add taxonomy mapping for a chunk.\n\n Args:\n chunk_id: Chunk ID\n term_id: Taxonomy term ID\n confidence: Confidence score (0.0-1.0)\n source: 'auto' or 'manual'\n\n Returns:\n Mapping ID or None if already exists\n \"\"\"\n # Check if mapping already exists\n cursor = self.execute(\n \"SELECT id FROM chunk_taxonomy WHERE chunk_id = %s AND taxonomy_term_id = %s\",\n (chunk_id, term_id),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n return None\n\n try:\n cursor = self.execute(\n \"\"\"INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence, source, created_at)\n VALUES (%s, %s, %s, %s, NOW())\"\"\",\n (chunk_id, term_id, confidence, source),\n )\n self.commit()\n mapping_id = cursor.lastrowid\n cursor.close()\n return mapping_id\n except Exception as e:\n self.log(\"WARNING\", f\"Failed to add chunk taxonomy: {e}\")\n return None\n\n def get_chunk_taxonomies(self, chunk_id: int) -> list:\n \"\"\"Get all taxonomy mappings for a chunk.\n\n Args:\n chunk_id: Chunk ID\n\n Returns:\n List of taxonomy mappings with term details\n \"\"\"\n cursor = self.execute(\n \"\"\"SELECT ct.*, tt.name as term_name, tt.path as term_path\n FROM chunk_taxonomy ct\n JOIN taxonomy_terms tt ON ct.taxonomy_term_id = tt.id\n WHERE ct.chunk_id = %s\n ORDER BY ct.confidence DESC\"\"\",\n (chunk_id,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n # ========== Entity Taxonomy ==========\n\n def add_entity_taxonomy(\n self,\n entity_id: int,\n term_id: int,\n relevance: float = 0.7,\n validated: bool = False,\n ) -> int | None:\n \"\"\"Add taxonomy mapping for an entity.\n\n Args:\n entity_id: Entity ID\n term_id: Taxonomy term ID\n relevance: Relevance score (0.0-1.0)\n validated: Whether manually validated\n\n Returns:\n Mapping ID or None if already exists\n \"\"\"\n # Check if mapping already exists\n cursor = self.execute(\n \"SELECT id FROM entity_taxonomy_mapping WHERE entity_id = %s AND taxonomy_term_id = %s\",\n (entity_id, term_id),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n return None\n\n try:\n cursor = self.execute(\n \"\"\"INSERT INTO entity_taxonomy_mapping\n (entity_id, taxonomy_term_id, confidence, validated, created_at)\n VALUES (%s, %s, %s, %s, NOW())\"\"\",\n (entity_id, term_id, relevance, 1 if validated else 0),\n )\n self.commit()\n mapping_id = cursor.lastrowid\n cursor.close()\n return mapping_id\n except Exception as e:\n self.log(\"WARNING\", f\"Failed to add entity taxonomy: {e}\")\n return None\n\n def get_entity_taxonomies(self, entity_id: int) -> list:\n \"\"\"Get all taxonomy mappings for an entity.\n\n Args:\n entity_id: Entity ID\n\n Returns:\n List of taxonomy mappings with term details\n \"\"\"\n cursor = self.execute(\n \"\"\"SELECT etm.*, tt.name as term_name, tt.path as term_path\n FROM entity_taxonomy_mapping etm\n JOIN taxonomy_terms tt ON etm.taxonomy_term_id = tt.id\n WHERE etm.entity_id = %s\n ORDER BY etm.relevance DESC\"\"\",\n (entity_id,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def get_taxonomy_terms(self) -> list:\n \"\"\"Get all taxonomy terms for matching.\n\n Returns:\n List of taxonomy terms with id, name, path, parent_id\n \"\"\"\n cursor = self.execute(\n \"SELECT id, name, path, pare... [TRUNCATED-9b9a45d5292de671]",
"numLines": 328,
"startLine": 1,
"totalLines": 328
}
}
}