{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/semantic_chunk\/repositories.py",
"content": "\"\"\"\nRepository classes for semantic chunk data persistence.\n\"\"\"\n\nimport json\nimport sys\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom db import db\n\nfrom .models import ChunkSemantics, Relation\nfrom .ollama_service import ANALYSIS_MODEL\n\nBATCH_SIZE = 10\n\n\nclass ChunkRepository:\n \"\"\"Datenbankzugriff für Chunks.\"\"\"\n\n def get_pending_chunks(self, limit: int = BATCH_SIZE) -> list[dict]:\n \"\"\"Hole Chunks ohne Semantik-Analyse.\"\"\"\n cursor = db.execute(\n \"\"\"\n SELECT c.id, c.content, c.document_id\n FROM chunks c\n LEFT JOIN chunk_semantics cs ON c.id = cs.chunk_id\n WHERE cs.id IS NULL\n ORDER BY c.id\n LIMIT %s\n \"\"\",\n (limit,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def get_stats(self) -> dict:\n \"\"\"Hole Statistiken.\"\"\"\n cursor = db.execute(\"SELECT COUNT(*) as total FROM chunks\")\n total = cursor.fetchone()[\"total\"]\n cursor.close()\n\n cursor = db.execute(\"SELECT COUNT(*) as analyzed FROM chunk_semantics\")\n analyzed = cursor.fetchone()[\"analyzed\"]\n cursor.close()\n\n cursor = db.execute(\"SELECT COUNT(*) as entities FROM entities\")\n entities = cursor.fetchone()[\"entities\"]\n cursor.close()\n\n return {\"total\": total, \"analyzed\": analyzed, \"entities\": entities}\n\n\nclass SemanticsRepository:\n \"\"\"Datenbankzugriff für Semantik-Daten.\"\"\"\n\n def save_semantics(self, sem: ChunkSemantics) -> int:\n \"\"\"Speichere Chunk-Semantik.\"\"\"\n cursor = db.execute(\n \"\"\"\n INSERT INTO chunk_semantics\n (chunk_id, summary, keywords, sentiment, topics, language, analyzed_at, analysis_model)\n VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s)\n ON DUPLICATE KEY UPDATE\n summary = VALUES(summary),\n keywords = VALUES(keywords),\n sentiment = VALUES(sentiment),\n topics = VALUES(topics),\n language = VALUES(language),\n analyzed_at = NOW()\n \"\"\",\n (\n sem.chunk_id,\n sem.summary,\n json.dumps(sem.keywords, ensure_ascii=False),\n sem.sentiment,\n json.dumps(sem.topics, ensure_ascii=False),\n sem.language,\n ANALYSIS_MODEL,\n ),\n )\n db.commit()\n sem_id = cursor.lastrowid\n cursor.close()\n return sem_id\n\n\nclass EntityRepository:\n \"\"\"Datenbankzugriff für Entitäten.\"\"\"\n\n def find_or_create(self, entity) -> int:\n \"\"\"Finde oder erstelle Entität.\"\"\"\n cursor = db.execute(\"SELECT id FROM entities WHERE name = %s AND type = %s\", (entity.name, entity.entity_type))\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n return existing[\"id\"]\n\n cursor = db.execute(\n \"\"\"\n INSERT INTO entities (name, type, description, created_at)\n VALUES (%s, %s, %s, NOW())\n \"\"\",\n (entity.name, entity.entity_type, entity.description),\n )\n db.commit()\n entity_id = cursor.lastrowid\n cursor.close()\n return entity_id\n\n def link_to_chunk(self, chunk_id: int, entity_id: int, relevance: float = 1.0):\n \"\"\"Verknüpfe Entity mit Chunk.\"\"\"\n cursor = db.execute(\n \"\"\"\n INSERT INTO chunk_entities (chunk_id, entity_id, relevance_score, mention_count)\n VALUES (%s, %s, %s, 1)\n ON DUPLICATE KEY UPDATE\n mention_count = mention_count + 1\n \"\"\",\n (chunk_id, entity_id, relevance),\n )\n db.commit()\n cursor.close()\n\n def save_relation(self, source_id: int, target_id: int, rel: Relation, chunk_id: int):\n \"\"\"Speichere Relation.\"\"\"\n cursor = db.execute(\n \"\"\"\n INSERT INTO entity_relations\n (source_entity_id, target_entity_id, relation_type, strength, chunk_id)\n VALUES (%s, %s, %s, %s, %s)\n ON DUPLICATE KEY UPDATE\n strength = GREATEST(strength, VALUES(strength))\n \"\"\",\n (source_id, target_id, rel.relation_type, rel.strength, chunk_id),\n )\n db.commit()\n cursor.close()\n\n\nclass TaxonomyRepository:\n \"\"\"Datenbankzugriff für Taxonomie.\"\"\"\n\n def ensure_terms_exist(self, terms: list[str]):\n \"\"\"Stelle sicher, dass Taxonomie-Terms existieren.\"\"\"\n for term in terms:\n cursor = db.execute(\"SELECT id FROM taxonomy_terms WHERE name = %s\", (term,))\n if not cursor.fetchone():\n cursor.close()\n slug = term.lower().replace(\" \", \"-\")\n cursor = db.execute(\n \"\"\"\n INSERT INTO taxonomy_terms (name, slug, depth)\n VALUES (%s, %s, 0)\n \"\"\",\n (term, slug),\n )\n db.commit()\n cursor.close()\n\n def link_to_chunk(self, chunk_id: int, category: str, confidence: float):\n \"\"\"Verknüpfe Chunk mit Taxonomie-Term.\"\"\"\n cursor = db.execute(\"SELECT id FROM taxonomy_terms WHERE name = %s\", (category,))\n term = cursor.fetchone()\n cursor.close()\n\n if term:\n cursor = db.execute(\n \"\"\"\n INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence)\n VALUES (%s, %s, %s)\n ON DUPLICATE KEY UPDATE confidence = VALUES(confidence)\n \"\"\",\n (chunk_id, term[\"id\"], confidence),\n )\n db.commit()\n cursor.close()\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/scripts\/pipeline\/semantic_chunk\/repositories.py",
"content": "\"\"\"\nRepository classes for semantic chunk data persistence.\n\"\"\"\n\nimport json\nimport sys\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom db import db\n\nfrom .models import ChunkSemantics, Relation\nfrom .ollama_service import ANALYSIS_MODEL\n\nBATCH_SIZE = 10\n\n\nclass ChunkRepository:\n \"\"\"Datenbankzugriff für Chunks.\"\"\"\n\n def get_pending_chunks(self, limit: int = BATCH_SIZE) -> list[dict]:\n \"\"\"Hole Chunks ohne Semantik-Analyse.\"\"\"\n cursor = db.execute(\n \"\"\"\n SELECT c.id, c.content, c.document_id\n FROM chunks c\n LEFT JOIN chunk_semantics cs ON c.id = cs.chunk_id\n WHERE cs.id IS NULL\n ORDER BY c.id\n LIMIT %s\n \"\"\",\n (limit,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def get_stats(self) -> dict:\n \"\"\"Hole Statistiken.\"\"\"\n cursor = db.execute(\"SELECT COUNT(*) as total FROM chunks\")\n total = cursor.fetchone()[\"total\"]\n cursor.close()\n\n cursor = db.execute(\"SELECT COUNT(*) as analyzed FROM chunk_semantics\")\n analyzed = cursor.fetchone()[\"analyzed\"]\n cursor.close()\n\n cursor = db.execute(\"SELECT COUNT(*) as entities FROM entities\")\n entities = cursor.fetchone()[\"entities\"]\n cursor.close()\n\n return {\"total\": total, \"analyzed\": analyzed, \"entities\": entities}\n\n\nclass SemanticsRepository:\n \"\"\"Datenbankzugriff für Semantik-Daten.\"\"\"\n\n def save_semantics(self, sem: ChunkSemantics) -> int:\n \"\"\"Speichere Chunk-Semantik.\"\"\"\n cursor = db.execute(\n \"\"\"\n INSERT INTO chunk_semantics\n (chunk_id, summary, keywords, sentiment, topics, language, analyzed_at, analysis_model)\n VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s)\n ON DUPLICATE KEY UPDATE\n summary = VALUES(summary),\n keywords = VALUES(keywords),\n sentiment = VALUES(sentiment),\n topics = VALUES(topics),\n language = VALUES(language),\n analyzed_at = NOW()\n \"\"\",\n (\n sem.chunk_id,\n sem.summary,\n json.dumps(sem.keywords, ensure_ascii=False),\n sem.sentiment,\n json.dumps(sem.topics, ensure_ascii=False),\n sem.language,\n ANALYSIS_MODEL,\n ),\n )\n db.commit()\n sem_id = cursor.lastrowid\n cursor.close()\n return sem_id\n\n\nclass EntityRepository:\n \"\"\"Datenbankzugriff für Entitäten.\"\"\"\n\n def find_or_create(self, entity) -> int:\n \"\"\"Finde oder erstelle Entität.\"\"\"\n cursor = db.execute(\"SELECT id FROM entities WHERE name = %s AND type = %s\", (entity.name, entity.entity_type))\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n return existing[\"id\"]\n\n cursor = db.execute(\n \"\"\"\n INSERT INTO entities (name, type, description, created_at)\n VALUES (%s, %s, %s, NOW())\n \"\"\",\n (entity.name, entity.entity_type, entity.description),\n )\n db.commit()\n entity_id = cursor.lastrowid\n cursor.close()\n return entity_id\n\n def link_to_chunk(self, chunk_id: int, entity_id: int, relevance: float = 1.0):\n \"\"\"Verknüpfe Entity mit Chunk.\"\"\"\n cursor = db.execute(\n \"\"\"\n INSERT INTO chunk_entities (chunk_id, entity_id, relevance_score, mention_count)\n VALUES (%s, %s, %s, 1)\n ON DUPLICATE KEY UPDATE\n mention_count = mention_count + 1\n \"\"\",\n (chunk_id, entity_id, relevance),\n )\n db.commit()\n cursor.close()\n\n def save_relation(self, source_id: int, target_id: int, rel: Relation, chunk_id: int):\n \"\"\"Speichere Relation.\"\"\"\n cursor = db.execute(\n \"\"\"\n INSERT INTO entity_relations\n (source_entity_id, target_entity_id, relation_type, strength, chunk_id)\n VALUES (%s, %s, %s, %s, %s)\n ON DUPLICATE KEY UPDATE\n strength = GREATEST(strength, VALUES(strength))\n \"\"\",\n (source_id, target_id, rel.relation_type, rel.strength, chunk_id),\n )\n db.commit()\n cursor.close()\n\n\nclass TaxonomyRepository:\n \"\"\"Datenbankzugriff für Taxonomie.\"\"\"\n\n def ensure_terms_exist(self, terms: list[str]):\n \"\"\"Stelle sicher, dass Taxonomie-Terms existieren.\"\"\"\n for term in terms:\n cursor = db.execute(\"SELECT id FROM taxonomy_terms WHERE name = %s\", (term,))\n if not cursor.fetchone():\n cursor.close()\n slug = term.lower().replace(\" \", \"-\")\n cursor = db.execute(\n \"\"\"\n INSERT INTO taxonomy_terms (name, slug, depth)\n VALUES (%s, %s, 0)\n \"\"\",\n (term, slug),\n )\n db.commit()\n cursor.close()\n\n def link_to_chunk(self, chunk_id: int, category: str, confidence: float):\n \"\"\"Verknüpfe Chunk mit Taxonomie-Term.\"\"\"\n cursor = db.execute(\"SELECT id FROM taxonomy_terms WHERE name = %s\", (category,))\n term = cursor.fetchone()\n cursor.close()\n\n if term:\n cursor = db.execute(\n \"\"\"\n INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence)\n VALUES (%s, %s, %s)\n ON DUPLICATE KEY UPDATE confidence = VALUES(confidence)\n \"\"\",\n (chunk_id, term[\"id\"], confidence),\n )\n db.commit()\n cursor.close()\n",
"structuredPatch": [],
"originalFile": null
}
}