{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/knowledge\/entity_extractor.py",
"content": "\"\"\"Entitäten-Extraktion für Wissensextraktion.\"\"\"\nimport sys\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom db import db\n\nfrom .models import KnowledgeLevel, KnowledgeType\nfrom .llm_service import LLMService\n\n\nclass EntityExtractor:\n \"\"\"Extrahiert Entitäten aus Text.\"\"\"\n\n def __init__(self, llm_service: LLMService, store_knowledge_fn):\n \"\"\"Initialisiere mit LLM-Service und Storage-Funktion.\"\"\"\n self.llm = llm_service\n self.store_knowledge = store_knowledge_fn\n\n def extract_entities(self, text: str, level: KnowledgeLevel, source_id: int) -> list[dict]:\n \"\"\"\n Extrahiere Entitäten aus Text.\n\n Args:\n text: Eingabetext\n level: Ebene (PAGE, SECTION, DOCUMENT)\n source_id: ID der Quelle (page_id, section_id, document_id)\n\n Returns:\n Liste von Entitäten mit DB-IDs\n \"\"\"\n prompt = f\"\"\"Analysiere den folgenden deutschen Text und extrahiere alle wichtigen Entitäten.\n\nKategorien:\n- PERSON: Namen von Personen, Autoren, Therapeuten\n- ORGANIZATION: Firmen, Institute, Verbände\n- CONCEPT: Fachbegriffe, Theorien, Modelle\n- METHOD: Methoden, Techniken, Verfahren\n- TOOL: Werkzeuge, Instrumente, Materialien\n- LOCATION: Orte, Länder, Regionen\n- EVENT: Ereignisse, Konferenzen\n\nAntworte NUR als JSON:\n{{\"entities\": [\n {{\"name\": \"...\", \"type\": \"CONCEPT\", \"context\": \"kurzer Kontext\", \"importance\": 0.0-1.0}}\n]}}\n\nText ({level.value}-Ebene):\n{text[:4000]}\"\"\"\n\n result = self.llm.call_llm(prompt)\n data = self.llm.parse_json(result)\n entities = data.get(\"entities\", [])\n\n # Speichere und gleiche mit DB ab\n stored_entities = []\n for entity in entities:\n stored = self._store_entity(entity, level, source_id)\n if stored:\n stored_entities.append(stored)\n\n # Speichere in Knowledge-Tabelle\n self.store_knowledge(\n level,\n source_id,\n KnowledgeType.ENTITY,\n {\"entities\": [e[\"name\"] for e in stored_entities], \"count\": len(stored_entities)},\n )\n\n return stored_entities\n\n def _store_entity(self, entity: dict, level: KnowledgeLevel, source_id: int) -> dict | None:\n \"\"\"Speichere Entität in DB mit Duplikatprüfung.\"\"\"\n try:\n name = entity.get(\"name\", \"\").strip()\n entity_type = entity.get(\"type\", \"OTHER\").upper()\n context = entity.get(\"context\", \"\")\n\n if not name:\n return None\n\n # Prüfe ob Entität existiert (case-insensitive)\n cursor = db.execute(\n \"\"\"SELECT id, name, type, description\n FROM entities\n WHERE LOWER(name) = LOWER(%s) OR LOWER(canonical_name) = LOWER(%s)\n LIMIT 1\"\"\",\n (name, name),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n entity_id = existing[\"id\"]\n db.log(\"DEBUG\", f\"Entität '{name}' existiert bereits (ID: {entity_id})\")\n else:\n # Neue Entität anlegen\n cursor = db.execute(\n \"\"\"INSERT INTO entities (name, type, canonical_name, created_at)\n VALUES (%s, %s, %s, NOW())\"\"\",\n (name, entity_type, name.lower()),\n )\n db.commit()\n entity_id = cursor.lastrowid\n cursor.close()\n db.log(\"INFO\", f\"Neue Entität angelegt: '{name}' (ID: {entity_id})\")\n\n return {\"id\": entity_id, \"name\": name, \"type\": entity_type, \"context\": context, \"is_new\": existing is None}\n\n except Exception as e:\n db.log(\"ERROR\", f\"Fehler beim Speichern der Entität: {e}\")\n return None\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/scripts\/pipeline\/knowledge\/entity_extractor.py",
"content": "\"\"\"Entitäten-Extraktion für Wissensextraktion.\"\"\"\nimport sys\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom db import db\n\nfrom .models import KnowledgeLevel, KnowledgeType\nfrom .llm_service import LLMService\n\n\nclass EntityExtractor:\n \"\"\"Extrahiert Entitäten aus Text.\"\"\"\n\n def __init__(self, llm_service: LLMService, store_knowledge_fn):\n \"\"\"Initialisiere mit LLM-Service und Storage-Funktion.\"\"\"\n self.llm = llm_service\n self.store_knowledge = store_knowledge_fn\n\n def extract_entities(self, text: str, level: KnowledgeLevel, source_id: int) -> list[dict]:\n \"\"\"\n Extrahiere Entitäten aus Text.\n\n Args:\n text: Eingabetext\n level: Ebene (PAGE, SECTION, DOCUMENT)\n source_id: ID der Quelle (page_id, section_id, document_id)\n\n Returns:\n Liste von Entitäten mit DB-IDs\n \"\"\"\n prompt = f\"\"\"Analysiere den folgenden deutschen Text und extrahiere alle wichtigen Entitäten.\n\nKategorien:\n- PERSON: Namen von Personen, Autoren, Therapeuten\n- ORGANIZATION: Firmen, Institute, Verbände\n- CONCEPT: Fachbegriffe, Theorien, Modelle\n- METHOD: Methoden, Techniken, Verfahren\n- TOOL: Werkzeuge, Instrumente, Materialien\n- LOCATION: Orte, Länder, Regionen\n- EVENT: Ereignisse, Konferenzen\n\nAntworte NUR als JSON:\n{{\"entities\": [\n {{\"name\": \"...\", \"type\": \"CONCEPT\", \"context\": \"kurzer Kontext\", \"importance\": 0.0-1.0}}\n]}}\n\nText ({level.value}-Ebene):\n{text[:4000]}\"\"\"\n\n result = self.llm.call_llm(prompt)\n data = self.llm.parse_json(result)\n entities = data.get(\"entities\", [])\n\n # Speichere und gleiche mit DB ab\n stored_entities = []\n for entity in entities:\n stored = self._store_entity(entity, level, source_id)\n if stored:\n stored_entities.append(stored)\n\n # Speichere in Knowledge-Tabelle\n self.store_knowledge(\n level,\n source_id,\n KnowledgeType.ENTITY,\n {\"entities\": [e[\"name\"] for e in stored_entities], \"count\": len(stored_entities)},\n )\n\n return stored_entities\n\n def _store_entity(self, entity: dict, level: KnowledgeLevel, source_id: int) -> dict | None:\n \"\"\"Speichere Entität in DB mit Duplikatprüfung.\"\"\"\n try:\n name = entity.get(\"name\", \"\").strip()\n entity_type = entity.get(\"type\", \"OTHER\").upper()\n context = entity.get(\"context\", \"\")\n\n if not name:\n return None\n\n # Prüfe ob Entität existiert (case-insensitive)\n cursor = db.execute(\n \"\"\"SELECT id, name, type, description\n FROM entities\n WHERE LOWER(name) = LOWER(%s) OR LOWER(canonical_name) = LOWER(%s)\n LIMIT 1\"\"\",\n (name, name),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n entity_id = existing[\"id\"]\n db.log(\"DEBUG\", f\"Entität '{name}' existiert bereits (ID: {entity_id})\")\n else:\n # Neue Entität anlegen\n cursor = db.execute(\n \"\"\"INSERT INTO entities (name, type, canonical_name, created_at)\n VALUES (%s, %s, %s, NOW())\"\"\",\n (name, entity_type, name.lower()),\n )\n db.commit()\n entity_id = cursor.lastrowid\n cursor.close()\n db.log(\"INFO\", f\"Neue Entität angelegt: '{name}' (ID: {entity_id})\")\n\n return {\"id\": entity_id, \"name\": name, \"type\": entity_type, \"context\": context, \"is_new\": existing is None}\n\n except Exception as e:\n db.log(\"ERROR\", f\"Fehler beim Speichern der Entität: {e}\")\n return None\n",
"structuredPatch": [],
"originalFile": null
}
}