db_semantic.py

Code Hygiene Score: 92

Keine Issues gefunden.

Dependencies 2

Klassen 1

Code

"""
Database Semantic Mixin

Single Responsibility: Semantic operations (entity types, stopwords, taxonomy, synonyms).
"""

import re
import unicodedata


class SemanticMixin:
    """Mixin for semantic operations.

    Provides:
    - Entity Types: get_entity_types, get_entity_type_codes, build_entity_prompt_categories
    - Stopwords: get_stopwords, is_stopword, _normalize_stopword
    - Synonyms: find_entity_by_synonym, add_synonym
    - Chunk Taxonomy: add_chunk_taxonomy, get_chunk_taxonomies
    - Entity Taxonomy: add_entity_taxonomy, get_entity_taxonomies, get_taxonomy_terms
    """

    # ========== Entity Types ==========

    def get_entity_types(self, active_only: bool = True) -> list[dict]:
        """Get all entity types from database.

        Args:
            active_only: Only return active types

        Returns:
            List of dicts with code, name, description, criteria, indicators, examples
        """
        query = """SELECT code, name, description, criteria, indicators, examples
                   FROM entity_types"""
        if active_only:
            query += " WHERE is_active = 1"
        query += " ORDER BY sort_order"

        cursor = self.execute(query)
        results = cursor.fetchall()
        cursor.close()
        return list(results) if results else []

    def get_entity_type_codes(self) -> set[str]:
        """Get set of valid entity type codes.

        Returns:
            Set of active entity type codes
        """
        cursor = self.execute("SELECT code FROM entity_types WHERE is_active = 1")
        results = cursor.fetchall()
        cursor.close()
        return {r["code"] for r in results} if results else set()

    def build_entity_prompt_categories(self) -> str:
        """Build categories section for entity extraction prompt from DB.

        Returns:
            Formatted string of entity categories for prompts
        """
        types = self.get_entity_types()
        lines = []
        for t in types:
            lines.append(f"  {t['code']}: {t['criteria']}")
        return "\n".join(lines)

    # ========== Stopwords ==========

    def get_stopwords(self, active_only: bool = True) -> list[str]:
        """Get list of stopword canonical forms for entity filtering.

        Args:
            active_only: Only return active stopwords

        Returns:
            List of canonical stopword strings (lowercase, normalized)
        """
        query = "SELECT canonical_form FROM stopwords"
        if active_only:
            query += " WHERE is_active = 1"

        cursor = self.execute(query)
        results = cursor.fetchall()
        cursor.close()
        return [r["canonical_form"] for r in results] if results else []

    def is_stopword(self, word: str) -> bool:
        """Check if a word is in the stopword list.

        Args:
            word: Word to check

        Returns:
            True if word is a stopword
        """
        canonical = self._normalize_stopword(word)
        stopwords = self.get_stopwords()
        return canonical in stopwords

    def _normalize_stopword(self, word: str) -> str:
        """Normalize word to canonical form for stopword matching.

        Args:
            word: Word to normalize

        Returns:
            Normalized canonical form
        """
        result = word.lower().strip()
        # German umlauts
        replacements = {"ä": "ae", "ö": "oe", "ü": "ue", "ß": "ss"}
        for old, new in replacements.items():
            result = result.replace(old, new)
        # Normalize unicode
        result = unicodedata.normalize("NFKD", result)
        result = result.encode("ascii", "ignore").decode("ascii")
        # Keep only alphanumeric
        result = re.sub(r"[^a-z0-9]", "", result)
        return result

    # ========== Entity Synonyms ==========

    def find_entity_by_synonym(self, synonym: str) -> dict | None:
        """Find entity by synonym.

        Args:
            synonym: Synonym to search for

        Returns:
            Dict with entity_id or None
        """
        cursor = self.execute(
            "SELECT entity_id FROM entity_synonyms WHERE synonym = %s LIMIT 1",
            (synonym,),
        )
        result = cursor.fetchone()
        cursor.close()
        return result

    def add_synonym(
        self,
        entity_id: int,
        synonym: str,
        source: str = "extraction",
        language: str = "de",
    ) -> int | None:
        """Add synonym to entity if not exists.

        Args:
            entity_id: Entity ID to add synonym to
            synonym: The synonym text
            source: How it was found (extraction, manual, merge)
            language: Language code

        Returns:
            Synonym ID or None if already exists
        """
        # Check if synonym already exists for this entity
        cursor = self.execute(
            "SELECT id FROM entity_synonyms WHERE entity_id = %s AND synonym = %s",
            (entity_id, synonym),
        )
        existing = cursor.fetchone()
        cursor.close()

        if existing:
            return None

        try:
            cursor = self.execute(
                """INSERT INTO entity_synonyms (entity_id, synonym, source, language, created_at)
                   VALUES (%s, %s, %s, %s, NOW())""",
                (entity_id, synonym, source, language),
            )
            self.commit()
            syn_id = cursor.lastrowid
            cursor.close()
            return syn_id
        except Exception as e:
            self.log("WARNING", f"Failed to add synonym: {e}")
            return None

    # ========== Chunk Taxonomy ==========

    def add_chunk_taxonomy(
        self,
        chunk_id: int,
        term_id: int,
        confidence: float = 0.7,
        source: str = "auto",
    ) -> int | None:
        """Add taxonomy mapping for a chunk.

        Args:
            chunk_id: Chunk ID
            term_id: Taxonomy term ID
            confidence: Confidence score (0.0-1.0)
            source: 'auto' or 'manual'

        Returns:
            Mapping ID or None if already exists
        """
        # Check if mapping already exists
        cursor = self.execute(
            "SELECT id FROM chunk_taxonomy WHERE chunk_id = %s AND taxonomy_term_id = %s",
            (chunk_id, term_id),
        )
        existing = cursor.fetchone()
        cursor.close()

        if existing:
            return None

        try:
            cursor = self.execute(
                """INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence, source, created_at)
                   VALUES (%s, %s, %s, %s, NOW())""",
                (chunk_id, term_id, confidence, source),
            )
            self.commit()
            mapping_id = cursor.lastrowid
            cursor.close()
            return mapping_id
        except Exception as e:
            self.log("WARNING", f"Failed to add chunk taxonomy: {e}")
            return None

    def get_chunk_taxonomies(self, chunk_id: int) -> list:
        """Get all taxonomy mappings for a chunk.

        Args:
            chunk_id: Chunk ID

        Returns:
            List of taxonomy mappings with term details
        """
        cursor = self.execute(
            """SELECT ct.*, tt.name as term_name, tt.path as term_path
               FROM chunk_taxonomy ct
               JOIN taxonomy_terms tt ON ct.taxonomy_term_id = tt.id
               WHERE ct.chunk_id = %s
               ORDER BY ct.confidence DESC""",
            (chunk_id,),
        )
        results = cursor.fetchall()
        cursor.close()
        return results

    # ========== Entity Taxonomy ==========

    def add_entity_taxonomy(
        self,
        entity_id: int,
        term_id: int,
        relevance: float = 0.7,
        validated: bool = False,
    ) -> int | None:
        """Add taxonomy mapping for an entity.

        Args:
            entity_id: Entity ID
            term_id: Taxonomy term ID
            relevance: Relevance score (0.0-1.0)
            validated: Whether manually validated

        Returns:
            Mapping ID or None if already exists
        """
        # Check if mapping already exists
        cursor = self.execute(
            "SELECT id FROM entity_taxonomy_mapping WHERE entity_id = %s AND taxonomy_term_id = %s",
            (entity_id, term_id),
        )
        existing = cursor.fetchone()
        cursor.close()

        if existing:
            return None

        try:
            cursor = self.execute(
                """INSERT INTO entity_taxonomy_mapping
                   (entity_id, taxonomy_term_id, confidence, validated, created_at)
                   VALUES (%s, %s, %s, %s, NOW())""",
                (entity_id, term_id, relevance, 1 if validated else 0),
            )
            self.commit()
            mapping_id = cursor.lastrowid
            cursor.close()
            return mapping_id
        except Exception as e:
            self.log("WARNING", f"Failed to add entity taxonomy: {e}")
            return None

    def get_entity_taxonomies(self, entity_id: int) -> list:
        """Get all taxonomy mappings for an entity.

        Args:
            entity_id: Entity ID

        Returns:
            List of taxonomy mappings with term details
        """
        cursor = self.execute(
            """SELECT etm.*, tt.name as term_name, tt.path as term_path
               FROM entity_taxonomy_mapping etm
               JOIN taxonomy_terms tt ON etm.taxonomy_term_id = tt.id
               WHERE etm.entity_id = %s
               ORDER BY etm.relevance DESC""",
            (entity_id,),
        )
        results = cursor.fetchall()
        cursor.close()
        return results

    def get_taxonomy_terms(self) -> list:
        """Get all taxonomy terms for matching.

        Returns:
            List of taxonomy terms with id, name, path, parent_id
        """
        cursor = self.execute("SELECT id, name, path, parent_id FROM taxonomy_terms ORDER BY path")
        results = cursor.fetchall()
        cursor.close()
        return results
← Übersicht Graph