{
"tool_response": {
"filePath": "\/var\/www\/scripts\/pipeline\/analyzers\/entity_extractor.py",
"oldString": " # Build prompt from YAML or plain text\n prompt = _build_prompt_from_yaml(prompt_content, text[:4000])\n\n try:\n start_time = time.time()\n message = client.messages.create(\n model=ANTHROPIC_MODEL, max_tokens=2000, messages=[{\"role\": \"user\", \"content\": prompt}]\n )\n duration_ms = int((time.time() - start_time) * 1000)",
"newString": " # Build prompt from YAML or plain text\n prompt = _build_prompt_from_yaml(prompt_content, text[:PROMPT_TEXT_LIMIT])\n\n try:\n start_time = time.time()\n message = client.messages.create(\n model=ANTHROPIC_MODEL, max_tokens=LLM_MAX_TOKENS, messages=[{\"role\": \"user\", \"content\": prompt}]\n )\n duration_ms = int((time.time() - start_time) * MS_PER_SECOND)",
"originalFile": "\"\"\"\nEntity Extraction - Extract and store entities from text.\n\"\"\"\n\nimport json\nimport re\nimport sys\nimport time\nimport unicodedata\n\nimport requests\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom config import ANTHROPIC_MODEL, OLLAMA_HOST\nfrom constants import LLM_MAX_TOKENS, LLM_TIMEOUT, MS_PER_SECOND, PROMPT_TEXT_LIMIT\nfrom db import db\nfrom protokoll import protokoll\n\n\ndef _build_prompt_from_yaml(yaml_content: str, text: str) -> str:\n \"\"\"Pass YAML prompt directly to LLM with text placeholder replaced.\"\"\"\n return yaml_content.replace(\"{{TEXT}}\", text[:PROMPT_TEXT_LIMIT])\n\n\ndef normalize_name(name: str) -> str:\n \"\"\"Generate canonical_name from entity name.\n\n Rules:\n - Lowercase\n - German umlauts: ä→ae, ö→oe, ü→ue, ß→ss\n - Replace spaces with underscores\n - Remove special characters except underscores\n - Collapse multiple underscores\n \"\"\"\n if not name:\n return \"\"\n\n result = name.lower()\n\n replacements = {\n \"ä\": \"ae\",\n \"ö\": \"oe\",\n \"ü\": \"ue\",\n \"ß\": \"ss\",\n \"Ä\": \"ae\",\n \"Ö\": \"oe\",\n \"Ü\": \"ue\",\n }\n for old, new in replacements.items():\n result = result.replace(old, new)\n\n result = unicodedata.normalize(\"NFKD\", result)\n result = result.encode(\"ascii\", \"ignore\").decode(\"ascii\")\n result = re.sub(r\"[\\s\\-]+\", \"_\", result)\n result = re.sub(r\"[^a-z0-9_]\", \"\", result)\n result = re.sub(r\"_+\", \"_\", result)\n result = result.strip(\"_\")\n\n return result\n\n\n# Category to type mapping - loaded dynamically from DB\ndef _get_category_type_map() -> dict[str, str]:\n \"\"\"Build category mapping from entity_types table.\"\"\"\n types = db.get_entity_types()\n mapping = {}\n for t in types:\n # Map plural lowercase to uppercase code\n plural = t[\"code\"].lower() + \"s\"\n mapping[plural] = t[\"code\"]\n # Also map singular\n mapping[t[\"code\"].lower()] = t[\"code\"]\n return mapping\n\n\ndef _get_valid_type_codes() -> set[str]:\n \"\"\"Get valid entity type codes from DB.\"\"\"\n return db.get_entity_type_codes()\n\n\n# Stopword cache\n_stopword_cache: set[str] | None = None\n\n\ndef _get_stopwords() -> set[str]:\n \"\"\"Get stopwords from DB with caching.\"\"\"\n global _stopword_cache\n if _stopword_cache is None:\n _stopword_cache = set(db.get_stopwords())\n return _stopword_cache\n\n\ndef _is_stopword(entity_name: str) -> bool:\n \"\"\"Check if entity is a stopword (should be filtered out).\"\"\"\n canonical = normalize_name(entity_name)\n stopwords = _get_stopwords()\n return canonical in stopwords\n\n\ndef _validate_entity_in_text(entity_name: str, source_text: str) -> bool:\n \"\"\"Strictly validate that entity appears EXACTLY in source text.\"\"\"\n if not entity_name or len(entity_name) < 3:\n return False\n # Exact match required\n return entity_name in source_text\n\n\ndef _normalize_entity_response(result: dict, source_text: str) -> list[dict]:\n \"\"\"Normalize entity response to standard format with validation.\n\n Handles two formats:\n 1. New: {\"persons\":[], \"roles\":[], ...}\n 2. Legacy: {\"entities\": [...]}\n\n Also validates entities against source text to filter hallucinations.\n \"\"\"\n entities = []\n\n # Check for legacy format\n if \"entities\" in result:\n legacy_entities = result.get(\"entities\", [])\n # Validate legacy entities too\n for e in legacy_entities:\n if isinstance(e, dict) and \"name\" in e:\n if _validate_entity_in_text(e[\"name\"], source_text):\n entities.append(e)\n return entities\n\n # New categorized format\n category_map = _get_category_type_map()\n for category, items in result.items():\n if not isinstance(items, list):\n continue\n\n entity_type = category_map.get(category.lower(), category.upper())\n\n for item in items:\n if not item or not isinstance(item, str):\n continue\n\n # Strict validation: entity must appear EXACTLY in source text\n if not _validate_entity_in_text(item, source_text):\n continue # Skip hallucinations\n\n entities.append(\n {\n \"name\": item,\n \"type\": entity_type,\n \"description\": None,\n }\n )\n\n return entities\n\n\ndef _build_pass2_categories() -> str:\n \"\"\"Build categories section for pass2 prompt from entity_types table.\"\"\"\n types = db.get_entity_types()\n lines = []\n for t in types:\n lines.append(f\" {t['code']}: {t['criteria']}\")\n return \"\\n\".join(lines)\n\n\ndef _call_ollama(prompt: str, model: str, timeout: int = LLM_TIMEOUT) -> tuple[str, int, int, int]:\n \"\"\"Call Ollama API and return (response_text, tokens_in, tokens_out, duration_ms).\"\"\"\n start_time = time.time()\n response = requests.post(\n f\"{OLLAMA_HOST}\/api\/generate\",\n json={\"model\": model, \"prompt\": prompt, \"stream\": False, \"format\": \"json\"},\n timeout=timeout,\n )\n response.raise_for_status()\n data = response.json()\n duration_ms = int((time.time() - start_time) * MS_PER_SECOND)\n return (\n data.get(\"response\", \"{}\"),\n data.get(\"prompt_eval_count\", 0),\n data.get(\"eval_count\", 0),\n duration_ms,\n )\n\n\ndef extract_entities_ollama(text: str, model: str = \"gemma3:27b-it-qat\") -> list[dict]:\n \"\"\"Extract entities using 2-pass approach for better categorization.\n\n Pass 1: Extract entity names from text\n Pass 2: Categorize extracted entities\n Post: Normalize types using deterministic rules\n\n Falls back to single-pass if 2-pass prompts not available.\n \"\"\"\n # Try 2-pass approach first\n pass1_template = db.get_prompt(\"entity_extraction_pass1\")\n pass2_template = db.get_prompt(\"entity_extraction_pass2\")\n\n if pass1_template and pass2_template:\n entities = _extract_entities_2pass(text, pass1_template, pass2_template, model)\n else:\n # Fallback to single-pass\n entities = _extract_entities_single_pass(text, model)\n\n return entities\n\n\ndef _extract_entities_2pass(text: str, pass1_template: str, pass2_template: str, model: str) -> list[dict]:\n \"\"\"2-pass entity extraction: extract then categorize.\"\"\"\n try:\n # PASS 1: Extract entity names\n prompt1 = pass1_template.replace(\"{text}\", text[:PROMPT_TEXT_LIMIT])\n resp1, tok_in1, tok_out1, dur1 = _call_ollama(prompt1, model)\n\n try:\n result1 = json.loads(resp1)\n raw_entities = result1.get(\"entities\", [])\n except json.JSONDecodeError:\n db.log(\"WARNING\", \"Failed to parse Pass 1 JSON\")\n return []\n\n # Validate: only keep entities that appear in text and are not stopwords\n valid_entities = [e for e in raw_entities if _validate_entity_in_text(e, text) and not _is_stopword(e)]\n\n if not valid_entities:\n return []\n\n protokoll.log_llm_call(\n request=f\"[entity_extraction_pass1] {len(valid_entities)} entities\",\n response=json.dumps(valid_entities[:10], ensure_ascii=False),\n model_name=f\"ollama:{model}\",\n tokens_input=tok_in1,\n tokens_output=tok_out1,\n duration_ms=dur1,\n status=\"completed\",\n )\n\n # PASS 2: Categorize entities (with dynamic categories from DB)\n entities_json = json.dumps(valid_entities, ensure_ascii=False)\n categories_text = _build_pass2_categories()\n prompt2 = pass2_template.replace(\"{categories}\", categories_text)\n prompt2 = prompt2.replace(\"{entities}\", entities_json)\n resp2, tok_in2, tok_out2, dur2 = _call_ollama(prompt2, model)\n\n try:\n result2 = json.loads(resp2)\n categorized = result2.get(\"kategorisiert\", [])\n except json.JSONDecodeError:\n db.log(\"WARNING\", \"Failed to parse Pass 2 JSON\")\n # Fallback: return uncategorized entities\n return [{\"name\": e, \"type\": \"CONCEPT\", \"description\": None} for e in valid_entities]\n\n protokoll.log_llm_call(\n request=f\"[entity_extraction_pass2] categorize {len(valid_entities)} entities\",\n response=resp2[:1000],\n model_name=f\"ollama:{model}\",\n tokens_input=tok_in2,\n tokens_output=tok_out2,\n duration_ms=dur2,\n status=\"completed\",\n )\n\n # Normalize output (validate types against DB)\n valid_types = _get_valid_type_codes()\n entities = []\n for e in categorized:\n if isinstance(e, dict) and \"name\" in e and \"type\" in e:\n # Final validation: in text, not stopword\n name = e[\"name\"]\n if _validate_entity_in_text(name, text) and not _is_stopword(name):\n entity_type = e[\"type\"].upper()\n # Fallback to CONCEPT if type not in DB\n if entity_type not in valid_types:\n entity_type = \"CONCEPT\"\n entities.append(\n {\n \"name\": name,\n \"type\": entity_type,\n \"description\": e.get(\"description\"),\n }\n )\n\n return entities\n\n except Exception as e:\n db.log(\"ERROR\", f\"2-pass entity extraction failed: {e}\")\n return []\n\n\ndef _extract_entities_single_pass(text: str, model: str) -> list[dict]:\n \"\"\"Single-pass entity extraction (legacy fallback).\"\"\"\n prompt_data = db.get_prompt_by_use_case(\"entity_extraction\")\n prompt_content = prompt_data[\"content\"] if prompt_data else None\n\n if not prompt_content:\n db.log(\"WARNING\", \"entity_extraction prompt not found in DB, using fallback\")\n prompt_content = \"\"\"Analysiere den Text und extrahiere wichtige Entitäten.\nKategorisiere als: PERSON, ORGANIZATION, CONCEPT, LOCATION\nAntworte NUR im JSON-Format:\n{\"entities\": [{\"name\":... [TRUNCATED-372de649de5f3cfb]",
"structuredPatch": [
{
"oldStart": 351,
"oldLines": 14,
"newStart": 351,
"newLines": 14,
"lines": [
" {{TEXT}}\"\"\"",
" ",
" # Build prompt from YAML or plain text",
"- prompt = _build_prompt_from_yaml(prompt_content, text[:4000])",
"+ prompt = _build_prompt_from_yaml(prompt_content, text[:PROMPT_TEXT_LIMIT])",
" ",
" try:",
" start_time = time.time()",
" message = client.messages.create(",
"- model=ANTHROPIC_MODEL, max_tokens=2000, messages=[{\"role\": \"user\", \"content\": prompt}]",
"+ model=ANTHROPIC_MODEL, max_tokens=LLM_MAX_TOKENS, messages=[{\"role\": \"user\", \"content\": prompt}]",
" )",
"- duration_ms = int((time.time() - start_time) * 1000)",
"+ duration_ms = int((time.time() - start_time) * MS_PER_SECOND)",
" ",
" response_text = message.content[0].text",
" "
]
}
],
"userModified": false,
"replaceAll": false
}
}