Protokoll #27409

ID27409
Zeitstempel2025-12-28 09:00:20.487587
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens13,237 (Input: 1,308, Output: 11,929)
Dauer96 ms
Request-Zeit2025-12-28 09:00:20.487587
Response-Zeit2025-12-28 09:00:20.583301

Request

{
    "event": "PreToolUse",
    "tool_name": "Write",
    "tool_input": {
        "file_path": "\/var\/www\/scripts\/pipeline\/db.py",
        "content": "\"\"\"\nDatabase operations for KI-System Pipeline\n\nComposition layer that combines all database mixins into a single Database class.\nMaintains full backward compatibility with existing imports.\n\nArchitecture:\n    db_core.py      - Connection management (DatabaseCore)\n    db_documents.py - Document\/Page\/Chunk CRUD (DocumentsMixin)\n    db_queue.py     - Pipeline queue operations (QueueMixin)\n    db_logging.py   - Logging operations (LoggingMixin)\n    db_semantic.py  - Semantic operations (SemanticMixin)\n    db_prompts.py   - Prompt operations (PromptsMixin)\n\"\"\"\n\nfrom datetime import datetime\n\nimport mysql.connector\n\nfrom config import DB_CONFIG\nfrom db_core import DatabaseCore\nfrom db_documents import DocumentsMixin\nfrom db_logging import LoggingMixin\nfrom db_prompts import PromptsMixin\nfrom db_queue import QueueMixin\nfrom db_semantic import SemanticMixin\n\n\nclass Database(\n    DatabaseCore,\n    DocumentsMixin,\n    QueueMixin,\n    LoggingMixin,\n    SemanticMixin,\n    PromptsMixin,\n):\n    \"\"\"MariaDB connection wrapper with all database operations.\n\n    Combines all mixins via multiple inheritance (MRO):\n    - DatabaseCore: connect, disconnect, execute, commit\n    - DocumentsMixin: document\/page\/chunk CRUD\n    - QueueMixin: pipeline queue operations\n    - LoggingMixin: log, log_to_protokoll, log_provenance\n    - SemanticMixin: entity types, stopwords, taxonomy\n    - PromptsMixin: prompt retrieval\n    \"\"\"\n\n    pass\n\n\nclass PipelineProgress:\n    \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"\n\n    def __init__(self, run_id: int):\n        self.run_id = run_id\n        self.log_lines = []\n        self.max_log_lines = 200  # Keep full log history\n\n    def _update(self, **kwargs):\n        \"\"\"Internal: Update database with progress.\"\"\"\n        if not self.run_id:\n            return\n\n        kwargs[\"last_update_at\"] = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n        sets = \", \".join(f\"{k} = %s\" for k in kwargs)\n        values = list(kwargs.values())\n        values.append(self.run_id)\n\n        try:\n            conn = mysql.connector.connect(**DB_CONFIG)\n            cursor = conn.cursor()\n            cursor.execute(f\"UPDATE pipeline_runs SET {sets} WHERE id = %s\", values)  # noqa: S608\n            conn.commit()\n            cursor.close()\n            conn.close()\n        except Exception as e:\n            print(f\"Progress update error: {e}\")\n\n    def update_step(self, step_name: str):\n        \"\"\"Update current pipeline step.\"\"\"\n        self._update(current_step=step_name)\n\n    def update_document(self, filename: str):\n        \"\"\"Update current document being processed.\"\"\"\n        self._update(current_document=filename)\n\n    def update_progress(\n        self,\n        processed: int = None,\n        total: int = None,\n        failed: int = None,\n        chunks: int = None,\n        embeddings: int = None,\n    ):\n        \"\"\"Update numeric progress fields.\"\"\"\n        data = {}\n        if processed is not None:\n            data[\"documents_processed\"] = processed\n        if total is not None:\n            data[\"documents_total\"] = total\n        if failed is not None:\n            data[\"documents_failed\"] = failed\n        if chunks is not None:\n            data[\"chunks_created\"] = chunks\n        if embeddings is not None:\n            data[\"embeddings_created\"] = embeddings\n        if data:\n            self._update(**data)\n\n    def add_log(self, message: str):\n        \"\"\"Add log line and update log_tail.\"\"\"\n        timestamp = datetime.now().strftime(\"%H:%M:%S\")\n        self.log_lines.append(f\"[{timestamp}] {message}\")\n        if len(self.log_lines) > self.max_log_lines:\n            self.log_lines = self.log_lines[-self.max_log_lines:]\n        self._update(log_tail=\"\\n\".join(self.log_lines))\n\n    def complete(self, status: str = \"completed\", error: str = None):\n        \"\"\"Mark run as complete or failed.\"\"\"\n        data = {\n            \"status\": status,\n            \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),\n            \"current_step\": None,\n            \"current_document\": None,\n        }\n        if error:\n            data[\"error_log\"] = error\n        self._update(**data)\n\n    def is_cancelled(self) -> bool:\n        \"\"\"Check if run has been cancelled by user.\"\"\"\n        if not self.run_id:\n            return False\n\n        try:\n            conn = mysql.connector.connect(**DB_CONFIG)\n            cursor = conn.cursor(dictionary=True)\n            cursor.execute(\n                \"SELECT status FROM pipeline_runs WHERE id = %s\",\n                (self.run_id,),\n            )\n            result = cursor.fetchone()\n            cursor.close()\n            conn.close()\n            return result and result[\"status\"] == \"cancelled\"\n        except Exception:\n            return False\n\n\n# Global database instance for backward compatibility\ndb = Database()\n"
    }
}

Response

