db_semantic.py
- Pfad:
/var/www/scripts/pipeline/db_semantic.py - Namespace: pipeline
- Zeilen: 326 | Größe: 10,115 Bytes
- Geändert: 2025-12-31 03:01:09 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 92
- Dependencies: 100 (25%)
- LOC: 58 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 2
- use re
- use unicodedata
Klassen 1
-
SemanticMixinclass Zeile 11
Code
"""
Database Semantic Mixin
Single Responsibility: Semantic operations (entity types, stopwords, taxonomy, synonyms).
"""
import re
import unicodedata
class SemanticMixin:
"""Mixin for semantic operations.
Provides:
- Entity Types: get_entity_types, get_entity_type_codes, build_entity_prompt_categories
- Stopwords: get_stopwords, is_stopword, _normalize_stopword
- Synonyms: find_entity_by_synonym, add_synonym
- Chunk Taxonomy: add_chunk_taxonomy, get_chunk_taxonomies
- Entity Taxonomy: add_entity_taxonomy, get_entity_taxonomies, get_taxonomy_terms
"""
# ========== Entity Types ==========
def get_entity_types(self, active_only: bool = True) -> list[dict]:
"""Get all entity types from database.
Args:
active_only: Only return active types
Returns:
List of dicts with code, name, description, criteria, indicators, examples
"""
query = """SELECT code, name, description, criteria, indicators, examples
FROM entity_types"""
if active_only:
query += " WHERE is_active = 1"
query += " ORDER BY sort_order"
cursor = self.execute(query)
results = cursor.fetchall()
cursor.close()
return list(results) if results else []
def get_entity_type_codes(self) -> set[str]:
"""Get set of valid entity type codes.
Returns:
Set of active entity type codes
"""
cursor = self.execute("SELECT code FROM entity_types WHERE is_active = 1")
results = cursor.fetchall()
cursor.close()
return {r["code"] for r in results} if results else set()
def build_entity_prompt_categories(self) -> str:
"""Build categories section for entity extraction prompt from DB.
Returns:
Formatted string of entity categories for prompts
"""
types = self.get_entity_types()
lines = []
for t in types:
lines.append(f" {t['code']}: {t['criteria']}")
return "\n".join(lines)
# ========== Stopwords ==========
def get_stopwords(self, active_only: bool = True) -> list[str]:
"""Get list of stopword canonical forms for entity filtering.
Args:
active_only: Only return active stopwords
Returns:
List of canonical stopword strings (lowercase, normalized)
"""
query = "SELECT canonical_form FROM stopwords"
if active_only:
query += " WHERE is_active = 1"
cursor = self.execute(query)
results = cursor.fetchall()
cursor.close()
return [r["canonical_form"] for r in results] if results else []
def is_stopword(self, word: str) -> bool:
"""Check if a word is in the stopword list.
Args:
word: Word to check
Returns:
True if word is a stopword
"""
canonical = self._normalize_stopword(word)
stopwords = self.get_stopwords()
return canonical in stopwords
def _normalize_stopword(self, word: str) -> str:
"""Normalize word to canonical form for stopword matching.
Args:
word: Word to normalize
Returns:
Normalized canonical form
"""
result = word.lower().strip()
# German umlauts
replacements = {"ä": "ae", "ö": "oe", "ü": "ue", "ß": "ss"}
for old, new in replacements.items():
result = result.replace(old, new)
# Normalize unicode
result = unicodedata.normalize("NFKD", result)
result = result.encode("ascii", "ignore").decode("ascii")
# Keep only alphanumeric
result = re.sub(r"[^a-z0-9]", "", result)
return result
# ========== Entity Synonyms ==========
def find_entity_by_synonym(self, synonym: str) -> dict | None:
"""Find entity by synonym.
Args:
synonym: Synonym to search for
Returns:
Dict with entity_id or None
"""
cursor = self.execute(
"SELECT entity_id FROM entity_synonyms WHERE synonym = %s LIMIT 1",
(synonym,),
)
result = cursor.fetchone()
cursor.close()
return result
def add_synonym(
self,
entity_id: int,
synonym: str,
source: str = "extraction",
language: str = "de",
) -> int | None:
"""Add synonym to entity if not exists.
Args:
entity_id: Entity ID to add synonym to
synonym: The synonym text
source: How it was found (extraction, manual, merge)
language: Language code
Returns:
Synonym ID or None if already exists
"""
# Check if synonym already exists for this entity
cursor = self.execute(
"SELECT id FROM entity_synonyms WHERE entity_id = %s AND synonym = %s",
(entity_id, synonym),
)
existing = cursor.fetchone()
cursor.close()
if existing:
return None
try:
cursor = self.execute(
"""INSERT INTO entity_synonyms (entity_id, synonym, source, language, created_at)
VALUES (%s, %s, %s, %s, NOW())""",
(entity_id, synonym, source, language),
)
self.commit()
syn_id = cursor.lastrowid
cursor.close()
return syn_id
except Exception as e:
self.log("WARNING", f"Failed to add synonym: {e}")
return None
# ========== Chunk Taxonomy ==========
def add_chunk_taxonomy(
self,
chunk_id: int,
term_id: int,
confidence: float = 0.7,
source: str = "auto",
) -> int | None:
"""Add taxonomy mapping for a chunk.
Args:
chunk_id: Chunk ID
term_id: Taxonomy term ID
confidence: Confidence score (0.0-1.0)
source: 'auto' or 'manual'
Returns:
Mapping ID or None if already exists
"""
# Check if mapping already exists
cursor = self.execute(
"SELECT id FROM chunk_taxonomy WHERE chunk_id = %s AND taxonomy_term_id = %s",
(chunk_id, term_id),
)
existing = cursor.fetchone()
cursor.close()
if existing:
return None
try:
cursor = self.execute(
"""INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence, source, created_at)
VALUES (%s, %s, %s, %s, NOW())""",
(chunk_id, term_id, confidence, source),
)
self.commit()
mapping_id = cursor.lastrowid
cursor.close()
return mapping_id
except Exception as e:
self.log("WARNING", f"Failed to add chunk taxonomy: {e}")
return None
def get_chunk_taxonomies(self, chunk_id: int) -> list:
"""Get all taxonomy mappings for a chunk.
Args:
chunk_id: Chunk ID
Returns:
List of taxonomy mappings with term details
"""
cursor = self.execute(
"""SELECT ct.*, tt.name as term_name, tt.path as term_path
FROM chunk_taxonomy ct
JOIN taxonomy_terms tt ON ct.taxonomy_term_id = tt.id
WHERE ct.chunk_id = %s
ORDER BY ct.confidence DESC""",
(chunk_id,),
)
results = cursor.fetchall()
cursor.close()
return results
# ========== Entity Taxonomy ==========
def add_entity_taxonomy(
self,
entity_id: int,
term_id: int,
relevance: float = 0.7,
validated: bool = False,
) -> int | None:
"""Add taxonomy mapping for an entity.
Args:
entity_id: Entity ID
term_id: Taxonomy term ID
relevance: Relevance score (0.0-1.0)
validated: Whether manually validated
Returns:
Mapping ID or None if already exists
"""
# Check if mapping already exists
cursor = self.execute(
"SELECT id FROM entity_taxonomy_mapping WHERE entity_id = %s AND taxonomy_term_id = %s",
(entity_id, term_id),
)
existing = cursor.fetchone()
cursor.close()
if existing:
return None
try:
cursor = self.execute(
"""INSERT INTO entity_taxonomy_mapping
(entity_id, taxonomy_term_id, confidence, validated, created_at)
VALUES (%s, %s, %s, %s, NOW())""",
(entity_id, term_id, relevance, 1 if validated else 0),
)
self.commit()
mapping_id = cursor.lastrowid
cursor.close()
return mapping_id
except Exception as e:
self.log("WARNING", f"Failed to add entity taxonomy: {e}")
return None
def get_entity_taxonomies(self, entity_id: int) -> list:
"""Get all taxonomy mappings for an entity.
Args:
entity_id: Entity ID
Returns:
List of taxonomy mappings with term details
"""
cursor = self.execute(
"""SELECT etm.*, tt.name as term_name, tt.path as term_path
FROM entity_taxonomy_mapping etm
JOIN taxonomy_terms tt ON etm.taxonomy_term_id = tt.id
WHERE etm.entity_id = %s
ORDER BY etm.relevance DESC""",
(entity_id,),
)
results = cursor.fetchall()
cursor.close()
return results
def get_taxonomy_terms(self) -> list:
"""Get all taxonomy terms for matching.
Returns:
List of taxonomy terms with id, name, path, parent_id
"""
cursor = self.execute("SELECT id, name, path, parent_id FROM taxonomy_terms ORDER BY path")
results = cursor.fetchall()
cursor.close()
return results