{
"event": "PreToolUse",
"tool_name": "Edit",
"tool_input": {
"file_path": "\/var\/www\/migration\/content-pipeline\/src\/json_utils.py",
"old_string": "def safe_get(data: dict, key: str, default: Any = None, valid_values: set = None) -> Any:\n \"\"\"\n Sicherer Zugriff auf dict-Werte mit Validierung.\n\n Args:\n data: Source dict\n key: Schlüssel\n default: Fallback-Wert\n valid_values: Erlaubte Werte (optional)\n\n Returns:\n Validierter Wert oder Default\n \"\"\"\n value = data.get(key, default)\n\n # Liste -> erstes Element\n if isinstance(value, list):\n value = value[0] if value else default\n\n # String normalisieren\n if isinstance(value, str):\n value = value.lower().strip()\n\n # Validierung\n if valid_values and value not in valid_values:\n return default\n\n return value",
"new_string": "def safe_get(data: dict, key: str, default: Any = None, valid_values: set = None) -> Any:\n \"\"\"\n Sicherer Zugriff auf dict-Werte mit Validierung.\n\n Args:\n data: Source dict\n key: Schlüssel\n default: Fallback-Wert\n valid_values: Erlaubte Werte (optional)\n\n Returns:\n Validierter Wert oder Default\n \"\"\"\n value = data.get(key, default)\n\n # Liste -> erstes Element\n if isinstance(value, list):\n value = value[0] if value else default\n\n # String normalisieren\n if isinstance(value, str):\n value = value.lower().strip()\n\n # Validierung\n if valid_values and value not in valid_values:\n return default\n\n return value\n\n\ndef extract_json_array(text: str) -> list | None:\n \"\"\"\n Extract first valid JSON array from text.\n\n Args:\n text: LLM response with JSON array\n\n Returns:\n Parsed list or None on error\n \"\"\"\n if not text:\n return None\n\n # Remove markdown code blocks\n text = re.sub(r\"```json\\s*\", \"\", text)\n text = re.sub(r\"```\\s*\", \"\", text)\n\n # Find first array\n start = text.find(\"[\")\n if start < 0:\n return None\n\n depth = 0\n end = start\n in_string = False\n escape_next = False\n\n for i, char in enumerate(text[start:], start):\n if escape_next:\n escape_next = False\n continue\n\n if char == \"\\\\\":\n escape_next = True\n continue\n\n if char == '\"' and not escape_next:\n in_string = not in_string\n continue\n\n if in_string:\n continue\n\n if char == \"[\":\n depth += 1\n elif char == \"]\":\n depth -= 1\n if depth == 0:\n end = i + 1\n break\n\n if end <= start:\n return None\n\n json_str = text[start:end]\n\n try:\n result = json.loads(json_str)\n return result if isinstance(result, list) else None\n except json.JSONDecodeError:\n json_str = repair_json(json_str)\n try:\n result = json.loads(json_str)\n return result if isinstance(result, list) else None\n except json.JSONDecodeError:\n return None\n\n\ndef parse_llm_response(\n text: str,\n expected_keys: list[str] | None = None,\n defaults: dict | None = None,\n) -> dict:\n \"\"\"\n Parse LLM response with validation and defaults.\n\n DRY: Single function for all LLM JSON response parsing.\n Replaces repeated json.loads() + regex patterns.\n\n Args:\n text: LLM response text\n expected_keys: Keys that must be present\n defaults: Default values for missing keys\n\n Returns:\n Parsed dict with defaults applied\n \"\"\"\n defaults = defaults or {}\n\n # Extract JSON\n result = extract_json(text)\n\n if result is None:\n # Return defaults if parsing failed\n return dict(defaults)\n\n # Validate expected keys\n if expected_keys:\n for key in expected_keys:\n if key not in result:\n result[key] = defaults.get(key)\n\n # Apply defaults for missing keys\n for key, value in defaults.items():\n if key not in result or result[key] is None:\n result[key] = value\n\n return result\n\n\ndef parse_critic_response(text: str) -> dict:\n \"\"\"\n Parse critic\/review LLM response.\n\n Standard format for all critic responses.\n\n Args:\n text: LLM response text\n\n Returns:\n Dict with rating, passed, issues, suggestions, summary\n \"\"\"\n defaults = {\n \"rating\": 5,\n \"passed\": False,\n \"issues\": [],\n \"suggestions\": [],\n \"summary\": \"\",\n }\n\n result = parse_llm_response(\n text,\n expected_keys=[\"rating\", \"passed\", \"issues\", \"suggestions\", \"summary\"],\n defaults=defaults,\n )\n\n # Normalize types\n if isinstance(result.get(\"rating\"), str):\n try:\n result[\"rating\"] = int(result[\"rating\"])\n except ValueError:\n result[\"rating\"] = 5\n\n if not isinstance(result.get(\"issues\"), list):\n result[\"issues\"] = [result[\"issues\"]] if result.get(\"issues\") else []\n\n if not isinstance(result.get(\"suggestions\"), list):\n result[\"suggestions\"] = [result[\"suggestions\"]] if result.get(\"suggestions\") else []\n\n return result\n\n\ndef parse_semantic_response(text: str) -> dict:\n \"\"\"\n Parse semantic analysis LLM response.\n\n Standard format for chunk semantics.\n\n Args:\n text: LLM response text\n\n Returns:\n Dict with summary, keywords, sentiment, topics\n \"\"\"\n defaults = {\n \"summary\": \"\",\n \"keywords\": [],\n \"sentiment\": \"neutral\",\n \"topics\": [],\n }\n\n result = parse_llm_response(\n text,\n expected_keys=[\"summary\", \"keywords\", \"sentiment\", \"topics\"],\n defaults=defaults,\n )\n\n # Validate sentiment\n valid_sentiments = {\"positive\", \"negative\", \"neutral\", \"mixed\"}\n if result.get(\"sentiment\", \"\").lower() not in valid_sentiments:\n result[\"sentiment\"] = \"neutral\"\n\n return result\n\n\ndef parse_entity_response(text: str) -> list[dict]:\n \"\"\"\n Parse entity extraction LLM response.\n\n Args:\n text: LLM response text\n\n Returns:\n List of entity dicts with name, type, description\n \"\"\"\n # Try array first\n result = extract_json_array(text)\n if result:\n return result\n\n # Try object with entities key\n obj = extract_json(text)\n if obj and \"entities\" in obj:\n return obj[\"entities\"] if isinstance(obj[\"entities\"], list) else []\n\n return []"
}
}
{
"tool_response": {
"filePath": "\/var\/www\/migration\/content-pipeline\/src\/json_utils.py",
"oldString": "def safe_get(data: dict, key: str, default: Any = None, valid_values: set = None) -> Any:\n \"\"\"\n Sicherer Zugriff auf dict-Werte mit Validierung.\n\n Args:\n data: Source dict\n key: Schlüssel\n default: Fallback-Wert\n valid_values: Erlaubte Werte (optional)\n\n Returns:\n Validierter Wert oder Default\n \"\"\"\n value = data.get(key, default)\n\n # Liste -> erstes Element\n if isinstance(value, list):\n value = value[0] if value else default\n\n # String normalisieren\n if isinstance(value, str):\n value = value.lower().strip()\n\n # Validierung\n if valid_values and value not in valid_values:\n return default\n\n return value",
"newString": "def safe_get(data: dict, key: str, default: Any = None, valid_values: set = None) -> Any:\n \"\"\"\n Sicherer Zugriff auf dict-Werte mit Validierung.\n\n Args:\n data: Source dict\n key: Schlüssel\n default: Fallback-Wert\n valid_values: Erlaubte Werte (optional)\n\n Returns:\n Validierter Wert oder Default\n \"\"\"\n value = data.get(key, default)\n\n # Liste -> erstes Element\n if isinstance(value, list):\n value = value[0] if value else default\n\n # String normalisieren\n if isinstance(value, str):\n value = value.lower().strip()\n\n # Validierung\n if valid_values and value not in valid_values:\n return default\n\n return value\n\n\ndef extract_json_array(text: str) -> list | None:\n \"\"\"\n Extract first valid JSON array from text.\n\n Args:\n text: LLM response with JSON array\n\n Returns:\n Parsed list or None on error\n \"\"\"\n if not text:\n return None\n\n # Remove markdown code blocks\n text = re.sub(r\"```json\\s*\", \"\", text)\n text = re.sub(r\"```\\s*\", \"\", text)\n\n # Find first array\n start = text.find(\"[\")\n if start < 0:\n return None\n\n depth = 0\n end = start\n in_string = False\n escape_next = False\n\n for i, char in enumerate(text[start:], start):\n if escape_next:\n escape_next = False\n continue\n\n if char == \"\\\\\":\n escape_next = True\n continue\n\n if char == '\"' and not escape_next:\n in_string = not in_string\n continue\n\n if in_string:\n continue\n\n if char == \"[\":\n depth += 1\n elif char == \"]\":\n depth -= 1\n if depth == 0:\n end = i + 1\n break\n\n if end <= start:\n return None\n\n json_str = text[start:end]\n\n try:\n result = json.loads(json_str)\n return result if isinstance(result, list) else None\n except json.JSONDecodeError:\n json_str = repair_json(json_str)\n try:\n result = json.loads(json_str)\n return result if isinstance(result, list) else None\n except json.JSONDecodeError:\n return None\n\n\ndef parse_llm_response(\n text: str,\n expected_keys: list[str] | None = None,\n defaults: dict | None = None,\n) -> dict:\n \"\"\"\n Parse LLM response with validation and defaults.\n\n DRY: Single function for all LLM JSON response parsing.\n Replaces repeated json.loads() + regex patterns.\n\n Args:\n text: LLM response text\n expected_keys: Keys that must be present\n defaults: Default values for missing keys\n\n Returns:\n Parsed dict with defaults applied\n \"\"\"\n defaults = defaults or {}\n\n # Extract JSON\n result = extract_json(text)\n\n if result is None:\n # Return defaults if parsing failed\n return dict(defaults)\n\n # Validate expected keys\n if expected_keys:\n for key in expected_keys:\n if key not in result:\n result[key] = defaults.get(key)\n\n # Apply defaults for missing keys\n for key, value in defaults.items():\n if key not in result or result[key] is None:\n result[key] = value\n\n return result\n\n\ndef parse_critic_response(text: str) -> dict:\n \"\"\"\n Parse critic\/review LLM response.\n\n Standard format for all critic responses.\n\n Args:\n text: LLM response text\n\n Returns:\n Dict with rating, passed, issues, suggestions, summary\n \"\"\"\n defaults = {\n \"rating\": 5,\n \"passed\": False,\n \"issues\": [],\n \"suggestions\": [],\n \"summary\": \"\",\n }\n\n result = parse_llm_response(\n text,\n expected_keys=[\"rating\", \"passed\", \"issues\", \"suggestions\", \"summary\"],\n defaults=defaults,\n )\n\n # Normalize types\n if isinstance(result.get(\"rating\"), str):\n try:\n result[\"rating\"] = int(result[\"rating\"])\n except ValueError:\n result[\"rating\"] = 5\n\n if not isinstance(result.get(\"issues\"), list):\n result[\"issues\"] = [result[\"issues\"]] if result.get(\"issues\") else []\n\n if not isinstance(result.get(\"suggestions\"), list):\n result[\"suggestions\"] = [result[\"suggestions\"]] if result.get(\"suggestions\") else []\n\n return result\n\n\ndef parse_semantic_response(text: str) -> dict:\n \"\"\"\n Parse semantic analysis LLM response.\n\n Standard format for chunk semantics.\n\n Args:\n text: LLM response text\n\n Returns:\n Dict with summary, keywords, sentiment, topics\n \"\"\"\n defaults = {\n \"summary\": \"\",\n \"keywords\": [],\n \"sentiment\": \"neutral\",\n \"topics\": [],\n }\n\n result = parse_llm_response(\n text,\n expected_keys=[\"summary\", \"keywords\", \"sentiment\", \"topics\"],\n defaults=defaults,\n )\n\n # Validate sentiment\n valid_sentiments = {\"positive\", \"negative\", \"neutral\", \"mixed\"}\n if result.get(\"sentiment\", \"\").lower() not in valid_sentiments:\n result[\"sentiment\"] = \"neutral\"\n\n return result\n\n\ndef parse_entity_response(text: str) -> list[dict]:\n \"\"\"\n Parse entity extraction LLM response.\n\n Args:\n text: LLM response text\n\n Returns:\n List of entity dicts with name, type, description\n \"\"\"\n # Try array first\n result = extract_json_array(text)\n if result:\n return result\n\n # Try object with entities key\n obj = extract_json(text)\n if obj and \"entities\" in obj:\n return obj[\"entities\"] if isinstance(obj[\"entities\"], list) else []\n\n return []",
"originalFile": "#!\/usr\/bin\/env python3\n\"\"\"\nRobuste JSON-Extraktion für LLM-Responses.\n\nBehandelt häufige Probleme:\n- Mehrere JSON-Blöcke (nimmt den ersten)\n- Trailing Commas\n- Unescaped Quotes in Strings\n- Markdown Code-Blöcke\n\"\"\"\n\nimport json\nimport re\nfrom typing import Any\n\n\ndef extract_json(text: str) -> dict | None:\n \"\"\"\n Extrahiert erstes gültiges JSON-Objekt aus Text.\n\n Args:\n text: LLM-Response mit JSON\n\n Returns:\n Parsed dict oder None bei Fehler\n \"\"\"\n if not text:\n return None\n\n # 1. Markdown Code-Blöcke entfernen\n text = re.sub(r\"```json\\s*\", \"\", text)\n text = re.sub(r\"```\\s*\", \"\", text)\n\n # 2. Ersten JSON-Block finden (Brace-Matching)\n start = text.find(\"{\")\n if start < 0:\n return None\n\n depth = 0\n end = start\n in_string = False\n escape_next = False\n\n for i, char in enumerate(text[start:], start):\n if escape_next:\n escape_next = False\n continue\n\n if char == \"\\\\\":\n escape_next = True\n continue\n\n if char == '\"' and not escape_next:\n in_string = not in_string\n continue\n\n if in_string:\n continue\n\n if char == \"{\":\n depth += 1\n elif char == \"}\":\n depth -= 1\n if depth == 0:\n end = i + 1\n break\n\n if end <= start:\n return None\n\n json_str = text[start:end]\n\n # 3. Versuche direkt zu parsen\n try:\n return json.loads(json_str)\n except json.JSONDecodeError:\n pass\n\n # 4. JSON reparieren und erneut versuchen\n json_str = repair_json(json_str)\n\n try:\n return json.loads(json_str)\n except json.JSONDecodeError:\n return None\n\n\ndef repair_json(json_str: str) -> str:\n \"\"\"\n Repariert häufige JSON-Fehler von LLMs.\n\n Args:\n json_str: Möglicherweise fehlerhafter JSON-String\n\n Returns:\n Reparierter JSON-String\n \"\"\"\n # Trailing Commas vor } oder ] entfernen\n json_str = re.sub(r\",\\s*}\", \"}\", json_str)\n json_str = re.sub(r\",\\s*]\", \"]\", json_str)\n\n # Single Quotes zu Double Quotes (außerhalb von Strings)\n # Vorsicht: nur wenn es eindeutig ist\n if \"'\" in json_str and '\"' not in json_str:\n json_str = json_str.replace(\"'\", '\"')\n\n # Fehlende Quotes um Werte (simple Fälle)\n # z.B. {key: value} -> {\"key\": \"value\"}\n json_str = re.sub(r\"{\\s*(\\w+)\\s*:\", r'{\"\\1\":', json_str)\n json_str = re.sub(r\",\\s*(\\w+)\\s*:\", r', \"\\1\":', json_str)\n\n # Unescaped Newlines in Strings ersetzen\n # Zwischen Quotes: \\n -> \\\\n\n def escape_newlines(match: re.Match) -> str:\n content = match.group(1)\n content = content.replace(\"\\n\", \"\\\\n\")\n content = content.replace(\"\\r\", \"\\\\r\")\n content = content.replace(\"\\t\", \"\\\\t\")\n return f'\"{content}\"'\n\n # Strings mit Newlines finden und escapen\n json_str = re.sub(r'\"([^\"]*(?:\\n|\\r)[^\"]*)\"', escape_newlines, json_str)\n\n return json_str\n\n\ndef safe_get(data: dict, key: str, default: Any = None, valid_values: set = None) -> Any:\n \"\"\"\n Sicherer Zugriff auf dict-Werte mit Validierung.\n\n Args:\n data: Source dict\n key: Schlüssel\n default: Fallback-Wert\n valid_values: Erlaubte Werte (optional)\n\n Returns:\n Validierter Wert oder Default\n \"\"\"\n value = data.get(key, default)\n\n # Liste -> erstes Element\n if isinstance(value, list):\n value = value[0] if value else default\n\n # String normalisieren\n if isinstance(value, str):\n value = value.lower().strip()\n\n # Validierung\n if valid_values and value not in valid_values:\n return default\n\n return value\n",
"structuredPatch": [
{
"oldStart": 152,
"oldLines": 3,
"newStart": 152,
"newLines": 211,
"lines": [
" return default",
" ",
" return value",
"+",
"+",
"+def extract_json_array(text: str) -> list | None:",
"+ \"\"\"",
"+ Extract first valid JSON array from text.",
"+",
"+ Args:",
"+ text: LLM response with JSON array",
"+",
"+ Returns:",
"+ Parsed list or None on error",
"+ \"\"\"",
"+ if not text:",
"+ return None",
"+",
"+ # Remove markdown code blocks",
"+ text = re.sub(r\"```json\\s*\", \"\", text)",
"+ text = re.sub(r\"```\\s*\", \"\", text)",
"+",
"+ # Find first array",
"+ start = text.find(\"[\")",
"+ if start < 0:",
"+ return None",
"+",
"+ depth = 0",
"+ end = start",
"+ in_string = False",
"+ escape_next = False",
"+",
"+ for i, char in enumerate(text[start:], start):",
"+ if escape_next:",
"+ escape_next = False",
"+ continue",
"+",
"+ if char == \"\\\\\":",
"+ escape_next = True",
"+ continue",
"+",
"+ if char == '\"' and not escape_next:",
"+ in_string = not in_string",
"+ continue",
"+",
"+ if in_string:",
"+ continue",
"+",
"+ if char == \"[\":",
"+ depth += 1",
"+ elif char == \"]\":",
"+ depth -= 1",
"+ if depth == 0:",
"+ end = i + 1",
"+ break",
"+",
"+ if end <= start:",
"+ return None",
"+",
"+ json_str = text[start:end]",
"+",
"+ try:",
"+ result = json.loads(json_str)",
"+ return result if isinstance(result, list) else None",
"+ except json.JSONDecodeError:",
"+ json_str = repair_json(json_str)",
"+ try:",
"+ result = json.loads(json_str)",
"+ return result if isinstance(result, list) else None",
"+ except json.JSONDecodeError:",
"+ return None",
"+",
"+",
"+def parse_llm_response(",
"+ text: str,",
"+ expected_keys: list[str] | None = None,",
"+ defaults: dict | None = None,",
"+) -> dict:",
"+ \"\"\"",
"+ Parse LLM response with validation and defaults.",
"+",
"+ DRY: Single function for all LLM JSON response parsing.",
"+ Replaces repeated json.loads() + regex patterns.",
"+",
"+ Args:",
"+ text: LLM response text",
"+ expected_keys: Keys that must be present",
"+ defaults: Default values for missing keys",
"+",
"+ Returns:",
"+ Parsed dict with defaults applied",
"+ \"\"\"",
"+ defaults = defaults or {}",
"+",
"+ # Extract JSON",
"+ result = extract_json(text)",
"+",
"+ if result is None:",
"+ # Return defaults if parsing failed",
"+ return dict(defaults)",
"+",
"+ # Validate expected keys",
"+ if expected_keys:",
"+ for key in expected_keys:",
"+ if key not in result:",
"+ result[key] = defaults.get(key)",
"+",
"+ # Apply defaults for missing keys",
"+ for key, value in defaults.items():",
"+ if key not in result or result[key] is None:",
"+ result[key] = value",
"+",
"+ return result",
"+",
"+",
"+def parse_critic_response(text: str) -> dict:",
"+ \"\"\"",
"+ Parse critic\/review LLM response.",
"+",
"+ Standard format for all critic responses.",
"+",
"+ Args:",
"+ text: LLM response text",
"+",
"+ Returns:",
"+ Dict with rating, passed, issues, suggestions, summary",
"+ \"\"\"",
"+ defaults = {",
"+ \"rating\": 5,",
"+ \"passed\": False,",
"+ \"issues\": [],",
"+ \"suggestions\": [],",
"+ \"summary\": \"\",",
"+ }",
"+",
"+ result = parse_llm_response(",
"+ text,",
"+ expected_keys=[\"rating\", \"passed\", \"issues\", \"suggestions\", \"summary\"],",
"+ defaults=defaults,",
"+ )",
"+",
"+ # Normalize types",
"+ if isinstance(result.get(\"rating\"), str):",
"+ try:",
"+ result[\"rating\"] = int(result[\"rating\"])",
"+ except ValueError:",
"+ result[\"rating\"] = 5",
"+",
"+ if not isinstance(result.get(\"issues\"), list):",
"+ result[\"issues\"] = [result[\"issues\"]] if result.get(\"issues\") else []",
"+",
"+ if not isinstance(result.get(\"suggestions\"), list):",
"+ result[\"suggestions\"] = [result[\"suggestions\"]] if result.get(\"suggestions\") else []",
"+",
"+ return result",
"+",
"+",
"+def parse_semantic_response(text: str) -> dict:",
"+ \"\"\"",
"+ Parse semantic analysis LLM response.",
"+",
"+ Standard format for chunk semantics.",
"+",
"+ Args:",
"+ text: LLM response text",
"+",
"+ Returns:",
"+ Dict with summary, keywords, sentiment, topics",
"+ \"\"\"",
"+ defaults = {",
"+ \"summary\": \"\",",
"+ \"keywords\": [],",
"+ \"sentiment\": \"neutral\",",
"+ \"topics\": [],",
"+ }",
"+",
"+ result = parse_llm_response(",
"+ text,",
"+ expected_keys=[\"summary\", \"keywords\", \"sentiment\", \"topics\"],",
"+ defaults=defaults,",
"+ )",
"+",
"+ # Validate sentiment",
"+ valid_sentiments = {\"positive\", \"negative\", \"neutral\", \"mixed\"}",
"+ if result.get(\"sentiment\", \"\").lower() not in valid_sentiments:",
"+ result[\"sentiment\"] = \"neutral\"",
"+",
"+ return result",
"+",
"+",
"+def parse_entity_response(text: str) -> list[dict]:",
"+ \"\"\"",
"+ Parse entity extraction LLM response.",
"+",
"+ Args:",
"+ text: LLM response text",
"+",
"+ Returns:",
"+ List of entity dicts with name, type, description",
"+ \"\"\"",
"+ # Try array first",
"+ result = extract_json_array(text)",
"+ if result:",
"+ return result",
"+",
"+ # Try object with entities key",
"+ obj = extract_json(text)",
"+ if obj and \"entities\" in obj:",
"+ return obj[\"entities\"] if isinstance(obj[\"entities\"], list) else []",
"+",
"+ return []"
]
}
],
"userModified": false,
"replaceAll": false
}
}