Protokoll #5783

ID5783
Zeitstempel2025-12-23 04:05:22.422593
Clientroot
IP145.224.96.190
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens5,675 (Input: 989, Output: 4,686)
Dauer49530 ms
Request-Zeit2025-12-23 04:05:22.422593
Response-Zeit2025-12-23 04:06:11.952921

Request

{
    "event": "PreToolUse",
    "tool_name": "Edit",
    "tool_input": {
        "file_path": "\/opt\/scripts\/pipeline\/db.py",
        "old_string": "# Global database instance\ndb = Database()",
        "new_string": "class PipelineProgress:\n    \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"\n\n    def __init__(self, run_id: int):\n        self.run_id = run_id\n        self.log_lines = []\n        self.max_log_lines = 20\n\n    def _update(self, **kwargs):\n        \"\"\"Internal: Update database with progress.\"\"\"\n        from datetime import datetime\n\n        import mysql.connector\n\n        from config import DB_CONFIG\n\n        if not self.run_id:\n            return\n\n        kwargs[\"last_update_at\"] = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n        sets = \", \".join(f\"{k} = %s\" for k in kwargs.keys())\n        values = list(kwargs.values())\n        values.append(self.run_id)\n\n        try:\n            conn = mysql.connector.connect(**DB_CONFIG)\n            cursor = conn.cursor()\n            cursor.execute(f\"UPDATE pipeline_runs SET {sets} WHERE id = %s\", values)\n            conn.commit()\n            cursor.close()\n            conn.close()\n        except Exception as e:\n            print(f\"Progress update error: {e}\")\n\n    def update_step(self, step_name: str):\n        \"\"\"Update current pipeline step.\"\"\"\n        self._update(current_step=step_name)\n\n    def update_document(self, filename: str):\n        \"\"\"Update current document being processed.\"\"\"\n        self._update(current_document=filename)\n\n    def update_progress(\n        self,\n        processed: int = None,\n        total: int = None,\n        failed: int = None,\n        chunks: int = None,\n        embeddings: int = None,\n    ):\n        \"\"\"Update numeric progress fields.\"\"\"\n        data = {}\n        if processed is not None:\n            data[\"documents_processed\"] = processed\n        if total is not None:\n            data[\"documents_total\"] = total\n        if failed is not None:\n            data[\"documents_failed\"] = failed\n        if chunks is not None:\n            data[\"chunks_created\"] = chunks\n        if embeddings is not None:\n            data[\"embeddings_created\"] = embeddings\n        if data:\n            self._update(**data)\n\n    def add_log(self, message: str):\n        \"\"\"Add log line and update log_tail.\"\"\"\n        from datetime import datetime\n\n        timestamp = datetime.now().strftime(\"%H:%M:%S\")\n        self.log_lines.append(f\"[{timestamp}] {message}\")\n        if len(self.log_lines) > self.max_log_lines:\n            self.log_lines = self.log_lines[-self.max_log_lines :]\n        self._update(log_tail=\"\\n\".join(self.log_lines))\n\n    def complete(self, status: str = \"completed\", error: str = None):\n        \"\"\"Mark run as complete or failed.\"\"\"\n        from datetime import datetime\n\n        data = {\n            \"status\": status,\n            \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),\n            \"current_step\": None,\n            \"current_document\": None,\n        }\n        if error:\n            data[\"error_log\"] = error\n        self._update(**data)\n\n    def is_cancelled(self) -> bool:\n        \"\"\"Check if run has been cancelled by user.\"\"\"\n        import mysql.connector\n\n        from config import DB_CONFIG\n\n        if not self.run_id:\n            return False\n\n        try:\n            conn = mysql.connector.connect(**DB_CONFIG)\n            cursor = conn.cursor(dictionary=True)\n            cursor.execute(\"SELECT status FROM pipeline_runs WHERE id = %s\", (self.run_id,))\n            result = cursor.fetchone()\n            cursor.close()\n            conn.close()\n            return result and result[\"status\"] == \"cancelled\"\n        except Exception:\n            return False\n\n\n# Global database instance\ndb = Database()"
    }
}

Response

