db_logging.py

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 5

Klassen 1

Code

"""
Database Logging Mixin

Single Responsibility: All logging operations (pipeline log, protokoll, provenance).
"""

import json
from datetime import datetime

import mysql.connector

from config import DB_LOG_CONFIG, DB_PROTOKOLL_CONFIG


class LoggingMixin:
    """Mixin for all logging operations.

    Provides:
    - log(): Pipeline log entries to ki_dev.pipeline_log
    - log_to_protokoll(): LLM call logging to ki_dev.protokoll
    - log_provenance(): Artifact traceability to ki_content.provenance
    """

    def log(self, level: str, message: str, context=None):
        """Write to pipeline log (ki_dev database).

        Args:
            level: Log level (INFO, WARNING, ERROR, DEBUG)
            message: Log message
            context: Optional context (string, dict, or any serializable)
        """
        try:
            # Context must be valid JSON
            if context is not None:
                if isinstance(context, str):
                    context = json.dumps({"info": context})
                elif isinstance(context, dict):
                    context = json.dumps(context)
                else:
                    context = json.dumps({"data": str(context)})

            # Use separate connection to ki_dev for logging
            log_conn = mysql.connector.connect(**DB_LOG_CONFIG)
            cursor = log_conn.cursor()
            cursor.execute(
                """INSERT INTO pipeline_log
                   (level, message, context, created_at)
                   VALUES (%s, %s, %s, NOW())""",
                (level, message, context),
            )
            log_conn.commit()
            cursor.close()
            log_conn.close()
        except Exception:  # noqa: S110
            # Logging should never break the pipeline
            pass

    def log_to_protokoll(
        self,
        client_name: str,
        request: str,
        response: str = None,
        model_name: str = None,
        tokens_input: int = 0,
        tokens_output: int = 0,
        duration_ms: int = 0,
        status: str = "completed",
        error_message: str = None,
    ) -> int | None:
        """Log LLM call to ki_dev.protokoll table.

        Args:
            client_name: Caller identifier (e.g., 'content-studio', 'pipeline')
            request: The prompt sent to the LLM
            response: The LLM response
            model_name: Model used (e.g., 'claude-opus-4-5-20251101')
            tokens_input: Input token count
            tokens_output: Output token count
            duration_ms: Request duration in milliseconds
            status: 'pending', 'completed', or 'error'
            error_message: Error details if status is 'error'

        Returns:
            Protokoll record ID or None on error
        """
        try:
            conn = mysql.connector.connect(**DB_PROTOKOLL_CONFIG)
            cursor = conn.cursor()

            now = datetime.now()
            tokens_total = tokens_input + tokens_output

            cursor.execute(
                """INSERT INTO protokoll
                   (request_ip, client_name, request, request_timestamp,
                    response, response_timestamp, duration_ms,
                    tokens_input, tokens_output, tokens_total,
                    model_name, status, error_message)
                   VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
                (
                    "127.0.0.1",  # Local pipeline call
                    client_name,
                    request[:65000] if request else None,  # Truncate if too long
                    now,
                    response[:65000] if response else None,  # Truncate if too long
                    now if response else None,
                    duration_ms,
                    tokens_input,
                    tokens_output,
                    tokens_total,
                    model_name,
                    status,
                    error_message,
                ),
            )
            conn.commit()
            protokoll_id = cursor.lastrowid
            cursor.close()
            conn.close()
            return protokoll_id
        except Exception as e:
            # Logging should never break the pipeline
            print(f"Protokoll logging error: {e}")
            return None

    def log_provenance(
        self,
        artifact_type: str,
        artifact_id: int,
        source_type: str,
        source_id: int = None,
        pipeline_run_id: int = None,
        pipeline_step: str = None,
        model_used: str = None,
        prompt_version: str = None,
    ) -> int | None:
        """Log artifact provenance for traceability.

        Args:
            artifact_type: Type (document, chunk, entity, relation, statement, embedding)
            artifact_id: ID of the artifact
            source_type: How created (file, extraction, analysis, merge, manual)
            source_id: Optional source artifact ID
            pipeline_run_id: Optional pipeline run ID
            pipeline_step: Optional step name (e.g., 'entity_extract')
            model_used: Optional model name (e.g., 'claude-opus-4-5-20251101')
            prompt_version: Optional prompt version

        Returns:
            Provenance record ID or None on error
        """
        try:
            cursor = self.execute(
                """INSERT INTO provenance
                   (artifact_type, artifact_id, source_type, source_id,
                    pipeline_run_id, pipeline_step, model_used, prompt_version, created_at)
                   VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())""",
                (
                    artifact_type,
                    artifact_id,
                    source_type,
                    source_id,
                    pipeline_run_id,
                    pipeline_step,
                    model_used,
                    prompt_version,
                ),
            )
            self.commit()
            provenance_id = cursor.lastrowid
            cursor.close()
            return provenance_id
        except Exception as e:
            self.log("WARNING", f"Failed to log provenance: {e}")
            return None
← Übersicht Graph