repositories.py
- Pfad:
/var/www/scripts/pipeline/semantic_chunk/repositories.py - Namespace: pipeline
- Zeilen: 180 | Größe: 5,779 Bytes
- Geändert: 2025-12-25 14:04:59 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 98
- Dependencies: 90 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 6
- use json
- use sys
- use db.db
- use models.ChunkSemantics
- use models.Relation
- use ollama_service.ANALYSIS_MODEL
Klassen 4
-
ChunkRepositoryclass Zeile 18 -
SemanticsRepositoryclass Zeile 55 -
EntityRepositoryclass Zeile 89 -
TaxonomyRepositoryclass Zeile 143
Code
"""
Repository classes for semantic chunk data persistence.
"""
import json
import sys
sys.path.insert(0, "/var/www/scripts/pipeline")
from db import db
from .models import ChunkSemantics, Relation
from .ollama_service import ANALYSIS_MODEL
BATCH_SIZE = 10
class ChunkRepository:
"""Datenbankzugriff für Chunks."""
def get_pending_chunks(self, limit: int = BATCH_SIZE) -> list[dict]:
"""Hole Chunks ohne Semantik-Analyse."""
cursor = db.execute(
"""
SELECT c.id, c.content, c.document_id
FROM chunks c
LEFT JOIN chunk_semantics cs ON c.id = cs.chunk_id
WHERE cs.id IS NULL
ORDER BY c.id
LIMIT %s
""",
(limit,),
)
results = cursor.fetchall()
cursor.close()
return results
def get_stats(self) -> dict:
"""Hole Statistiken."""
cursor = db.execute("SELECT COUNT(*) as total FROM chunks")
total = cursor.fetchone()["total"]
cursor.close()
cursor = db.execute("SELECT COUNT(*) as analyzed FROM chunk_semantics")
analyzed = cursor.fetchone()["analyzed"]
cursor.close()
cursor = db.execute("SELECT COUNT(*) as entities FROM entities")
entities = cursor.fetchone()["entities"]
cursor.close()
return {"total": total, "analyzed": analyzed, "entities": entities}
class SemanticsRepository:
"""Datenbankzugriff für Semantik-Daten."""
def save_semantics(self, sem: ChunkSemantics) -> int:
"""Speichere Chunk-Semantik."""
cursor = db.execute(
"""
INSERT INTO chunk_semantics
(chunk_id, summary, keywords, sentiment, topics, language, analyzed_at, analysis_model)
VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s)
ON DUPLICATE KEY UPDATE
summary = VALUES(summary),
keywords = VALUES(keywords),
sentiment = VALUES(sentiment),
topics = VALUES(topics),
language = VALUES(language),
analyzed_at = NOW()
""",
(
sem.chunk_id,
sem.summary,
json.dumps(sem.keywords, ensure_ascii=False),
sem.sentiment,
json.dumps(sem.topics, ensure_ascii=False),
sem.language,
ANALYSIS_MODEL,
),
)
db.commit()
sem_id = cursor.lastrowid
cursor.close()
return sem_id
class EntityRepository:
"""Datenbankzugriff für Entitäten."""
def find_or_create(self, entity) -> int:
"""Finde oder erstelle Entität."""
cursor = db.execute("SELECT id FROM entities WHERE name = %s AND type = %s", (entity.name, entity.entity_type))
existing = cursor.fetchone()
cursor.close()
if existing:
return existing["id"]
cursor = db.execute(
"""
INSERT INTO entities (name, type, description, created_at)
VALUES (%s, %s, %s, NOW())
""",
(entity.name, entity.entity_type, entity.description),
)
db.commit()
entity_id = cursor.lastrowid
cursor.close()
return entity_id
def link_to_chunk(self, chunk_id: int, entity_id: int, relevance: float = 1.0):
"""Verknüpfe Entity mit Chunk."""
cursor = db.execute(
"""
INSERT INTO chunk_entities (chunk_id, entity_id, relevance_score, mention_count)
VALUES (%s, %s, %s, 1)
ON DUPLICATE KEY UPDATE
mention_count = mention_count + 1
""",
(chunk_id, entity_id, relevance),
)
db.commit()
cursor.close()
def save_relation(self, source_id: int, target_id: int, rel: Relation, chunk_id: int):
"""Speichere Relation."""
cursor = db.execute(
"""
INSERT INTO entity_relations
(source_entity_id, target_entity_id, relation_type, strength, chunk_id)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
strength = GREATEST(strength, VALUES(strength))
""",
(source_id, target_id, rel.relation_type, rel.strength, chunk_id),
)
db.commit()
cursor.close()
class TaxonomyRepository:
"""Datenbankzugriff für Taxonomie."""
def ensure_terms_exist(self, terms: list[str]):
"""Stelle sicher, dass Taxonomie-Terms existieren."""
for term in terms:
cursor = db.execute("SELECT id FROM taxonomy_terms WHERE name = %s", (term,))
if not cursor.fetchone():
cursor.close()
slug = term.lower().replace(" ", "-")
cursor = db.execute(
"""
INSERT INTO taxonomy_terms (name, slug, depth)
VALUES (%s, %s, 0)
""",
(term, slug),
)
db.commit()
cursor.close()
def link_to_chunk(self, chunk_id: int, category: str, confidence: float):
"""Verknüpfe Chunk mit Taxonomie-Term."""
cursor = db.execute("SELECT id FROM taxonomy_terms WHERE name = %s", (category,))
term = cursor.fetchone()
cursor.close()
if term:
cursor = db.execute(
"""
INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE confidence = VALUES(confidence)
""",
(chunk_id, term["id"], confidence),
)
db.commit()
cursor.close()