Protokoll #15817

ID15817
Zeitstempel2025-12-25 16:57:03.464160
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens4,839 (Input: 2,417, Output: 2,422)
Dauer137 ms
Request-Zeit2025-12-25 16:57:03.464160
Response-Zeit2025-12-25 16:57:03.600923

Request

{
    "event": "PreToolUse",
    "tool_name": "Write",
    "tool_input": {
        "file_path": "\/var\/www\/tools\/ki-protokoll\/claude-hook\/log_storage.py",
        "content": "#!\/usr\/bin\/env python3\n\"\"\"\nLog Storage Module\nVerantwortlich für die Speicherung und Aktualisierung von Logs in der Datenbank\n\"\"\"\n\nimport os\nimport json\nimport datetime\nimport pymysql\nimport sys\nimport tempfile\nfrom pathlib import Path\nfrom typing import Dict, Any, Optional, List\nfrom dotenv import load_dotenv\n\n# .env aus Hook-Verzeichnis laden\nload_dotenv(Path(__file__).parent \/ '.env')\n\n# Konfiguration aus Environment-Variablen\nDB_CONFIG = {\n    'host': os.environ.get('CLAUDE_DB_HOST', 'localhost'),\n    'port': int(os.environ.get('CLAUDE_DB_PORT', '3306')),\n    'user': os.environ.get('CLAUDE_DB_USER', 'root'),\n    'password': os.environ.get('CLAUDE_DB_PASSWORD', ''),\n    'database': os.environ.get('CLAUDE_DB_NAME', 'ki_dev'),\n    'charset': 'utf8mb4'\n}\n\n# Session-Tracking im temporären Verzeichnis\nTEMP_DIR = Path(tempfile.gettempdir()) \/ \"claude_hooks\"\nTEMP_DIR.mkdir(exist_ok=True)\n\n\ndef get_session_tracking_key(data: Dict[str, Any]) -> str:\n    \"\"\"Erstellt einen eindeutigen Key für Session-Tracking\"\"\"\n    session_id = data.get('session_id', '')\n    event_name = data.get('hook_event_name', '')\n    tool_name = data.get('tool_name', '')\n\n    # Für Tool-Events: tool_name-spezifischer Key\n    if event_name in ['PreToolUse', 'PostToolUse'] and tool_name:\n        return f\"{session_id}_{tool_name}_{event_name}\"\n\n    # Für andere Events: event-spezifischer Key\n    return f\"{session_id}_{event_name}\"\n\n\ndef save_pending_request(data: Dict[str, Any], db_id: int) -> None:\n    \"\"\"Speichert pending Request für spätere Response-Zuordnung\"\"\"\n    try:\n        key = get_session_tracking_key(data)\n        tracking_file = TEMP_DIR \/ f\"{key}.json\"\n\n        tracking_data = {\n            'db_id': db_id,\n            'timestamp': datetime.datetime.now().isoformat(),\n            'event': data.get('hook_event_name'),\n            'tool_name': data.get('tool_name', ''),\n            'session_id': data.get('session_id', '')\n        }\n\n        with open(tracking_file, 'w') as f:\n            json.dump(tracking_data, f)\n\n    except Exception as e:\n        print(f\"Session tracking save error: {e}\", file=sys.stderr)\n\n\ndef find_matching_request(data: Dict[str, Any]) -> Optional[int]:\n    \"\"\"Findet matching Request für Response-Event\"\"\"\n    try:\n        event_name = data.get('hook_event_name', '')\n\n        # PostToolUse sucht nach PreToolUse\n        if event_name == 'PostToolUse':\n            search_data = dict(data)\n            search_data['hook_event_name'] = 'PreToolUse'\n            key = get_session_tracking_key(search_data)\n        else:\n            return None\n\n        tracking_file = TEMP_DIR \/ f\"{key}.json\"\n\n        if tracking_file.exists():\n            with open(tracking_file, 'r') as f:\n                tracking_data = json.load(f)\n\n            # Cleanup\n            tracking_file.unlink()\n            return tracking_data['db_id']\n\n    except Exception as e:\n        print(f\"Session tracking find error: {e}\", file=sys.stderr)\n\n    return None\n\n\ndef insert_log_entry(\n    request_str: str,\n    client_ip: str,\n    client_name: str,\n    response_str: Optional[str],\n    tokens_input: int,\n    tokens_output: int,\n    model_name: str\n) -> Optional[int]:\n    \"\"\"Fügt einen neuen Log-Eintrag in die Datenbank ein\"\"\"\n    try:\n        connection = pymysql.connect(**DB_CONFIG)\n\n        with connection.cursor() as cursor:\n            current_time = datetime.datetime.now()\n            tokens_total = tokens_input + tokens_output\n            status = 'completed' if response_str else 'pending'\n            response_timestamp = current_time if response_str else None\n\n            sql = \"\"\"\n                INSERT INTO protokoll (\n                    timestamp, request_ip, client_name, request, request_timestamp,\n                    response, response_timestamp, duration_ms, tokens_input,\n                    tokens_output, tokens_total, model_name, status\n                ) VALUES (\n                    %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s\n                )\n            \"\"\"\n\n            cursor.execute(sql, (\n                current_time,           # timestamp\n                client_ip,              # request_ip\n                client_name,            # client_name\n                request_str,            # request\n                current_time,           # request_timestamp\n                response_str,           # response\n                response_timestamp,     # response_timestamp\n                0,                      # duration_ms (wird bei Updates gesetzt)\n                tokens_input,           # tokens_input\n                tokens_output,          # tokens_output\n                tokens_total,           # tokens_total\n                model_name,             # model_name\n                status                  # status\n            ))\n\n            connection.commit()\n            return cursor.lastrowid\n\n    except Exception as e:\n        print(f\"Database insert error: {e}\", file=sys.stderr)\n        return None\n\n    finally:\n        if 'connection' in locals():\n            connection.close()\n\n\ndef update_request_with_response(db_id: int, response_data: str, tokens_output: int) -> None:\n    \"\"\"Updated existing Request mit Response-Daten und berechnet Duration\"\"\"\n    try:\n        connection = pymysql.connect(**DB_CONFIG)\n\n        with connection.cursor() as cursor:\n            current_time = datetime.datetime.now()\n\n            # Response und Timestamp setzen\n            sql = \"\"\"\n                UPDATE protokoll\n                SET response = %s,\n                    response_timestamp = %s,\n                    tokens_output = %s,\n                    tokens_total = tokens_input + %s,\n                    status = 'completed'\n                WHERE id = %s\n            \"\"\"\n\n            cursor.execute(sql, (\n                response_data,\n                current_time,\n                tokens_output,\n                tokens_output,\n                db_id\n            ))\n\n            # Duration aus Timestamps berechnen (mit Microsekunden-Präzision)\n            duration_sql = \"\"\"\n                UPDATE protokoll\n                SET duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND, request_timestamp, response_timestamp) \/ 1000, 3)\n                WHERE id = %s\n            \"\"\"\n\n            cursor.execute(duration_sql, (db_id,))\n            connection.commit()\n\n    except Exception as e:\n        print(f\"Database update error: {e}\", file=sys.stderr)\n    finally:\n        if 'connection' in locals():\n            connection.close()\n\n\ndef close_pending_user_prompts(session_id: str, conversation_pairs: List[Dict[str, str]]) -> None:\n    \"\"\"Schließt offene UserPromptSubmit-Einträge mit ihren zugehörigen Responses\"\"\"\n    if not conversation_pairs:\n        return\n\n    try:\n        connection = pymysql.connect(**DB_CONFIG)\n\n        with connection.cursor() as cursor:\n            # Hole alle pending UserPromptSubmit-Einträge der letzten Stunde\n            cursor.execute(\"\"\"\n                SELECT id, JSON_UNQUOTE(JSON_EXTRACT(request, '$.prompt')) as prompt\n                FROM protokoll\n                WHERE status = 'pending'\n                  AND JSON_EXTRACT(request, '$.event') = 'UserPromptSubmit'\n                  AND timestamp > DATE_SUB(NOW(), INTERVAL 1 HOUR)\n                ORDER BY id ASC\n            \"\"\")\n            pending_prompts = cursor.fetchall()\n\n            current_time = datetime.datetime.now()\n\n            # Matche jeden pending Prompt mit seinem Response\n            for db_id, db_prompt in pending_prompts:\n                if not db_prompt:\n                    continue\n\n                # Finde das passende Paar (Prompt-Match)\n                matching_response = None\n                for pair in conversation_pairs:\n                    if pair['prompt'].strip() == db_prompt.strip():\n                        matching_response = pair['response']\n                        break\n\n                if matching_response:\n                    from log_parser import estimate_tokens\n                    tokens_output = estimate_tokens(matching_response)\n                    response_json = json.dumps({\n                        'assistant_response': matching_response\n                    }, ensure_ascii=False)\n\n                    cursor.execute(\"\"\"\n                        UPDATE protokoll\n                        SET response = %s,\n                            response_timestamp = %s,\n                            tokens_output = %s,\n                            tokens_total = tokens_input + %s,\n                            duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND, request_timestamp, %s) \/ 1000, 3),\n                            status = 'completed'\n                        WHERE id = %s\n                    \"\"\", (\n                        response_json,\n                        current_time,\n                        tokens_output,\n                        tokens_output,\n                        current_time,\n                        db_id\n                    ))\n\n            connection.commit()\n\n    except Exception as e:\n        print(f\"Close pending prompts error: {e}\", file=sys.stderr)\n    finally:\n        if 'connection' in locals():\n            connection.close()\n"
    }
}

