{
"tool_response": {
"filePath": "\/var\/www\/scripts\/pipeline\/analyzers\/entity_normalizer.py",
"oldString": " # Stopwords\n self._stopwords = set(w.lower() for w in self.rules.get(\"stopwords\", []))",
"newString": " # Stopwords\n self._stopwords = {w.lower() for w in self.rules.get(\"stopwords\", [])}",
"originalFile": "\"\"\"\nEntity Type Normalizer\nDeterministic rules for entity type assignment.\nReads rules from config\/entity_type_rules.yaml.\n\"\"\"\n\nimport re\nfrom pathlib import Path\n\nimport yaml\n\n\nclass EntityNormalizer:\n \"\"\"Normalizes entity types based on deterministic rules.\"\"\"\n\n def __init__(self, rules_path: str | None = None):\n if rules_path is None:\n rules_path = Path(__file__).parent.parent \/ \"config\" \/ \"entity_type_rules.yaml\"\n\n self.rules_path = Path(rules_path)\n self.rules = self._load_rules()\n\n # Build lookup structures\n self._explicit_map: dict[str, str] = {}\n self._pattern_rules: list[tuple[re.Pattern, str]] = []\n self._stopwords: set[str] = set()\n self._default_type = \"CONCEPT\"\n\n self._build_lookups()\n\n def _load_rules(self) -> dict:\n \"\"\"Load rules from YAML file.\"\"\"\n if not self.rules_path.exists():\n return {}\n\n with open(self.rules_path, encoding=\"utf-8\") as f:\n return yaml.safe_load(f) or {}\n\n def _build_lookups(self) -> None:\n \"\"\"Build efficient lookup structures from rules.\"\"\"\n # Explicit mappings (case-insensitive lookup)\n for entity_type, names in self.rules.get(\"explicit_mappings\", {}).items():\n for name in names:\n self._explicit_map[name.lower()] = entity_type\n\n # Pattern rules (compile regexes)\n for entity_type, patterns in self.rules.get(\"pattern_rules\", {}).items():\n for pattern in patterns:\n try:\n compiled = re.compile(pattern, re.IGNORECASE)\n self._pattern_rules.append((compiled, entity_type))\n except re.error:\n pass\n\n # Stopwords\n self._stopwords = set(w.lower() for w in self.rules.get(\"stopwords\", []))\n\n # Default type\n self._default_type = self.rules.get(\"default_type\", \"CONCEPT\")\n\n def is_stopword(self, name: str) -> bool:\n \"\"\"Check if entity name is a stopword.\"\"\"\n return name.lower() in self._stopwords\n\n def normalize_type(self, name: str, llm_type: str | None = None) -> str:\n \"\"\"\n Determine the correct type for an entity.\n\n Priority:\n 1. Explicit mapping (highest)\n 2. Pattern rules\n 3. LLM suggestion (if valid)\n 4. Default type\n \"\"\"\n name_lower = name.lower()\n\n # 1. Check explicit mapping\n if name_lower in self._explicit_map:\n return self._explicit_map[name_lower]\n\n # 2. Check pattern rules\n for pattern, entity_type in self._pattern_rules:\n if pattern.search(name):\n return entity_type\n\n # 3. Use LLM type if valid\n valid_types = {\n \"PERSON\", \"ROLE\", \"ORGANIZATION\", \"LOCATION\",\n \"THEORY\", \"METHOD\", \"MODEL\", \"CONCEPT\",\n \"ARTIFACT\", \"METAPHOR\", \"PRINCIPLE\", \"TOOL\", \"EVENT\", \"OTHER\"\n }\n if llm_type and llm_type.upper() in valid_types:\n return llm_type.upper()\n\n # 4. Default\n return self._default_type\n\n def normalize_entity(self, entity: dict) -> dict | None:\n \"\"\"\n Normalize a single entity.\n Returns None if entity should be filtered (stopword).\n \"\"\"\n name = entity.get(\"name\", \"\")\n\n if not name or len(name) < 3:\n return None\n\n if self.is_stopword(name):\n return None\n\n llm_type = entity.get(\"type\")\n normalized_type = self.normalize_type(name, llm_type)\n\n return {\n \"name\": name,\n \"type\": normalized_type,\n \"description\": entity.get(\"description\"),\n }\n\n def normalize_entities(self, entities: list[dict]) -> list[dict]:\n \"\"\"Normalize a list of entities, filtering stopwords.\"\"\"\n result = []\n for entity in entities:\n normalized = self.normalize_entity(entity)\n if normalized:\n result.append(normalized)\n return result\n\n\n# Singleton instance\n_normalizer: EntityNormalizer | None = None\n\n\ndef get_normalizer() -> EntityNormalizer:\n \"\"\"Get or create the singleton normalizer instance.\"\"\"\n global _normalizer\n if _normalizer is None:\n _normalizer = EntityNormalizer()\n return _normalizer\n\n\ndef normalize_entity_type(name: str, llm_type: str | None = None) -> str:\n \"\"\"Convenience function to normalize a single entity type.\"\"\"\n return get_normalizer().normalize_type(name, llm_type)\n\n\ndef normalize_entities(entities: list[dict]) -> list[dict]:\n \"\"\"Convenience function to normalize a list of entities.\"\"\"\n return get_normalizer().normalize_entities(entities)\n\n\nif __name__ == \"__main__\":\n # Test\n normalizer = EntityNormalizer()\n\n test_cases = [\n {\"name\": \"Coach\", \"type\": \"PERSON\"},\n {\"name\": \"Klient\", \"type\": \"PERSON\"},\n {\"name\": \"Steve de Shazer\", \"type\": \"PERSON\"},\n {\"name\": \"Wunderfrage\", \"type\": \"CONCEPT\"},\n {\"name\": \"Systemische Therapie\", \"type\": \"CONCEPT\"},\n {\"name\": \"GROW-Modell\", \"type\": \"CONCEPT\"},\n {\"name\": \"Reframing\", \"type\": \"CONCEPT\"},\n {\"name\": \"Aspekte\", \"type\": \"CONCEPT\"}, # Should be filtered\n ]\n\n print(\"Entity Normalizer Test:\")\n print(\"-\" * 50)\n for entity in test_cases:\n result = normalizer.normalize_entity(entity)\n if result:\n print(f\"{entity['name']:30} {entity['type']:10} -> {result['type']}\")\n else:\n print(f\"{entity['name']:30} FILTERED (stopword)\")\n",
"structuredPatch": [
{
"oldStart": 53,
"oldLines": 7,
"newStart": 53,
"newLines": 7,
"lines": [
" pass",
" ",
" # Stopwords",
"- self._stopwords = set(w.lower() for w in self.rules.get(\"stopwords\", []))",
"+ self._stopwords = {w.lower() for w in self.rules.get(\"stopwords\", [])}",
" ",
" # Default type",
" self._default_type = self.rules.get(\"default_type\", \"CONCEPT\")"
]
}
],
"userModified": false,
"replaceAll": false
}
}