log_parser.py
- Pfad:
/var/www/tools/ki-protokoll/claude-hook/log_parser.py - Namespace: claude-hook
- Zeilen: 117 | Größe: 4,896 Bytes
- Geändert: 2025-12-25 17:20:00 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 93
- Dependencies: 70 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 8
- use json
- use os
- use re
- use pathlib.Path
- use typing.Dict
- use typing.Any
- use typing.List
- use hashlib
Funktionen 5
-
sanitize_data()Zeile 19 -
estimate_tokens()Zeile 43 -
get_client_ip()Zeile 50 -
parse_event_data()Zeile 61 -
get_conversation_pairs_from_session()Zeile 80
Code
#!/usr/bin/env python3
"""Log Parser Module - Parsen und Extrahieren von Daten aus Hook-Events"""
import json
import os
import re
from pathlib import Path
from typing import Dict, Any, List
MAX_FIELD_LENGTH = 10000
SENSITIVE_KEY_PATTERNS = re.compile(
r"(?i)(password|pass|secret|token|apikey|api_key|authorization|auth|bearer|credential)")
SENSITIVE_VALUE_PATTERNS = [
re.compile(r"(?i)\bAKIA[0-9A-Z]{16}\b"),
re.compile(r"(?i)\b(?:sk|rk|pk)[0-9A-Za-z]{20,}\b"),
re.compile(r"(?i)\beyJ[a-zA-Z0-9-]{10,}\.[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]{10,}\b")]
def sanitize_data(obj: Any) -> Any:
"""Entfernt oder maskiert sensible Daten"""
if isinstance(obj, dict):
result = {}
for key, value in obj.items():
if SENSITIVE_KEY_PATTERNS.search(str(key)):
result[key] = '[REDACTED]'
else:
result[key] = sanitize_data(value)
return result
elif isinstance(obj, list):
return [sanitize_data(item) for item in obj]
elif isinstance(obj, str):
for pattern in SENSITIVE_VALUE_PATTERNS:
if pattern.search(obj):
return '[REDACTED]'
if len(obj) > MAX_FIELD_LENGTH:
import hashlib
hash_value = hashlib.sha256(obj.encode('utf-8', errors='ignore')).hexdigest()[:16]
return obj[:MAX_FIELD_LENGTH] + f'... [TRUNCATED-{hash_value}]'
return obj
return obj
def estimate_tokens(text: str) -> int:
"""Grobe Token-Schätzung (4 Zeichen = 1 Token)"""
if not text:
return 0
return max(1, len(text) // 4)
def get_client_ip() -> str:
"""Ermittelt die Client-IP-Adresse"""
ssh_client = os.environ.get('SSH_CLIENT', '')
if ssh_client:
return ssh_client.split()[0]
ssh_connection = os.environ.get('SSH_CONNECTION', '')
if ssh_connection:
return ssh_connection.split()[0]
return '127.0.0.1'
def parse_event_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Extrahiert und strukturiert Event-Daten basierend auf Event-Typ"""
evt = data.get('hook_event_name', 'Unknown')
if evt == 'UserPromptSubmit':
return {'event': evt, 'prompt': sanitize_data(data.get('prompt', ''))}
elif evt == 'PreToolUse':
return {'event': evt, 'tool_name': data.get('tool_name', ''),
'tool_input': sanitize_data(data.get('tool_input', {}))}
elif evt == 'PostToolUse':
return {'event': evt, 'tool_name': data.get('tool_name', ''),
'tool_response': sanitize_data(data.get('tool_response', {}))}
elif evt in ['Stop', 'SubagentStop']:
return {'event': evt, 'stop_hook_active': data.get('stop_hook_active', False)}
elif evt in ['SessionStart', 'SessionEnd']:
return {'event': evt, 'source': data.get('source', data.get('reason', ''))}
else:
return {'event': evt, 'raw_data': sanitize_data(data)}
def get_conversation_pairs_from_session(session_id: str) -> List[Dict[str, str]]:
"""Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL"""
pairs = []
try:
for project_dir in (Path.home() / '.claude' / 'projects').iterdir():
if not project_dir.is_dir():
continue
session_file = project_dir / f"{session_id}.jsonl"
if session_file.exists():
lines = session_file.read_text().strip().split('\n')
current_user_prompt = None
for line in lines:
try:
entry = json.loads(line)
entry_type = entry.get('type', '')
if entry_type == 'user':
content = entry.get('message', {}).get('content', '')
if isinstance(content, str) and content.strip():
current_user_prompt = content.strip()
elif entry_type == 'assistant' and current_user_prompt:
content_parts = entry.get('message', {}).get('content', [])
text_parts = []
for part in content_parts:
if isinstance(part, dict) and part.get('type') == 'text':
text_parts.append(part.get('text', ''))
elif isinstance(part, str):
text_parts.append(part)
if text_parts:
pairs.append({'prompt': current_user_prompt,
'response': sanitize_data('\n'.join(text_parts))})
current_user_prompt = None
except json.JSONDecodeError:
continue
break
return pairs
except Exception:
return []