{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/scripts\/pipeline\/semantic_chunk_analyzer.py",
"content": "#!\/usr\/bin\/env python3\n\"\"\"\nSemantic Chunk Analyzer - Chunk-Level Semantic Analysis Pipeline\n\nAnalysiert Chunks aus der ki_system.chunks Tabelle und befüllt:\n- chunk_semantics (summary, keywords, sentiment, topics, language)\n- entities (extrahierte Entitäten)\n- chunk_entities (Verknüpfung Chunk <-> Entity)\n- entity_relations (Beziehungen zwischen Entitäten)\n- taxonomy_terms + chunk_taxonomy (Kategorisierung)\n\nBACKWARD COMPATIBILITY WRAPPER - Logic moved to semantic_chunk\/ package.\n\nUsage:\n python semantic_chunk_analyzer.py analyze [--limit N]\n python semantic_chunk_analyzer.py status\n python semantic_chunk_analyzer.py reset\n\"\"\"\n\nimport sys\n\nfrom db import db\nfrom semantic_chunk import SemanticChunkPipeline, BATCH_SIZE\n\n# Re-export for backward compatibility\nfrom semantic_chunk import (\n ChunkSemantics,\n Entity,\n Relation,\n OllamaService,\n ANALYSIS_MODEL,\n SemanticsAnalyzer,\n EntityExtractor,\n RelationExtractor,\n TaxonomyClassifier,\n ChunkRepository,\n SemanticsRepository,\n EntityRepository,\n TaxonomyRepository,\n)\n\n__all__ = [\n \"ChunkSemantics\",\n \"Entity\",\n \"Relation\",\n \"OllamaService\",\n \"ANALYSIS_MODEL\",\n \"SemanticsAnalyzer\",\n \"EntityExtractor\",\n \"RelationExtractor\",\n \"TaxonomyClassifier\",\n \"ChunkRepository\",\n \"SemanticsRepository\",\n \"EntityRepository\",\n \"TaxonomyRepository\",\n \"SemanticChunkPipeline\",\n \"BATCH_SIZE\",\n]\n\n\ndef main():\n \"\"\"CLI Entry Point.\"\"\"\n if len(sys.argv) < 2:\n print(__doc__)\n return\n\n command = sys.argv[1].lower()\n\n db.connect()\n\n try:\n pipeline = SemanticChunkPipeline()\n\n if command == \"analyze\":\n limit = BATCH_SIZE\n if len(sys.argv) > 2 and sys.argv[2] == \"--limit\":\n limit = int(sys.argv[3]) if len(sys.argv) > 3 else BATCH_SIZE\n\n result = pipeline.run(limit)\n print(f\"\\nErgebnis: {result['success']}\/{result['processed']} erfolgreich\")\n\n elif command == \"status\":\n stats = pipeline.status()\n print(f\"Chunks gesamt: {stats['total']}\")\n print(f\"Analysiert: {stats['analyzed']}\")\n print(f\"Entitäten: {stats['entities']}\")\n print(f\"Ausstehend: {stats['total'] - stats['analyzed']}\")\n\n elif command == \"reset\":\n confirm = input(\"Alle Semantik-Daten löschen? (ja\/nein): \")\n if confirm.lower() == \"ja\":\n for table in [\n \"chunk_semantics\",\n \"chunk_entities\",\n \"chunk_taxonomy\",\n \"entity_relations\",\n \"entities\",\n \"taxonomy_terms\",\n ]:\n db.execute(f\"TRUNCATE TABLE {table}\")\n db.commit()\n print(\"Alle Semantik-Tabellen geleert.\")\n\n else:\n print(f\"Unbekannter Befehl: {command}\")\n print(__doc__)\n\n except Exception as e:\n db.log(\"ERROR\", f\"Pipeline error: {e}\")\n print(f\"Fehler: {e}\")\n raise\n finally:\n db.disconnect()\n\n\nif __name__ == \"__main__\":\n main()\n",
"structuredPatch": [
{
"oldStart": 9,
"oldLines": 10,
"newStart": 9,
"newLines": 7,
"lines": [
" - entity_relations (Beziehungen zwischen Entitäten)",
" - taxonomy_terms + chunk_taxonomy (Kategorisierung)",
" ",
"-Prinzipien: DRY, KISS, SRP, YAGNI",
"-- Eine Klasse pro Verantwortlichkeit",
"-- Ollama für LLM-Aufrufe (lokal, kostenfrei)",
"-- Batch-Verarbeitung für Effizienz",
"+BACKWARD COMPATIBILITY WRAPPER - Logic moved to semantic_chunk\/ package.",
" ",
" Usage:",
" python semantic_chunk_analyzer.py analyze [--limit N]"
]
},
{
"oldStart": 20,
"oldLines": 604,
"newStart": 17,
"newLines": 47,
"lines": [
" python semantic_chunk_analyzer.py reset",
" \"\"\"",
" ",
"-import json",
" import sys",
"-from dataclasses import dataclass",
" ",
"-import requests",
"-",
"-from config import OLLAMA_HOST",
" from db import db",
"+from semantic_chunk import SemanticChunkPipeline, BATCH_SIZE",
" ",
"-# === Configuration ===",
"-ANALYSIS_MODEL = \"gemma3:27b-it-qat\" # Beste JSON-Compliance und Qualität",
"-BATCH_SIZE = 10",
"+# Re-export for backward compatibility",
"+from semantic_chunk import (",
"+ ChunkSemantics,",
"+ Entity,",
"+ Relation,",
"+ OllamaService,",
"+ ANALYSIS_MODEL,",
"+ SemanticsAnalyzer,",
"+ EntityExtractor,",
"+ RelationExtractor,",
"+ TaxonomyClassifier,",
"+ ChunkRepository,",
"+ SemanticsRepository,",
"+ EntityRepository,",
"+ TaxonomyRepository,",
"+)",
" ",
"+__all__ = [",
"+ \"ChunkSemantics\",",
"+ \"Entity\",",
"+ \"Relation\",",
"+ \"OllamaService\",",
"+ \"ANALYSIS_MODEL\",",
"+ \"SemanticsAnalyzer\",",
"+ \"EntityExtractor\",",
"+ \"RelationExtractor\",",
"+ \"TaxonomyClassifier\",",
"+ \"ChunkRepository\",",
"+ \"SemanticsRepository\",",
"+ \"EntityRepository\",",
"+ \"TaxonomyRepository\",",
"+ \"SemanticChunkPipeline\",",
"+ \"BATCH_SIZE\",",
"+]",
" ",
"-# === Data Classes (SRP) ===",
"-@dataclass",
"-class ChunkSemantics:",
"- \"\"\"Semantische Analyse eines Chunks.\"\"\"",
" ",
"- chunk_id: int",
"- summary: str",
"- keywords: list[str]",
"- sentiment: str # positive, neutral, negative, mixed",
"- topics: list[str]",
"- language: str",
"-",
"-",
"-@dataclass",
"-class Entity:",
"- \"\"\"Extrahierte Entität.\"\"\"",
"-",
"- name: str",
"- entity_type: str # PERSON, ORGANIZATION, CONCEPT, LOCATION, OTHER",
"- description: str | None = None",
"-",
"-",
"-@dataclass",
"-class Relation:",
"- \"\"\"Beziehung zwischen Entitäten.\"\"\"",
"-",
"- source: str",
"- relation_type: str",
"- target: str",
"- strength: float = 0.5",
"-",
"-",
"-# === LLM Service (SRP) ===",
"-class OllamaService:",
"- \"\"\"Ollama API Wrapper - Single Responsibility: LLM Kommunikation.\"\"\"",
"-",
"- def __init__(self, host: str = OLLAMA_HOST, model: str = ANALYSIS_MODEL):",
"- self.host = host",
"- self.model = model",
"-",
"- def generate(self, prompt: str, json_format: bool = True) -> dict | None:",
"- \"\"\"Generiere Antwort von Ollama.\"\"\"",
"- try:",
"- payload = {",
"- \"model\": self.model,",
"- \"prompt\": prompt,",
"- \"stream\": False,",
"- \"options\": {\"temperature\": 0.3, \"num_predict\": 1000},",
"- }",
"- if json_format:",
"- payload[\"format\"] = \"json\"",
"-",
"- response = requests.post(f\"{self.host}\/api\/generate\", json=payload, timeout=120)",
"- response.raise_for_status()",
"-",
"- text = response.json().get(\"response\", \"{}\")",
"- if json_format:",
"- return self._parse_json(text)",
"- return {\"text\": text}",
"- except Exception as e:",
"- db.log(\"ERROR\", f\"Ollama error: {e}\")",
"- return None",
"-",
"- def _parse_json(self, text: str) -> dict | None:",
"- \"\"\"Parse JSON aus Antwort.\"\"\"",
"- try:",
"- return json.loads(text)",
"- except json.JSONDecodeError:",
"- # Versuche JSON aus Text zu extrahieren",
"- import re",
"-",
"- match = re.search(r\"\\{[\\s\\S]*\\}\", text)",
"- if match:",
"- try:",
"- return json.loads(match.group())",
"- except json.JSONDecodeError:",
"- pass",
"- return None",
"-",
"-",
"-# === Analyzer Classes (SRP) ===",
"-class SemanticsAnalyzer:",
"- \"\"\"Analysiert Chunk-Semantik: Summary, Keywords, Sentiment.\"\"\"",
"-",
"- PROMPT = \"\"\"Analysiere diesen deutschen Text und erstelle eine semantische Analyse.",
"-",
"-Text:",
"-{text}",
"-",
"-Antworte NUR als JSON:",
"-{{",
"- \"summary\": \"Zusammenfassung in 1-2 Sätzen\",",
"- \"keywords\": [\"keyword1\", \"keyword2\", \"keyword3\"],",
"- \"sentiment\": \"positive|neutral|negative|mixed\",",
"- \"topics\": [\"thema1\", \"thema2\"],",
"- \"language\": \"de|en\"",
"-}}\"\"\"",
"-",
"- def __init__(self, llm: OllamaService):",
"- self.llm = llm",
"-",
"- def analyze(self, chunk_id: int, text: str) -> ChunkSemantics | None:",
"- \"\"\"Analysiere einen Chunk.\"\"\"",
"- result = self.llm.generate(self.PROMPT.format(text=text[:2000]))",
"- if not result:",
"- return None",
"-",
"- # Robuste Extraktion mit Typ-Validierung",
"- summary = result.get(\"summary\", \"\")",
"- if isinstance(summary, list):",
"- summary = summary[0] if summary else \"\"",
"-",
"- keywords = result.get(\"keywords\", [])",
"- if not isinstance(keywords, list):",
"- keywords = [str(keywords)] if keywords else []",
"- keywords = [str(k) for k in keywords if k and not isinstance(k, (list, dict))][:10]",
"-",
"- topics = result.get(\"topics\", [])",
"- if not isinstance(topics, list):",
"- topics = [str(topics)] if topics else []",
"- topics = [str(t) for t in topics if t and not isinstance(t, (list, dict))][:5]",
"-",
"- language = result.get(\"language\", \"de\")",
"- if isinstance(language, list):",
"- language = language[0] if language else \"de\"",
"-",
"- return ChunkSemantics(",
"- chunk_id=chunk_id,",
"- summary=str(summary)[:1000],",
"- keywords=keywords,",
"- sentiment=self._validate_sentiment(result.get(\"sentiment\", \"neutral\")),",
"- topics=topics,",
"- language=str(language)[:5],",
"- )",
"-",
"- def _validate_sentiment(self, sentiment) -> str:",
"- \"\"\"Validiere Sentiment-Wert.\"\"\"",
"- if isinstance(sentiment, list):",
"- sentiment = sentiment[0] if sentiment else \"neutral\"",
"- if not isinstance(sentiment, str):",
"- return \"neutral\"",
"- valid = {\"positive\", \"neutral\", \"negative\", \"mixed\"}",
"- return sentiment.lower() if sentiment.lower() in valid else \"neutral\"",
"-",
"-",
"-class EntityExtractor:",
"- \"\"\"Extrahiert Entitäten aus Text.\"\"\"",
"-",
"- PROMPT = \"\"\"Extrahiere alle wichtigen Entitäten aus diesem deutschen Text.",
"-",
"-Kategorien:",
"-- PERSON: Namen von Personen",
"-- ORGANIZATION: Firmen, Institutionen",
"-- CONCEPT: Fachbegriffe, Methoden, Theorien",
"-- LOCATION: Orte, Länder",
"-- OTHER: Sonstiges",
"-",
"-Text:",
"-{text}",
"-",
"-Antworte NUR als JSON:",
"-{{",
"- \"entities\": [",
"- {{\"name\": \"Name\", \"type\": \"CONCEPT\", \"description\": \"Kurze Beschreibung\"}}",
"- ]",
"-}}\"\"\"",
"-",
"- def __init__(self, llm: OllamaService):",
"- self.llm = llm",
"-",
"- def extract(self, text: str) -> list[Entity]:",
"- \"\"\"Extrahiere Entitäten aus Text.\"\"\"",
"- result = self.llm.generate(self.PROMPT.format(text=text[:2000]))",
"- if not result:",
"- return []",
"-",
"- entities = []",
"- for e in result.get(\"entities\", []):",
"- name = e.get(\"name\")",
"- etype = e.get(\"type\")",
"- desc = e.get(\"description\")",
"-",
"- # Validiere: name und type müssen Strings sein",
"- if isinstance(name, list):",
"- name = name[0] if name else None",
"- if isinstance(etype, list):",
"- etype = etype[0] if etype else None",
"- if isinstance(desc, list):",
"- desc = desc[0] if desc else None",
"-",
"- if name and isinstance(name, str) and etype:",
"- entities.append(",
"- Entity(",
"- name=str(name)[:200], # Limit length",
"- entity_type=self._validate_type(str(etype)),",
"- description=str(desc)[:500] if desc else None,",
"- )",
"- )",
"- return entities[:20] # Max 20 pro Chunk",
"-",
"- def _validate_type(self, entity_type: str) -> str:",
"- \"\"\"Validiere Entity-Typ.\"\"\"",
"- valid = {\"PERSON\", \"ORGANIZATION\", \"CONCEPT\", \"LOCATION\", \"OTHER\"}",
"- return entity_type.upper() if entity_type.upper() in valid else \"OTHER\"",
"-",
"-",
"-class RelationExtractor:",
"- \"\"\"Extrahiert Beziehungen zwischen Entitäten.\"\"\"",
"-",
"- PROMPT = \"\"\"Finde Beziehungen zwischen diesen Entitäten im Text.",
"-",
"-Entitäten: {entities}",
"-",
"-Beziehungstypen:",
"-- RELATED_TO: steht in Beziehung zu",
"-- PART_OF: ist Teil von",
"-- DEVELOPED_BY: wurde entwickelt von",
"-- USED_IN: wird verwendet in",
"-- INFLUENCED_BY: wurde beeinflusst von",
"-",
"-Text:",
"-{text}",
"-",
"-Antworte NUR als JSON:",
"-{{",
"- \"relations\": [",
"- {{\"source\": \"Entity1\", \"relation\": \"RELATED_TO\", \"target\": \"Entity2\", \"strength\": 0.8}}",
"- ]",
"-}}\"\"\"",
"-",
"- def __init__(self, llm: OllamaService):",
"- self.llm = llm",
"-",
"- def extract(self, text: str, entities: list[Entity]) -> list[Relation]:",
"- \"\"\"Extrahiere Relationen zwischen Entitäten.\"\"\"",
"- if len(entities) < 2:",
"- return []",
"-",
"- entity_names = \", \".join([e.name for e in entities[:15]])",
"- result = self.llm.generate(self.PROMPT.format(entities=entity_names, text=text[:1500]))",
"-",
"- if not result:",
"- return []",
"-",
"- relations = []",
"- for r in result.get(\"relations\", []):",
"- source = r.get(\"source\")",
"- target = r.get(\"target\")",
"- rel_type = r.get(\"relation\")",
"- strength = r.get(\"strength\", 0.5)",
"-",
"- # Validiere Typen",
"- if isinstance(source, list):",
"- source = source[0] if source else None",
"- if isinstance(target, list):",
"- target = target[0] if target else None",
"- if isinstance(rel_type, list):",
"- rel_type = rel_type[0] if rel_type else None",
"-",
"- if source and target and rel_type and isinstance(source, str) and isinstance(target, str):",
"- try:",
"- strength_val = float(strength) if not isinstance(strength, list) else 0.5",
"- except (ValueError, TypeError):",
"- strength_val = 0.5",
"- relations.append(",
"- Relation(",
"- source=str(source)[:200],",
"- relation_type=str(rel_type)[:50],",
"- target=str(target)[:200],",
"- strength=min(1.0, max(0.0, strength_val)),",
"- )",
"- )",
"- return relations[:10] # Max 10 pro Chunk",
"-",
"-",
"-class TaxonomyClassifier:",
"- \"\"\"Klassifiziert Chunks in Taxonomie-Kategorien.\"\"\"",
"-",
"- # Standard-Taxonomie für systemische Inhalte",
"- CATEGORIES = [",
"- \"Methoden\",",
"- \"Theorie\",",
"- \"Praxis\",",
"- \"Kommunikation\",",
"- \"Organisation\",",
"- \"Entwicklung\",",
"- \"Coaching\",",
"- \"Therapie\",",
"- ]",
"-",
"- PROMPT = \"\"\"Klassifiziere diesen Text in passende Kategorien.",
"-",
"-Kategorien: {categories}",
"-",
"-Text:",
"-{text}",
"-",
"-Antworte NUR als JSON:",
"-{{",
"- \"categories\": [\"Kategorie1\", \"Kategorie2\"],",
"- \"confidence\": 0.8",
"-}}\"\"\"",
"-",
"- def __init__(self, llm: OllamaService):",
"- self.llm = llm",
"-",
"- def classify(self, text: str) -> tuple[list[str], float]:",
"- \"\"\"Klassifiziere Text in Kategorien.\"\"\"",
"- result = self.llm.generate(self.PROMPT.format(categories=\", \".join(self.CATEGORIES), text=text[:1500]))",
"-",
"- if not result:",
"- return [], 0.0",
"-",
"- categories = [c for c in result.get(\"categories\", []) if c in self.CATEGORIES]",
"- confidence = min(1.0, max(0.0, float(result.get(\"confidence\", 0.5))))",
"- return categories[:3], confidence",
"-",
"-",
"-# === Repository Classes (SRP) ===",
"-class ChunkRepository:",
"- \"\"\"Datenbankzugriff für Chunks.\"\"\"",
"-",
"- def get_pending_chunks(self, limit: int = BATCH_SIZE) -> list[dict]:",
"- \"\"\"Hole Chunks ohne Semantik-Analyse.\"\"\"",
"- cursor = db.execute(",
"- \"\"\"",
"- SELECT c.id, c.content, c.document_id",
"- FROM chunks c",
"- LEFT JOIN chunk_semantics cs ON c.id = cs.chunk_id",
"- WHERE cs.id IS NULL",
"- ORDER BY c.id",
"- LIMIT %s",
"- \"\"\",",
"- (limit,),",
"- )",
"- results = cursor.fetchall()",
"- cursor.close()",
"- return results",
"-",
"- def get_stats(self) -> dict:",
"- \"\"\"Hole Statistiken.\"\"\"",
"- cursor = db.execute(\"SELECT COUNT(*) as total FROM chunks\")",
"- total = cursor.fetchone()[\"total\"]",
"- cursor.close()",
"-",
"- cursor = db.execute(\"SELECT COUNT(*) as analyzed FROM chunk_semantics\")",
"- analyzed = cursor.fetchone()[\"analyzed\"]",
"- cursor.close()",
"-",
"- cursor = db.execute(\"SELECT COUNT(*) as entities FROM entities\")",
"- entities = cursor.fetchone()[\"entities\"]",
"- cursor.close()",
"-",
"- return {\"total\": total, \"analyzed\": analyzed, \"entities\": entities}",
"-",
"-",
"-class SemanticsRepository:",
"- \"\"\"Datenbankzugriff für Semantik-Daten.\"\"\"",
"-",
"- def save_semantics(self, sem: ChunkSemantics) -> int:",
"- \"\"\"Speichere Chunk-Semantik.\"\"\"",
"- cursor = db.execute(",
"- \"\"\"",
"- INSERT INTO chunk_semantics",
"- (chunk_id, summary, keywords, sentiment, topics, language, analyzed_at, analysis_model)",
"- VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s)",
"- ON DUPLICATE KEY UPDATE",
"- summary = VALUES(summary),",
"- keywords = VALUES(keywords),",
"- sentiment = VALUES(sentiment),",
"- topics = VALUES(topics),",
"- language = VALUES(language),",
"- analyzed_at = NOW()",
"- \"\"\",",
"- (",
"- sem.chunk_id,",
"- sem.summary,",
"- json.dumps(sem.keywords, ensure_ascii=False),",
"- sem.sentiment,",
"- json.dumps(sem.topics, ensure_ascii=False),",
"- sem.language,",
"- ANALYSIS_MODEL,",
"- ),",
"- )",
"- db.commit()",
"- sem_id = cursor.lastrowid",
"- cursor.close()",
"- return sem_id",
"-",
"-",
"-class EntityRepository:",
"- \"\"\"Datenbankzugriff für Entitäten.\"\"\"",
"-",
"- def find_or_create(self, entity: Entity) -> int:",
"- \"\"\"Finde oder erstelle Entität.\"\"\"",
"- # Suche existierende",
"- cursor = db.execute(\"SELECT id FROM entities WHERE name = %s AND type = %s\", (entity.name, entity.entity_type))",
"- existing = cursor.fetchone()",
"- cursor.close()",
"-",
"- if existing:",
"- return existing[\"id\"]",
"-",
"- # Erstelle neue",
"- cursor = db.execute(",
"- \"\"\"",
"- INSERT INTO entities (name, type, description, created_at)",
"- VALUES (%s, %s, %s, NOW())",
"- \"\"\",",
"- (entity.name, entity.entity_type, entity.description),",
"- )",
"- db.commit()",
"- entity_id = cursor.lastrowid",
"- cursor.close()",
"- return entity_id",
"-",
"- def link_to_chunk(self, chunk_id: int, entity_id: int, relevance: float = 1.0):",
"- \"\"\"Verknüpfe Entity mit Chunk.\"\"\"",
"- cursor = db.execute(",
"- \"\"\"",
"- INSERT INTO chunk_entities (chunk_id, entity_id, relevance_score, mention_count)",
"- VALUES (%s, %s, %s, 1)",
"- ON DUPLICATE KEY UPDATE",
"- mention_count = mention_count + 1",
"- \"\"\",",
"- (chunk_id, entity_id, relevance),",
"- )",
"- db.commit()",
"- cursor.close()",
"-",
"- def save_relation(self, source_id: int, target_id: int, rel: Relation, chunk_id: int):",
"- \"\"\"Speichere Relation.\"\"\"",
"- cursor = db.execute(",
"- \"\"\"",
"- INSERT INTO entity_relations",
"- (source_entity_id, target_entity_id, relation_type, strength, chunk_id)",
"- VALUES (%s, %s, %s, %s, %s)",
"- ON DUPLICATE KEY UPDATE",
"- strength = GREATEST(strength, VALUES(strength))",
"- \"\"\",",
"- (source_id, target_id, rel.relation_type, rel.strength, chunk_id),",
"- )",
"- db.commit()",
"- cursor.close()",
"-",
"-",
"-class TaxonomyRepository:",
"- \"\"\"Datenbankzugriff für Taxonomie.\"\"\"",
"-",
"- def ensure_terms_exist(self, terms: list[str]):",
"- \"\"\"Stelle sicher, dass Taxonomie-Terms existieren.\"\"\"",
"- for term in terms:",
"- cursor = db.execute(\"SELECT id FROM taxonomy_terms WHERE name = %s\", (term,))",
"- if not cursor.fetchone():",
"- cursor.close()",
"- slug = term.lower().replace(\" \", \"-\")",
"- cursor = db.execute(",
"- \"\"\"",
"- INSERT INTO taxonomy_terms (name, slug, depth)",
"- VALUES (%s, %s, 0)",
"- \"\"\",",
"- (term, slug),",
"- )",
"- db.commit()",
"- cursor.close()",
"-",
"- def link_to_chunk(self, chunk_id: int, category: str, confidence: float):",
"- \"\"\"Verknüpfe Chunk mit Taxonomie-Term.\"\"\"",
"- cursor = db.execute(\"SELECT id FROM taxonomy_terms WHERE name = %s\", (category,))",
"- term = cursor.fetchone()",
"- cursor.close()",
"-",
"- if term:",
"- cursor = db.execute(",
"- \"\"\"",
"- INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence)",
"- VALUES (%s, %s, %s)",
"- ON DUPLICATE KEY UPDATE confidence = VALUES(confidence)",
"- \"\"\",",
"- (chunk_id, term[\"id\"], confidence),",
"- )",
"- db.commit()",
"- cursor.close()",
"-",
"-",
"-# === Main Pipeline (Orchestration) ===",
"-class SemanticChunkPipeline:",
"- \"\"\"Orchestriert die semantische Chunk-Analyse.\"\"\"",
"-",
"- def __init__(self):",
"- self.llm = OllamaService()",
"- self.semantics_analyzer = SemanticsAnalyzer(self.llm)",
"- self.entity_extractor = EntityExtractor(self.llm)",
"- self.relation_extractor = RelationExtractor(self.llm)",
"- self.taxonomy_classifier = TaxonomyClassifier(self.llm)",
"-",
"- self.chunk_repo = ChunkRepository()",
"- self.semantics_repo = SemanticsRepository()",
"- self.entity_repo = EntityRepository()",
"- self.taxonomy_repo = TaxonomyRepository()",
"-",
"- # Ensure base taxonomy exists",
"- self.taxonomy_repo.ensure_terms_exist(TaxonomyClassifier.CATEGORIES)",
"-",
"- def analyze_chunk(self, chunk: dict) -> dict:",
"- \"\"\"Analysiere einen einzelnen Chunk.\"\"\"",
"- chunk_id = chunk[\"id\"]",
"- text = chunk[\"content\"]",
"-",
"- results = {\"chunk_id\": chunk_id, \"success\": False}",
"-",
"- # 1. Semantik-Analyse",
"- semantics = self.semantics_analyzer.analyze(chunk_id, text)",
"- if semantics:",
"- self.semantics_repo.save_semantics(semantics)",
"- results[\"semantics\"] = True",
"-",
"- # 2. Entity-Extraktion",
"- entities = self.entity_extractor.extract(text)",
"- entity_ids = {}",
"- for entity in entities:",
"- entity_id = self.entity_repo.find_or_create(entity)",
"- self.entity_repo.link_to_chunk(chunk_id, entity_id)",
"- entity_ids[entity.name] = entity_id",
"- results[\"entities\"] = len(entities)",
"-",
"- # 3. Relation-Extraktion",
"- if len(entities) >= 2:",
"- relations = self.relation_extractor.extract(text, entities)",
"- for rel in relations:",
"- source_id = entity_ids.get(rel.source)",
"- target_id = entity_ids.get(rel.target)",
"- if source_id and target_id:",
"- self.entity_repo.save_relation(source_id, target_id, rel, chunk_id)",
"- results[\"relations\"] = len(relations)",
"-",
"- # 4. Taxonomie-Klassifikation",
"- categories, confidence = self.taxonomy_classifier.classify(text)",
"- for cat in categories:",
"- self.taxonomy_repo.link_to_chunk(chunk_id, cat, confidence)",
"- results[\"categories\"] = categories",
"-",
"- results[\"success\"] = True",
"- return results",
"-",
"- def run(self, limit: int = BATCH_SIZE) -> dict:",
"- \"\"\"Führe Pipeline aus.\"\"\"",
"- db.log(\"INFO\", f\"Starting semantic chunk analysis (limit={limit})\")",
"-",
"- chunks = self.chunk_repo.get_pending_chunks(limit)",
"- if not chunks:",
"- print(\"Keine Chunks zur Analyse gefunden.\")",
"- return {\"processed\": 0}",
"-",
"- print(f\"Analysiere {len(chunks)} Chunks...\")",
"-",
"- success = 0",
"- failed = 0",
"-",
"- for i, chunk in enumerate(chunks, 1):",
"- print(f\"[{i}\/{len(chunks)}] Chunk #{chunk['id']}...\", end=\" \")",
"-",
"- try:",
"- result = self.analyze_chunk(chunk)",
"- if result[\"success\"]:",
"- print(f\"✓ {result.get('entities', 0)} Entitäten, {result.get('categories', [])}\")",
"- success += 1",
"- else:",
"- print(\"✗ Fehler\")",
"- failed += 1",
"- except Exception as e:",
"- print(f\"✗ {e}\")",
"- db.log(\"ERROR\", f\"Chunk {chunk['id']} failed: {e}\")",
"- failed += 1",
"-",
"- db.log(\"INFO\", f\"Analysis complete: {success} success, {failed} failed\")",
"- return {\"processed\": len(chunks), \"success\": success, \"failed\": failed}",
"-",
"- def status(self) -> dict:",
"- \"\"\"Zeige Status.\"\"\"",
"- return self.chunk_repo.get_stats()",
"-",
"-",
"-# === CLI ===",
" def main():",
" \"\"\"CLI Entry Point.\"\"\"",
" if len(sys.argv) < 2:"
]
}
],
"originalFile": "#!\/usr\/bin\/env python3\n\"\"\"\nSemantic Chunk Analyzer - Chunk-Level Semantic Analysis Pipeline\n\nAnalysiert Chunks aus der ki_system.chunks Tabelle und befüllt:\n- chunk_semantics (summary, keywords, sentiment, topics, language)\n- entities (extrahierte Entitäten)\n- chunk_entities (Verknüpfung Chunk <-> Entity)\n- entity_relations (Beziehungen zwischen Entitäten)\n- taxonomy_terms + chunk_taxonomy (Kategorisierung)\n\nPrinzipien: DRY, KISS, SRP, YAGNI\n- Eine Klasse pro Verantwortlichkeit\n- Ollama für LLM-Aufrufe (lokal, kostenfrei)\n- Batch-Verarbeitung für Effizienz\n\nUsage:\n python semantic_chunk_analyzer.py analyze [--limit N]\n python semantic_chunk_analyzer.py status\n python semantic_chunk_analyzer.py reset\n\"\"\"\n\nimport json\nimport sys\nfrom dataclasses import dataclass\n\nimport requests\n\nfrom config import OLLAMA_HOST\nfrom db import db\n\n# === Configuration ===\nANALYSIS_MODEL = \"gemma3:27b-it-qat\" # Beste JSON-Compliance und Qualität\nBATCH_SIZE = 10\n\n\n# === Data Classes (SRP) ===\n@dataclass\nclass ChunkSemantics:\n \"\"\"Semantische Analyse eines Chunks.\"\"\"\n\n chunk_id: int\n summary: str\n keywords: list[str]\n sentiment: str # positive, neutral, negative, mixed\n topics: list[str]\n language: str\n\n\n@dataclass\nclass Entity:\n \"\"\"Extrahierte Entität.\"\"\"\n\n name: str\n entity_type: str # PERSON, ORGANIZATION, CONCEPT, LOCATION, OTHER\n description: str | None = None\n\n\n@dataclass\nclass Relation:\n \"\"\"Beziehung zwischen Entitäten.\"\"\"\n\n source: str\n relation_type: str\n target: str\n strength: float = 0.5\n\n\n# === LLM Service (SRP) ===\nclass OllamaService:\n \"\"\"Ollama API Wrapper - Single Responsibility: LLM Kommunikation.\"\"\"\n\n def __init__(self, host: str = OLLAMA_HOST, model: str = ANALYSIS_MODEL):\n self.host = host\n self.model = model\n\n def generate(self, prompt: str, json_format: bool = True) -> dict | None:\n \"\"\"Generiere Antwort von Ollama.\"\"\"\n try:\n payload = {\n \"model\": self.model,\n \"prompt\": prompt,\n \"stream\": False,\n \"options\": {\"temperature\": 0.3, \"num_predict\": 1000},\n }\n if json_format:\n payload[\"format\"] = \"json\"\n\n response = requests.post(f\"{self.host}\/api\/generate\", json=payload, timeout=120)\n response.raise_for_status()\n\n text = response.json().get(\"response\", \"{}\")\n if json_format:\n return self._parse_json(text)\n return {\"text\": text}\n except Exception as e:\n db.log(\"ERROR\", f\"Ollama error: {e}\")\n return None\n\n def _parse_json(self, text: str) -> dict | None:\n \"\"\"Parse JSON aus Antwort.\"\"\"\n try:\n return json.loads(text)\n except json.JSONDecodeError:\n # Versuche JSON aus Text zu extrahieren\n import re\n\n match = re.search(r\"\\{[\\s\\S]*\\}\", text)\n if match:\n try:\n return json.loads(match.group())\n except json.JSONDecodeError:\n pass\n return None\n\n\n# === Analyzer Classes (SRP) ===\nclass SemanticsAnalyzer:\n \"\"\"Analysiert Chunk-Semantik: Summary, Keywords, Sentiment.\"\"\"\n\n PROMPT = \"\"\"Analysiere diesen deutschen Text und erstelle eine semantische Analyse.\n\nText:\n{text}\n\nAntworte NUR als JSON:\n{{\n \"summary\": \"Zusammenfassung in 1-2 Sätzen\",\n \"keywords\": [\"keyword1\", \"keyword2\", \"keyword3\"],\n \"sentiment\": \"positive|neutral|negative|mixed\",\n \"topics\": [\"thema1\", \"thema2\"],\n \"language\": \"de|en\"\n}}\"\"\"\n\n def __init__(self, llm: OllamaService):\n self.llm = llm\n\n def analyze(self, chunk_id: int, text: str) -> ChunkSemantics | None:\n \"\"\"Analysiere einen Chunk.\"\"\"\n result = self.llm.generate(self.PROMPT.format(text=text[:2000]))\n if not result:\n return None\n\n # Robuste Extraktion mit Typ-Validierung\n summary = result.get(\"summary\", \"\")\n if isinstance(summary, list):\n summary = summary[0] if summary else \"\"\n\n keywords = result.get(\"keywords\", [])\n if not isinstance(keywords, list):\n keywords = [str(keywords)] if keywords else []\n keywords = [str(k) for k in keywords if k and not isinstance(k, (list, dict))][:10]\n\n topics = result.get(\"topics\", [])\n if not isinstance(topics, list):\n topics = [str(topics)] if topics else []\n topics = [str(t) for t in topics if t and not isinstance(t, (list, dict))][:5]\n\n language = result.get(\"language\", \"de\")\n if isinstance(language, list):\n language = language[0] if language else \"de\"\n\n return ChunkSemantics(\n chunk_id=chunk_id,\n summary=str(summary)[:1000],\n keywords=keywords,\n sentiment=self._validate_sentiment(result.get(\"sentiment\", \"neutral\")),\n topics=topics,\n language=str(language)[:5],\n )\n\n def _validate_sentiment(self, sentiment) -> str:\n \"\"\"Validiere Sentiment-Wert.\"\"\"\n if isinstance(sentiment, list):\n sentiment = sentiment[0] if sentiment else \"neutral\"\n if not isinstance(sentiment, str):\n return \"neutral\"\n valid = {\"positive\", \"neutral\", \"negative\", \"mixed\"}\n return sentiment.lower() if sentiment.lower() in valid else \"neutral\"\n\n\nclass EntityExtractor:\n \"\"\"Extrahiert Entitäten aus Text.\"\"\"\n\n PROMPT = \"\"\"Extrahiere alle wichtigen Entitäten aus diesem deutschen Text.\n\nKategorien:\n- PERSON: Namen von Personen\n- ORGANIZATION: Firmen, Institutionen\n- CONCEPT: Fachbegriffe, Methoden, Theorien\n- LOCATION: Orte, Länder\n- OTHER: Sonstiges\n\nText:\n{text}\n\nAntworte NUR als JSON:\n{{\n \"entities\": [\n {{\"name\": \"Name\", \"type\": \"CONCEPT\", \"description\": \"Kurze Beschreibung\"}}\n ]\n}}\"\"\"\n\n def __init__(self, llm: OllamaService):\n self.llm = llm\n\n def extract(self, text: str) -> list[Entity]:\n \"\"\"Extrahiere Entitäten aus Text.\"\"\"\n result = self.llm.generate(self.PROMPT.format(text=text[:2000]))\n if not result:\n return []\n\n entities = []\n for e in result.get(\"entities\", []):\n name = e.get(\"name\")\n etype = e.get(\"type\")\n desc = e.get(\"description\")\n\n # Validiere: name und type müssen Strings sein\n if isinstance(name, list):\n name = name[0] if name else None\n if isinstance(etype, list):\n etype = etype[0] if etype else None\n if isinstance(desc, list):\n desc = desc[0] if desc else None\n\n if name and isinstance(name, str) and etype:\n entities.append(\n Entity(\n name=str(name)[:200], # Limit length\n entity_type=self._validate_type(str(etype)),\n description=str(desc)[:500] if desc else None,\n )\n )\n return entities[:20] # Max 20 pro Chunk\n\n def _validate_type(self, entity_type: str) -> str:\n \"\"\"Validiere Entity-Typ.\"\"\"\n valid = {\"PERSON\", \"ORGANIZATION\", \"CONCEPT\", \"LOCATION\", \"OTHER\"}\n return entity_type.upper() if entity_type.upper() in valid else \"OTHER\"\n\n\nclass RelationExtractor:\n \"\"\"Extrahiert Beziehungen zwischen Entitäten.\"\"\"\n\n PROMPT = \"\"\"Finde Beziehungen zwischen diesen Entitäten im Text.\n\nEntitäten: {entities}\n\nBeziehungstypen:\n- RELATED_TO: steht in Beziehung zu\n- PART_OF: ist Teil von\n- DEVELOPED_BY: wurde entwickelt von\n- USED_IN: wird verwendet in\n- INFLUENCED_BY: wurde beeinflusst von\n\nText:\n{text}\n\nAntworte NUR als JSON:\n{{\n \"relations\": [\n {{\"source\": \"Entity1\", \"relation\": \"RELATED_TO\", \"target\": \"Entity2\", \"strength\": 0.8}}\n ]\n}}\"\"\"\n\n def __init__(self, llm: OllamaService):\n self.llm = llm\n\n def extract(self, text: str, entities: list[Entity]) -> list[Relation]:\n \"\"\"Extrahiere Relationen zwischen Entitäten.\"\"\"\n if len(entities) < 2:\n return []\n\n entity_names = \", \".join([e.name for e in entities[:15]])\n result = self.llm.generate(self.PROMPT.format(entities=entity_names, text=text[:1500]))\n\n if not result:\n return []\n\n relations = []\n for r in result.get(\"relations\", []):\n source = r.get(\"source\")\n target = r.get(\"target\")\n rel_type = r.get(\"relation\")\n strength = r.get(\"strength\", 0.5)\n\n # Validiere Typen\n if isinstance(source, list):\n source = source[0] if source else None\n if isinstance(target, list):\n target = target[0] if target else None\n if isinstance(rel_type, list):\n rel_type = rel_type[0] if rel_type else None\n\n if source and target and rel_type and isinstance(source, str) and isinstance(target, str):\n try:\n strength_val = float(strength) if not isinstance(strength, list) else 0.5\n except (ValueError, TypeError):\n strength_val = 0.5\n relations.append(\n Relation(\n source=str(source)[:200],\n relation_type=str(rel_type)[:50],\n target=str(target)[:200],\n strength=min(1.0, max(0.0, strength_val)),\n )\n )\n return relations[:10] # Max 10 pro Chunk\n\n\nclass TaxonomyClassifier:\n \"\"\"Klassifiziert Chunks in Taxonomie-Kategorien.\"\"\"\n\n # Standard-Taxonomie für systemische Inhalte\n CATEGORIES = [\n \"Methoden\",\n \"Theorie\",\n \"Praxis\",\n \"Kommunikation\",\n \"Organisation\",\n \"Entwicklung\",\n \"Coaching\",\n \"Therapie\",\n ]\n\n PROMPT = \"\"\"Klassifiziere diesen Text in passende Kategorien.\n\nKategorien: {categories}\n\nText:\n{text}\n\nAntworte NUR al... [TRUNCATED-a0b3b9e3fdc8ca42]"
}
}