{
    "tool_response": {
        "filePath": "\/opt\/scripts\/pipeline\/db.py",
        "oldString": "# Global database instance\ndb = Database()",
        "newString": "class PipelineProgress:\n    \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"\n\n    def __init__(self, run_id: int):\n        self.run_id = run_id\n        self.log_lines = []\n        self.max_log_lines = 20\n\n    def _update(self, **kwargs):\n        \"\"\"Internal: Update database with progress.\"\"\"\n        from datetime import datetime\n\n        import mysql.connector\n\n        from config import DB_CONFIG\n\n        if not self.run_id:\n            return\n\n        kwargs[\"last_update_at\"] = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n        sets = \", \".join(f\"{k} = %s\" for k in kwargs.keys())\n        values = list(kwargs.values())\n        values.append(self.run_id)\n\n        try:\n            conn = mysql.connector.connect(**DB_CONFIG)\n            cursor = conn.cursor()\n            cursor.execute(f\"UPDATE pipeline_runs SET {sets} WHERE id = %s\", values)\n            conn.commit()\n            cursor.close()\n            conn.close()\n        except Exception as e:\n            print(f\"Progress update error: {e}\")\n\n    def update_step(self, step_name: str):\n        \"\"\"Update current pipeline step.\"\"\"\n        self._update(current_step=step_name)\n\n    def update_document(self, filename: str):\n        \"\"\"Update current document being processed.\"\"\"\n        self._update(current_document=filename)\n\n    def update_progress(\n        self,\n        processed: int = None,\n        total: int = None,\n        failed: int = None,\n        chunks: int = None,\n        embeddings: int = None,\n    ):\n        \"\"\"Update numeric progress fields.\"\"\"\n        data = {}\n        if processed is not None:\n            data[\"documents_processed\"] = processed\n        if total is not None:\n            data[\"documents_total\"] = total\n        if failed is not None:\n            data[\"documents_failed\"] = failed\n        if chunks is not None:\n            data[\"chunks_created\"] = chunks\n        if embeddings is not None:\n            data[\"embeddings_created\"] = embeddings\n        if data:\n            self._update(**data)\n\n    def add_log(self, message: str):\n        \"\"\"Add log line and update log_tail.\"\"\"\n        from datetime import datetime\n\n        timestamp = datetime.now().strftime(\"%H:%M:%S\")\n        self.log_lines.append(f\"[{timestamp}] {message}\")\n        if len(self.log_lines) > self.max_log_lines:\n            self.log_lines = self.log_lines[-self.max_log_lines :]\n        self._update(log_tail=\"\\n\".join(self.log_lines))\n\n    def complete(self, status: str = \"completed\", error: str = None):\n        \"\"\"Mark run as complete or failed.\"\"\"\n        from datetime import datetime\n\n        data = {\n            \"status\": status,\n            \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),\n            \"current_step\": None,\n            \"current_document\": None,\n        }\n        if error:\n            data[\"error_log\"] = error\n        self._update(**data)\n\n    def is_cancelled(self) -> bool:\n        \"\"\"Check if run has been cancelled by user.\"\"\"\n        import mysql.connector\n\n        from config import DB_CONFIG\n\n        if not self.run_id:\n            return False\n\n        try:\n            conn = mysql.connector.connect(**DB_CONFIG)\n            cursor = conn.cursor(dictionary=True)\n            cursor.execute(\"SELECT status FROM pipeline_runs WHERE id = %s\", (self.run_id,))\n            result = cursor.fetchone()\n            cursor.close()\n            conn.close()\n            return result and result[\"status\"] == \"cancelled\"\n        except Exception:\n            return False\n\n\n# Global database instance\ndb = Database()",
        "originalFile": "\"\"\"\nDatabase operations for KI-System Pipeline\n\"\"\"\n\nimport mysql.connector\nfrom mysql.connector import Error\n\nfrom config import DB_CONFIG\n\n\nclass Database:\n    \"\"\"MariaDB connection wrapper with document and chunk operations.\"\"\"\n\n    def __init__(self):\n        self.connection = None\n\n    def connect(self):\n        \"\"\"Establish database connection.\"\"\"\n        try:\n            self.connection = mysql.connector.connect(**DB_CONFIG)\n            return True\n        except Error as e:\n            print(f\"Database connection error: {e}\")\n            return False\n\n    def disconnect(self):\n        \"\"\"Close database connection.\"\"\"\n        if self.connection and self.connection.is_connected():\n            self.connection.close()\n\n    def execute(self, query, params=None):\n        \"\"\"Execute a query and return the cursor.\"\"\"\n        cursor = self.connection.cursor(dictionary=True)\n        cursor.execute(query, params or ())\n        return cursor\n\n    def commit(self):\n        \"\"\"Commit the current transaction.\"\"\"\n        self.connection.commit()\n\n    # Document Operations\n    def document_exists(self, file_path):\n        \"\"\"Check if document already exists.\"\"\"\n        cursor = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n        result = cursor.fetchone()\n        cursor.close()\n        return result[\"id\"] if result else None\n\n    def insert_document(self, file_path, title, file_type, file_size, file_hash):\n        \"\"\"Insert a new document or update existing one.\"\"\"\n        import os\n\n        folder_path = os.path.dirname(file_path)\n        cursor = self.execute(\n            \"\"\"INSERT INTO documents\n               (source_path, folder_path, filename, mime_type, file_size, file_hash, status)\n               VALUES (%s, %s, %s, %s, %s, %s, 'processing')\n               ON DUPLICATE KEY UPDATE\n               file_hash = VALUES(file_hash),\n               file_size = VALUES(file_size),\n               status = 'processing',\n               processed_at = NULL,\n               error_message = NULL\"\"\",\n            (file_path, folder_path, title, file_type, file_size, file_hash),\n        )\n        self.commit()\n        doc_id = cursor.lastrowid\n        # If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0\n        if doc_id == 0:\n            cursor_select = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n            result = cursor_select.fetchone()\n            cursor_select.close()\n            doc_id = result[\"id\"] if result else None\n        cursor.close()\n        return doc_id\n\n    def update_document_status(self, doc_id, status, error_message=None):\n        \"\"\"Update document processing status.\"\"\"\n        if error_message:\n            cursor = self.execute(\n                \"\"\"UPDATE documents\n                   SET status = %s, error_message = %s, processed_at = NOW()\n                   WHERE id = %s\"\"\",\n                (status, error_message, doc_id),\n            )\n        else:\n            cursor = self.execute(\n                \"UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s\", (status, doc_id)\n            )\n        self.commit()\n        cursor.close()\n\n    # Chunk Operations\n    def insert_chunk(\n        self, doc_id, chunk_index, content, heading_path, position_start=None, position_end=None, metadata=None\n    ):\n        \"\"\"Insert a text chunk.\"\"\"\n        # Calculate token count (rough estimate: 4 chars per token)\n        token_count = len(content) \/\/ 4\n        cursor = self.execute(\n            \"\"\"INSERT INTO chunks\n               (document_id, chunk_index, content, token_count, heading_path, metadata)\n               VALUES (%s, %s, %s, %s, %s, %s)\"\"\",\n            (doc_id, chunk_index, content, token_count, heading_path, metadata),\n        )\n        self.commit()\n        chunk_id = cursor.lastrowid\n        cursor.close()\n        return chunk_id\n\n    def get_chunks_for_embedding(self, limit=100):\n        \"\"\"Get chunks that need embeddings.\"\"\"\n        cursor = self.execute(\n            \"\"\"SELECT c.id, c.content, c.document_id\n               FROM chunks c\n               WHERE c.qdrant_id IS NULL\n               ORDER BY c.created_at\n               LIMIT %s\"\"\",\n            (limit,),\n        )\n        results = cursor.fetchall()\n        cursor.close()\n        return results\n\n    def update_chunk_qdrant_id(self, chunk_id, qdrant_id):\n        \"\"\"Update chunk with Qdrant point ID.\"\"\"\n        cursor = self.execute(\"UPDATE chunks SET qdrant_id = %s WHERE id = %s\", (qdrant_id, chunk_id))\n        self.commit()\n        cursor.close()\n\n    # Queue Operations\n    def add_to_queue(self, file_path, action=\"process\"):\n        \"\"\"Add item to pipeline queue.\"\"\"\n        cursor = self.execute(\n            \"\"\"INSERT INTO pipeline_queue\n               (file_path, action, status, retry_count, created_at)\n               VALUES (%s, %s, 'pending', 0, NOW())\"\"\",\n            (file_path, action),\n        )\n        self.commit()\n        queue_id = cursor.lastrowid\n        cursor.close()\n        return queue_id\n\n    def get_pending_queue_items(self, limit=10):\n        \"\"\"Get pending items from queue.\"\"\"\n        cursor = self.execute(\n            \"\"\"SELECT id, file_path, action, retry_count\n               FROM pipeline_queue\n               WHERE status = 'pending'\n               ORDER BY created_at\n               LIMIT %s\"\"\",\n            (limit,),\n        )\n        results = cursor.fetchall()\n        cursor.close()\n        return results\n\n    def update_queue_status(self, queue_id, status, error_message=None):\n        \"\"\"Update queue item status.\"\"\"\n        if status == \"error\" and error_message:\n            cursor = self.execute(\n                \"\"\"UPDATE pipeline_queue\n                   SET status = %s, error_message = %s,\n                       retry_count = retry_count + 1, updated_at = NOW()\n                   WHERE id = %s\"\"\",\n                (status, error_message, queue_id),\n            )\n        else:\n            cursor = self.execute(\n                \"UPDATE pipeline_queue SET status = %s, updated_at = NOW() WHERE id = %s\", (status, queue_id)\n            )\n        self.commit()\n        cursor.close()\n\n    # Logging\n    def log(self, level, message, context=None):\n        \"\"\"Write to pipeline log (ki_dev database).\"\"\"\n        import json\n\n        import mysql.connector\n\n        from config import DB_LOG_CONFIG\n\n        try:\n            # Context must be valid JSON\n            if context is not None:\n                if isinstance(context, str):\n                    context = json.dumps({\"info\": context})\n                elif isinstance(context, dict):\n                    context = json.dumps(context)\n                else:\n                    context = json.dumps({\"data\": str(context)})\n\n            # Use separate connection to ki_dev for logging\n            log_conn = mysql.connector.connect(**DB_LOG_CONFIG)\n            cursor = log_conn.cursor()\n            cursor.execute(\n                \"\"\"INSERT INTO pipeline_log\n                   (level, message, context, created_at)\n                   VALUES (%s, %s, %s, NOW())\"\"\",\n                (level, message, context),\n            )\n            log_conn.commit()\n            cursor.close()\n            log_conn.close()\n        except Exception:  # noqa: S110\n            # Logging should never break the pipeline\n            pass\n\n    # Prompts\n    def get_prompt(self, name, version=None):\n        \"\"\"Get a prompt by name (optionally specific version).\"\"\"\n        if version:\n            cursor = self.execute(\"SELECT content FROM prompts WHERE name = %s AND version = %s\", (name, version))\n        else:\n            cursor = self.execute(\n                \"\"\"SELECT content FROM prompts\n                   WHERE name = %s AND is_active = 1\n                   ORDER BY version DESC LIMIT 1\"\"\",\n                (name,),\n            )\n        result = cursor.fetchone()\n        cursor.close()\n        return result[\"content\"] if result else None\n\n    # Protokoll Logging (LLM calls)\n    def log_to_protokoll(\n        self,\n        client_name,\n        request,\n        response=None,\n        model_name=None,\n        tokens_input=0,\n        tokens_output=0,\n        duration_ms=0,\n        status=\"completed\",\n        error_message=None,\n    ):\n        \"\"\"\n        Log LLM call to ki_dev.protokoll table.\n\n        Args:\n            client_name: Caller identifier (e.g., 'content-studio', 'pipeline')\n            request: The prompt sent to the LLM\n            response: The LLM response\n            model_name: Model used (e.g., 'claude-opus-4-5-20251101')\n            tokens_input: Input token count\n            tokens_output: Output token count\n            duration_ms: Request duration in milliseconds\n            status: 'pending', 'completed', or 'error'\n            error_message: Error details if status is 'error'\n        \"\"\"\n        from datetime import datetime\n\n        import mysql.connector\n\n        from config import DB_PROTOKOLL_CONFIG\n\n        try:\n            conn = mysql.connector.connect(**DB_PROTOKOLL_CONFIG)\n            cursor = conn.cursor()\n\n            now = datetime.now()\n            tokens_total = tokens_input + tokens_output\n\n            cursor.execute(\n                \"\"\"INSERT INTO protokoll\n                   (request_ip, client_name, request, request_timestamp,\n                    response, response_timestamp, duration_ms,\n                    tokens_input, tokens_output, tokens_total,\n                    model_name, status, error_message)\n                   VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\"\"\",\n                (\n                    \"127.0.0.1\",  # Local pipeline call\n                    client_name,\n                    request[:65000] if request else None,  # Truncate if too long\n                    now,\n                    response[:65000] if response else None,  # Truncate if too long\n                    now if response else None,\n                    duration_ms,\n                    tokens_input,\n... [TRUNCATED-8c65cd83e3db3f82]",
        "structuredPatch": [
            {
                "oldStart": 298,
                "oldLines": 5,
                "newStart": 298,
                "newLines": 116,
                "lines": [
                    "             return None",
                    " ",
                    " ",
                    "+class PipelineProgress:",
                    "+    \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"",
                    "+",
                    "+    def __init__(self, run_id: int):",
                    "+        self.run_id = run_id",
                    "+        self.log_lines = []",
                    "+        self.max_log_lines = 20",
                    "+",
                    "+    def _update(self, **kwargs):",
                    "+        \"\"\"Internal: Update database with progress.\"\"\"",
                    "+        from datetime import datetime",
                    "+",
                    "+        import mysql.connector",
                    "+",
                    "+        from config import DB_CONFIG",
                    "+",
                    "+        if not self.run_id:",
                    "+            return",
                    "+",
                    "+        kwargs[\"last_update_at\"] = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")",
                    "+",
                    "+        sets = \", \".join(f\"{k} = %s\" for k in kwargs.keys())",
                    "+        values = list(kwargs.values())",
                    "+        values.append(self.run_id)",
                    "+",
                    "+        try:",
                    "+            conn = mysql.connector.connect(**DB_CONFIG)",
                    "+            cursor = conn.cursor()",
                    "+            cursor.execute(f\"UPDATE pipeline_runs SET {sets} WHERE id = %s\", values)",
                    "+            conn.commit()",
                    "+            cursor.close()",
                    "+            conn.close()",
                    "+        except Exception as e:",
                    "+            print(f\"Progress update error: {e}\")",
                    "+",
                    "+    def update_step(self, step_name: str):",
                    "+        \"\"\"Update current pipeline step.\"\"\"",
                    "+        self._update(current_step=step_name)",
                    "+",
                    "+    def update_document(self, filename: str):",
                    "+        \"\"\"Update current document being processed.\"\"\"",
                    "+        self._update(current_document=filename)",
                    "+",
                    "+    def update_progress(",
                    "+        self,",
                    "+        processed: int = None,",
                    "+        total: int = None,",
                    "+        failed: int = None,",
                    "+        chunks: int = None,",
                    "+        embeddings: int = None,",
                    "+    ):",
                    "+        \"\"\"Update numeric progress fields.\"\"\"",
                    "+        data = {}",
                    "+        if processed is not None:",
                    "+            data[\"documents_processed\"] = processed",
                    "+        if total is not None:",
                    "+            data[\"documents_total\"] = total",
                    "+        if failed is not None:",
                    "+            data[\"documents_failed\"] = failed",
                    "+        if chunks is not None:",
                    "+            data[\"chunks_created\"] = chunks",
                    "+        if embeddings is not None:",
                    "+            data[\"embeddings_created\"] = embeddings",
                    "+        if data:",
                    "+            self._update(**data)",
                    "+",
                    "+    def add_log(self, message: str):",
                    "+        \"\"\"Add log line and update log_tail.\"\"\"",
                    "+        from datetime import datetime",
                    "+",
                    "+        timestamp = datetime.now().strftime(\"%H:%M:%S\")",
                    "+        self.log_lines.append(f\"[{timestamp}] {message}\")",
                    "+        if len(self.log_lines) > self.max_log_lines:",
                    "+            self.log_lines = self.log_lines[-self.max_log_lines :]",
                    "+        self._update(log_tail=\"\\n\".join(self.log_lines))",
                    "+",
                    "+    def complete(self, status: str = \"completed\", error: str = None):",
                    "+        \"\"\"Mark run as complete or failed.\"\"\"",
                    "+        from datetime import datetime",
                    "+",
                    "+        data = {",
                    "+            \"status\": status,",
                    "+            \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),",
                    "+            \"current_step\": None,",
                    "+            \"current_document\": None,",
                    "+        }",
                    "+        if error:",
                    "+            data[\"error_log\"] = error",
                    "+        self._update(**data)",
                    "+",
                    "+    def is_cancelled(self) -> bool:",
                    "+        \"\"\"Check if run has been cancelled by user.\"\"\"",
                    "+        import mysql.connector",
                    "+",
                    "+        from config import DB_CONFIG",
                    "+",
                    "+        if not self.run_id:",
                    "+            return False",
                    "+",
                    "+        try:",
                    "+            conn = mysql.connector.connect(**DB_CONFIG)",
                    "+            cursor = conn.cursor(dictionary=True)",
                    "+            cursor.execute(\"SELECT status FROM pipeline_runs WHERE id = %s\", (self.run_id,))",
                    "+            result = cursor.fetchone()",
                    "+            cursor.close()",
                    "+            conn.close()",
                    "+            return result and result[\"status\"] == \"cancelled\"",
                    "+        except Exception:",
                    "+            return False",
                    "+",
                    "+",
                    " # Global database instance",
                    " db = Database()"
                ]
            }
        ],
        "userModified": false,
        "replaceAll": false
    }
}
← Vorheriger Zur Liste Nächster →