{
"tool_response": {
"filePath": "\/var\/www\/tools\/ki-protokoll\/claude-hook\/log_to_db.py",
"oldString": "#!\/usr\/bin\/env python3\n\"\"\"\nClaude Code Hook Logger v2 für KI-Protokoll-Datenbank\nVerbesserte Version mit Request\/Response-Pairing und Duration-Tracking\n\"\"\"\n\nimport json\nimport os\nimport sys\nimport re\nimport datetime\nimport hashlib\nimport pymysql\nimport tempfile\nfrom pathlib import Path\nfrom typing import Dict, Any, Optional\n\n# .env aus Hook-Verzeichnis laden\nfrom dotenv import load_dotenv\nload_dotenv(Path(__file__).parent \/ '.env')\n\n# Konfiguration aus Environment-Variablen\nDB_CONFIG = {\n 'host': os.environ.get('CLAUDE_DB_HOST', 'localhost'),\n 'port': int(os.environ.get('CLAUDE_DB_PORT', '3306')),\n 'user': os.environ.get('CLAUDE_DB_USER', 'root'),\n 'password': os.environ.get('CLAUDE_DB_PASSWORD', ''),\n 'database': os.environ.get('CLAUDE_DB_NAME', 'ki_dev'),\n 'charset': 'utf8mb4'\n}\n\n# Session-Tracking im temporären Verzeichnis\nTEMP_DIR = Path(tempfile.gettempdir()) \/ \"claude_hooks\"\nTEMP_DIR.mkdir(exist_ok=True)\n\n# Sicherheitseinstellungen\nMAX_FIELD_LENGTH = 10000\nSENSITIVE_KEY_PATTERNS = re.compile(r\"(?i)(password|pass|secret|token|apikey|api_key|authorization|auth|bearer|credential)\")\nSENSITIVE_VALUE_PATTERNS = [\n re.compile(r\"(?i)\\bAKIA[0-9A-Z]{16}\\b\"),\n re.compile(r\"(?i)\\b(?:sk|rk|pk)[0-9A-Za-z]{20,}\\b\"),\n re.compile(r\"(?i)\\beyJ[a-zA-Z0-9-]{10,}\\.[a-zA-Z0-9_-]{10,}\\.[a-zA-Z0-9_-]{10,}\\b\")\n]\n\ndef get_client_ip() -> str:\n \"\"\"Ermittelt die Client-IP-Adresse\"\"\"\n ssh_client = os.environ.get('SSH_CLIENT', '')\n if ssh_client:\n return ssh_client.split()[0]\n ssh_connection = os.environ.get('SSH_CONNECTION', '')\n if ssh_connection:\n return ssh_connection.split()[0]\n return '127.0.0.1'\n\ndef sanitize_data(obj: Any) -> Any:\n \"\"\"Entfernt oder maskiert sensible Daten\"\"\"\n if isinstance(obj, dict):\n result = {}\n for key, value in obj.items():\n if SENSITIVE_KEY_PATTERNS.search(str(key)):\n result[key] = '[REDACTED]'\n else:\n result[key] = sanitize_data(value)\n return result\n elif isinstance(obj, list):\n return [sanitize_data(item) for item in obj]\n elif isinstance(obj, str):\n for pattern in SENSITIVE_VALUE_PATTERNS:\n if pattern.search(obj):\n return '[REDACTED]'\n if len(obj) > MAX_FIELD_LENGTH:\n hash_value = hashlib.sha256(obj.encode('utf-8', errors='ignore')).hexdigest()[:16]\n return obj[:MAX_FIELD_LENGTH] + f'... [TRUNCATED-{hash_value}]'\n return obj\n return obj\n\ndef estimate_tokens(text: str) -> int:\n \"\"\"Grobe Token-Schätzung (4 Zeichen = 1 Token)\"\"\"\n if not text:\n return 0\n return max(1, len(text) \/\/ 4)\n\ndef get_session_tracking_key(data: Dict[str, Any]) -> str:\n \"\"\"Erstellt einen eindeutigen Key für Session-Tracking\"\"\"\n session_id = data.get('session_id', '')\n event_name = data.get('hook_event_name', '')\n tool_name = data.get('tool_name', '')\n\n # Für Tool-Events: tool_name-spezifischer Key\n if event_name in ['PreToolUse', 'PostToolUse'] and tool_name:\n return f\"{session_id}_{tool_name}_{event_name}\"\n\n # Für andere Events: event-spezifischer Key\n return f\"{session_id}_{event_name}\"\n\ndef save_pending_request(data: Dict[str, Any], db_id: int) -> None:\n \"\"\"Speichert pending Request für spätere Response-Zuordnung\"\"\"\n try:\n key = get_session_tracking_key(data)\n tracking_file = TEMP_DIR \/ f\"{key}.json\"\n\n tracking_data = {\n 'db_id': db_id,\n 'timestamp': datetime.datetime.now().isoformat(),\n 'event': data.get('hook_event_name'),\n 'tool_name': data.get('tool_name', ''),\n 'session_id': data.get('session_id', '')\n }\n\n with open(tracking_file, 'w') as f:\n json.dump(tracking_data, f)\n\n except Exception as e:\n print(f\"Session tracking save error: {e}\", file=sys.stderr)\n\ndef find_matching_request(data: Dict[str, Any]) -> Optional[int]:\n \"\"\"Findet matching Request für Response-Event\"\"\"\n try:\n event_name = data.get('hook_event_name', '')\n\n # PostToolUse sucht nach PreToolUse\n if event_name == 'PostToolUse':\n search_data = dict(data)\n search_data['hook_event_name'] = 'PreToolUse'\n key = get_session_tracking_key(search_data)\n else:\n return None\n\n tracking_file = TEMP_DIR \/ f\"{key}.json\"\n\n if tracking_file.exists():\n with open(tracking_file, 'r') as f:\n tracking_data = json.load(f)\n\n # Cleanup\n tracking_file.unlink()\n return tracking_data['db_id']\n\n except Exception as e:\n print(f\"Session tracking find error: {e}\", file=sys.stderr)\n\n return None\n\ndef update_request_with_response(db_id: int, response_data: str) -> None:\n \"\"\"Updated existing Request mit Response-Daten und berechnet Duration\"\"\"\n try:\n connection = pymysql.connect(**DB_CONFIG)\n\n with connection.cursor() as cursor:\n current_time = datetime.datetime.now()\n\n # Erst Response und Timestamp setzen\n tokens_output = estimate_tokens(response_data)\n\n sql = \"\"\"\n UPDATE protokoll\n SET response = %s,\n response_timestamp = %s,\n tokens_output = %s,\n tokens_total = tokens_input + %s,\n status = 'completed'\n WHERE id = %s\n \"\"\"\n\n cursor.execute(sql, (\n response_data,\n current_time,\n tokens_output,\n tokens_output,\n db_id\n ))\n\n # Dann Duration aus Timestamps berechnen und setzen (mit Microsekunden-Präzision)\n duration_sql = \"\"\"\n UPDATE protokoll\n SET duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND, request_timestamp, response_timestamp) \/ 1000, 3)\n WHERE id = %s\n \"\"\"\n\n cursor.execute(duration_sql, (db_id,))\n\n connection.commit()\n\n except Exception as e:\n print(f\"Database update error: {e}\", file=sys.stderr)\n finally:\n if 'connection' in locals():\n connection.close()\n\n\ndef get_conversation_pairs_from_session(session_id: str) -> list:\n \"\"\"Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL\"\"\"\n pairs = []\n try:\n claude_dir = Path.home() \/ '.claude' \/ 'projects'\n\n for project_dir in claude_dir.iterdir():\n if not project_dir.is_dir():\n continue\n\n session_file = project_dir \/ f\"{session_id}.jsonl\"\n if session_file.exists():\n lines = session_file.read_text().strip().split('\\n')\n\n current_user_prompt = None\n for line in lines:\n try:\n entry = json.loads(line)\n entry_type = entry.get('type', '')\n\n # User-Message gefunden\n if entry_type == 'user':\n message = entry.get('message', {})\n content = message.get('content', '')\n if isinstance(content, str) and content.strip():\n current_user_prompt = content.strip()\n\n # Assistant-Message gefunden - pair mit letztem User-Prompt\n elif entry_type == 'assistant' and current_user_prompt:\n message = entry.get('message', {})\n content_parts = message.get('content', [])\n text_parts = []\n for part in content_parts:\n if isinstance(part, dict) and part.get('type') == 'text':\n text_parts.append(part.get('text', ''))\n elif isinstance(part, str):\n text_parts.append(part)\n\n if text_parts:\n assistant_response = '\\n'.join(text_parts)\n pairs.append({\n 'prompt': current_user_prompt,\n 'response': sanitize_data(assistant_response)\n })\n current_user_prompt = None # Reset für nächstes Paar\n\n except json.JSONDecodeError:\n continue\n\n break # Session gefunden, aufhören zu suchen\n\n return pairs\n\n except Exception as e:\n print(f\"Session read error: {e}\", file=sys.stderr)\n return []\n\n\ndef close_pending_user_prompts(session_id: str) -> None:\n \"\"\"Schließt offene UserPromptSubmit-Einträge mit ihren zugehörigen Responses\"\"\"\n try:\n # Hole alle Conversation-Paare aus der Session\n pairs = get_conversation_pairs_from_session(session_id)\n if not pairs:\n return\n\n connection = pymysql.connect(**DB_CONFIG)\n\n with connection.cursor() as cursor:\n # Hole alle pending UserPromptSubmit-Einträge der letzten Stunde\n cursor.execute(\"\"\"\n SELECT id, JSON_UNQUOTE(JSON_EXTRACT(request, '$.prompt')) as prompt\n FROM protokoll\n WHERE status = 'pending'\n AND JSON_EXTRACT(request, '$.event') = 'UserPromptSubmit'\n AND timestamp > DATE_SUB(NOW(), INTERVAL 1 HOUR)\n ORDER BY id ASC\n \"\"\")\n pending_prompts = cursor.fetchall()\n\n current_time = datetime.datetime.now()\n\n # Matche jeden pending Prompt mit seinem Response\n for db_id, db_prompt in pending_prompts:\n if not db_prompt:\n continue\n\n # Finde das passende Paar (Prompt-Match)\n matching_response = None\n for... [TRUNCATED-de9df41b21569d88]",
"newString": "#!\/usr\/bin\/env python3\n\"\"\"\nClaude Code Hook Logger v3 - Modularisiert\nOrchestriert Parsing, Formatting und Storage von Hook-Events\n\"\"\"\n\nimport json\nimport sys\nfrom typing import Dict, Any, Optional\n\n# Importiere modularisierte Komponenten\nfrom log_parser import (\n parse_event_data,\n sanitize_data,\n get_conversation_pairs_from_session\n)\nfrom log_formatter import (\n format_request_to_json,\n format_response_to_json,\n create_response_data,\n calculate_tokens,\n get_client_metadata,\n get_model_name,\n format_tool_response,\n should_suppress_output\n)\nfrom log_storage import (\n find_matching_request,\n save_pending_request,\n insert_log_entry,\n update_request_with_response,\n close_pending_user_prompts\n)\n\n\ndef log_to_database(data: Dict[str, Any]) -> Optional[int]:\n \"\"\"\n Hauptfunktion: Orchestriert das Logging eines Events\n\n Args:\n data: Event-Daten vom Hook\n\n Returns:\n Database ID des Log-Eintrags oder None bei Fehler\n \"\"\"\n try:\n event_name = data.get('hook_event_name', 'Unknown')\n session_id = data.get('session_id', '')\n\n # Prüfe auf matching Request für Response-Events (PostToolUse)\n if event_name == 'PostToolUse':\n matching_request_id = find_matching_request(data)\n if matching_request_id:\n # Update existing request mit response\n tool_response = sanitize_data(data.get('tool_response', {}))\n response_str = format_tool_response(tool_response)\n tokens = calculate_tokens('', response_str)\n\n update_request_with_response(\n matching_request_id,\n response_str,\n tokens['tokens_output']\n )\n return matching_request_id\n\n # Bei Stop-Events: Schließe alle pending UserPrompts\n if event_name in ['Stop', 'SubagentStop'] and session_id:\n pairs = get_conversation_pairs_from_session(session_id)\n close_pending_user_prompts(session_id, pairs)\n\n # Parse Event-Daten\n request_data = parse_event_data(data)\n\n # Erstelle Response-Daten (falls direkt verfügbar)\n response_data = create_response_data(event_name, data)\n\n # Formatiere zu JSON\n request_str = format_request_to_json(request_data)\n response_str = format_response_to_json(response_data)\n\n # Berechne Tokens\n tokens = calculate_tokens(request_str, response_str)\n\n # Hole Client-Metadaten\n client_meta = get_client_metadata()\n model_name = get_model_name()\n\n # Insert in DB\n db_id = insert_log_entry(\n request_str=request_str,\n client_ip=client_meta['client_ip'],\n client_name=client_meta['client_name'],\n response_str=response_str,\n tokens_input=tokens['tokens_input'],\n tokens_output=tokens['tokens_output'],\n model_name=model_name\n )\n\n # Speichere pending Requests für spätere Response-Zuordnung\n if db_id and not response_str:\n save_pending_request(data, db_id)\n\n return db_id\n\n except Exception as e:\n print(f\"Database logging error: {e}\", file=sys.stderr)\n return None",
"originalFile": "#!\/usr\/bin\/env python3\n\"\"\"\nClaude Code Hook Logger v2 für KI-Protokoll-Datenbank\nVerbesserte Version mit Request\/Response-Pairing und Duration-Tracking\n\"\"\"\n\nimport json\nimport os\nimport sys\nimport re\nimport datetime\nimport hashlib\nimport pymysql\nimport tempfile\nfrom pathlib import Path\nfrom typing import Dict, Any, Optional\n\n# .env aus Hook-Verzeichnis laden\nfrom dotenv import load_dotenv\nload_dotenv(Path(__file__).parent \/ '.env')\n\n# Konfiguration aus Environment-Variablen\nDB_CONFIG = {\n 'host': os.environ.get('CLAUDE_DB_HOST', 'localhost'),\n 'port': int(os.environ.get('CLAUDE_DB_PORT', '3306')),\n 'user': os.environ.get('CLAUDE_DB_USER', 'root'),\n 'password': os.environ.get('CLAUDE_DB_PASSWORD', ''),\n 'database': os.environ.get('CLAUDE_DB_NAME', 'ki_dev'),\n 'charset': 'utf8mb4'\n}\n\n# Session-Tracking im temporären Verzeichnis\nTEMP_DIR = Path(tempfile.gettempdir()) \/ \"claude_hooks\"\nTEMP_DIR.mkdir(exist_ok=True)\n\n# Sicherheitseinstellungen\nMAX_FIELD_LENGTH = 10000\nSENSITIVE_KEY_PATTERNS = re.compile(r\"(?i)(password|pass|secret|token|apikey|api_key|authorization|auth|bearer|credential)\")\nSENSITIVE_VALUE_PATTERNS = [\n re.compile(r\"(?i)\\bAKIA[0-9A-Z]{16}\\b\"),\n re.compile(r\"(?i)\\b(?:sk|rk|pk)[0-9A-Za-z]{20,}\\b\"),\n re.compile(r\"(?i)\\beyJ[a-zA-Z0-9-]{10,}\\.[a-zA-Z0-9_-]{10,}\\.[a-zA-Z0-9_-]{10,}\\b\")\n]\n\ndef get_client_ip() -> str:\n \"\"\"Ermittelt die Client-IP-Adresse\"\"\"\n ssh_client = os.environ.get('SSH_CLIENT', '')\n if ssh_client:\n return ssh_client.split()[0]\n ssh_connection = os.environ.get('SSH_CONNECTION', '')\n if ssh_connection:\n return ssh_connection.split()[0]\n return '127.0.0.1'\n\ndef sanitize_data(obj: Any) -> Any:\n \"\"\"Entfernt oder maskiert sensible Daten\"\"\"\n if isinstance(obj, dict):\n result = {}\n for key, value in obj.items():\n if SENSITIVE_KEY_PATTERNS.search(str(key)):\n result[key] = '[REDACTED]'\n else:\n result[key] = sanitize_data(value)\n return result\n elif isinstance(obj, list):\n return [sanitize_data(item) for item in obj]\n elif isinstance(obj, str):\n for pattern in SENSITIVE_VALUE_PATTERNS:\n if pattern.search(obj):\n return '[REDACTED]'\n if len(obj) > MAX_FIELD_LENGTH:\n hash_value = hashlib.sha256(obj.encode('utf-8', errors='ignore')).hexdigest()[:16]\n return obj[:MAX_FIELD_LENGTH] + f'... [TRUNCATED-{hash_value}]'\n return obj\n return obj\n\ndef estimate_tokens(text: str) -> int:\n \"\"\"Grobe Token-Schätzung (4 Zeichen = 1 Token)\"\"\"\n if not text:\n return 0\n return max(1, len(text) \/\/ 4)\n\ndef get_session_tracking_key(data: Dict[str, Any]) -> str:\n \"\"\"Erstellt einen eindeutigen Key für Session-Tracking\"\"\"\n session_id = data.get('session_id', '')\n event_name = data.get('hook_event_name', '')\n tool_name = data.get('tool_name', '')\n\n # Für Tool-Events: tool_name-spezifischer Key\n if event_name in ['PreToolUse', 'PostToolUse'] and tool_name:\n return f\"{session_id}_{tool_name}_{event_name}\"\n\n # Für andere Events: event-spezifischer Key\n return f\"{session_id}_{event_name}\"\n\ndef save_pending_request(data: Dict[str, Any], db_id: int) -> None:\n \"\"\"Speichert pending Request für spätere Response-Zuordnung\"\"\"\n try:\n key = get_session_tracking_key(data)\n tracking_file = TEMP_DIR \/ f\"{key}.json\"\n\n tracking_data = {\n 'db_id': db_id,\n 'timestamp': datetime.datetime.now().isoformat(),\n 'event': data.get('hook_event_name'),\n 'tool_name': data.get('tool_name', ''),\n 'session_id': data.get('session_id', '')\n }\n\n with open(tracking_file, 'w') as f:\n json.dump(tracking_data, f)\n\n except Exception as e:\n print(f\"Session tracking save error: {e}\", file=sys.stderr)\n\ndef find_matching_request(data: Dict[str, Any]) -> Optional[int]:\n \"\"\"Findet matching Request für Response-Event\"\"\"\n try:\n event_name = data.get('hook_event_name', '')\n\n # PostToolUse sucht nach PreToolUse\n if event_name == 'PostToolUse':\n search_data = dict(data)\n search_data['hook_event_name'] = 'PreToolUse'\n key = get_session_tracking_key(search_data)\n else:\n return None\n\n tracking_file = TEMP_DIR \/ f\"{key}.json\"\n\n if tracking_file.exists():\n with open(tracking_file, 'r') as f:\n tracking_data = json.load(f)\n\n # Cleanup\n tracking_file.unlink()\n return tracking_data['db_id']\n\n except Exception as e:\n print(f\"Session tracking find error: {e}\", file=sys.stderr)\n\n return None\n\ndef update_request_with_response(db_id: int, response_data: str) -> None:\n \"\"\"Updated existing Request mit Response-Daten und berechnet Duration\"\"\"\n try:\n connection = pymysql.connect(**DB_CONFIG)\n\n with connection.cursor() as cursor:\n current_time = datetime.datetime.now()\n\n # Erst Response und Timestamp setzen\n tokens_output = estimate_tokens(response_data)\n\n sql = \"\"\"\n UPDATE protokoll\n SET response = %s,\n response_timestamp = %s,\n tokens_output = %s,\n tokens_total = tokens_input + %s,\n status = 'completed'\n WHERE id = %s\n \"\"\"\n\n cursor.execute(sql, (\n response_data,\n current_time,\n tokens_output,\n tokens_output,\n db_id\n ))\n\n # Dann Duration aus Timestamps berechnen und setzen (mit Microsekunden-Präzision)\n duration_sql = \"\"\"\n UPDATE protokoll\n SET duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND, request_timestamp, response_timestamp) \/ 1000, 3)\n WHERE id = %s\n \"\"\"\n\n cursor.execute(duration_sql, (db_id,))\n\n connection.commit()\n\n except Exception as e:\n print(f\"Database update error: {e}\", file=sys.stderr)\n finally:\n if 'connection' in locals():\n connection.close()\n\n\ndef get_conversation_pairs_from_session(session_id: str) -> list:\n \"\"\"Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL\"\"\"\n pairs = []\n try:\n claude_dir = Path.home() \/ '.claude' \/ 'projects'\n\n for project_dir in claude_dir.iterdir():\n if not project_dir.is_dir():\n continue\n\n session_file = project_dir \/ f\"{session_id}.jsonl\"\n if session_file.exists():\n lines = session_file.read_text().strip().split('\\n')\n\n current_user_prompt = None\n for line in lines:\n try:\n entry = json.loads(line)\n entry_type = entry.get('type', '')\n\n # User-Message gefunden\n if entry_type == 'user':\n message = entry.get('message', {})\n content = message.get('content', '')\n if isinstance(content, str) and content.strip():\n current_user_prompt = content.strip()\n\n # Assistant-Message gefunden - pair mit letztem User-Prompt\n elif entry_type == 'assistant' and current_user_prompt:\n message = entry.get('message', {})\n content_parts = message.get('content', [])\n text_parts = []\n for part in content_parts:\n if isinstance(part, dict) and part.get('type') == 'text':\n text_parts.append(part.get('text', ''))\n elif isinstance(part, str):\n text_parts.append(part)\n\n if text_parts:\n assistant_response = '\\n'.join(text_parts)\n pairs.append({\n 'prompt': current_user_prompt,\n 'response': sanitize_data(assistant_response)\n })\n current_user_prompt = None # Reset für nächstes Paar\n\n except json.JSONDecodeError:\n continue\n\n break # Session gefunden, aufhören zu suchen\n\n return pairs\n\n except Exception as e:\n print(f\"Session read error: {e}\", file=sys.stderr)\n return []\n\n\ndef close_pending_user_prompts(session_id: str) -> None:\n \"\"\"Schließt offene UserPromptSubmit-Einträge mit ihren zugehörigen Responses\"\"\"\n try:\n # Hole alle Conversation-Paare aus der Session\n pairs = get_conversation_pairs_from_session(session_id)\n if not pairs:\n return\n\n connection = pymysql.connect(**DB_CONFIG)\n\n with connection.cursor() as cursor:\n # Hole alle pending UserPromptSubmit-Einträge der letzten Stunde\n cursor.execute(\"\"\"\n SELECT id, JSON_UNQUOTE(JSON_EXTRACT(request, '$.prompt')) as prompt\n FROM protokoll\n WHERE status = 'pending'\n AND JSON_EXTRACT(request, '$.event') = 'UserPromptSubmit'\n AND timestamp > DATE_SUB(NOW(), INTERVAL 1 HOUR)\n ORDER BY id ASC\n \"\"\")\n pending_prompts = cursor.fetchall()\n\n current_time = datetime.datetime.now()\n\n # Matche jeden pending Prompt mit seinem Response\n for db_id, db_prompt in pending_prompts:\n if not db_prompt:\n continue\n\n # Finde das passende Paar (Prompt-Match)\n matching_response = None\n for... [TRUNCATED-e3595077ff4c5e19]",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 448,
"newStart": 1,
"newLines": 111,
"lines": [
" #!\/usr\/bin\/env python3",
" \"\"\"",
"-Claude Code Hook Logger v2 für KI-Protokoll-Datenbank",
"-Verbesserte Version mit Request\/Response-Pairing und Duration-Tracking",
"+Claude Code Hook Logger v3 - Modularisiert",
"+Orchestriert Parsing, Formatting und Storage von Hook-Events",
" \"\"\"",
" ",
" import json",
"-import os",
" import sys",
"-import re",
"-import datetime",
"-import hashlib",
"-import pymysql",
"-import tempfile",
"-from pathlib import Path",
" from typing import Dict, Any, Optional",
" ",
"-# .env aus Hook-Verzeichnis laden",
"-from dotenv import load_dotenv",
"-load_dotenv(Path(__file__).parent \/ '.env')",
"+# Importiere modularisierte Komponenten",
"+from log_parser import (",
"+ parse_event_data,",
"+ sanitize_data,",
"+ get_conversation_pairs_from_session",
"+)",
"+from log_formatter import (",
"+ format_request_to_json,",
"+ format_response_to_json,",
"+ create_response_data,",
"+ calculate_tokens,",
"+ get_client_metadata,",
"+ get_model_name,",
"+ format_tool_response,",
"+ should_suppress_output",
"+)",
"+from log_storage import (",
"+ find_matching_request,",
"+ save_pending_request,",
"+ insert_log_entry,",
"+ update_request_with_response,",
"+ close_pending_user_prompts",
"+)",
" ",
"-# Konfiguration aus Environment-Variablen",
"-DB_CONFIG = {",
"- 'host': os.environ.get('CLAUDE_DB_HOST', 'localhost'),",
"- 'port': int(os.environ.get('CLAUDE_DB_PORT', '3306')),",
"- 'user': os.environ.get('CLAUDE_DB_USER', 'root'),",
"- 'password': os.environ.get('CLAUDE_DB_PASSWORD', ''),",
"- 'database': os.environ.get('CLAUDE_DB_NAME', 'ki_dev'),",
"- 'charset': 'utf8mb4'",
"-}",
" ",
"-# Session-Tracking im temporären Verzeichnis",
"-TEMP_DIR = Path(tempfile.gettempdir()) \/ \"claude_hooks\"",
"-TEMP_DIR.mkdir(exist_ok=True)",
"+def log_to_database(data: Dict[str, Any]) -> Optional[int]:",
"+ \"\"\"",
"+ Hauptfunktion: Orchestriert das Logging eines Events",
" ",
"-# Sicherheitseinstellungen",
"-MAX_FIELD_LENGTH = 10000",
"-SENSITIVE_KEY_PATTERNS = re.compile(r\"(?i)(password|pass|secret|token|apikey|api_key|authorization|auth|bearer|credential)\")",
"-SENSITIVE_VALUE_PATTERNS = [",
"- re.compile(r\"(?i)\\bAKIA[0-9A-Z]{16}\\b\"),",
"- re.compile(r\"(?i)\\b(?:sk|rk|pk)[0-9A-Za-z]{20,}\\b\"),",
"- re.compile(r\"(?i)\\beyJ[a-zA-Z0-9-]{10,}\\.[a-zA-Z0-9_-]{10,}\\.[a-zA-Z0-9_-]{10,}\\b\")",
"-]",
"+ Args:",
"+ data: Event-Daten vom Hook",
" ",
"-def get_client_ip() -> str:",
"- \"\"\"Ermittelt die Client-IP-Adresse\"\"\"",
"- ssh_client = os.environ.get('SSH_CLIENT', '')",
"- if ssh_client:",
"- return ssh_client.split()[0]",
"- ssh_connection = os.environ.get('SSH_CONNECTION', '')",
"- if ssh_connection:",
"- return ssh_connection.split()[0]",
"- return '127.0.0.1'",
"-",
"-def sanitize_data(obj: Any) -> Any:",
"- \"\"\"Entfernt oder maskiert sensible Daten\"\"\"",
"- if isinstance(obj, dict):",
"- result = {}",
"- for key, value in obj.items():",
"- if SENSITIVE_KEY_PATTERNS.search(str(key)):",
"- result[key] = '[REDACTED]'",
"- else:",
"- result[key] = sanitize_data(value)",
"- return result",
"- elif isinstance(obj, list):",
"- return [sanitize_data(item) for item in obj]",
"- elif isinstance(obj, str):",
"- for pattern in SENSITIVE_VALUE_PATTERNS:",
"- if pattern.search(obj):",
"- return '[REDACTED]'",
"- if len(obj) > MAX_FIELD_LENGTH:",
"- hash_value = hashlib.sha256(obj.encode('utf-8', errors='ignore')).hexdigest()[:16]",
"- return obj[:MAX_FIELD_LENGTH] + f'... [TRUNCATED-{hash_value}]'",
"- return obj",
"- return obj",
"-",
"-def estimate_tokens(text: str) -> int:",
"- \"\"\"Grobe Token-Schätzung (4 Zeichen = 1 Token)\"\"\"",
"- if not text:",
"- return 0",
"- return max(1, len(text) \/\/ 4)",
"-",
"-def get_session_tracking_key(data: Dict[str, Any]) -> str:",
"- \"\"\"Erstellt einen eindeutigen Key für Session-Tracking\"\"\"",
"- session_id = data.get('session_id', '')",
"- event_name = data.get('hook_event_name', '')",
"- tool_name = data.get('tool_name', '')",
"-",
"- # Für Tool-Events: tool_name-spezifischer Key",
"- if event_name in ['PreToolUse', 'PostToolUse'] and tool_name:",
"- return f\"{session_id}_{tool_name}_{event_name}\"",
"-",
"- # Für andere Events: event-spezifischer Key",
"- return f\"{session_id}_{event_name}\"",
"-",
"-def save_pending_request(data: Dict[str, Any], db_id: int) -> None:",
"- \"\"\"Speichert pending Request für spätere Response-Zuordnung\"\"\"",
"+ Returns:",
"+ Database ID des Log-Eintrags oder None bei Fehler",
"+ \"\"\"",
" try:",
"- key = get_session_tracking_key(data)",
"- tracking_file = TEMP_DIR \/ f\"{key}.json\"",
"+ event_name = data.get('hook_event_name', 'Unknown')",
"+ session_id = data.get('session_id', '')",
" ",
"- tracking_data = {",
"- 'db_id': db_id,",
"- 'timestamp': datetime.datetime.now().isoformat(),",
"- 'event': data.get('hook_event_name'),",
"- 'tool_name': data.get('tool_name', ''),",
"- 'session_id': data.get('session_id', '')",
"- }",
"-",
"- with open(tracking_file, 'w') as f:",
"- json.dump(tracking_data, f)",
"-",
"- except Exception as e:",
"- print(f\"Session tracking save error: {e}\", file=sys.stderr)",
"-",
"-def find_matching_request(data: Dict[str, Any]) -> Optional[int]:",
"- \"\"\"Findet matching Request für Response-Event\"\"\"",
"- try:",
"- event_name = data.get('hook_event_name', '')",
"-",
"- # PostToolUse sucht nach PreToolUse",
"+ # Prüfe auf matching Request für Response-Events (PostToolUse)",
" if event_name == 'PostToolUse':",
"- search_data = dict(data)",
"- search_data['hook_event_name'] = 'PreToolUse'",
"- key = get_session_tracking_key(search_data)",
"- else:",
"- return None",
"+ matching_request_id = find_matching_request(data)",
"+ if matching_request_id:",
"+ # Update existing request mit response",
"+ tool_response = sanitize_data(data.get('tool_response', {}))",
"+ response_str = format_tool_response(tool_response)",
"+ tokens = calculate_tokens('', response_str)",
" ",
"- tracking_file = TEMP_DIR \/ f\"{key}.json\"",
"+ update_request_with_response(",
"+ matching_request_id,",
"+ response_str,",
"+ tokens['tokens_output']",
"+ )",
"+ return matching_request_id",
" ",
"- if tracking_file.exists():",
"- with open(tracking_file, 'r') as f:",
"- tracking_data = json.load(f)",
"+ # Bei Stop-Events: Schließe alle pending UserPrompts",
"+ if event_name in ['Stop', 'SubagentStop'] and session_id:",
"+ pairs = get_conversation_pairs_from_session(session_id)",
"+ close_pending_user_prompts(session_id, pairs)",
" ",
"- # Cleanup",
"- tracking_file.unlink()",
"- return tracking_data['db_id']",
"+ # Parse Event-Daten",
"+ request_data = parse_event_data(data)",
" ",
"- except Exception as e:",
"- print(f\"Session tracking find error: {e}\", file=sys.stderr)",
"+ # Erstelle Response-Daten (falls direkt verfügbar)",
"+ response_data = create_response_data(event_name, data)",
" ",
"- return None",
"+ # Formatiere zu JSON",
"+ request_str = format_request_to_json(request_data)",
"+ response_str = format_response_to_json(response_data)",
" ",
"-def update_request_with_response(db_id: int, response_data: str) -> None:",
"- \"\"\"Updated existing Request mit Response-Daten und berechnet Duration\"\"\"",
"- try:",
"- connection = pymysql.connect(**DB_CONFIG)",
"+ # Berechne Tokens",
"+ tokens = calculate_tokens(request_str, response_str)",
" ",
"- with connection.cursor() as cursor:",
"- current_time = datetime.datetime.now()",
"+ # Hole Client-Metadaten",
"+ client_meta = get_client_metadata()",
"+ model_name = get_model_name()",
" ",
"- # Erst Response und Timestamp setzen",
"- tokens_output = estimate_tokens(response_data)",
"+ # Insert in DB",
"+ db_id = insert_log_entry(",
"+ request_str=request_str,",
"+ client_ip=client_meta['client_ip'],",
"+ client_name=client_meta['client_name'],",
"+ response_str=response_str,",
"+ tokens_input=tokens['tokens_input'],",
"+ tokens_output=tokens['tokens_output'],",
"+ model_name=model_name",
"+ )",
" ",
"- sql = \"\"\"",
"- UPDATE protokoll",
"- SET response = %s,",
"- response_timestamp = %s,",
"- tokens_output = %s,",
"- tokens_total = tokens_input + %s,",
"- status = 'completed'",
"- WHERE id = %s",
"- \"\"\"",
"+ # Speichere pending Requests für spätere Response-Zuordnung",
"+ if db_id and not response_str:",
"+ save_pending_request(data, db_id)",
" ",
"- cursor.execute(sql, (",
"- response_data,",
"- current_time,",
"- tokens_output,",
"- tokens_output,",
"- db_id",
"- ))",
"+ return db_id",
" ",
"- # Dann Duration aus Timestamps berechnen und setzen (mit Microsekunden-Präzision)",
"- duration_sql = \"\"\"",
"- UPDATE protokoll",
"- SET duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND, request_timestamp, response_timestamp) \/ 1000, 3)",
"- WHERE id = %s",
"- \"\"\"",
"-",
"- cursor.execute(duration_sql, (db_id,))",
"-",
"- connection.commit()",
"-",
" except Exception as e:",
"- print(f\"Database update error: {e}\", file=sys.stderr)",
"- finally:",
"- if 'connection' in locals():",
"- connection.close()",
"-",
"-",
"-def get_conversation_pairs_from_session(session_id: str) -> list:",
"- \"\"\"Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL\"\"\"",
"- pairs = []",
"- try:",
"- claude_dir = Path.home() \/ '.claude' \/ 'projects'",
"-",
"- for project_dir in claude_dir.iterdir():",
"- if not project_dir.is_dir():",
"- continue",
"-",
"- session_file = project_dir \/ f\"{session_id}.jsonl\"",
"- if session_file.exists():",
"- lines = session_file.read_text().strip().split('\\n')",
"-",
"- current_user_prompt = None",
"- for line in lines:",
"- try:",
"- entry = json.loads(line)",
"- entry_type = entry.get('type', '')",
"-",
"- # User-Message gefunden",
"- if entry_type == 'user':",
"- message = entry.get('message', {})",
"- content = message.get('content', '')",
"- if isinstance(content, str) and content.strip():",
"- current_user_prompt = content.strip()",
"-",
"- # Assistant-Message gefunden - pair mit letztem User-Prompt",
"- elif entry_type == 'assistant' and current_user_prompt:",
"- message = entry.get('message', {})",
"- content_parts = message.get('content', [])",
"- text_parts = []",
"- for part in content_parts:",
"- if isinstance(part, dict) and part.get('type') == 'text':",
"- text_parts.append(part.get('text', ''))",
"- elif isinstance(part, str):",
"- text_parts.append(part)",
"-",
"- if text_parts:",
"- assistant_response = '\\n'.join(text_parts)",
"- pairs.append({",
"- 'prompt': current_user_prompt,",
"- 'response': sanitize_data(assistant_response)",
"- })",
"- current_user_prompt = None # Reset für nächstes Paar",
"-",
"- except json.JSONDecodeError:",
"- continue",
"-",
"- break # Session gefunden, aufhören zu suchen",
"-",
"- return pairs",
"-",
"- except Exception as e:",
"- print(f\"Session read error: {e}\", file=sys.stderr)",
"- return []",
"-",
"-",
"-def close_pending_user_prompts(session_id: str) -> None:",
"- \"\"\"Schließt offene UserPromptSubmit-Einträge mit ihren zugehörigen Responses\"\"\"",
"- try:",
"- # Hole alle Conversation-Paare aus der Session",
"- pairs = get_conversation_pairs_from_session(session_id)",
"- if not pairs:",
"- return",
"-",
"- connection = pymysql.connect(**DB_CONFIG)",
"-",
"- with connection.cursor() as cursor:",
"- # Hole alle pending UserPromptSubmit-Einträge der letzten Stunde",
"- cursor.execute(\"\"\"",
"- SELECT id, JSON_UNQUOTE(JSON_EXTRACT(request, '$.prompt')) as prompt",
"- FROM protokoll",
"- WHERE status = 'pending'",
"- AND JSON_EXTRACT(request, '$.event') = 'UserPromptSubmit'",
"- AND timestamp > DATE_SUB(NOW(), INTERVAL 1 HOUR)",
"- ORDER BY id ASC",
"- \"\"\")",
"- pending_prompts = cursor.fetchall()",
"-",
"- current_time = datetime.datetime.now()",
"-",
"- # Matche jeden pending Prompt mit seinem Response",
"- for db_id, db_prompt in pending_prompts:",
"- if not db_prompt:",
"- continue",
"-",
"- # Finde das passende Paar (Prompt-Match)",
"- matching_response = None",
"- for pair in pairs:",
"- # Vergleiche die Prompts (normalisiert)",
"- if pair['prompt'].strip() == db_prompt.strip():",
"- matching_response = pair['response']",
"- break",
"-",
"- if matching_response:",
"- tokens_output = estimate_tokens(matching_response)",
"- response_json = json.dumps({",
"- 'assistant_response': matching_response",
"- }, ensure_ascii=False)",
"-",
"- cursor.execute(\"\"\"",
"- UPDATE protokoll",
"- SET response = %s,",
"- response_timestamp = %s,",
"- tokens_output = %s,",
"- tokens_total = tokens_input + %s,",
"- duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND, request_timestamp, %s) \/ 1000, 3),",
"- status = 'completed'",
"- WHERE id = %s",
"- \"\"\", (",
"- response_json,",
"- current_time,",
"- tokens_output,",
"- tokens_output,",
"- current_time,",
"- db_id",
"- ))",
"-",
"- connection.commit()",
"-",
"- except Exception as e:",
"- print(f\"Close pending prompts error: {e}\", file=sys.stderr)",
"- finally:",
"- if 'connection' in locals():",
"- connection.close()",
"-",
"-# Entfernt - Duration wird jetzt direkt in SQL berechnet",
"-",
"-def log_to_database(data: Dict[str, Any]) -> Optional[int]:",
"- \"\"\"Schreibt Protokoll-Eintrag in die Datenbank\"\"\"",
"- try:",
"- connection = pymysql.connect(**DB_CONFIG)",
"-",
"- with connection.cursor() as cursor:",
"- event_name = data.get('hook_event_name', 'Unknown')",
"- session_id = data.get('session_id', '')",
"- client_ip = get_client_ip()",
"- client_name = os.environ.get('USER', 'unknown')",
"- current_time = datetime.datetime.now()",
"-",
"- # Prüfe auf matching Request für Response-Events",
"- if event_name == 'PostToolUse':",
"- matching_request_id = find_matching_request(data)",
"- if matching_request_id:",
"- # Update existing request mit response",
"- tool_response = sanitize_data(data.get('tool_response', {}))",
"- response_str = json.dumps({'tool_response': tool_response}, ensure_ascii=False)",
"-",
"- # Duration wird automatisch aus Timestamps berechnet",
"- update_request_with_response(matching_request_id, response_str)",
"- return matching_request_id",
"-",
"- # Normale Event-Verarbeitung für neue Einträge",
"- request_data = {}",
"- response_data = None",
"- model_name = 'claude-sonnet-4-20250514'",
"-",
"- if event_name == 'UserPromptSubmit':",
"- request_data = {",
"- 'event': event_name,",
"- 'prompt': sanitize_data(data.get('prompt', ''))",
"- }",
"-",
"- elif event_name == 'PreToolUse':",
"- tool_name = data.get('tool_name', '')",
"- tool_input = sanitize_data(data.get('tool_input', {}))",
"- request_data = {",
"- 'event': event_name,",
"- 'tool_name': tool_name,",
"- 'tool_input': tool_input",
"- }",
"-",
"- elif event_name in ['Stop', 'SubagentStop']:",
"- # Bei Stop: Matche alle pending Prompts mit ihren Responses",
"- session_id = data.get('session_id', '')",
"- if session_id:",
"- close_pending_user_prompts(session_id)",
"-",
"- request_data = {",
"- 'event': event_name,",
"- 'stop_hook_active': data.get('stop_hook_active', False)",
"- }",
"- response_data = {'status': 'completed'}",
"-",
"- elif event_name in ['SessionStart', 'SessionEnd']:",
"- request_data = {",
"- 'event': event_name,",
"- 'source': data.get('source', data.get('reason', ''))",
"- }",
"- response_data = {'status': 'logged'}",
"-",
"- else:",
"- request_data = {",
"- 'event': event_name,",
"- 'raw_data': sanitize_data(data)",
"- }",
"-",
"- # JSON zu String konvertieren",
"- request_str = json.dumps(request_data, ensure_ascii=False)",
"- response_str = json.dumps(response_data, ensure_ascii=False) if response_data else None",
"-",
"- # Token-Schätzung",
"- tokens_input = estimate_tokens(request_str)",
"- tokens_output = estimate_tokens(response_str) if response_str else 0",
"- tokens_total = tokens_input + tokens_output",
"-",
"- # Status bestimmen",
"- status = 'completed' if response_str else 'pending'",
"- response_timestamp = current_time if response_str else None",
"-",
"- # SQL Insert",
"- sql = \"\"\"",
"- INSERT INTO protokoll (",
"- timestamp, request_ip, client_name, request, request_timestamp,",
"- response, response_timestamp, duration_ms, tokens_input,",
"- tokens_output, tokens_total, model_name, status",
"- ) VALUES (",
"- %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
"- )",
"- \"\"\"",
"-",
"- cursor.execute(sql, (",
"- current_time, # timestamp",
"- client_ip, # request_ip",
"- client_name, # client_name",
"- request_str, # request",
"- current_time, # request_timestamp",
"- response_str, # response",
"- response_timestamp, # response_timestamp",
"- 0, # duration_ms (wird bei Updates gesetzt)",
"- tokens_input, # tokens_input",
"- tokens_output, # tokens_output",
"- tokens_total, # tokens_total",
"- model_name, # model_name",
"- status # status",
"- ))",
"-",
"- connection.commit()",
"- db_id = cursor.lastrowid",
"-",
"- # Speichere pending Requests für spätere Response-Zuordnung",
"- if status == 'pending':",
"- save_pending_request(data, db_id)",
"-",
"- return db_id",
"-",
"- except Exception as e:",
" print(f\"Database logging error: {e}\", file=sys.stderr)",
" return None",
" ",
"- finally:",
"- if 'connection' in locals():",
"- connection.close()",
"-",
" def main():",
" \"\"\"Hauptfunktion des Hook-Skripts\"\"\"",
" try:"
]
}
],
"userModified": false,
"replaceAll": false
}
}