Response

{
    "tool_response": {
        "type": "create",
        "filePath": "\/var\/www\/tools\/ki-protokoll\/claude-hook\/log_storage.py",
        "content": "#!\/usr\/bin\/env python3\n\"\"\"\nLog Storage Module\nVerantwortlich für die Speicherung und Aktualisierung von Logs in der Datenbank\n\"\"\"\n\nimport os\nimport json\nimport datetime\nimport pymysql\nimport sys\nimport tempfile\nfrom pathlib import Path\nfrom typing import Dict, Any, Optional, List\nfrom dotenv import load_dotenv\n\n# .env aus Hook-Verzeichnis laden\nload_dotenv(Path(__file__).parent \/ '.env')\n\n# Konfiguration aus Environment-Variablen\nDB_CONFIG = {\n    'host': os.environ.get('CLAUDE_DB_HOST', 'localhost'),\n    'port': int(os.environ.get('CLAUDE_DB_PORT', '3306')),\n    'user': os.environ.get('CLAUDE_DB_USER', 'root'),\n    'password': os.environ.get('CLAUDE_DB_PASSWORD', ''),\n    'database': os.environ.get('CLAUDE_DB_NAME', 'ki_dev'),\n    'charset': 'utf8mb4'\n}\n\n# Session-Tracking im temporären Verzeichnis\nTEMP_DIR = Path(tempfile.gettempdir()) \/ \"claude_hooks\"\nTEMP_DIR.mkdir(exist_ok=True)\n\n\ndef get_session_tracking_key(data: Dict[str, Any]) -> str:\n    \"\"\"Erstellt einen eindeutigen Key für Session-Tracking\"\"\"\n    session_id = data.get('session_id', '')\n    event_name = data.get('hook_event_name', '')\n    tool_name = data.get('tool_name', '')\n\n    # Für Tool-Events: tool_name-spezifischer Key\n    if event_name in ['PreToolUse', 'PostToolUse'] and tool_name:\n        return f\"{session_id}_{tool_name}_{event_name}\"\n\n    # Für andere Events: event-spezifischer Key\n    return f\"{session_id}_{event_name}\"\n\n\ndef save_pending_request(data: Dict[str, Any], db_id: int) -> None:\n    \"\"\"Speichert pending Request für spätere Response-Zuordnung\"\"\"\n    try:\n        key = get_session_tracking_key(data)\n        tracking_file = TEMP_DIR \/ f\"{key}.json\"\n\n        tracking_data = {\n            'db_id': db_id,\n            'timestamp': datetime.datetime.now().isoformat(),\n            'event': data.get('hook_event_name'),\n            'tool_name': data.get('tool_name', ''),\n            'session_id': data.get('session_id', '')\n        }\n\n        with open(tracking_file, 'w') as f:\n            json.dump(tracking_data, f)\n\n    except Exception as e:\n        print(f\"Session tracking save error: {e}\", file=sys.stderr)\n\n\ndef find_matching_request(data: Dict[str, Any]) -> Optional[int]:\n    \"\"\"Findet matching Request für Response-Event\"\"\"\n    try:\n        event_name = data.get('hook_event_name', '')\n\n        # PostToolUse sucht nach PreToolUse\n        if event_name == 'PostToolUse':\n            search_data = dict(data)\n            search_data['hook_event_name'] = 'PreToolUse'\n            key = get_session_tracking_key(search_data)\n        else:\n            return None\n\n        tracking_file = TEMP_DIR \/ f\"{key}.json\"\n\n        if tracking_file.exists():\n            with open(tracking_file, 'r') as f:\n                tracking_data = json.load(f)\n\n            # Cleanup\n            tracking_file.unlink()\n            return tracking_data['db_id']\n\n    except Exception as e:\n        print(f\"Session tracking find error: {e}\", file=sys.stderr)\n\n    return None\n\n\ndef insert_log_entry(\n    request_str: str,\n    client_ip: str,\n    client_name: str,\n    response_str: Optional[str],\n    tokens_input: int,\n    tokens_output: int,\n    model_name: str\n) -> Optional[int]:\n    \"\"\"Fügt einen neuen Log-Eintrag in die Datenbank ein\"\"\"\n    try:\n        connection = pymysql.connect(**DB_CONFIG)\n\n        with connection.cursor() as cursor:\n            current_time = datetime.datetime.now()\n            tokens_total = tokens_input + tokens_output\n            status = 'completed' if response_str else 'pending'\n            response_timestamp = current_time if response_str else None\n\n            sql = \"\"\"\n                INSERT INTO protokoll (\n                    timestamp, request_ip, client_name, request, request_timestamp,\n                    response, response_timestamp, duration_ms, tokens_input,\n                    tokens_output, tokens_total, model_name, status\n                ) VALUES (\n                    %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s\n                )\n            \"\"\"\n\n            cursor.execute(sql, (\n                current_time,           # timestamp\n                client_ip,              # request_ip\n                client_name,            # client_name\n                request_str,            # request\n                current_time,           # request_timestamp\n                response_str,           # response\n                response_timestamp,     # response_timestamp\n                0,                      # duration_ms (wird bei Updates gesetzt)\n                tokens_input,           # tokens_input\n                tokens_output,          # tokens_output\n                tokens_total,           # tokens_total\n                model_name,             # model_name\n                status                  # status\n            ))\n\n            connection.commit()\n            return cursor.lastrowid\n\n    except Exception as e:\n        print(f\"Database insert error: {e}\", file=sys.stderr)\n        return None\n\n    finally:\n        if 'connection' in locals():\n            connection.close()\n\n\ndef update_request_with_response(db_id: int, response_data: str, tokens_output: int) -> None:\n    \"\"\"Updated existing Request mit Response-Daten und berechnet Duration\"\"\"\n    try:\n        connection = pymysql.connect(**DB_CONFIG)\n\n        with connection.cursor() as cursor:\n            current_time = datetime.datetime.now()\n\n            # Response und Timestamp setzen\n            sql = \"\"\"\n                UPDATE protokoll\n                SET response = %s,\n                    response_timestamp = %s,\n                    tokens_output = %s,\n                    tokens_total = tokens_input + %s,\n                    status = 'completed'\n                WHERE id = %s\n            \"\"\"\n\n            cursor.execute(sql, (\n                response_data,\n                current_time,\n                tokens_output,\n                tokens_output,\n                db_id\n            ))\n\n            # Duration aus Timestamps berechnen (mit Microsekunden-Präzision)\n            duration_sql = \"\"\"\n                UPDATE protokoll\n                SET duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND, request_timestamp, response_timestamp) \/ 1000, 3)\n                WHERE id = %s\n            \"\"\"\n\n            cursor.execute(duration_sql, (db_id,))\n            connection.commit()\n\n    except Exception as e:\n        print(f\"Database update error: {e}\", file=sys.stderr)\n    finally:\n        if 'connection' in locals():\n            connection.close()\n\n\ndef close_pending_user_prompts(session_id: str, conversation_pairs: List[Dict[str, str]]) -> None:\n    \"\"\"Schließt offene UserPromptSubmit-Einträge mit ihren zugehörigen Responses\"\"\"\n    if not conversation_pairs:\n        return\n\n    try:\n        connection = pymysql.connect(**DB_CONFIG)\n\n        with connection.cursor() as cursor:\n            # Hole alle pending UserPromptSubmit-Einträge der letzten Stunde\n            cursor.execute(\"\"\"\n                SELECT id, JSON_UNQUOTE(JSON_EXTRACT(request, '$.prompt')) as prompt\n                FROM protokoll\n                WHERE status = 'pending'\n                  AND JSON_EXTRACT(request, '$.event') = 'UserPromptSubmit'\n                  AND timestamp > DATE_SUB(NOW(), INTERVAL 1 HOUR)\n                ORDER BY id ASC\n            \"\"\")\n            pending_prompts = cursor.fetchall()\n\n            current_time = datetime.datetime.now()\n\n            # Matche jeden pending Prompt mit seinem Response\n            for db_id, db_prompt in pending_prompts:\n                if not db_prompt:\n                    continue\n\n                # Finde das passende Paar (Prompt-Match)\n                matching_response = None\n                for pair in conversation_pairs:\n                    if pair['prompt'].strip() == db_prompt.strip():\n                        matching_response = pair['response']\n                        break\n\n                if matching_response:\n                    from log_parser import estimate_tokens\n                    tokens_output = estimate_tokens(matching_response)\n                    response_json = json.dumps({\n                        'assistant_response': matching_response\n                    }, ensure_ascii=False)\n\n                    cursor.execute(\"\"\"\n                        UPDATE protokoll\n                        SET response = %s,\n                            response_timestamp = %s,\n                            tokens_output = %s,\n                            tokens_total = tokens_input + %s,\n                            duration_ms = ROUND(TIMESTAMPDIFF(MICROSECOND, request_timestamp, %s) \/ 1000, 3),\n                            status = 'completed'\n                        WHERE id = %s\n                    \"\"\", (\n                        response_json,\n                        current_time,\n                        tokens_output,\n                        tokens_output,\n                        current_time,\n                        db_id\n                    ))\n\n            connection.commit()\n\n    except Exception as e:\n        print(f\"Close pending prompts error: {e}\", file=sys.stderr)\n    finally:\n        if 'connection' in locals():\n            connection.close()\n",
        "structuredPatch": [],
        "originalFile": null
    }
}
← Vorheriger Zur Liste Nächster →