db_logging.py
- Pfad:
/var/www/scripts/pipeline/db_logging.py - Namespace: pipeline
- Zeilen: 176 | Größe: 6,194 Bytes
- Geändert: 2025-12-28 08:56:40 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 100
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 5
- use json
- use datetime.datetime
- use mysql.connector
- use config.DB_LOG_CONFIG
- use config.DB_PROTOKOLL_CONFIG
Klassen 1
-
LoggingMixinclass Zeile 15
Code
"""
Database Logging Mixin
Single Responsibility: All logging operations (pipeline log, protokoll, provenance).
"""
import json
from datetime import datetime
import mysql.connector
from config import DB_LOG_CONFIG, DB_PROTOKOLL_CONFIG
class LoggingMixin:
"""Mixin for all logging operations.
Provides:
- log(): Pipeline log entries to ki_dev.pipeline_log
- log_to_protokoll(): LLM call logging to ki_dev.protokoll
- log_provenance(): Artifact traceability to ki_content.provenance
"""
def log(self, level: str, message: str, context=None):
"""Write to pipeline log (ki_dev database).
Args:
level: Log level (INFO, WARNING, ERROR, DEBUG)
message: Log message
context: Optional context (string, dict, or any serializable)
"""
try:
# Context must be valid JSON
if context is not None:
if isinstance(context, str):
context = json.dumps({"info": context})
elif isinstance(context, dict):
context = json.dumps(context)
else:
context = json.dumps({"data": str(context)})
# Use separate connection to ki_dev for logging
log_conn = mysql.connector.connect(**DB_LOG_CONFIG)
cursor = log_conn.cursor()
cursor.execute(
"""INSERT INTO pipeline_log
(level, message, context, created_at)
VALUES (%s, %s, %s, NOW())""",
(level, message, context),
)
log_conn.commit()
cursor.close()
log_conn.close()
except Exception: # noqa: S110
# Logging should never break the pipeline
pass
def log_to_protokoll(
self,
client_name: str,
request: str,
response: str = None,
model_name: str = None,
tokens_input: int = 0,
tokens_output: int = 0,
duration_ms: int = 0,
status: str = "completed",
error_message: str = None,
) -> int | None:
"""Log LLM call to ki_dev.protokoll table.
Args:
client_name: Caller identifier (e.g., 'content-studio', 'pipeline')
request: The prompt sent to the LLM
response: The LLM response
model_name: Model used (e.g., 'claude-opus-4-5-20251101')
tokens_input: Input token count
tokens_output: Output token count
duration_ms: Request duration in milliseconds
status: 'pending', 'completed', or 'error'
error_message: Error details if status is 'error'
Returns:
Protokoll record ID or None on error
"""
try:
conn = mysql.connector.connect(**DB_PROTOKOLL_CONFIG)
cursor = conn.cursor()
now = datetime.now()
tokens_total = tokens_input + tokens_output
cursor.execute(
"""INSERT INTO protokoll
(request_ip, client_name, request, request_timestamp,
response, response_timestamp, duration_ms,
tokens_input, tokens_output, tokens_total,
model_name, status, error_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
(
"127.0.0.1", # Local pipeline call
client_name,
request[:65000] if request else None, # Truncate if too long
now,
response[:65000] if response else None, # Truncate if too long
now if response else None,
duration_ms,
tokens_input,
tokens_output,
tokens_total,
model_name,
status,
error_message,
),
)
conn.commit()
protokoll_id = cursor.lastrowid
cursor.close()
conn.close()
return protokoll_id
except Exception as e:
# Logging should never break the pipeline
print(f"Protokoll logging error: {e}")
return None
def log_provenance(
self,
artifact_type: str,
artifact_id: int,
source_type: str,
source_id: int = None,
pipeline_run_id: int = None,
pipeline_step: str = None,
model_used: str = None,
prompt_version: str = None,
) -> int | None:
"""Log artifact provenance for traceability.
Args:
artifact_type: Type (document, chunk, entity, relation, statement, embedding)
artifact_id: ID of the artifact
source_type: How created (file, extraction, analysis, merge, manual)
source_id: Optional source artifact ID
pipeline_run_id: Optional pipeline run ID
pipeline_step: Optional step name (e.g., 'entity_extract')
model_used: Optional model name (e.g., 'claude-opus-4-5-20251101')
prompt_version: Optional prompt version
Returns:
Provenance record ID or None on error
"""
try:
cursor = self.execute(
"""INSERT INTO provenance
(artifact_type, artifact_id, source_type, source_id,
pipeline_run_id, pipeline_step, model_used, prompt_version, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())""",
(
artifact_type,
artifact_id,
source_type,
source_id,
pipeline_run_id,
pipeline_step,
model_used,
prompt_version,
),
)
self.commit()
provenance_id = cursor.lastrowid
cursor.close()
return provenance_id
except Exception as e:
self.log("WARNING", f"Failed to log provenance: {e}")
return None