{
"tool_response": {
"filePath": "\/opt\/scripts\/pipeline\/db.py",
"oldString": "# Global database instance\ndb = Database()",
"newString": "class PipelineProgress:\n \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"\n\n def __init__(self, run_id: int):\n self.run_id = run_id\n self.log_lines = []\n self.max_log_lines = 20\n\n def _update(self, **kwargs):\n \"\"\"Internal: Update database with progress.\"\"\"\n from datetime import datetime\n\n import mysql.connector\n\n from config import DB_CONFIG\n\n if not self.run_id:\n return\n\n kwargs[\"last_update_at\"] = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n sets = \", \".join(f\"{k} = %s\" for k in kwargs.keys())\n values = list(kwargs.values())\n values.append(self.run_id)\n\n try:\n conn = mysql.connector.connect(**DB_CONFIG)\n cursor = conn.cursor()\n cursor.execute(f\"UPDATE pipeline_runs SET {sets} WHERE id = %s\", values)\n conn.commit()\n cursor.close()\n conn.close()\n except Exception as e:\n print(f\"Progress update error: {e}\")\n\n def update_step(self, step_name: str):\n \"\"\"Update current pipeline step.\"\"\"\n self._update(current_step=step_name)\n\n def update_document(self, filename: str):\n \"\"\"Update current document being processed.\"\"\"\n self._update(current_document=filename)\n\n def update_progress(\n self,\n processed: int = None,\n total: int = None,\n failed: int = None,\n chunks: int = None,\n embeddings: int = None,\n ):\n \"\"\"Update numeric progress fields.\"\"\"\n data = {}\n if processed is not None:\n data[\"documents_processed\"] = processed\n if total is not None:\n data[\"documents_total\"] = total\n if failed is not None:\n data[\"documents_failed\"] = failed\n if chunks is not None:\n data[\"chunks_created\"] = chunks\n if embeddings is not None:\n data[\"embeddings_created\"] = embeddings\n if data:\n self._update(**data)\n\n def add_log(self, message: str):\n \"\"\"Add log line and update log_tail.\"\"\"\n from datetime import datetime\n\n timestamp = datetime.now().strftime(\"%H:%M:%S\")\n self.log_lines.append(f\"[{timestamp}] {message}\")\n if len(self.log_lines) > self.max_log_lines:\n self.log_lines = self.log_lines[-self.max_log_lines :]\n self._update(log_tail=\"\\n\".join(self.log_lines))\n\n def complete(self, status: str = \"completed\", error: str = None):\n \"\"\"Mark run as complete or failed.\"\"\"\n from datetime import datetime\n\n data = {\n \"status\": status,\n \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),\n \"current_step\": None,\n \"current_document\": None,\n }\n if error:\n data[\"error_log\"] = error\n self._update(**data)\n\n def is_cancelled(self) -> bool:\n \"\"\"Check if run has been cancelled by user.\"\"\"\n import mysql.connector\n\n from config import DB_CONFIG\n\n if not self.run_id:\n return False\n\n try:\n conn = mysql.connector.connect(**DB_CONFIG)\n cursor = conn.cursor(dictionary=True)\n cursor.execute(\"SELECT status FROM pipeline_runs WHERE id = %s\", (self.run_id,))\n result = cursor.fetchone()\n cursor.close()\n conn.close()\n return result and result[\"status\"] == \"cancelled\"\n except Exception:\n return False\n\n\n# Global database instance\ndb = Database()",
"originalFile": "\"\"\"\nDatabase operations for KI-System Pipeline\n\"\"\"\n\nimport mysql.connector\nfrom mysql.connector import Error\n\nfrom config import DB_CONFIG\n\n\nclass Database:\n \"\"\"MariaDB connection wrapper with document and chunk operations.\"\"\"\n\n def __init__(self):\n self.connection = None\n\n def connect(self):\n \"\"\"Establish database connection.\"\"\"\n try:\n self.connection = mysql.connector.connect(**DB_CONFIG)\n return True\n except Error as e:\n print(f\"Database connection error: {e}\")\n return False\n\n def disconnect(self):\n \"\"\"Close database connection.\"\"\"\n if self.connection and self.connection.is_connected():\n self.connection.close()\n\n def execute(self, query, params=None):\n \"\"\"Execute a query and return the cursor.\"\"\"\n cursor = self.connection.cursor(dictionary=True)\n cursor.execute(query, params or ())\n return cursor\n\n def commit(self):\n \"\"\"Commit the current transaction.\"\"\"\n self.connection.commit()\n\n # Document Operations\n def document_exists(self, file_path):\n \"\"\"Check if document already exists.\"\"\"\n cursor = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor.fetchone()\n cursor.close()\n return result[\"id\"] if result else None\n\n def insert_document(self, file_path, title, file_type, file_size, file_hash):\n \"\"\"Insert a new document or update existing one.\"\"\"\n import os\n\n folder_path = os.path.dirname(file_path)\n cursor = self.execute(\n \"\"\"INSERT INTO documents\n (source_path, folder_path, filename, mime_type, file_size, file_hash, status)\n VALUES (%s, %s, %s, %s, %s, %s, 'processing')\n ON DUPLICATE KEY UPDATE\n file_hash = VALUES(file_hash),\n file_size = VALUES(file_size),\n status = 'processing',\n processed_at = NULL,\n error_message = NULL\"\"\",\n (file_path, folder_path, title, file_type, file_size, file_hash),\n )\n self.commit()\n doc_id = cursor.lastrowid\n # If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0\n if doc_id == 0:\n cursor_select = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor_select.fetchone()\n cursor_select.close()\n doc_id = result[\"id\"] if result else None\n cursor.close()\n return doc_id\n\n def update_document_status(self, doc_id, status, error_message=None):\n \"\"\"Update document processing status.\"\"\"\n if error_message:\n cursor = self.execute(\n \"\"\"UPDATE documents\n SET status = %s, error_message = %s, processed_at = NOW()\n WHERE id = %s\"\"\",\n (status, error_message, doc_id),\n )\n else:\n cursor = self.execute(\n \"UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s\", (status, doc_id)\n )\n self.commit()\n cursor.close()\n\n # Chunk Operations\n def insert_chunk(\n self, doc_id, chunk_index, content, heading_path, position_start=None, position_end=None, metadata=None\n ):\n \"\"\"Insert a text chunk.\"\"\"\n # Calculate token count (rough estimate: 4 chars per token)\n token_count = len(content) \/\/ 4\n cursor = self.execute(\n \"\"\"INSERT INTO chunks\n (document_id, chunk_index, content, token_count, heading_path, metadata)\n VALUES (%s, %s, %s, %s, %s, %s)\"\"\",\n (doc_id, chunk_index, content, token_count, heading_path, metadata),\n )\n self.commit()\n chunk_id = cursor.lastrowid\n cursor.close()\n return chunk_id\n\n def get_chunks_for_embedding(self, limit=100):\n \"\"\"Get chunks that need embeddings.\"\"\"\n cursor = self.execute(\n \"\"\"SELECT c.id, c.content, c.document_id\n FROM chunks c\n WHERE c.qdrant_id IS NULL\n ORDER BY c.created_at\n LIMIT %s\"\"\",\n (limit,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def update_chunk_qdrant_id(self, chunk_id, qdrant_id):\n \"\"\"Update chunk with Qdrant point ID.\"\"\"\n cursor = self.execute(\"UPDATE chunks SET qdrant_id = %s WHERE id = %s\", (qdrant_id, chunk_id))\n self.commit()\n cursor.close()\n\n # Queue Operations\n def add_to_queue(self, file_path, action=\"process\"):\n \"\"\"Add item to pipeline queue.\"\"\"\n cursor = self.execute(\n \"\"\"INSERT INTO pipeline_queue\n (file_path, action, status, retry_count, created_at)\n VALUES (%s, %s, 'pending', 0, NOW())\"\"\",\n (file_path, action),\n )\n self.commit()\n queue_id = cursor.lastrowid\n cursor.close()\n return queue_id\n\n def get_pending_queue_items(self, limit=10):\n \"\"\"Get pending items from queue.\"\"\"\n cursor = self.execute(\n \"\"\"SELECT id, file_path, action, retry_count\n FROM pipeline_queue\n WHERE status = 'pending'\n ORDER BY created_at\n LIMIT %s\"\"\",\n (limit,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def update_queue_status(self, queue_id, status, error_message=None):\n \"\"\"Update queue item status.\"\"\"\n if status == \"error\" and error_message:\n cursor = self.execute(\n \"\"\"UPDATE pipeline_queue\n SET status = %s, error_message = %s,\n retry_count = retry_count + 1, updated_at = NOW()\n WHERE id = %s\"\"\",\n (status, error_message, queue_id),\n )\n else:\n cursor = self.execute(\n \"UPDATE pipeline_queue SET status = %s, updated_at = NOW() WHERE id = %s\", (status, queue_id)\n )\n self.commit()\n cursor.close()\n\n # Logging\n def log(self, level, message, context=None):\n \"\"\"Write to pipeline log (ki_dev database).\"\"\"\n import json\n\n import mysql.connector\n\n from config import DB_LOG_CONFIG\n\n try:\n # Context must be valid JSON\n if context is not None:\n if isinstance(context, str):\n context = json.dumps({\"info\": context})\n elif isinstance(context, dict):\n context = json.dumps(context)\n else:\n context = json.dumps({\"data\": str(context)})\n\n # Use separate connection to ki_dev for logging\n log_conn = mysql.connector.connect(**DB_LOG_CONFIG)\n cursor = log_conn.cursor()\n cursor.execute(\n \"\"\"INSERT INTO pipeline_log\n (level, message, context, created_at)\n VALUES (%s, %s, %s, NOW())\"\"\",\n (level, message, context),\n )\n log_conn.commit()\n cursor.close()\n log_conn.close()\n except Exception: # noqa: S110\n # Logging should never break the pipeline\n pass\n\n # Prompts\n def get_prompt(self, name, version=None):\n \"\"\"Get a prompt by name (optionally specific version).\"\"\"\n if version:\n cursor = self.execute(\"SELECT content FROM prompts WHERE name = %s AND version = %s\", (name, version))\n else:\n cursor = self.execute(\n \"\"\"SELECT content FROM prompts\n WHERE name = %s AND is_active = 1\n ORDER BY version DESC LIMIT 1\"\"\",\n (name,),\n )\n result = cursor.fetchone()\n cursor.close()\n return result[\"content\"] if result else None\n\n # Protokoll Logging (LLM calls)\n def log_to_protokoll(\n self,\n client_name,\n request,\n response=None,\n model_name=None,\n tokens_input=0,\n tokens_output=0,\n duration_ms=0,\n status=\"completed\",\n error_message=None,\n ):\n \"\"\"\n Log LLM call to ki_dev.protokoll table.\n\n Args:\n client_name: Caller identifier (e.g., 'content-studio', 'pipeline')\n request: The prompt sent to the LLM\n response: The LLM response\n model_name: Model used (e.g., 'claude-opus-4-5-20251101')\n tokens_input: Input token count\n tokens_output: Output token count\n duration_ms: Request duration in milliseconds\n status: 'pending', 'completed', or 'error'\n error_message: Error details if status is 'error'\n \"\"\"\n from datetime import datetime\n\n import mysql.connector\n\n from config import DB_PROTOKOLL_CONFIG\n\n try:\n conn = mysql.connector.connect(**DB_PROTOKOLL_CONFIG)\n cursor = conn.cursor()\n\n now = datetime.now()\n tokens_total = tokens_input + tokens_output\n\n cursor.execute(\n \"\"\"INSERT INTO protokoll\n (request_ip, client_name, request, request_timestamp,\n response, response_timestamp, duration_ms,\n tokens_input, tokens_output, tokens_total,\n model_name, status, error_message)\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\"\"\",\n (\n \"127.0.0.1\", # Local pipeline call\n client_name,\n request[:65000] if request else None, # Truncate if too long\n now,\n response[:65000] if response else None, # Truncate if too long\n now if response else None,\n duration_ms,\n tokens_input,\n... [TRUNCATED-8c65cd83e3db3f82]",
"structuredPatch": [
{
"oldStart": 298,
"oldLines": 5,
"newStart": 298,
"newLines": 116,
"lines": [
" return None",
" ",
" ",
"+class PipelineProgress:",
"+ \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"",
"+",
"+ def __init__(self, run_id: int):",
"+ self.run_id = run_id",
"+ self.log_lines = []",
"+ self.max_log_lines = 20",
"+",
"+ def _update(self, **kwargs):",
"+ \"\"\"Internal: Update database with progress.\"\"\"",
"+ from datetime import datetime",
"+",
"+ import mysql.connector",
"+",
"+ from config import DB_CONFIG",
"+",
"+ if not self.run_id:",
"+ return",
"+",
"+ kwargs[\"last_update_at\"] = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")",
"+",
"+ sets = \", \".join(f\"{k} = %s\" for k in kwargs.keys())",
"+ values = list(kwargs.values())",
"+ values.append(self.run_id)",
"+",
"+ try:",
"+ conn = mysql.connector.connect(**DB_CONFIG)",
"+ cursor = conn.cursor()",
"+ cursor.execute(f\"UPDATE pipeline_runs SET {sets} WHERE id = %s\", values)",
"+ conn.commit()",
"+ cursor.close()",
"+ conn.close()",
"+ except Exception as e:",
"+ print(f\"Progress update error: {e}\")",
"+",
"+ def update_step(self, step_name: str):",
"+ \"\"\"Update current pipeline step.\"\"\"",
"+ self._update(current_step=step_name)",
"+",
"+ def update_document(self, filename: str):",
"+ \"\"\"Update current document being processed.\"\"\"",
"+ self._update(current_document=filename)",
"+",
"+ def update_progress(",
"+ self,",
"+ processed: int = None,",
"+ total: int = None,",
"+ failed: int = None,",
"+ chunks: int = None,",
"+ embeddings: int = None,",
"+ ):",
"+ \"\"\"Update numeric progress fields.\"\"\"",
"+ data = {}",
"+ if processed is not None:",
"+ data[\"documents_processed\"] = processed",
"+ if total is not None:",
"+ data[\"documents_total\"] = total",
"+ if failed is not None:",
"+ data[\"documents_failed\"] = failed",
"+ if chunks is not None:",
"+ data[\"chunks_created\"] = chunks",
"+ if embeddings is not None:",
"+ data[\"embeddings_created\"] = embeddings",
"+ if data:",
"+ self._update(**data)",
"+",
"+ def add_log(self, message: str):",
"+ \"\"\"Add log line and update log_tail.\"\"\"",
"+ from datetime import datetime",
"+",
"+ timestamp = datetime.now().strftime(\"%H:%M:%S\")",
"+ self.log_lines.append(f\"[{timestamp}] {message}\")",
"+ if len(self.log_lines) > self.max_log_lines:",
"+ self.log_lines = self.log_lines[-self.max_log_lines :]",
"+ self._update(log_tail=\"\\n\".join(self.log_lines))",
"+",
"+ def complete(self, status: str = \"completed\", error: str = None):",
"+ \"\"\"Mark run as complete or failed.\"\"\"",
"+ from datetime import datetime",
"+",
"+ data = {",
"+ \"status\": status,",
"+ \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),",
"+ \"current_step\": None,",
"+ \"current_document\": None,",
"+ }",
"+ if error:",
"+ data[\"error_log\"] = error",
"+ self._update(**data)",
"+",
"+ def is_cancelled(self) -> bool:",
"+ \"\"\"Check if run has been cancelled by user.\"\"\"",
"+ import mysql.connector",
"+",
"+ from config import DB_CONFIG",
"+",
"+ if not self.run_id:",
"+ return False",
"+",
"+ try:",
"+ conn = mysql.connector.connect(**DB_CONFIG)",
"+ cursor = conn.cursor(dictionary=True)",
"+ cursor.execute(\"SELECT status FROM pipeline_runs WHERE id = %s\", (self.run_id,))",
"+ result = cursor.fetchone()",
"+ cursor.close()",
"+ conn.close()",
"+ return result and result[\"status\"] == \"cancelled\"",
"+ except Exception:",
"+ return False",
"+",
"+",
" # Global database instance",
" db = Database()"
]
}
],
"userModified": false,
"replaceAll": false
}
}