log_parser.py

Code Hygiene Score: 93

Keine Issues gefunden.

Dependencies 8

Funktionen 5

Code

#!/usr/bin/env python3
"""Log Parser Module - Parsen und Extrahieren von Daten aus Hook-Events"""

import json
import os
import re
from pathlib import Path
from typing import Dict, Any, List

MAX_FIELD_LENGTH = 10000
SENSITIVE_KEY_PATTERNS = re.compile(
    r"(?i)(password|pass|secret|token|apikey|api_key|authorization|auth|bearer|credential)")
SENSITIVE_VALUE_PATTERNS = [
    re.compile(r"(?i)\bAKIA[0-9A-Z]{16}\b"),
    re.compile(r"(?i)\b(?:sk|rk|pk)[0-9A-Za-z]{20,}\b"),
    re.compile(r"(?i)\beyJ[a-zA-Z0-9-]{10,}\.[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]{10,}\b")]


def sanitize_data(obj: Any) -> Any:
    """Entfernt oder maskiert sensible Daten"""
    if isinstance(obj, dict):
        result = {}
        for key, value in obj.items():
            if SENSITIVE_KEY_PATTERNS.search(str(key)):
                result[key] = '[REDACTED]'
            else:
                result[key] = sanitize_data(value)
        return result
    elif isinstance(obj, list):
        return [sanitize_data(item) for item in obj]
    elif isinstance(obj, str):
        for pattern in SENSITIVE_VALUE_PATTERNS:
            if pattern.search(obj):
                return '[REDACTED]'
        if len(obj) > MAX_FIELD_LENGTH:
            import hashlib
            hash_value = hashlib.sha256(obj.encode('utf-8', errors='ignore')).hexdigest()[:16]
            return obj[:MAX_FIELD_LENGTH] + f'... [TRUNCATED-{hash_value}]'
        return obj
    return obj


def estimate_tokens(text: str) -> int:
    """Grobe Token-Schätzung (4 Zeichen = 1 Token)"""
    if not text:
        return 0
    return max(1, len(text) // 4)


def get_client_ip() -> str:
    """Ermittelt die Client-IP-Adresse"""
    ssh_client = os.environ.get('SSH_CLIENT', '')
    if ssh_client:
        return ssh_client.split()[0]
    ssh_connection = os.environ.get('SSH_CONNECTION', '')
    if ssh_connection:
        return ssh_connection.split()[0]
    return '127.0.0.1'


def parse_event_data(data: Dict[str, Any]) -> Dict[str, Any]:
    """Extrahiert und strukturiert Event-Daten basierend auf Event-Typ"""
    evt = data.get('hook_event_name', 'Unknown')
    if evt == 'UserPromptSubmit':
        return {'event': evt, 'prompt': sanitize_data(data.get('prompt', ''))}
    elif evt == 'PreToolUse':
        return {'event': evt, 'tool_name': data.get('tool_name', ''),
                'tool_input': sanitize_data(data.get('tool_input', {}))}
    elif evt == 'PostToolUse':
        return {'event': evt, 'tool_name': data.get('tool_name', ''),
                'tool_response': sanitize_data(data.get('tool_response', {}))}
    elif evt in ['Stop', 'SubagentStop']:
        return {'event': evt, 'stop_hook_active': data.get('stop_hook_active', False)}
    elif evt in ['SessionStart', 'SessionEnd']:
        return {'event': evt, 'source': data.get('source', data.get('reason', ''))}
    else:
        return {'event': evt, 'raw_data': sanitize_data(data)}


def get_conversation_pairs_from_session(session_id: str) -> List[Dict[str, str]]:
    """Liest User-Prompt → Assistant-Response Paare aus der Session-JSONL"""
    pairs = []
    try:
        for project_dir in (Path.home() / '.claude' / 'projects').iterdir():
            if not project_dir.is_dir():
                continue
            session_file = project_dir / f"{session_id}.jsonl"
            if session_file.exists():
                lines = session_file.read_text().strip().split('\n')
                current_user_prompt = None
                for line in lines:
                    try:
                        entry = json.loads(line)
                        entry_type = entry.get('type', '')
                        if entry_type == 'user':
                            content = entry.get('message', {}).get('content', '')
                            if isinstance(content, str) and content.strip():
                                current_user_prompt = content.strip()
                        elif entry_type == 'assistant' and current_user_prompt:
                            content_parts = entry.get('message', {}).get('content', [])
                            text_parts = []
                            for part in content_parts:
                                if isinstance(part, dict) and part.get('type') == 'text':
                                    text_parts.append(part.get('text', ''))
                                elif isinstance(part, str):
                                    text_parts.append(part)
                            if text_parts:
                                pairs.append({'prompt': current_user_prompt,
                                            'response': sanitize_data('\n'.join(text_parts))})
                                current_user_prompt = None
                    except json.JSONDecodeError:
                        continue
                break
        return pairs
    except Exception:
        return []
← Übersicht