{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/db_logging.py",
"content": "\"\"\"\nDatabase Logging Mixin\n\nSingle Responsibility: All logging operations (pipeline log, protokoll, provenance).\n\"\"\"\n\nimport json\nfrom datetime import datetime\n\nimport mysql.connector\n\nfrom config import DB_LOG_CONFIG, DB_PROTOKOLL_CONFIG\n\n\nclass LoggingMixin:\n \"\"\"Mixin for all logging operations.\n\n Provides:\n - log(): Pipeline log entries to ki_dev.pipeline_log\n - log_to_protokoll(): LLM call logging to ki_dev.protokoll\n - log_provenance(): Artifact traceability to ki_content.provenance\n \"\"\"\n\n def log(self, level: str, message: str, context=None):\n \"\"\"Write to pipeline log (ki_dev database).\n\n Args:\n level: Log level (INFO, WARNING, ERROR, DEBUG)\n message: Log message\n context: Optional context (string, dict, or any serializable)\n \"\"\"\n try:\n # Context must be valid JSON\n if context is not None:\n if isinstance(context, str):\n context = json.dumps({\"info\": context})\n elif isinstance(context, dict):\n context = json.dumps(context)\n else:\n context = json.dumps({\"data\": str(context)})\n\n # Use separate connection to ki_dev for logging\n log_conn = mysql.connector.connect(**DB_LOG_CONFIG)\n cursor = log_conn.cursor()\n cursor.execute(\n \"\"\"INSERT INTO pipeline_log\n (level, message, context, created_at)\n VALUES (%s, %s, %s, NOW())\"\"\",\n (level, message, context),\n )\n log_conn.commit()\n cursor.close()\n log_conn.close()\n except Exception: # noqa: S110\n # Logging should never break the pipeline\n pass\n\n def log_to_protokoll(\n self,\n client_name: str,\n request: str,\n response: str = None,\n model_name: str = None,\n tokens_input: int = 0,\n tokens_output: int = 0,\n duration_ms: int = 0,\n status: str = \"completed\",\n error_message: str = None,\n ) -> int | None:\n \"\"\"Log LLM call to ki_dev.protokoll table.\n\n Args:\n client_name: Caller identifier (e.g., 'content-studio', 'pipeline')\n request: The prompt sent to the LLM\n response: The LLM response\n model_name: Model used (e.g., 'claude-opus-4-5-20251101')\n tokens_input: Input token count\n tokens_output: Output token count\n duration_ms: Request duration in milliseconds\n status: 'pending', 'completed', or 'error'\n error_message: Error details if status is 'error'\n\n Returns:\n Protokoll record ID or None on error\n \"\"\"\n try:\n conn = mysql.connector.connect(**DB_PROTOKOLL_CONFIG)\n cursor = conn.cursor()\n\n now = datetime.now()\n tokens_total = tokens_input + tokens_output\n\n cursor.execute(\n \"\"\"INSERT INTO protokoll\n (request_ip, client_name, request, request_timestamp,\n response, response_timestamp, duration_ms,\n tokens_input, tokens_output, tokens_total,\n model_name, status, error_message)\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\"\"\",\n (\n \"127.0.0.1\", # Local pipeline call\n client_name,\n request[:65000] if request else None, # Truncate if too long\n now,\n response[:65000] if response else None, # Truncate if too long\n now if response else None,\n duration_ms,\n tokens_input,\n tokens_output,\n tokens_total,\n model_name,\n status,\n error_message,\n ),\n )\n conn.commit()\n protokoll_id = cursor.lastrowid\n cursor.close()\n conn.close()\n return protokoll_id\n except Exception as e:\n # Logging should never break the pipeline\n print(f\"Protokoll logging error: {e}\")\n return None\n\n def log_provenance(\n self,\n artifact_type: str,\n artifact_id: int,\n source_type: str,\n source_id: int = None,\n pipeline_run_id: int = None,\n pipeline_step: str = None,\n model_used: str = None,\n prompt_version: str = None,\n ) -> int | None:\n \"\"\"Log artifact provenance for traceability.\n\n Args:\n artifact_type: Type (document, chunk, entity, relation, statement, embedding)\n artifact_id: ID of the artifact\n source_type: How created (file, extraction, analysis, merge, manual)\n source_id: Optional source artifact ID\n pipeline_run_id: Optional pipeline run ID\n pipeline_step: Optional step name (e.g., 'entity_extract')\n model_used: Optional model name (e.g., 'claude-opus-4-5-20251101')\n prompt_version: Optional prompt version\n\n Returns:\n Provenance record ID or None on error\n \"\"\"\n try:\n cursor = self.execute(\n \"\"\"INSERT INTO provenance\n (artifact_type, artifact_id, source_type, source_id,\n pipeline_run_id, pipeline_step, model_used, prompt_version, created_at)\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())\"\"\",\n (\n artifact_type,\n artifact_id,\n source_type,\n source_id,\n pipeline_run_id,\n pipeline_step,\n model_used,\n prompt_version,\n ),\n )\n self.commit()\n provenance_id = cursor.lastrowid\n cursor.close()\n return provenance_id\n except Exception as e:\n self.log(\"WARNING\", f\"Failed to log provenance: {e}\")\n return None\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/scripts\/pipeline\/db_logging.py",
"content": "\"\"\"\nDatabase Logging Mixin\n\nSingle Responsibility: All logging operations (pipeline log, protokoll, provenance).\n\"\"\"\n\nimport json\nfrom datetime import datetime\n\nimport mysql.connector\n\nfrom config import DB_LOG_CONFIG, DB_PROTOKOLL_CONFIG\n\n\nclass LoggingMixin:\n \"\"\"Mixin for all logging operations.\n\n Provides:\n - log(): Pipeline log entries to ki_dev.pipeline_log\n - log_to_protokoll(): LLM call logging to ki_dev.protokoll\n - log_provenance(): Artifact traceability to ki_content.provenance\n \"\"\"\n\n def log(self, level: str, message: str, context=None):\n \"\"\"Write to pipeline log (ki_dev database).\n\n Args:\n level: Log level (INFO, WARNING, ERROR, DEBUG)\n message: Log message\n context: Optional context (string, dict, or any serializable)\n \"\"\"\n try:\n # Context must be valid JSON\n if context is not None:\n if isinstance(context, str):\n context = json.dumps({\"info\": context})\n elif isinstance(context, dict):\n context = json.dumps(context)\n else:\n context = json.dumps({\"data\": str(context)})\n\n # Use separate connection to ki_dev for logging\n log_conn = mysql.connector.connect(**DB_LOG_CONFIG)\n cursor = log_conn.cursor()\n cursor.execute(\n \"\"\"INSERT INTO pipeline_log\n (level, message, context, created_at)\n VALUES (%s, %s, %s, NOW())\"\"\",\n (level, message, context),\n )\n log_conn.commit()\n cursor.close()\n log_conn.close()\n except Exception: # noqa: S110\n # Logging should never break the pipeline\n pass\n\n def log_to_protokoll(\n self,\n client_name: str,\n request: str,\n response: str = None,\n model_name: str = None,\n tokens_input: int = 0,\n tokens_output: int = 0,\n duration_ms: int = 0,\n status: str = \"completed\",\n error_message: str = None,\n ) -> int | None:\n \"\"\"Log LLM call to ki_dev.protokoll table.\n\n Args:\n client_name: Caller identifier (e.g., 'content-studio', 'pipeline')\n request: The prompt sent to the LLM\n response: The LLM response\n model_name: Model used (e.g., 'claude-opus-4-5-20251101')\n tokens_input: Input token count\n tokens_output: Output token count\n duration_ms: Request duration in milliseconds\n status: 'pending', 'completed', or 'error'\n error_message: Error details if status is 'error'\n\n Returns:\n Protokoll record ID or None on error\n \"\"\"\n try:\n conn = mysql.connector.connect(**DB_PROTOKOLL_CONFIG)\n cursor = conn.cursor()\n\n now = datetime.now()\n tokens_total = tokens_input + tokens_output\n\n cursor.execute(\n \"\"\"INSERT INTO protokoll\n (request_ip, client_name, request, request_timestamp,\n response, response_timestamp, duration_ms,\n tokens_input, tokens_output, tokens_total,\n model_name, status, error_message)\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\"\"\",\n (\n \"127.0.0.1\", # Local pipeline call\n client_name,\n request[:65000] if request else None, # Truncate if too long\n now,\n response[:65000] if response else None, # Truncate if too long\n now if response else None,\n duration_ms,\n tokens_input,\n tokens_output,\n tokens_total,\n model_name,\n status,\n error_message,\n ),\n )\n conn.commit()\n protokoll_id = cursor.lastrowid\n cursor.close()\n conn.close()\n return protokoll_id\n except Exception as e:\n # Logging should never break the pipeline\n print(f\"Protokoll logging error: {e}\")\n return None\n\n def log_provenance(\n self,\n artifact_type: str,\n artifact_id: int,\n source_type: str,\n source_id: int = None,\n pipeline_run_id: int = None,\n pipeline_step: str = None,\n model_used: str = None,\n prompt_version: str = None,\n ) -> int | None:\n \"\"\"Log artifact provenance for traceability.\n\n Args:\n artifact_type: Type (document, chunk, entity, relation, statement, embedding)\n artifact_id: ID of the artifact\n source_type: How created (file, extraction, analysis, merge, manual)\n source_id: Optional source artifact ID\n pipeline_run_id: Optional pipeline run ID\n pipeline_step: Optional step name (e.g., 'entity_extract')\n model_used: Optional model name (e.g., 'claude-opus-4-5-20251101')\n prompt_version: Optional prompt version\n\n Returns:\n Provenance record ID or None on error\n \"\"\"\n try:\n cursor = self.execute(\n \"\"\"INSERT INTO provenance\n (artifact_type, artifact_id, source_type, source_id,\n pipeline_run_id, pipeline_step, model_used, prompt_version, created_at)\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())\"\"\",\n (\n artifact_type,\n artifact_id,\n source_type,\n source_id,\n pipeline_run_id,\n pipeline_step,\n model_used,\n prompt_version,\n ),\n )\n self.commit()\n provenance_id = cursor.lastrowid\n cursor.close()\n return provenance_id\n except Exception as e:\n self.log(\"WARNING\", f\"Failed to log provenance: {e}\")\n return None\n",
"structuredPatch": [],
"originalFile": null
}
}