log_storage.py

Code Hygiene Score: 78

Issues 2

Zeile Typ Beschreibung
104 magic_number Magic Number gefunden: 1000
136 magic_number Magic Number gefunden: 1000

Dependencies 13

Funktionen 9

Code

#!/usr/bin/env python3
"""Log Storage Module - DB-Speicherung und Aktualisierung von Logs"""

import os
import json
import datetime
import pymysql
import sys
import tempfile
from pathlib import Path
from typing import Dict, Any, Optional, List
from dotenv import load_dotenv

load_dotenv(Path(__file__).parent / '.env')

DB_CONFIG = {
    'host': os.environ.get('CLAUDE_DB_HOST', 'localhost'),
    'port': int(os.environ.get('CLAUDE_DB_PORT', '3306')),
    'user': os.environ.get('CLAUDE_DB_USER', 'root'),
    'password': os.environ.get('CLAUDE_DB_PASSWORD', ''),
    'database': os.environ.get('CLAUDE_DB_NAME', 'ki_dev'),
    'charset': 'utf8mb4'
}

TEMP_DIR = Path(tempfile.gettempdir()) / "claude_hooks"
TEMP_DIR.mkdir(exist_ok=True)


def get_session_tracking_key(data: Dict[str, Any]) -> str:
    """Erstellt einen eindeutigen Key für Session-Tracking"""
    sid = data.get('session_id', '')
    evt = data.get('hook_event_name', '')
    tool = data.get('tool_name', '')
    if evt in ['PreToolUse', 'PostToolUse'] and tool:
        return f"{sid}_{tool}_{evt}"
    return f"{sid}_{evt}"


def save_pending_request(data: Dict[str, Any], db_id: int) -> None:
    """Speichert pending Request für spätere Response-Zuordnung"""
    try:
        key = get_session_tracking_key(data)
        tracking = {'db_id': db_id, 'timestamp': datetime.datetime.now().isoformat(),
                    'event': data.get('hook_event_name'), 'tool_name': data.get('tool_name', ''),
                    'session_id': data.get('session_id', '')}
        (TEMP_DIR / f"{key}.json").write_text(json.dumps(tracking))
    except Exception as e:
        print(f"Session tracking save error: {e}", file=sys.stderr)


def find_matching_request(data: Dict[str, Any]) -> Optional[int]:
    """Findet matching Request für Response-Event"""
    try:
        if data.get('hook_event_name', '') != 'PostToolUse':
            return None
        search = dict(data)
        search['hook_event_name'] = 'PreToolUse'
        key = get_session_tracking_key(search)
        track_file = TEMP_DIR / f"{key}.json"
        if track_file.exists():
            track_data = json.loads(track_file.read_text())
            track_file.unlink()
            return track_data['db_id']
    except Exception as e:
        print(f"Session tracking find error: {e}", file=sys.stderr)
    return None


def insert_log_entry(request_str: str, client_ip: str, client_name: str, response_str: Optional[str],
                     tokens_input: int, tokens_output: int, model_name: str) -> Optional[int]:
    """Fügt einen neuen Log-Eintrag in die Datenbank ein"""
    try:
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            ct = datetime.datetime.now()
            tt = tokens_input + tokens_output
            status = 'completed' if response_str else 'pending'
            rt = ct if response_str else None
            cursor.execute("""INSERT INTO protokoll (timestamp, request_ip, client_name, request,
                request_timestamp, response, response_timestamp, duration_ms, tokens_input, tokens_output,
                tokens_total, model_name, status) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
                (ct, client_ip, client_name, request_str, ct, response_str, rt, 0,
                 tokens_input, tokens_output, tt, model_name, status))
            connection.commit()
            return cursor.lastrowid
    except Exception as e:
        print(f"Database insert error: {e}", file=sys.stderr)
        return None
    finally:
        if 'connection' in locals():
            connection.close()


def update_request_with_response(db_id: int, response_data: str, tokens_output: int) -> None:
    """Updated existing Request mit Response-Daten und berechnet Duration"""
    try:
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            ct = datetime.datetime.now()
            cursor.execute("""UPDATE protokoll SET response = %s, response_timestamp = %s, tokens_output = %s,
                tokens_total = tokens_input + %s, status = 'completed' WHERE id = %s""",
                (response_data, ct, tokens_output, tokens_output, db_id))
            cursor.execute("""UPDATE protokoll SET duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND,
                request_timestamp, response_timestamp) / 1000, 3) WHERE id = %s""", (db_id,))
            connection.commit()
    except Exception as e:
        print(f"Database update error: {e}", file=sys.stderr)
    finally:
        if 'connection' in locals():
            connection.close()


def _get_pending_prompts(cursor) -> List:
    """Holt pending UserPromptSubmit-Einträge"""
    cursor.execute("""SELECT id, JSON_UNQUOTE(JSON_EXTRACT(request, '$.prompt')) as prompt
        FROM protokoll WHERE status = 'pending' AND JSON_EXTRACT(request, '$.event') = 'UserPromptSubmit'
        AND timestamp > DATE_SUB(NOW(), INTERVAL 1 HOUR) ORDER BY id ASC""")
    return cursor.fetchall()


def _find_matching_response(db_prompt: str, pairs: List[Dict[str, str]]) -> Optional[str]:
    """Findet matching Response für einen Prompt"""
    for pair in pairs:
        if pair['prompt'].strip() == db_prompt.strip():
            return pair['response']
    return None


def _update_prompt_with_response(cursor, db_id: int, response: str, current_time) -> None:
    """Updated einen pending Prompt mit seiner Response"""
    from log_parser import estimate_tokens
    tokens = estimate_tokens(response)
    resp_json = json.dumps({'assistant_response': response}, ensure_ascii=False)
    cursor.execute("""UPDATE protokoll SET response = %s, response_timestamp = %s, tokens_output = %s,
        tokens_total = tokens_input + %s,
        duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND, request_timestamp, %s) / 1000, 3),
        status = 'completed' WHERE id = %s""", (resp_json, current_time, tokens, tokens, current_time, db_id))


def close_pending_user_prompts(session_id: str, conversation_pairs: List[Dict[str, str]]) -> None:
    """Schließt offene UserPromptSubmit-Einträge mit ihren zugehörigen Responses"""
    if not conversation_pairs:
        return
    try:
        connection = pymysql.connect(**DB_CONFIG)
        with connection.cursor() as cursor:
            pending_prompts = _get_pending_prompts(cursor)
            ct = datetime.datetime.now()
            for db_id, db_prompt in pending_prompts:
                if db_prompt:
                    resp = _find_matching_response(db_prompt, conversation_pairs)
                    if resp:
                        _update_prompt_with_response(cursor, db_id, resp, ct)
            connection.commit()
    except Exception as e:
        print(f"Close pending prompts error: {e}", file=sys.stderr)
    finally:
        if 'connection' in locals():
            connection.close()
← Übersicht