{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/analyzers\/entity_extractor.py",
"content": "\"\"\"\nEntity Extraction - Extract and store entities from text.\n\"\"\"\n\nimport json\nimport re\nimport sys\nimport time\nimport unicodedata\n\nimport requests\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom config import ANTHROPIC_MODEL, OLLAMA_HOST\nfrom db import db\nfrom protokoll import protokoll\n\n\ndef normalize_name(name: str) -> str:\n \"\"\"Generate canonical_name from entity name.\n\n Rules:\n - Lowercase\n - German umlauts: ä→ae, ö→oe, ü→ue, ß→ss\n - Replace spaces with underscores\n - Remove special characters except underscores\n - Collapse multiple underscores\n \"\"\"\n if not name:\n return \"\"\n\n result = name.lower()\n\n replacements = {\n \"ä\": \"ae\", \"ö\": \"oe\", \"ü\": \"ue\", \"ß\": \"ss\",\n \"Ä\": \"ae\", \"Ö\": \"oe\", \"Ü\": \"ue\",\n }\n for old, new in replacements.items():\n result = result.replace(old, new)\n\n result = unicodedata.normalize(\"NFKD\", result)\n result = result.encode(\"ascii\", \"ignore\").decode(\"ascii\")\n result = re.sub(r\"[\\s\\-]+\", \"_\", result)\n result = re.sub(r\"[^a-z0-9_]\", \"\", result)\n result = re.sub(r\"_+\", \"_\", result)\n result = result.strip(\"_\")\n\n return result\n\n\ndef extract_entities_ollama(text: str, model: str = \"gemma3:27b-it-qat\") -> list[dict]:\n \"\"\"Extract entities using Ollama.\"\"\"\n prompt_data = db.get_prompt_by_use_case(\"entity_extraction\")\n prompt_template = prompt_data[\"content\"] if prompt_data else None\n\n if not prompt_template:\n db.log(\"WARNING\", \"entity_extraction prompt not found in DB, using fallback\")\n prompt_template = \"\"\"Analysiere den Text und extrahiere wichtige Entitäten.\nKategorisiere als: PERSON, ORGANIZATION, CONCEPT, LOCATION\nAntworte NUR im JSON-Format:\n{\"entities\": [{\"name\": \"...\", \"type\": \"...\", \"description\": \"...\"}]}\n\nText:\n{{TEXT}}\"\"\"\n\n prompt = prompt_template.replace(\"{{TEXT}}\", text[:3000])\n\n try:\n start_time = time.time()\n response = requests.post(\n f\"{OLLAMA_HOST}\/api\/generate\",\n json={\"model\": model, \"prompt\": prompt, \"stream\": False, \"format\": \"json\"},\n timeout=120,\n )\n response.raise_for_status()\n data = response.json()\n duration_ms = int((time.time() - start_time) * 1000)\n\n response_text = data.get(\"response\", \"{}\")\n\n protokoll.log_llm_call(\n request=f\"[entity_extraction] {prompt[:500]}...\",\n response=response_text[:2000],\n model_name=f\"ollama:{model}\",\n tokens_input=data.get(\"prompt_eval_count\", 0),\n tokens_output=data.get(\"eval_count\", 0),\n duration_ms=duration_ms,\n status=\"completed\",\n )\n\n try:\n entities = json.loads(response_text)\n return entities.get(\"entities\", [])\n except json.JSONDecodeError:\n db.log(\"WARNING\", \"Failed to parse entity JSON from Ollama\")\n return []\n except Exception as e:\n db.log(\"ERROR\", f\"Ollama entity extraction failed: {e}\")\n protokoll.log_llm_call(\n request=f\"[entity_extraction] {prompt[:500]}...\",\n model_name=f\"ollama:{model}\",\n status=\"error\",\n error_message=str(e),\n )\n return []\n\n\ndef extract_entities_anthropic(text: str, client) -> list[dict]:\n \"\"\"Extract entities using Anthropic Claude.\"\"\"\n prompt_data = db.get_prompt_by_use_case(\"entity_extraction\")\n prompt_template = prompt_data[\"content\"] if prompt_data else None\n\n if not prompt_template:\n prompt_template = \"\"\"Analysiere den folgenden deutschen Text und extrahiere alle wichtigen Entitäten.\n\nKategorisiere jede Entität als:\n- PERSON (Namen von Personen)\n- ORGANIZATION (Firmen, Institutionen, Gruppen)\n- CONCEPT (Fachbegriffe, Methoden, Theorien)\n- LOCATION (Orte, Länder)\n- DATE (Zeitangaben)\n- OTHER (Sonstiges)\n\nAntworte NUR im JSON-Format:\n{\"entities\": [{\"name\": \"...\", \"type\": \"...\", \"context\": \"kurzer Kontext der Erwähnung\"}]}\n\nText:\n{{TEXT}}\"\"\"\n\n prompt = prompt_template.replace(\"{{TEXT}}\", text[:4000])\n\n try:\n start_time = time.time()\n message = client.messages.create(\n model=ANTHROPIC_MODEL, max_tokens=2000, messages=[{\"role\": \"user\", \"content\": prompt}]\n )\n duration_ms = int((time.time() - start_time) * 1000)\n\n response_text = message.content[0].text\n\n protokoll.log_llm_call(\n request=f\"[entity_extraction] {prompt[:500]}...\",\n response=response_text[:2000],\n model_name=ANTHROPIC_MODEL,\n tokens_input=message.usage.input_tokens,\n tokens_output=message.usage.output_tokens,\n duration_ms=duration_ms,\n status=\"completed\",\n )\n\n json_match = re.search(r\"\\{[\\s\\S]*\\}\", response_text)\n if json_match:\n entities = json.loads(json_match.group())\n return entities.get(\"entities\", [])\n return []\n except Exception as e:\n db.log(\"ERROR\", f\"Anthropic entity extraction failed: {e}\")\n protokoll.log_llm_call(\n request=f\"[entity_extraction] {prompt[:500]}...\",\n model_name=ANTHROPIC_MODEL,\n status=\"error\",\n error_message=str(e),\n )\n return []\n\n\ndef store_entities(document_id: int, entities: list[dict]) -> int:\n \"\"\"Store extracted entities in database with deduplication via canonical_name.\"\"\"\n stored = 0\n\n for entity in entities:\n try:\n description = entity.get(\"description\") or entity.get(\"context\") or None\n canonical = normalize_name(entity[\"name\"])\n\n # Check for existing entity by canonical_name first (deduplication)\n cursor = db.execute(\n \"SELECT id, description FROM entities WHERE canonical_name = %s AND type = %s\",\n (canonical, entity[\"type\"]),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n # Fallback: check by exact name\n if not existing:\n cursor = db.execute(\n \"SELECT id, description FROM entities WHERE name = %s AND type = %s\",\n (entity[\"name\"], entity[\"type\"]),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n entity_id = existing[\"id\"]\n # Update description and canonical_name if missing\n if description and not existing[\"description\"]:\n cursor = db.execute(\n \"UPDATE entities SET description = %s, canonical_name = %s WHERE id = %s\",\n (description, canonical, entity_id),\n )\n db.commit()\n cursor.close()\n else:\n # Ensure canonical_name is set\n cursor = db.execute(\n \"UPDATE entities SET canonical_name = %s WHERE id = %s AND canonical_name IS NULL\",\n (canonical, entity_id),\n )\n db.commit()\n cursor.close()\n else:\n # Insert new entity with canonical_name and status='normalized'\n cursor = db.execute(\n \"\"\"INSERT INTO entities (name, type, description, canonical_name, status, created_at)\n VALUES (%s, %s, %s, %s, 'normalized', NOW())\"\"\",\n (entity[\"name\"], entity[\"type\"], description, canonical),\n )\n db.commit()\n entity_id = cursor.lastrowid\n cursor.close()\n\n db.log_provenance(\n artifact_type=\"entity\",\n artifact_id=entity_id,\n source_type=\"extraction\",\n source_id=document_id,\n pipeline_step=\"entity_extract\",\n )\n\n # Link entity to document via document_entities\n try:\n cursor = db.execute(\n \"\"\"INSERT IGNORE INTO document_entities (document_id, entity_id, relevance, created_at)\n VALUES (%s, %s, 0.8, NOW())\"\"\",\n (document_id, entity_id),\n )\n db.commit()\n cursor.close()\n except Exception as link_err:\n db.log(\"WARNING\", f\"Failed to link entity {entity_id} to document {document_id}: {link_err}\")\n\n stored += 1\n\n except Exception as e:\n db.log(\"WARNING\", f\"Failed to store entity: {e}\")\n\n return stored\n\n\ndef find_entity_by_name(name: str) -> dict | None:\n \"\"\"Find entity by name or canonical_name.\"\"\"\n name_lower = name.lower().strip()\n canonical = normalize_name(name)\n\n cursor = db.execute(\n \"SELECT id, name, canonical_name FROM entities WHERE canonical_name = %s LIMIT 1\",\n (canonical,),\n )\n result = cursor.fetchone()\n cursor.close()\n if result:\n return result\n\n cursor = db.execute(\n \"SELECT id, name, canonical_name FROM entities WHERE LOWER(name) = %s LIMIT 1\",\n (name_lower,),\n )\n result = cursor.fetchone()\n cursor.close()\n if result:\n return result\n\n cursor = db.execute(\n \"SELECT id, name, canonical_name FROM entities WHERE canonical_name LIKE %s LIMIT 1\",\n (f\"%{canonical}%\",),\n )\n result = cursor.fetchone()\n cursor.close()\n return result\n",
"numLines": 281,
"startLine": 1,
"totalLines": 281
}
}
}