{
    "tool_response": {
        "type": "update",
        "filePath": "\/var\/www\/scripts\/pipeline\/db.py",
        "content": "\"\"\"\nDatabase operations for KI-System Pipeline\n\nComposition layer that combines all database mixins into a single Database class.\nMaintains full backward compatibility with existing imports.\n\nArchitecture:\n    db_core.py      - Connection management (DatabaseCore)\n    db_documents.py - Document\/Page\/Chunk CRUD (DocumentsMixin)\n    db_queue.py     - Pipeline queue operations (QueueMixin)\n    db_logging.py   - Logging operations (LoggingMixin)\n    db_semantic.py  - Semantic operations (SemanticMixin)\n    db_prompts.py   - Prompt operations (PromptsMixin)\n\"\"\"\n\nfrom datetime import datetime\n\nimport mysql.connector\n\nfrom config import DB_CONFIG\nfrom db_core import DatabaseCore\nfrom db_documents import DocumentsMixin\nfrom db_logging import LoggingMixin\nfrom db_prompts import PromptsMixin\nfrom db_queue import QueueMixin\nfrom db_semantic import SemanticMixin\n\n\nclass Database(\n    DatabaseCore,\n    DocumentsMixin,\n    QueueMixin,\n    LoggingMixin,\n    SemanticMixin,\n    PromptsMixin,\n):\n    \"\"\"MariaDB connection wrapper with all database operations.\n\n    Combines all mixins via multiple inheritance (MRO):\n    - DatabaseCore: connect, disconnect, execute, commit\n    - DocumentsMixin: document\/page\/chunk CRUD\n    - QueueMixin: pipeline queue operations\n    - LoggingMixin: log, log_to_protokoll, log_provenance\n    - SemanticMixin: entity types, stopwords, taxonomy\n    - PromptsMixin: prompt retrieval\n    \"\"\"\n\n    pass\n\n\nclass PipelineProgress:\n    \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"\n\n    def __init__(self, run_id: int):\n        self.run_id = run_id\n        self.log_lines = []\n        self.max_log_lines = 200  # Keep full log history\n\n    def _update(self, **kwargs):\n        \"\"\"Internal: Update database with progress.\"\"\"\n        if not self.run_id:\n            return\n\n        kwargs[\"last_update_at\"] = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n        sets = \", \".join(f\"{k} = %s\" for k in kwargs)\n        values = list(kwargs.values())\n        values.append(self.run_id)\n\n        try:\n            conn = mysql.connector.connect(**DB_CONFIG)\n            cursor = conn.cursor()\n            cursor.execute(f\"UPDATE pipeline_runs SET {sets} WHERE id = %s\", values)  # noqa: S608\n            conn.commit()\n            cursor.close()\n            conn.close()\n        except Exception as e:\n            print(f\"Progress update error: {e}\")\n\n    def update_step(self, step_name: str):\n        \"\"\"Update current pipeline step.\"\"\"\n        self._update(current_step=step_name)\n\n    def update_document(self, filename: str):\n        \"\"\"Update current document being processed.\"\"\"\n        self._update(current_document=filename)\n\n    def update_progress(\n        self,\n        processed: int = None,\n        total: int = None,\n        failed: int = None,\n        chunks: int = None,\n        embeddings: int = None,\n    ):\n        \"\"\"Update numeric progress fields.\"\"\"\n        data = {}\n        if processed is not None:\n            data[\"documents_processed\"] = processed\n        if total is not None:\n            data[\"documents_total\"] = total\n        if failed is not None:\n            data[\"documents_failed\"] = failed\n        if chunks is not None:\n            data[\"chunks_created\"] = chunks\n        if embeddings is not None:\n            data[\"embeddings_created\"] = embeddings\n        if data:\n            self._update(**data)\n\n    def add_log(self, message: str):\n        \"\"\"Add log line and update log_tail.\"\"\"\n        timestamp = datetime.now().strftime(\"%H:%M:%S\")\n        self.log_lines.append(f\"[{timestamp}] {message}\")\n        if len(self.log_lines) > self.max_log_lines:\n            self.log_lines = self.log_lines[-self.max_log_lines:]\n        self._update(log_tail=\"\\n\".join(self.log_lines))\n\n    def complete(self, status: str = \"completed\", error: str = None):\n        \"\"\"Mark run as complete or failed.\"\"\"\n        data = {\n            \"status\": status,\n            \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),\n            \"current_step\": None,\n            \"current_document\": None,\n        }\n        if error:\n            data[\"error_log\"] = error\n        self._update(**data)\n\n    def is_cancelled(self) -> bool:\n        \"\"\"Check if run has been cancelled by user.\"\"\"\n        if not self.run_id:\n            return False\n\n        try:\n            conn = mysql.connector.connect(**DB_CONFIG)\n            cursor = conn.cursor(dictionary=True)\n            cursor.execute(\n                \"SELECT status FROM pipeline_runs WHERE id = %s\",\n                (self.run_id,),\n            )\n            result = cursor.fetchone()\n            cursor.close()\n            conn.close()\n            return result and result[\"status\"] == \"cancelled\"\n        except Exception:\n            return False\n\n\n# Global database instance for backward compatibility\ndb = Database()\n",
        "structuredPatch": [
            {
                "oldStart": 1,
                "oldLines": 723,
                "newStart": 1,
                "newLines": 53,
                "lines": [
                    " \"\"\"",
                    " Database operations for KI-System Pipeline",
                    "+",
                    "+Composition layer that combines all database mixins into a single Database class.",
                    "+Maintains full backward compatibility with existing imports.",
                    "+",
                    "+Architecture:",
                    "+    db_core.py      - Connection management (DatabaseCore)",
                    "+    db_documents.py - Document\/Page\/Chunk CRUD (DocumentsMixin)",
                    "+    db_queue.py     - Pipeline queue operations (QueueMixin)",
                    "+    db_logging.py   - Logging operations (LoggingMixin)",
                    "+    db_semantic.py  - Semantic operations (SemanticMixin)",
                    "+    db_prompts.py   - Prompt operations (PromptsMixin)",
                    " \"\"\"",
                    " ",
                    "+from datetime import datetime",
                    "+",
                    " import mysql.connector",
                    "-from mysql.connector import Error",
                    " ",
                    " from config import DB_CONFIG",
                    "-from constants import DEFAULT_LIMIT",
                    "+from db_core import DatabaseCore",
                    "+from db_documents import DocumentsMixin",
                    "+from db_logging import LoggingMixin",
                    "+from db_prompts import PromptsMixin",
                    "+from db_queue import QueueMixin",
                    "+from db_semantic import SemanticMixin",
                    " ",
                    " ",
                    "-class Database:",
                    "-    \"\"\"MariaDB connection wrapper with document and chunk operations.\"\"\"",
                    "+class Database(",
                    "+    DatabaseCore,",
                    "+    DocumentsMixin,",
                    "+    QueueMixin,",
                    "+    LoggingMixin,",
                    "+    SemanticMixin,",
                    "+    PromptsMixin,",
                    "+):",
                    "+    \"\"\"MariaDB connection wrapper with all database operations.",
                    " ",
                    "-    def __init__(self):",
                    "-        self.connection = None",
                    "+    Combines all mixins via multiple inheritance (MRO):",
                    "+    - DatabaseCore: connect, disconnect, execute, commit",
                    "+    - DocumentsMixin: document\/page\/chunk CRUD",
                    "+    - QueueMixin: pipeline queue operations",
                    "+    - LoggingMixin: log, log_to_protokoll, log_provenance",
                    "+    - SemanticMixin: entity types, stopwords, taxonomy",
                    "+    - PromptsMixin: prompt retrieval",
                    "+    \"\"\"",
                    " ",
                    "-    def connect(self):",
                    "-        \"\"\"Establish database connection.\"\"\"",
                    "-        try:",
                    "-            self.connection = mysql.connector.connect(**DB_CONFIG)",
                    "-            return True",
                    "-        except Error as e:",
                    "-            print(f\"Database connection error: {e}\")",
                    "-            return False",
                    "+    pass",
                    " ",
                    "-    def disconnect(self):",
                    "-        \"\"\"Close database connection.\"\"\"",
                    "-        if self.connection and self.connection.is_connected():",
                    "-            self.connection.close()",
                    " ",
                    "-    def execute(self, query, params=None):",
                    "-        \"\"\"Execute a query and return the cursor.\"\"\"",
                    "-        cursor = self.connection.cursor(dictionary=True)",
                    "-        cursor.execute(query, params or ())",
                    "-        return cursor",
                    "-",
                    "-    def commit(self):",
                    "-        \"\"\"Commit the current transaction.\"\"\"",
                    "-        self.connection.commit()",
                    "-",
                    "-    # Document Operations",
                    "-    def document_exists(self, file_path):",
                    "-        \"\"\"Check if document already exists.\"\"\"",
                    "-        cursor = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))",
                    "-        result = cursor.fetchone()",
                    "-        cursor.close()",
                    "-        return result[\"id\"] if result else None",
                    "-",
                    "-    def document_is_done(self, file_path):",
                    "-        \"\"\"Check if document is already fully processed (status='done').\"\"\"",
                    "-        cursor = self.execute(\"SELECT id, status FROM documents WHERE source_path = %s\", (file_path,))",
                    "-        result = cursor.fetchone()",
                    "-        cursor.close()",
                    "-        if result and result[\"status\"] == \"done\":",
                    "-            return result[\"id\"]",
                    "-        return None",
                    "-",
                    "-    def insert_document(self, file_path, title, file_type, file_size, file_hash):",
                    "-        \"\"\"Insert a new document or update existing one.\"\"\"",
                    "-        import os",
                    "-",
                    "-        folder_path = os.path.dirname(file_path)",
                    "-        cursor = self.execute(",
                    "-            \"\"\"INSERT INTO documents",
                    "-               (source_path, folder_path, filename, mime_type, file_size, file_hash, status)",
                    "-               VALUES (%s, %s, %s, %s, %s, %s, 'processing')",
                    "-               ON DUPLICATE KEY UPDATE",
                    "-               file_hash = VALUES(file_hash),",
                    "-               file_size = VALUES(file_size),",
                    "-               status = 'processing',",
                    "-               processed_at = NULL,",
                    "-               error_message = NULL\"\"\",",
                    "-            (file_path, folder_path, title, file_type, file_size, file_hash),",
                    "-        )",
                    "-        self.commit()",
                    "-        doc_id = cursor.lastrowid",
                    "-        # If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0",
                    "-        if doc_id == 0:",
                    "-            cursor_select = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))",
                    "-            result = cursor_select.fetchone()",
                    "-            cursor_select.close()",
                    "-            doc_id = result[\"id\"] if result else None",
                    "-        cursor.close()",
                    "-        return doc_id",
                    "-",
                    "-    def update_document_status(self, doc_id, status, error_message=None):",
                    "-        \"\"\"Update document processing status.\"\"\"",
                    "-        if error_message:",
                    "-            cursor = self.execute(",
                    "-                \"\"\"UPDATE documents",
                    "-                   SET status = %s, error_message = %s, processed_at = NOW()",
                    "-                   WHERE id = %s\"\"\",",
                    "-                (status, error_message, doc_id),",
                    "-            )",
                    "-        else:",
                    "-            cursor = self.execute(",
                    "-                \"UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s\", (status, doc_id)",
                    "-            )",
                    "-        self.commit()",
                    "-        cursor.close()",
                    "-",
                    "-    # Page Operations",
                    "-    def insert_page(self, doc_id, page_number, text_content, token_count=None):",
                    "-        \"\"\"Insert a document page.\"\"\"",
                    "-        if token_count is None:",
                    "-            token_count = len(text_content.split()) if text_content else 0",
                    "-        cursor = self.execute(",
                    "-            \"\"\"INSERT INTO document_pages",
                    "-               (document_id, page_number, text_content, token_count, created_at)",
                    "-               VALUES (%s, %s, %s, %s, NOW())",
                    "-               ON DUPLICATE KEY UPDATE",
                    "-               text_content = VALUES(text_content),",
                    "-               token_count = VALUES(token_count)\"\"\",",
                    "-            (doc_id, page_number, text_content, token_count),",
                    "-        )",
                    "-        self.commit()",
                    "-        page_id = cursor.lastrowid",
                    "-        if page_id == 0:",
                    "-            cursor_select = self.execute(",
                    "-                \"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s\",",
                    "-                (doc_id, page_number),",
                    "-            )",
                    "-            result = cursor_select.fetchone()",
                    "-            cursor_select.close()",
                    "-            page_id = result[\"id\"] if result else None",
                    "-        cursor.close()",
                    "-        return page_id",
                    "-",
                    "-    def get_page_id(self, doc_id, page_number):",
                    "-        \"\"\"Get page ID by document and page number.\"\"\"",
                    "-        cursor = self.execute(",
                    "-            \"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s\",",
                    "-            (doc_id, page_number),",
                    "-        )",
                    "-        result = cursor.fetchone()",
                    "-        cursor.close()",
                    "-        return result[\"id\"] if result else None",
                    "-",
                    "-    # Chunk Operations",
                    "-    def insert_chunk(",
                    "-        self,",
                    "-        doc_id,",
                    "-        chunk_index,",
                    "-        content,",
                    "-        heading_path,",
                    "-        position_start=None,",
                    "-        position_end=None,",
                    "-        metadata=None,",
                    "-        page_id=None,",
                    "-    ):",
                    "-        \"\"\"Insert a text chunk.\"\"\"",
                    "-        # Calculate token count (rough estimate: 4 chars per token)",
                    "-        token_count = len(content) \/\/ 4",
                    "-        cursor = self.execute(",
                    "-            \"\"\"INSERT INTO chunks",
                    "-               (document_id, page_id, chunk_index, content, token_count, heading_path, metadata)",
                    "-               VALUES (%s, %s, %s, %s, %s, %s, %s)\"\"\",",
                    "-            (doc_id, page_id, chunk_index, content, token_count, heading_path, metadata),",
                    "-        )",
                    "-        self.commit()",
                    "-        chunk_id = cursor.lastrowid",
                    "-        cursor.close()",
                    "-        return chunk_id",
                    "-",
                    "-    def get_chunks_for_embedding(self, limit=DEFAULT_LIMIT):",
                    "-        \"\"\"Get chunks that need embeddings.\"\"\"",
                    "-        cursor = self.execute(",
                    "-            \"\"\"SELECT c.id, c.content, c.document_id",
                    "-               FROM chunks c",
                    "-               WHERE c.qdrant_id IS NULL",
                    "-               ORDER BY c.created_at",
                    "-               LIMIT %s\"\"\",",
                    "-            (limit,),",
                    "-        )",
                    "-        results = cursor.fetchall()",
                    "-        cursor.close()",
                    "-        return results",
                    "-",
                    "-    def update_chunk_qdrant_id(self, chunk_id, qdrant_id):",
                    "-        \"\"\"Update chunk with Qdrant point ID.\"\"\"",
                    "-        cursor = self.execute(\"UPDATE chunks SET qdrant_id = %s WHERE id = %s\", (qdrant_id, chunk_id))",
                    "-        self.commit()",
                    "-        cursor.close()",
                    "-",
                    "-    # Queue Operations",
                    "-    def add_to_queue(self, file_path, action=\"process\"):",
                    "-        \"\"\"Add item to pipeline queue.\"\"\"",
                    "-        cursor = self.execute(",
                    "-            \"\"\"INSERT INTO pipeline_queue",
                    "-               (file_path, action, status, retry_count, created_at)",
                    "-               VALUES (%s, %s, 'pending', 0, NOW())\"\"\",",
                    "-            (file_path, action),",
                    "-        )",
                    "-        self.commit()",
                    "-        queue_id = cursor.lastrowid",
                    "-        cursor.close()",
                    "-        return queue_id",
                    "-",
                    "-    def get_pending_queue_items(self, limit=10):",
                    "-        \"\"\"Get pending items from queue.\"\"\"",
                    "-        cursor = self.execute(",
                    "-            \"\"\"SELECT id, file_path, action, retry_count",
                    "-               FROM pipeline_queue",
                    "-               WHERE status = 'pending'",
                    "-               ORDER BY created_at",
                    "-               LIMIT %s\"\"\",",
                    "-            (limit,),",
                    "-        )",
                    "-        results = cursor.fetchall()",
                    "-        cursor.close()",
                    "-        return results",
                    "-",
                    "-    def update_queue_status(self, queue_id, status, error_message=None):",
                    "-        \"\"\"Update queue item status.\"\"\"",
                    "-        if status == \"error\" and error_message:",
                    "-            cursor = self.execute(",
                    "-                \"\"\"UPDATE pipeline_queue",
                    "-                   SET status = %s, error_message = %s,",
                    "-                       retry_count = retry_count + 1, updated_at = NOW()",
                    "-                   WHERE id = %s\"\"\",",
                    "-                (status, error_message, queue_id),",
                    "-            )",
                    "-        else:",
                    "-            cursor = self.execute(",
                    "-                \"UPDATE pipeline_queue SET status = %s, updated_at = NOW() WHERE id = %s\", (status, queue_id)",
                    "-            )",
                    "-        self.commit()",
                    "-        cursor.close()",
                    "-",
                    "-    # Logging",
                    "-    def log(self, level, message, context=None):",
                    "-        \"\"\"Write to pipeline log (ki_dev database).\"\"\"",
                    "-        import json",
                    "-",
                    "-        import mysql.connector",
                    "-",
                    "-        from config import DB_LOG_CONFIG",
                    "-",
                    "-        try:",
                    "-            # Context must be valid JSON",
                    "-            if context is not None:",
                    "-                if isinstance(context, str):",
                    "-                    context = json.dumps({\"info\": context})",
                    "-                elif isinstance(context, dict):",
                    "-                    context = json.dumps(context)",
                    "-                else:",
                    "-                    context = json.dumps({\"data\": str(context)})",
                    "-",
                    "-            # Use separate connection to ki_dev for logging",
                    "-            log_conn = mysql.connector.connect(**DB_LOG_CONFIG)",
                    "-            cursor = log_conn.cursor()",
                    "-            cursor.execute(",
                    "-                \"\"\"INSERT INTO pipeline_log",
                    "-                   (level, message, context, created_at)",
                    "-                   VALUES (%s, %s, %s, NOW())\"\"\",",
                    "-                (level, message, context),",
                    "-            )",
                    "-            log_conn.commit()",
                    "-            cursor.close()",
                    "-            log_conn.close()",
                    "-        except Exception:  # noqa: S110",
                    "-            # Logging should never break the pipeline",
                    "-            pass",
                    "-",
                    "-    # Prompts",
                    "-    def get_prompt(self, name, version=None):",
                    "-        \"\"\"Get a prompt by name (optionally specific version).\"\"\"",
                    "-        if version:",
                    "-            cursor = self.execute(\"SELECT content FROM prompts WHERE name = %s AND version = %s\", (name, version))",
                    "-        else:",
                    "-            cursor = self.execute(",
                    "-                \"\"\"SELECT content FROM prompts",
                    "-                   WHERE name = %s AND is_active = 1",
                    "-                   ORDER BY version DESC LIMIT 1\"\"\",",
                    "-                (name,),",
                    "-            )",
                    "-        result = cursor.fetchone()",
                    "-        cursor.close()",
                    "-        return result[\"content\"] if result else None",
                    "-",
                    "-    def get_prompt_by_use_case(self, use_case: str, version: str = None) -> dict | None:",
                    "-        \"\"\"",
                    "-        Get prompt by use_case with full metadata for provenance tracking.",
                    "-",
                    "-        Args:",
                    "-            use_case: The use case (entity_extraction, semantic_analysis, statement_extraction, etc.)",
                    "-            version: Optional specific version (otherwise latest active)",
                    "-",
                    "-        Returns:",
                    "-            Dict with id, name, version, content, use_case or None",
                    "-        \"\"\"",
                    "-        if version:",
                    "-            cursor = self.execute(",
                    "-                \"\"\"SELECT id, name, version, content, use_case",
                    "-                   FROM prompts",
                    "-                   WHERE use_case = %s AND version = %s\"\"\",",
                    "-                (use_case, version),",
                    "-            )",
                    "-        else:",
                    "-            cursor = self.execute(",
                    "-                \"\"\"SELECT id, name, version, content, use_case",
                    "-                   FROM prompts",
                    "-                   WHERE use_case = %s AND is_active = 1",
                    "-                   ORDER BY version DESC LIMIT 1\"\"\",",
                    "-                (use_case,),",
                    "-            )",
                    "-        result = cursor.fetchone()",
                    "-        cursor.close()",
                    "-        return result",
                    "-",
                    "-    # Entity Types",
                    "-    def get_entity_types(self, active_only: bool = True) -> list[dict]:",
                    "-        \"\"\"Get all entity types from database.",
                    "-",
                    "-        Returns:",
                    "-            List of dicts with code, name, description, criteria, indicators, examples",
                    "-        \"\"\"",
                    "-        query = \"\"\"SELECT code, name, description, criteria, indicators, examples",
                    "-                   FROM entity_types\"\"\"",
                    "-        if active_only:",
                    "-            query += \" WHERE is_active = 1\"",
                    "-        query += \" ORDER BY sort_order\"",
                    "-",
                    "-        cursor = self.execute(query)",
                    "-        results = cursor.fetchall()",
                    "-        cursor.close()",
                    "-        return list(results) if results else []",
                    "-",
                    "-    def get_entity_type_codes(self) -> set[str]:",
                    "-        \"\"\"Get set of valid entity type codes.\"\"\"",
                    "-        cursor = self.execute(\"SELECT code FROM entity_types WHERE is_active = 1\")",
                    "-        results = cursor.fetchall()",
                    "-        cursor.close()",
                    "-        return {r[\"code\"] for r in results} if results else set()",
                    "-",
                    "-    def build_entity_prompt_categories(self) -> str:",
                    "-        \"\"\"Build categories section for entity extraction prompt from DB.\"\"\"",
                    "-        types = self.get_entity_types()",
                    "-        lines = []",
                    "-        for t in types:",
                    "-            lines.append(f\"  {t['code']}: {t['criteria']}\")",
                    "-        return \"\\n\".join(lines)",
                    "-",
                    "-    # Stopwords",
                    "-    def get_stopwords(self, active_only: bool = True) -> list[str]:",
                    "-        \"\"\"Get list of stopword canonical forms for entity filtering.",
                    "-",
                    "-        Returns:",
                    "-            List of canonical stopword strings (lowercase, normalized)",
                    "-        \"\"\"",
                    "-        query = \"SELECT canonical_form FROM stopwords\"",
                    "-        if active_only:",
                    "-            query += \" WHERE is_active = 1\"",
                    "-",
                    "-        cursor = self.execute(query)",
                    "-        results = cursor.fetchall()",
                    "-        cursor.close()",
                    "-        return [r[\"canonical_form\"] for r in results] if results else []",
                    "-",
                    "-    def is_stopword(self, word: str) -> bool:",
                    "-        \"\"\"Check if a word is in the stopword list.\"\"\"",
                    "-        canonical = self._normalize_stopword(word)",
                    "-        stopwords = self.get_stopwords()",
                    "-        return canonical in stopwords",
                    "-",
                    "-    def _normalize_stopword(self, word: str) -> str:",
                    "-        \"\"\"Normalize word to canonical form for stopword matching.\"\"\"",
                    "-        import re",
                    "-        import unicodedata",
                    "-",
                    "-        result = word.lower().strip()",
                    "-        # German umlauts",
                    "-        replacements = {\"ä\": \"ae\", \"ö\": \"oe\", \"ü\": \"ue\", \"ß\": \"ss\"}",
                    "-        for old, new in replacements.items():",
                    "-            result = result.replace(old, new)",
                    "-        # Normalize unicode",
                    "-        result = unicodedata.normalize(\"NFKD\", result)",
                    "-        result = result.encode(\"ascii\", \"ignore\").decode(\"ascii\")",
                    "-        # Keep only alphanumeric",
                    "-        result = re.sub(r\"[^a-z0-9]\", \"\", result)",
                    "-        return result",
                    "-",
                    "-    # Protokoll Logging (LLM calls)",
                    "-    def log_to_protokoll(",
                    "-        self,",
                    "-        client_name,",
                    "-        request,",
                    "-        response=None,",
                    "-        model_name=None,",
                    "-        tokens_input=0,",
                    "-        tokens_output=0,",
                    "-        duration_ms=0,",
                    "-        status=\"completed\",",
                    "-        error_message=None,",
                    "-    ):",
                    "-        \"\"\"",
                    "-        Log LLM call to ki_dev.protokoll table.",
                    "-",
                    "-        Args:",
                    "-            client_name: Caller identifier (e.g., 'content-studio', 'pipeline')",
                    "-            request: The prompt sent to the LLM",
                    "-            response: The LLM response",
                    "-            model_name: Model used (e.g., 'claude-opus-4-5-20251101')",
                    "-            tokens_input: Input token count",
                    "-            tokens_output: Output token count",
                    "-            duration_ms: Request duration in milliseconds",
                    "-            status: 'pending', 'completed', or 'error'",
                    "-            error_message: Error details if status is 'error'",
                    "-        \"\"\"",
                    "-        from datetime import datetime",
                    "-",
                    "-        import mysql.connector",
                    "-",
                    "-        from config import DB_PROTOKOLL_CONFIG",
                    "-",
                    "-        try:",
                    "-            conn = mysql.connector.connect(**DB_PROTOKOLL_CONFIG)",
                    "-            cursor = conn.cursor()",
                    "-",
                    "-            now = datetime.now()",
                    "-            tokens_total = tokens_input + tokens_output",
                    "-",
                    "-            cursor.execute(",
                    "-                \"\"\"INSERT INTO protokoll",
                    "-                   (request_ip, client_name, request, request_timestamp,",
                    "-                    response, response_timestamp, duration_ms,",
                    "-                    tokens_input, tokens_output, tokens_total,",
                    "-                    model_name, status, error_message)",
                    "-                   VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\"\"\",",
                    "-                (",
                    "-                    \"127.0.0.1\",  # Local pipeline call",
                    "-                    client_name,",
                    "-                    request[:65000] if request else None,  # Truncate if too long",
                    "-                    now,",
                    "-                    response[:65000] if response else None,  # Truncate if too long",
                    "-                    now if response else None,",
                    "-                    duration_ms,",
                    "-                    tokens_input,",
                    "-                    tokens_output,",
                    "-                    tokens_total,",
                    "-                    model_name,",
                    "-                    status,",
                    "-                    error_message,",
                    "-                ),",
                    "-            )",
                    "-            conn.commit()",
                    "-            protokoll_id = cursor.lastrowid",
                    "-            cursor.close()",
                    "-            conn.close()",
                    "-            return protokoll_id",
                    "-        except Exception as e:",
                    "-            # Logging should never break the pipeline",
                    "-            print(f\"Protokoll logging error: {e}\")",
                    "-            return None",
                    "-",
                    "-    # Entity Synonyms",
                    "-    def find_entity_by_synonym(self, synonym: str) -> dict | None:",
                    "-        \"\"\"",
                    "-        Find entity by synonym.",
                    "-",
                    "-        Args:",
                    "-            synonym: Synonym to search for",
                    "-",
                    "-        Returns:",
                    "-            Dict with entity_id or None",
                    "-        \"\"\"",
                    "-        cursor = self.execute(",
                    "-            \"SELECT entity_id FROM entity_synonyms WHERE synonym = %s LIMIT 1\",",
                    "-            (synonym,),",
                    "-        )",
                    "-        result = cursor.fetchone()",
                    "-        cursor.close()",
                    "-        return result",
                    "-",
                    "-    def add_synonym(",
                    "-        self,",
                    "-        entity_id: int,",
                    "-        synonym: str,",
                    "-        source: str = \"extraction\",",
                    "-        language: str = \"de\",",
                    "-    ) -> int | None:",
                    "-        \"\"\"",
                    "-        Add synonym to entity if not exists.",
                    "-",
                    "-        Args:",
                    "-            entity_id: Entity ID to add synonym to",
                    "-            synonym: The synonym text",
                    "-            source: How it was found (extraction, manual, merge)",
                    "-            language: Language code",
                    "-",
                    "-        Returns:",
                    "-            Synonym ID or None if already exists",
                    "-        \"\"\"",
                    "-        # Check if synonym already exists for this entity",
                    "-        cursor = self.execute(",
                    "-            \"SELECT id FROM entity_synonyms WHERE entity_id = %s AND synonym = %s\",",
                    "-            (entity_id, synonym),",
                    "-        )",
                    "-        existing = cursor.fetchone()",
                    "-        cursor.close()",
                    "-",
                    "-        if existing:",
                    "-            return None",
                    "-",
                    "-        try:",
                    "-            cursor = self.execute(",
                    "-                \"\"\"INSERT INTO entity_synonyms (entity_id, synonym, source, language, created_at)",
                    "-                   VALUES (%s, %s, %s, %s, NOW())\"\"\",",
                    "-                (entity_id, synonym, source, language),",
                    "-            )",
                    "-            self.commit()",
                    "-            syn_id = cursor.lastrowid",
                    "-            cursor.close()",
                    "-            return syn_id",
                    "-        except Exception as e:",
                    "-            self.log(\"WARNING\", f\"Failed to add synonym: {e}\")",
                    "-            return None",
                    "-",
                    "-    # Chunk Taxonomy Mapping",
                    "-    def add_chunk_taxonomy(",
                    "-        self,",
                    "-        chunk_id: int,",
                    "-        term_id: int,",
                    "-        confidence: float = 0.7,",
                    "-        source: str = \"auto\",",
                    "-    ) -> int | None:",
                    "-        \"\"\"",
                    "-        Add taxonomy mapping for a chunk.",
                    "-",
                    "-        Args:",
                    "-            chunk_id: Chunk ID",
                    "-            term_id: Taxonomy term ID",
                    "-            confidence: Confidence score (0.0-1.0)",
                    "-            source: 'auto' or 'manual'",
                    "-",
                    "-        Returns:",
                    "-            Mapping ID or None if already exists",
                    "-        \"\"\"",
                    "-        # Check if mapping already exists",
                    "-        cursor = self.execute(",
                    "-            \"SELECT id FROM chunk_taxonomy WHERE chunk_id = %s AND taxonomy_term_id = %s\",",
                    "-            (chunk_id, term_id),",
                    "-        )",
                    "-        existing = cursor.fetchone()",
                    "-        cursor.close()",
                    "-",
                    "-        if existing:",
                    "-            return None",
                    "-",
                    "-        try:",
                    "-            cursor = self.execute(",
                    "-                \"\"\"INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence, source, created_at)",
                    "-                   VALUES (%s, %s, %s, %s, NOW())\"\"\",",
                    "-                (chunk_id, term_id, confidence, source),",
                    "-            )",
                    "-            self.commit()",
                    "-            mapping_id = cursor.lastrowid",
                    "-            cursor.close()",
                    "-            return mapping_id",
                    "-        except Exception as e:",
                    "-            self.log(\"WARNING\", f\"Failed to add chunk taxonomy: {e}\")",
                    "-            return None",
                    "-",
                    "-    def get_chunk_taxonomies(self, chunk_id: int) -> list:",
                    "-        \"\"\"",
                    "-        Get all taxonomy mappings for a chunk.",
                    "-",
                    "-        Args:",
                    "-            chunk_id: Chunk ID",
                    "-",
                    "-        Returns:",
                    "-            List of taxonomy mappings with term details",
                    "-        \"\"\"",
                    "-        cursor = self.execute(",
                    "-            \"\"\"SELECT ct.*, tt.name as term_name, tt.path as term_path",
                    "-               FROM chunk_taxonomy ct",
                    "-               JOIN taxonomy_terms tt ON ct.taxonomy_term_id = tt.id",
                    "-               WHERE ct.chunk_id = %s",
                    "-               ORDER BY ct.confidence DESC\"\"\",",
                    "-            (chunk_id,),",
                    "-        )",
                    "-        results = cursor.fetchall()",
                    "-        cursor.close()",
                    "-        return results",
                    "-",
                    "-    # Entity Taxonomy Mapping",
                    "-    def add_entity_taxonomy(",
                    "-        self,",
                    "-        entity_id: int,",
                    "-        term_id: int,",
                    "-        relevance: float = 0.7,",
                    "-        validated: bool = False,",
                    "-    ) -> int | None:",
                    "-        \"\"\"",
                    "-        Add taxonomy mapping for an entity.",
                    "-",
                    "-        Args:",
                    "-            entity_id: Entity ID",
                    "-            term_id: Taxonomy term ID",
                    "-            relevance: Relevance score (0.0-1.0)",
                    "-            validated: Whether manually validated",
                    "-",
                    "-        Returns:",
                    "-            Mapping ID or None if already exists",
                    "-        \"\"\"",
                    "-        # Check if mapping already exists",
                    "-        cursor = self.execute(",
                    "-            \"SELECT id FROM entity_taxonomy_mapping WHERE entity_id = %s AND taxonomy_term_id = %s\",",
                    "-            (entity_id, term_id),",
                    "-        )",
                    "-        existing = cursor.fetchone()",
                    "-        cursor.close()",
                    "-",
                    "-        if existing:",
                    "-            return None",
                    "-",
                    "-        try:",
                    "-            cursor = self.execute(",
                    "-                \"\"\"INSERT INTO entity_taxonomy_mapping",
                    "-                   (entity_id, taxonomy_term_id, confidence, validated, created_at)",
                    "-                   VALUES (%s, %s, %s, %s, NOW())\"\"\",",
                    "-                (entity_id, term_id, relevance, 1 if validated else 0),",
                    "-            )",
                    "-            self.commit()",
                    "-            mapping_id = cursor.lastrowid",
                    "-            cursor.close()",
                    "-            return mapping_id",
                    "-        except Exception as e:",
                    "-            self.log(\"WARNING\", f\"Failed to add entity taxonomy: {e}\")",
                    "-            return None",
                    "-",
                    "-    def get_entity_taxonomies(self, entity_id: int) -> list:",
                    "-        \"\"\"",
                    "-        Get all taxonomy mappings for an entity.",
                    "-",
                    "-        Args:",
                    "-            entity_id: Entity ID",
                    "-",
                    "-        Returns:",
                    "-            List of taxonomy mappings with term details",
                    "-        \"\"\"",
                    "-        cursor = self.execute(",
                    "-            \"\"\"SELECT etm.*, tt.name as term_name, tt.path as term_path",
                    "-               FROM entity_taxonomy_mapping etm",
                    "-               JOIN taxonomy_terms tt ON etm.taxonomy_term_id = tt.id",
                    "-               WHERE etm.entity_id = %s",
                    "-               ORDER BY etm.relevance DESC\"\"\",",
                    "-            (entity_id,),",
                    "-        )",
                    "-        results = cursor.fetchall()",
                    "-        cursor.close()",
                    "-        return results",
                    "-",
                    "-    def get_taxonomy_terms(self) -> list:",
                    "-        \"\"\"",
                    "-        Get all taxonomy terms for matching.",
                    "-",
                    "-        Returns:",
                    "-            List of taxonomy terms with id, name, path",
                    "-        \"\"\"",
                    "-        cursor = self.execute(\"SELECT id, name, path, parent_id FROM taxonomy_terms ORDER BY path\")",
                    "-        results = cursor.fetchall()",
                    "-        cursor.close()",
                    "-        return results",
                    "-",
                    "-    # Provenance Logging",
                    "-    def log_provenance(",
                    "-        self,",
                    "-        artifact_type: str,",
                    "-        artifact_id: int,",
                    "-        source_type: str,",
                    "-        source_id: int = None,",
                    "-        pipeline_run_id: int = None,",
                    "-        pipeline_step: str = None,",
                    "-        model_used: str = None,",
                    "-        prompt_version: str = None,",
                    "-    ) -> int:",
                    "-        \"\"\"",
                    "-        Log artifact provenance for traceability.",
                    "-",
                    "-        Args:",
                    "-            artifact_type: Type of artifact (document, chunk, entity, relation, statement, embedding)",
                    "-            artifact_id: ID of the artifact",
                    "-            source_type: How it was created (file, extraction, analysis, merge, manual)",
                    "-            source_id: Optional source artifact ID",
                    "-            pipeline_run_id: Optional pipeline run ID",
                    "-            pipeline_step: Optional step name (e.g., 'entity_extract')",
                    "-            model_used: Optional model name (e.g., 'claude-opus-4-5-20251101')",
                    "-            prompt_version: Optional prompt version",
                    "-",
                    "-        Returns:",
                    "-            Provenance record ID",
                    "-        \"\"\"",
                    "-        try:",
                    "-            cursor = self.execute(",
                    "-                \"\"\"INSERT INTO provenance",
                    "-                   (artifact_type, artifact_id, source_type, source_id,",
                    "-                    pipeline_run_id, pipeline_step, model_used, prompt_version, created_at)",
                    "-                   VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())\"\"\",",
                    "-                (",
                    "-                    artifact_type,",
                    "-                    artifact_id,",
                    "-                    source_type,",
                    "-                    source_id,",
                    "-                    pipeline_run_id,",
                    "-                    pipeline_step,",
                    "-                    model_used,",
                    "-                    prompt_version,",
                    "-                ),",
                    "-            )",
                    "-            self.commit()",
                    "-            provenance_id = cursor.lastrowid",
                    "-            cursor.close()",
                    "-            return provenance_id",
                    "-        except Exception as e:",
                    "-            self.log(\"WARNING\", f\"Failed to log provenance: {e}\")",
                    "-            return None",
                    "-",
                    "-",
                    " class PipelineProgress:",
                    "     \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"",
                    " "
                ]
            },
            {
                "oldStart": 728,
                "oldLines": 12,
                "newStart": 58,
                "newLines": 6,
                "lines": [
                    " ",
                    "     def _update(self, **kwargs):",
                    "         \"\"\"Internal: Update database with progress.\"\"\"",
                    "-        from datetime import datetime",
                    "-",
                    "-        import mysql.connector",
                    "-",
                    "-        from config import DB_CONFIG",
                    "-",
                    "         if not self.run_id:",
                    "             return",
                    " "
                ]
            },
            {
                "oldStart": 786,
                "oldLines": 18,
                "newStart": 110,
                "newLines": 14,
                "lines": [
                    " ",
                    "     def add_log(self, message: str):",
                    "         \"\"\"Add log line and update log_tail.\"\"\"",
                    "-        from datetime import datetime",
                    "-",
                    "         timestamp = datetime.now().strftime(\"%H:%M:%S\")",
                    "         self.log_lines.append(f\"[{timestamp}] {message}\")",
                    "         if len(self.log_lines) > self.max_log_lines:",
                    "-            self.log_lines = self.log_lines[-self.max_log_lines :]",
                    "+            self.log_lines = self.log_lines[-self.max_log_lines:]",
                    "         self._update(log_tail=\"\\n\".join(self.log_lines))",
                    " ",
                    "     def complete(self, status: str = \"completed\", error: str = None):",
                    "         \"\"\"Mark run as complete or failed.\"\"\"",
                    "-        from datetime import datetime",
                    "-",
                    "         data = {",
                    "             \"status\": status,",
                    "             \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),"
                ]
            },
            {
                "oldStart": 810,
                "oldLines": 17,
                "newStart": 130,
                "newLines": 16,
                "lines": [
                    " ",
                    "     def is_cancelled(self) -> bool:",
                    "         \"\"\"Check if run has been cancelled by user.\"\"\"",
                    "-        import mysql.connector",
                    "-",
                    "-        from config import DB_CONFIG",
                    "-",
                    "         if not self.run_id:",
                    "             return False",
                    " ",
                    "         try:",
                    "             conn = mysql.connector.connect(**DB_CONFIG)",
                    "             cursor = conn.cursor(dictionary=True)",
                    "-            cursor.execute(\"SELECT status FROM pipeline_runs WHERE id = %s\", (self.run_id,))",
                    "+            cursor.execute(",
                    "+                \"SELECT status FROM pipeline_runs WHERE id = %s\",",
                    "+                (self.run_id,),",
                    "+            )",
                    "             result = cursor.fetchone()",
                    "             cursor.close()",
                    "             conn.close()"
                ]
            },
            {
                "oldStart": 829,
                "oldLines": 5,
                "newStart": 148,
                "newLines": 5,
                "lines": [
                    "             return False",
                    " ",
                    " ",
                    "-# Global database instance",
                    "+# Global database instance for backward compatibility",
                    " db = Database()"
                ]
            }
        ],
        "originalFile": "\"\"\"\nDatabase operations for KI-System Pipeline\n\"\"\"\n\nimport mysql.connector\nfrom mysql.connector import Error\n\nfrom config import DB_CONFIG\nfrom constants import DEFAULT_LIMIT\n\n\nclass Database:\n    \"\"\"MariaDB connection wrapper with document and chunk operations.\"\"\"\n\n    def __init__(self):\n        self.connection = None\n\n    def connect(self):\n        \"\"\"Establish database connection.\"\"\"\n        try:\n            self.connection = mysql.connector.connect(**DB_CONFIG)\n            return True\n        except Error as e:\n            print(f\"Database connection error: {e}\")\n            return False\n\n    def disconnect(self):\n        \"\"\"Close database connection.\"\"\"\n        if self.connection and self.connection.is_connected():\n            self.connection.close()\n\n    def execute(self, query, params=None):\n        \"\"\"Execute a query and return the cursor.\"\"\"\n        cursor = self.connection.cursor(dictionary=True)\n        cursor.execute(query, params or ())\n        return cursor\n\n    def commit(self):\n        \"\"\"Commit the current transaction.\"\"\"\n        self.connection.commit()\n\n    # Document Operations\n    def document_exists(self, file_path):\n        \"\"\"Check if document already exists.\"\"\"\n        cursor = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n        result = cursor.fetchone()\n        cursor.close()\n        return result[\"id\"] if result else None\n\n    def document_is_done(self, file_path):\n        \"\"\"Check if document is already fully processed (status='done').\"\"\"\n        cursor = self.execute(\"SELECT id, status FROM documents WHERE source_path = %s\", (file_path,))\n        result = cursor.fetchone()\n        cursor.close()\n        if result and result[\"status\"] == \"done\":\n            return result[\"id\"]\n        return None\n\n    def insert_document(self, file_path, title, file_type, file_size, file_hash):\n        \"\"\"Insert a new document or update existing one.\"\"\"\n        import os\n\n        folder_path = os.path.dirname(file_path)\n        cursor = self.execute(\n            \"\"\"INSERT INTO documents\n               (source_path, folder_path, filename, mime_type, file_size, file_hash, status)\n               VALUES (%s, %s, %s, %s, %s, %s, 'processing')\n               ON DUPLICATE KEY UPDATE\n               file_hash = VALUES(file_hash),\n               file_size = VALUES(file_size),\n               status = 'processing',\n               processed_at = NULL,\n               error_message = NULL\"\"\",\n            (file_path, folder_path, title, file_type, file_size, file_hash),\n        )\n        self.commit()\n        doc_id = cursor.lastrowid\n        # If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0\n        if doc_id == 0:\n            cursor_select = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n            result = cursor_select.fetchone()\n            cursor_select.close()\n            doc_id = result[\"id\"] if result else None\n        cursor.close()\n        return doc_id\n\n    def update_document_status(self, doc_id, status, error_message=None):\n        \"\"\"Update document processing status.\"\"\"\n        if error_message:\n            cursor = self.execute(\n                \"\"\"UPDATE documents\n                   SET status = %s, error_message = %s, processed_at = NOW()\n                   WHERE id = %s\"\"\",\n                (status, error_message, doc_id),\n            )\n        else:\n            cursor = self.execute(\n                \"UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s\", (status, doc_id)\n            )\n        self.commit()\n        cursor.close()\n\n    # Page Operations\n    def insert_page(self, doc_id, page_number, text_content, token_count=None):\n        \"\"\"Insert a document page.\"\"\"\n        if token_count is None:\n            token_count = len(text_content.split()) if text_content else 0\n        cursor = self.execute(\n            \"\"\"INSERT INTO document_pages\n               (document_id, page_number, text_content, token_count, created_at)\n               VALUES (%s, %s, %s, %s, NOW())\n               ON DUPLICATE KEY UPDATE\n               text_content = VALUES(text_content),\n               token_count = VALUES(token_count)\"\"\",\n            (doc_id, page_number, text_content, token_count),\n        )\n        self.commit()\n        page_id = cursor.lastrowid\n        if page_id == 0:\n            cursor_select = self.execute(\n                \"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s\",\n                (doc_id, page_number),\n            )\n            result = cursor_select.fetchone()\n            cursor_select.close()\n            page_id = result[\"id\"] if result else None\n        cursor.close()\n        return page_id\n\n    def get_page_id(self, doc_id, page_number):\n        \"\"\"Get page ID by document and page number.\"\"\"\n        cursor = self.execute(\n            \"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s\",\n            (doc_id, page_number),\n        )\n        result = cursor.fetchone()\n        cursor.close()\n        return result[\"id\"] if result else None\n\n    # Chunk Operations\n    def insert_chunk(\n        self,\n        doc_id,\n        chunk_index,\n        content,\n        heading_path,\n        position_start=None,\n        position_end=None,\n        metadata=None,\n        page_id=None,\n    ):\n        \"\"\"Insert a text chunk.\"\"\"\n        # Calculate token count (rough estimate: 4 chars per token)\n        token_count = len(content) \/\/ 4\n        cursor = self.execute(\n            \"\"\"INSERT INTO chunks\n               (document_id, page_id, chunk_index, content, token_count, heading_path, metadata)\n               VALUES (%s, %s, %s, %s, %s, %s, %s)\"\"\",\n            (doc_id, page_id, chunk_index, content, token_count, heading_path, metadata),\n        )\n        self.commit()\n        chunk_id = cursor.lastrowid\n        cursor.close()\n        return chunk_id\n\n    def get_chunks_for_embedding(self, limit=DEFAULT_LIMIT):\n        \"\"\"Get chunks that need embeddings.\"\"\"\n        cursor = self.execute(\n            \"\"\"SELECT c.id, c.content, c.document_id\n               FROM chunks c\n               WHERE c.qdrant_id IS NULL\n               ORDER BY c.created_at\n               LIMIT %s\"\"\",\n            (limit,),\n        )\n        results = cursor.fetchall()\n        cursor.close()\n        return results\n\n    def update_chunk_qdrant_id(self, chunk_id, qdrant_id):\n        \"\"\"Update chunk with Qdrant point ID.\"\"\"\n        cursor = self.execute(\"UPDATE chunks SET qdrant_id = %s WHERE id = %s\", (qdrant_id, chunk_id))\n        self.commit()\n        cursor.close()\n\n    # Queue Operations\n    def add_to_queue(self, file_path, action=\"process\"):\n        \"\"\"Add item to pipeline queue.\"\"\"\n        cursor = self.execute(\n            \"\"\"INSERT INTO pipeline_queue\n               (file_path, action, status, retry_count, created_at)\n               VALUES (%s, %s, 'pending', 0, NOW())\"\"\",\n            (file_path, action),\n        )\n        self.commit()\n        queue_id = cursor.lastrowid\n        cursor.close()\n        return queue_id\n\n    def get_pending_queue_items(self, limit=10):\n        \"\"\"Get pending items from queue.\"\"\"\n        cursor = self.execute(\n            \"\"\"SELECT id, file_path, action, retry_count\n               FROM pipeline_queue\n               WHERE status = 'pending'\n               ORDER BY created_at\n               LIMIT %s\"\"\",\n            (limit,),\n        )\n        results = cursor.fetchall()\n        cursor.close()\n        return results\n\n    def update_queue_status(self, queue_id, status, error_message=None):\n        \"\"\"Update queue item status.\"\"\"\n        if status == \"error\" and error_message:\n            cursor = self.execute(\n                \"\"\"UPDATE pipeline_queue\n                   SET status = %s, error_message = %s,\n                       retry_count = retry_count + 1, updated_at = NOW()\n                   WHERE id = %s\"\"\",\n                (status, error_message, queue_id),\n            )\n        else:\n            cursor = self.execute(\n                \"UPDATE pipeline_queue SET status = %s, updated_at = NOW() WHERE id = %s\", (status, queue_id)\n            )\n        self.commit()\n        cursor.close()\n\n    # Logging\n    def log(self, level, message, context=None):\n        \"\"\"Write to pipeline log (ki_dev database).\"\"\"\n        import json\n\n        import mysql.connector\n\n        from config import DB_LOG_CONFIG\n\n        try:\n            # Context must be valid JSON\n            if context is not None:\n                if isinstance(context, str):\n                    context = json.dumps({\"info\": context})\n                elif isinstance(context, dict):\n                    context = json.dumps(context)\n                else:\n                    context = json.dumps({\"data\": str(context)})\n\n            # Use separate connection to ki_dev for logging\n            log_conn = mysql.connector.connect(**DB_LOG_CONFIG)\n            cursor = log_conn.cursor()\n            cursor.execute(\n                \"\"\"INSERT INTO pipeline_log\n                   (level, message, context, created_at)\n                   VALUES (%s, %s, %s, NOW())\"\"\",\n                (level, message, context),\n            )\n            log_conn.commit()\n            cursor.close()\n            log_conn.close()\n        except Exception:  # noqa: S110\n            # Logging should never break the pipeline\n            pass\n\n    # Prompts\n    def get_prompt(self, name, version=None):\n        \"\"\"Get a prompt by name (optionally specific version).\"\"\"\n        if version:\n            cursor = self.execute(\"SELECT content FROM prompts WHERE name = %s AND version = %s\", (name, version))\n        else:\n            cursor = self.execute(\n                \"\"\"SELECT content FROM prompts\n                   WHERE name = %s AND is_active = 1\n                   ORDER BY version DESC LIMIT 1\"\"\",\n                (name,),\n            )\n        result = cursor.fetchone()\n        cursor.close()\n        return result[\"content\"] if result else None\n\n    def ... [TRUNCATED-6133b14c29ef88bd]"
    }
}
← Vorheriger Zur Liste Nächster →