log_storage.py
- Pfad:
/var/www/tools/ki-protokoll/claude-hook/log_storage.py
- Namespace: claude-hook
- Zeilen: 160 | Größe: 6,874 Bytes
- Geändert: 2025-12-25 17:20:00 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 78
- Dependencies: 20 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 80 (10%)
Issues 2
| Zeile |
Typ |
Beschreibung |
| 104 |
magic_number |
Magic Number gefunden: 1000 |
| 136 |
magic_number |
Magic Number gefunden: 1000 |
Dependencies 13
- use os
- use json
- use datetime
- use pymysql
- use sys
- use tempfile
- use pathlib.Path
- use typing.Dict
- use typing.Any
- use typing.Optional
- use typing.List
- use dotenv.load_dotenv
- use log_parser.estimate_tokens
Funktionen 9
-
get_session_tracking_key()
Zeile 29
-
save_pending_request()
Zeile 39
-
find_matching_request()
Zeile 51
-
insert_log_entry()
Zeile 69
-
update_request_with_response()
Zeile 94
-
_get_pending_prompts()
Zeile 113
-
_find_matching_response()
Zeile 121
-
_update_prompt_with_response()
Zeile 129
-
close_pending_user_prompts()
Zeile 140
Code
#!/usr/bin/env python3
"""Log Storage Module - DB-Speicherung und Aktualisierung von Logs"""
import os
import json
import datetime
import pymysql
import sys
import tempfile
from pathlib import Path
from typing import Dict, Any, Optional, List
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / '.env')
DB_CONFIG = {
'host': os.environ.get('CLAUDE_DB_HOST', 'localhost'),
'port': int(os.environ.get('CLAUDE_DB_PORT', '3306')),
'user': os.environ.get('CLAUDE_DB_USER', 'root'),
'password': os.environ.get('CLAUDE_DB_PASSWORD', ''),
'database': os.environ.get('CLAUDE_DB_NAME', 'ki_dev'),
'charset': 'utf8mb4'
}
TEMP_DIR = Path(tempfile.gettempdir()) / "claude_hooks"
TEMP_DIR.mkdir(exist_ok=True)
def get_session_tracking_key(data: Dict[str, Any]) -> str:
"""Erstellt einen eindeutigen Key für Session-Tracking"""
sid = data.get('session_id', '')
evt = data.get('hook_event_name', '')
tool = data.get('tool_name', '')
if evt in ['PreToolUse', 'PostToolUse'] and tool:
return f"{sid}_{tool}_{evt}"
return f"{sid}_{evt}"
def save_pending_request(data: Dict[str, Any], db_id: int) -> None:
"""Speichert pending Request für spätere Response-Zuordnung"""
try:
key = get_session_tracking_key(data)
tracking = {'db_id': db_id, 'timestamp': datetime.datetime.now().isoformat(),
'event': data.get('hook_event_name'), 'tool_name': data.get('tool_name', ''),
'session_id': data.get('session_id', '')}
(TEMP_DIR / f"{key}.json").write_text(json.dumps(tracking))
except Exception as e:
print(f"Session tracking save error: {e}", file=sys.stderr)
def find_matching_request(data: Dict[str, Any]) -> Optional[int]:
"""Findet matching Request für Response-Event"""
try:
if data.get('hook_event_name', '') != 'PostToolUse':
return None
search = dict(data)
search['hook_event_name'] = 'PreToolUse'
key = get_session_tracking_key(search)
track_file = TEMP_DIR / f"{key}.json"
if track_file.exists():
track_data = json.loads(track_file.read_text())
track_file.unlink()
return track_data['db_id']
except Exception as e:
print(f"Session tracking find error: {e}", file=sys.stderr)
return None
def insert_log_entry(request_str: str, client_ip: str, client_name: str, response_str: Optional[str],
tokens_input: int, tokens_output: int, model_name: str) -> Optional[int]:
"""Fügt einen neuen Log-Eintrag in die Datenbank ein"""
try:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
ct = datetime.datetime.now()
tt = tokens_input + tokens_output
status = 'completed' if response_str else 'pending'
rt = ct if response_str else None
cursor.execute("""INSERT INTO protokoll (timestamp, request_ip, client_name, request,
request_timestamp, response, response_timestamp, duration_ms, tokens_input, tokens_output,
tokens_total, model_name, status) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(ct, client_ip, client_name, request_str, ct, response_str, rt, 0,
tokens_input, tokens_output, tt, model_name, status))
connection.commit()
return cursor.lastrowid
except Exception as e:
print(f"Database insert error: {e}", file=sys.stderr)
return None
finally:
if 'connection' in locals():
connection.close()
def update_request_with_response(db_id: int, response_data: str, tokens_output: int) -> None:
"""Updated existing Request mit Response-Daten und berechnet Duration"""
try:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
ct = datetime.datetime.now()
cursor.execute("""UPDATE protokoll SET response = %s, response_timestamp = %s, tokens_output = %s,
tokens_total = tokens_input + %s, status = 'completed' WHERE id = %s""",
(response_data, ct, tokens_output, tokens_output, db_id))
cursor.execute("""UPDATE protokoll SET duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND,
request_timestamp, response_timestamp) / 1000, 3) WHERE id = %s""", (db_id,))
connection.commit()
except Exception as e:
print(f"Database update error: {e}", file=sys.stderr)
finally:
if 'connection' in locals():
connection.close()
def _get_pending_prompts(cursor) -> List:
"""Holt pending UserPromptSubmit-Einträge"""
cursor.execute("""SELECT id, JSON_UNQUOTE(JSON_EXTRACT(request, '$.prompt')) as prompt
FROM protokoll WHERE status = 'pending' AND JSON_EXTRACT(request, '$.event') = 'UserPromptSubmit'
AND timestamp > DATE_SUB(NOW(), INTERVAL 1 HOUR) ORDER BY id ASC""")
return cursor.fetchall()
def _find_matching_response(db_prompt: str, pairs: List[Dict[str, str]]) -> Optional[str]:
"""Findet matching Response für einen Prompt"""
for pair in pairs:
if pair['prompt'].strip() == db_prompt.strip():
return pair['response']
return None
def _update_prompt_with_response(cursor, db_id: int, response: str, current_time) -> None:
"""Updated einen pending Prompt mit seiner Response"""
from log_parser import estimate_tokens
tokens = estimate_tokens(response)
resp_json = json.dumps({'assistant_response': response}, ensure_ascii=False)
cursor.execute("""UPDATE protokoll SET response = %s, response_timestamp = %s, tokens_output = %s,
tokens_total = tokens_input + %s,
duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND, request_timestamp, %s) / 1000, 3),
status = 'completed' WHERE id = %s""", (resp_json, current_time, tokens, tokens, current_time, db_id))
def close_pending_user_prompts(session_id: str, conversation_pairs: List[Dict[str, str]]) -> None:
"""Schließt offene UserPromptSubmit-Einträge mit ihren zugehörigen Responses"""
if not conversation_pairs:
return
try:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
pending_prompts = _get_pending_prompts(cursor)
ct = datetime.datetime.now()
for db_id, db_prompt in pending_prompts:
if db_prompt:
resp = _find_matching_response(db_prompt, conversation_pairs)
if resp:
_update_prompt_with_response(cursor, db_id, resp, ct)
connection.commit()
except Exception as e:
print(f"Close pending prompts error: {e}", file=sys.stderr)
finally:
if 'connection' in locals():
connection.close()