{
"event": "PreToolUse",
"tool_name": "Edit",
"tool_input": {
"file_path": "\/var\/www\/tools\/ki-protokoll\/claude-hook\/log_parser.py",
"old_string": "def get_conversation_pairs_from_session(session_id: str) -> List[Dict[str, str]]:\n \"\"\"Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL\"\"\"\n pairs = []\n try:\n claude_dir = Path.home() \/ '.claude' \/ 'projects'\n\n for project_dir in claude_dir.iterdir():\n if not project_dir.is_dir():\n continue\n\n session_file = project_dir \/ f\"{session_id}.jsonl\"\n if session_file.exists():\n lines = session_file.read_text().strip().split('\\n')\n\n current_user_prompt = None\n for line in lines:\n try:\n entry = json.loads(line)\n entry_type = entry.get('type', '')\n\n # User-Message gefunden\n if entry_type == 'user':\n message = entry.get('message', {})\n content = message.get('content', '')\n if isinstance(content, str) and content.strip():\n current_user_prompt = content.strip()\n\n # Assistant-Message gefunden - pair mit letztem User-Prompt\n elif entry_type == 'assistant' and current_user_prompt:\n message = entry.get('message', {})\n content_parts = message.get('content', [])\n text_parts = []\n for part in content_parts:\n if isinstance(part, dict) and part.get('type') == 'text':\n text_parts.append(part.get('text', ''))\n elif isinstance(part, str):\n text_parts.append(part)\n\n if text_parts:\n assistant_response = '\\n'.join(text_parts)\n pairs.append({\n 'prompt': current_user_prompt,\n 'response': sanitize_data(assistant_response)\n })\n current_user_prompt = None # Reset für nächstes Paar\n\n except json.JSONDecodeError:\n continue\n\n break # Session gefunden, aufhören zu suchen\n\n return pairs\n\n except Exception:\n return []",
"new_string": "def get_conversation_pairs_from_session(session_id: str) -> List[Dict[str, str]]:\n \"\"\"Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL\"\"\"\n pairs = []\n try:\n for project_dir in (Path.home() \/ '.claude' \/ 'projects').iterdir():\n if not project_dir.is_dir():\n continue\n session_file = project_dir \/ f\"{session_id}.jsonl\"\n if session_file.exists():\n lines = session_file.read_text().strip().split('\\n')\n current_user_prompt = None\n for line in lines:\n try:\n entry = json.loads(line)\n entry_type = entry.get('type', '')\n if entry_type == 'user':\n content = entry.get('message', {}).get('content', '')\n if isinstance(content, str) and content.strip():\n current_user_prompt = content.strip()\n elif entry_type == 'assistant' and current_user_prompt:\n content_parts = entry.get('message', {}).get('content', [])\n text_parts = []\n for part in content_parts:\n if isinstance(part, dict) and part.get('type') == 'text':\n text_parts.append(part.get('text', ''))\n elif isinstance(part, str):\n text_parts.append(part)\n if text_parts:\n pairs.append({'prompt': current_user_prompt,\n 'response': sanitize_data('\\n'.join(text_parts))})\n current_user_prompt = None\n except json.JSONDecodeError:\n continue\n break\n return pairs\n except Exception:\n return []"
}
}
{
"tool_response": {
"filePath": "\/var\/www\/tools\/ki-protokoll\/claude-hook\/log_parser.py",
"oldString": "def get_conversation_pairs_from_session(session_id: str) -> List[Dict[str, str]]:\n \"\"\"Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL\"\"\"\n pairs = []\n try:\n claude_dir = Path.home() \/ '.claude' \/ 'projects'\n\n for project_dir in claude_dir.iterdir():\n if not project_dir.is_dir():\n continue\n\n session_file = project_dir \/ f\"{session_id}.jsonl\"\n if session_file.exists():\n lines = session_file.read_text().strip().split('\\n')\n\n current_user_prompt = None\n for line in lines:\n try:\n entry = json.loads(line)\n entry_type = entry.get('type', '')\n\n # User-Message gefunden\n if entry_type == 'user':\n message = entry.get('message', {})\n content = message.get('content', '')\n if isinstance(content, str) and content.strip():\n current_user_prompt = content.strip()\n\n # Assistant-Message gefunden - pair mit letztem User-Prompt\n elif entry_type == 'assistant' and current_user_prompt:\n message = entry.get('message', {})\n content_parts = message.get('content', [])\n text_parts = []\n for part in content_parts:\n if isinstance(part, dict) and part.get('type') == 'text':\n text_parts.append(part.get('text', ''))\n elif isinstance(part, str):\n text_parts.append(part)\n\n if text_parts:\n assistant_response = '\\n'.join(text_parts)\n pairs.append({\n 'prompt': current_user_prompt,\n 'response': sanitize_data(assistant_response)\n })\n current_user_prompt = None # Reset für nächstes Paar\n\n except json.JSONDecodeError:\n continue\n\n break # Session gefunden, aufhören zu suchen\n\n return pairs\n\n except Exception:\n return []",
"newString": "def get_conversation_pairs_from_session(session_id: str) -> List[Dict[str, str]]:\n \"\"\"Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL\"\"\"\n pairs = []\n try:\n for project_dir in (Path.home() \/ '.claude' \/ 'projects').iterdir():\n if not project_dir.is_dir():\n continue\n session_file = project_dir \/ f\"{session_id}.jsonl\"\n if session_file.exists():\n lines = session_file.read_text().strip().split('\\n')\n current_user_prompt = None\n for line in lines:\n try:\n entry = json.loads(line)\n entry_type = entry.get('type', '')\n if entry_type == 'user':\n content = entry.get('message', {}).get('content', '')\n if isinstance(content, str) and content.strip():\n current_user_prompt = content.strip()\n elif entry_type == 'assistant' and current_user_prompt:\n content_parts = entry.get('message', {}).get('content', [])\n text_parts = []\n for part in content_parts:\n if isinstance(part, dict) and part.get('type') == 'text':\n text_parts.append(part.get('text', ''))\n elif isinstance(part, str):\n text_parts.append(part)\n if text_parts:\n pairs.append({'prompt': current_user_prompt,\n 'response': sanitize_data('\\n'.join(text_parts))})\n current_user_prompt = None\n except json.JSONDecodeError:\n continue\n break\n return pairs\n except Exception:\n return []",
"originalFile": "#!\/usr\/bin\/env python3\n\"\"\"Log Parser Module - Parsen und Extrahieren von Daten aus Hook-Events\"\"\"\n\nimport json, os, re\nfrom pathlib import Path\nfrom typing import Dict, Any, List, Optional\n\nMAX_FIELD_LENGTH = 10000\nSENSITIVE_KEY_PATTERNS = re.compile(\n r\"(?i)(password|pass|secret|token|apikey|api_key|authorization|auth|bearer|credential)\")\nSENSITIVE_VALUE_PATTERNS = [\n re.compile(r\"(?i)\\bAKIA[0-9A-Z]{16}\\b\"),\n re.compile(r\"(?i)\\b(?:sk|rk|pk)[0-9A-Za-z]{20,}\\b\"),\n re.compile(r\"(?i)\\beyJ[a-zA-Z0-9-]{10,}\\.[a-zA-Z0-9_-]{10,}\\.[a-zA-Z0-9_-]{10,}\\b\")]\n\n\ndef sanitize_data(obj: Any) -> Any:\n \"\"\"Entfernt oder maskiert sensible Daten\"\"\"\n if isinstance(obj, dict):\n result = {}\n for key, value in obj.items():\n if SENSITIVE_KEY_PATTERNS.search(str(key)):\n result[key] = '[REDACTED]'\n else:\n result[key] = sanitize_data(value)\n return result\n elif isinstance(obj, list):\n return [sanitize_data(item) for item in obj]\n elif isinstance(obj, str):\n for pattern in SENSITIVE_VALUE_PATTERNS:\n if pattern.search(obj):\n return '[REDACTED]'\n if len(obj) > MAX_FIELD_LENGTH:\n import hashlib\n hash_value = hashlib.sha256(obj.encode('utf-8', errors='ignore')).hexdigest()[:16]\n return obj[:MAX_FIELD_LENGTH] + f'... [TRUNCATED-{hash_value}]'\n return obj\n return obj\n\n\ndef estimate_tokens(text: str) -> int:\n \"\"\"Grobe Token-Schätzung (4 Zeichen = 1 Token)\"\"\"\n if not text:\n return 0\n return max(1, len(text) \/\/ 4)\n\n\ndef get_client_ip() -> str:\n \"\"\"Ermittelt die Client-IP-Adresse\"\"\"\n ssh_client = os.environ.get('SSH_CLIENT', '')\n if ssh_client:\n return ssh_client.split()[0]\n ssh_connection = os.environ.get('SSH_CONNECTION', '')\n if ssh_connection:\n return ssh_connection.split()[0]\n return '127.0.0.1'\n\n\ndef parse_event_data(data: Dict[str, Any]) -> Dict[str, Any]:\n \"\"\"Extrahiert und strukturiert Event-Daten basierend auf Event-Typ\"\"\"\n evt = data.get('hook_event_name', 'Unknown')\n if evt == 'UserPromptSubmit':\n return {'event': evt, 'prompt': sanitize_data(data.get('prompt', ''))}\n elif evt == 'PreToolUse':\n return {'event': evt, 'tool_name': data.get('tool_name', ''),\n 'tool_input': sanitize_data(data.get('tool_input', {}))}\n elif evt == 'PostToolUse':\n return {'event': evt, 'tool_name': data.get('tool_name', ''),\n 'tool_response': sanitize_data(data.get('tool_response', {}))}\n elif evt in ['Stop', 'SubagentStop']:\n return {'event': evt, 'stop_hook_active': data.get('stop_hook_active', False)}\n elif evt in ['SessionStart', 'SessionEnd']:\n return {'event': evt, 'source': data.get('source', data.get('reason', ''))}\n else:\n return {'event': evt, 'raw_data': sanitize_data(data)}\n\n\ndef get_conversation_pairs_from_session(session_id: str) -> List[Dict[str, str]]:\n \"\"\"Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL\"\"\"\n pairs = []\n try:\n claude_dir = Path.home() \/ '.claude' \/ 'projects'\n\n for project_dir in claude_dir.iterdir():\n if not project_dir.is_dir():\n continue\n\n session_file = project_dir \/ f\"{session_id}.jsonl\"\n if session_file.exists():\n lines = session_file.read_text().strip().split('\\n')\n\n current_user_prompt = None\n for line in lines:\n try:\n entry = json.loads(line)\n entry_type = entry.get('type', '')\n\n # User-Message gefunden\n if entry_type == 'user':\n message = entry.get('message', {})\n content = message.get('content', '')\n if isinstance(content, str) and content.strip():\n current_user_prompt = content.strip()\n\n # Assistant-Message gefunden - pair mit letztem User-Prompt\n elif entry_type == 'assistant' and current_user_prompt:\n message = entry.get('message', {})\n content_parts = message.get('content', [])\n text_parts = []\n for part in content_parts:\n if isinstance(part, dict) and part.get('type') == 'text':\n text_parts.append(part.get('text', ''))\n elif isinstance(part, str):\n text_parts.append(part)\n\n if text_parts:\n assistant_response = '\\n'.join(text_parts)\n pairs.append({\n 'prompt': current_user_prompt,\n 'response': sanitize_data(assistant_response)\n })\n current_user_prompt = None # Reset für nächstes Paar\n\n except json.JSONDecodeError:\n continue\n\n break # Session gefunden, aufhören zu suchen\n\n return pairs\n\n except Exception:\n return []\n",
"structuredPatch": [
{
"oldStart": 79,
"oldLines": 54,
"newStart": 79,
"newLines": 36,
"lines": [
" \"\"\"Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL\"\"\"",
" pairs = []",
" try:",
"- claude_dir = Path.home() \/ '.claude' \/ 'projects'",
"-",
"- for project_dir in claude_dir.iterdir():",
"+ for project_dir in (Path.home() \/ '.claude' \/ 'projects').iterdir():",
" if not project_dir.is_dir():",
" continue",
"-",
" session_file = project_dir \/ f\"{session_id}.jsonl\"",
" if session_file.exists():",
" lines = session_file.read_text().strip().split('\\n')",
"-",
" current_user_prompt = None",
" for line in lines:",
" try:",
" entry = json.loads(line)",
" entry_type = entry.get('type', '')",
"-",
"- # User-Message gefunden",
" if entry_type == 'user':",
"- message = entry.get('message', {})",
"- content = message.get('content', '')",
"+ content = entry.get('message', {}).get('content', '')",
" if isinstance(content, str) and content.strip():",
" current_user_prompt = content.strip()",
"-",
"- # Assistant-Message gefunden - pair mit letztem User-Prompt",
" elif entry_type == 'assistant' and current_user_prompt:",
"- message = entry.get('message', {})",
"- content_parts = message.get('content', [])",
"+ content_parts = entry.get('message', {}).get('content', [])",
" text_parts = []",
" for part in content_parts:",
" if isinstance(part, dict) and part.get('type') == 'text':",
" text_parts.append(part.get('text', ''))",
" elif isinstance(part, str):",
" text_parts.append(part)",
"-",
" if text_parts:",
"- assistant_response = '\\n'.join(text_parts)",
"- pairs.append({",
"- 'prompt': current_user_prompt,",
"- 'response': sanitize_data(assistant_response)",
"- })",
"- current_user_prompt = None # Reset für nächstes Paar",
"-",
"+ pairs.append({'prompt': current_user_prompt,",
"+ 'response': sanitize_data('\\n'.join(text_parts))})",
"+ current_user_prompt = None",
" except json.JSONDecodeError:",
" continue",
"-",
"- break # Session gefunden, aufhören zu suchen",
"-",
"+ break",
" return pairs",
"-",
" except Exception:",
" return []"
]
}
],
"userModified": false,
"replaceAll": false
}
}