{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/db.py",
"content": "\"\"\"\nDatabase operations for KI-System Pipeline\n\"\"\"\n\nimport mysql.connector\nfrom mysql.connector import Error\n\nfrom config import DB_CONFIG\nfrom constants import DEFAULT_LIMIT\n\n\nclass Database:\n \"\"\"MariaDB connection wrapper with document and chunk operations.\"\"\"\n\n def __init__(self):\n self.connection = None\n\n def connect(self):\n \"\"\"Establish database connection.\"\"\"\n try:\n self.connection = mysql.connector.connect(**DB_CONFIG)\n return True\n except Error as e:\n print(f\"Database connection error: {e}\")\n return False\n\n def disconnect(self):\n \"\"\"Close database connection.\"\"\"\n if self.connection and self.connection.is_connected():\n self.connection.close()\n\n def execute(self, query, params=None):\n \"\"\"Execute a query and return the cursor.\"\"\"\n cursor = self.connection.cursor(dictionary=True)\n cursor.execute(query, params or ())\n return cursor\n\n def commit(self):\n \"\"\"Commit the current transaction.\"\"\"\n self.connection.commit()\n\n # Document Operations\n def document_exists(self, file_path):\n \"\"\"Check if document already exists.\"\"\"\n cursor = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor.fetchone()\n cursor.close()\n return result[\"id\"] if result else None\n\n def document_is_done(self, file_path):\n \"\"\"Check if document is already fully processed (status='done').\"\"\"\n cursor = self.execute(\"SELECT id, status FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor.fetchone()\n cursor.close()\n if result and result[\"status\"] == \"done\":\n return result[\"id\"]\n return None\n\n def insert_document(self, file_path, title, file_type, file_size, file_hash):\n \"\"\"Insert a new document or update existing one.\"\"\"\n import os\n\n folder_path = os.path.dirname(file_path)\n cursor = self.execute(\n \"\"\"INSERT INTO documents\n (source_path, folder_path, filename, mime_type, file_size, file_hash, status)\n VALUES (%s, %s, %s, %s, %s, %s, 'processing')\n ON DUPLICATE KEY UPDATE\n file_hash = VALUES(file_hash),\n file_size = VALUES(file_size),\n status = 'processing',\n processed_at = NULL,\n error_message = NULL\"\"\",\n (file_path, folder_path, title, file_type, file_size, file_hash),\n )\n self.commit()\n doc_id = cursor.lastrowid\n # If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0\n if doc_id == 0:\n cursor_select = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor_select.fetchone()\n cursor_select.close()\n doc_id = result[\"id\"] if result else None\n cursor.close()\n return doc_id\n\n def update_document_status(self, doc_id, status, error_message=None):\n \"\"\"Update document processing status.\"\"\"\n if error_message:\n cursor = self.execute(\n \"\"\"UPDATE documents\n SET status = %s, error_message = %s, processed_at = NOW()\n WHERE id = %s\"\"\",\n (status, error_message, doc_id),\n )\n else:\n cursor = self.execute(\n \"UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s\", (status, doc_id)\n )\n self.commit()",
"numLines": 100,
"startLine": 1,
"totalLines": 834
}
}
}