{
"event": "PreToolUse",
"tool_name": "Read",
"tool_input": {
"file_path": "\/var\/www\/migration\/content-pipeline\/src\/pipeline.py"
}
}
{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/migration\/content-pipeline\/src\/db.py",
"content": "\"\"\"\nDatabase operations for KI-System Pipeline\n\nComposition layer that combines all database mixins into a single Database class.\nMaintains full backward compatibility with existing imports.\n\nArchitecture:\n db_core.py - Connection management (DatabaseCore)\n db_documents.py - Document\/Page\/Chunk CRUD (DocumentsMixin)\n db_queue.py - Pipeline queue operations (QueueMixin)\n db_logging.py - Logging operations (LoggingMixin)\n db_semantic.py - Semantic operations (SemanticMixin)\n db_prompts.py - Prompt operations (PromptsMixin)\n\"\"\"\n\nfrom datetime import datetime\n\nimport mysql.connector\n\nfrom config import DB_CONFIG\nfrom db_core import DatabaseCore\nfrom db_documents import DocumentsMixin\nfrom db_logging import LoggingMixin\nfrom db_prompts import PromptsMixin\nfrom db_queue import QueueMixin\nfrom db_semantic import SemanticMixin\n\n\nclass Database(\n DatabaseCore,\n DocumentsMixin,\n QueueMixin,\n LoggingMixin,\n SemanticMixin,\n PromptsMixin,\n):\n \"\"\"MariaDB connection wrapper with all database operations.\n\n Combines all mixins via multiple inheritance (MRO):\n - DatabaseCore: connect, disconnect, execute, commit\n - DocumentsMixin: document\/page\/chunk CRUD\n - QueueMixin: pipeline queue operations\n - LoggingMixin: log, log_to_protokoll, log_provenance\n - SemanticMixin: entity types, stopwords, taxonomy\n - PromptsMixin: prompt retrieval\n \"\"\"\n\n pass\n\n\nclass PipelineProgress:\n \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"\n\n def __init__(self, run_id: int):\n self.run_id = run_id\n self.log_lines = []\n self.max_log_lines = 200 # Keep full log history\n\n def _update(self, **kwargs):\n \"\"\"Internal: Update database with progress.\"\"\"\n if not self.run_id:\n return\n\n kwargs[\"last_update_at\"] = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n sets = \", \".join(f\"{k} = %s\" for k in kwargs)\n values = list(kwargs.values())\n values.append(self.run_id)\n\n try:\n conn = mysql.connector.connect(**DB_CONFIG)\n cursor = conn.cursor()\n cursor.execute(f\"UPDATE pipeline_runs SET {sets} WHERE id = %s\", values) # noqa: S608\n conn.commit()\n cursor.close()\n conn.close()\n except Exception as e:\n print(f\"Progress update error: {e}\")\n\n def update_step(self, step_name: str):\n \"\"\"Update current pipeline step.\"\"\"\n self._update(current_step=step_name)\n\n def update_document(self, filename: str):\n \"\"\"Update current document being processed.\"\"\"\n self._update(current_document=filename)\n\n def update_progress(\n self,\n processed: int = None,\n total: int = None,\n failed: int = None,\n chunks: int = None,\n embeddings: int = None,\n ):\n \"\"\"Update numeric progress fields.\"\"\"\n data = {}\n if processed is not None:\n data[\"documents_processed\"] = processed\n if total is not None:\n data[\"documents_total\"] = total\n if failed is not None:\n data[\"documents_failed\"] = failed\n if chunks is not None:\n data[\"chunks_created\"] = chunks\n if embeddings is not None:\n data[\"embeddings_created\"] = embeddings\n if data:\n self._update(**data)\n\n def add_log(self, message: str):\n \"\"\"Add log line and update log_tail.\"\"\"\n timestamp = datetime.now().strftime(\"%H:%M:%S\")\n self.log_lines.append(f\"[{timestamp}] {message}\")\n if len(self.log_lines) > self.max_log_lines:\n self.log_lines = self.log_lines[-self.max_log_lines :]\n self._update(log_tail=\"\\n\".join(self.log_lines))\n\n def complete(self, status: str = \"completed\", error: str = None):\n \"\"\"Mark run as complete or failed.\"\"\"\n data = {\n \"status\": status,\n \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),\n \"current_step\": None,\n \"current_document\": None,\n }\n if error:\n data[\"error_log\"] = error\n self._update(**data)\n\n def is_cancelled(self) -> bool:\n \"\"\"Check if run has been cancelled by user.\"\"\"\n if not self.run_id:\n return False\n\n try:\n conn = mysql.connector.connect(**DB_CONFIG)\n cursor = conn.cursor(dictionary=True)\n cursor.execute(\n \"SELECT status FROM pipeline_runs WHERE id = %s\",\n (self.run_id,),\n )\n result = cursor.fetchone()\n cursor.close()\n conn.close()\n return result and result[\"status\"] == \"cancelled\"\n except Exception:\n return False\n\n\n# Global database instance for backward compatibility\ndb = Database()\n",
"numLines": 153,
"startLine": 1,
"totalLines": 153
}
}
}