{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/scripts\/pipeline\/db.py",
"content": "\"\"\"\nDatabase operations for KI-System Pipeline\n\nComposition layer that combines all database mixins into a single Database class.\nMaintains full backward compatibility with existing imports.\n\nArchitecture:\n db_core.py - Connection management (DatabaseCore)\n db_documents.py - Document\/Page\/Chunk CRUD (DocumentsMixin)\n db_queue.py - Pipeline queue operations (QueueMixin)\n db_logging.py - Logging operations (LoggingMixin)\n db_semantic.py - Semantic operations (SemanticMixin)\n db_prompts.py - Prompt operations (PromptsMixin)\n\"\"\"\n\nfrom datetime import datetime\n\nimport mysql.connector\n\nfrom config import DB_CONFIG\nfrom db_core import DatabaseCore\nfrom db_documents import DocumentsMixin\nfrom db_logging import LoggingMixin\nfrom db_prompts import PromptsMixin\nfrom db_queue import QueueMixin\nfrom db_semantic import SemanticMixin\n\n\nclass Database(\n DatabaseCore,\n DocumentsMixin,\n QueueMixin,\n LoggingMixin,\n SemanticMixin,\n PromptsMixin,\n):\n \"\"\"MariaDB connection wrapper with all database operations.\n\n Combines all mixins via multiple inheritance (MRO):\n - DatabaseCore: connect, disconnect, execute, commit\n - DocumentsMixin: document\/page\/chunk CRUD\n - QueueMixin: pipeline queue operations\n - LoggingMixin: log, log_to_protokoll, log_provenance\n - SemanticMixin: entity types, stopwords, taxonomy\n - PromptsMixin: prompt retrieval\n \"\"\"\n\n pass\n\n\nclass PipelineProgress:\n \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"\n\n def __init__(self, run_id: int):\n self.run_id = run_id\n self.log_lines = []\n self.max_log_lines = 200 # Keep full log history\n\n def _update(self, **kwargs):\n \"\"\"Internal: Update database with progress.\"\"\"\n if not self.run_id:\n return\n\n kwargs[\"last_update_at\"] = datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n sets = \", \".join(f\"{k} = %s\" for k in kwargs)\n values = list(kwargs.values())\n values.append(self.run_id)\n\n try:\n conn = mysql.connector.connect(**DB_CONFIG)\n cursor = conn.cursor()\n cursor.execute(f\"UPDATE pipeline_runs SET {sets} WHERE id = %s\", values) # noqa: S608\n conn.commit()\n cursor.close()\n conn.close()\n except Exception as e:\n print(f\"Progress update error: {e}\")\n\n def update_step(self, step_name: str):\n \"\"\"Update current pipeline step.\"\"\"\n self._update(current_step=step_name)\n\n def update_document(self, filename: str):\n \"\"\"Update current document being processed.\"\"\"\n self._update(current_document=filename)\n\n def update_progress(\n self,\n processed: int = None,\n total: int = None,\n failed: int = None,\n chunks: int = None,\n embeddings: int = None,\n ):\n \"\"\"Update numeric progress fields.\"\"\"\n data = {}\n if processed is not None:\n data[\"documents_processed\"] = processed\n if total is not None:\n data[\"documents_total\"] = total\n if failed is not None:\n data[\"documents_failed\"] = failed\n if chunks is not None:\n data[\"chunks_created\"] = chunks\n if embeddings is not None:\n data[\"embeddings_created\"] = embeddings\n if data:\n self._update(**data)\n\n def add_log(self, message: str):\n \"\"\"Add log line and update log_tail.\"\"\"\n timestamp = datetime.now().strftime(\"%H:%M:%S\")\n self.log_lines.append(f\"[{timestamp}] {message}\")\n if len(self.log_lines) > self.max_log_lines:\n self.log_lines = self.log_lines[-self.max_log_lines:]\n self._update(log_tail=\"\\n\".join(self.log_lines))\n\n def complete(self, status: str = \"completed\", error: str = None):\n \"\"\"Mark run as complete or failed.\"\"\"\n data = {\n \"status\": status,\n \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),\n \"current_step\": None,\n \"current_document\": None,\n }\n if error:\n data[\"error_log\"] = error\n self._update(**data)\n\n def is_cancelled(self) -> bool:\n \"\"\"Check if run has been cancelled by user.\"\"\"\n if not self.run_id:\n return False\n\n try:\n conn = mysql.connector.connect(**DB_CONFIG)\n cursor = conn.cursor(dictionary=True)\n cursor.execute(\n \"SELECT status FROM pipeline_runs WHERE id = %s\",\n (self.run_id,),\n )\n result = cursor.fetchone()\n cursor.close()\n conn.close()\n return result and result[\"status\"] == \"cancelled\"\n except Exception:\n return False\n\n\n# Global database instance for backward compatibility\ndb = Database()\n",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 723,
"newStart": 1,
"newLines": 53,
"lines": [
" \"\"\"",
" Database operations for KI-System Pipeline",
"+",
"+Composition layer that combines all database mixins into a single Database class.",
"+Maintains full backward compatibility with existing imports.",
"+",
"+Architecture:",
"+ db_core.py - Connection management (DatabaseCore)",
"+ db_documents.py - Document\/Page\/Chunk CRUD (DocumentsMixin)",
"+ db_queue.py - Pipeline queue operations (QueueMixin)",
"+ db_logging.py - Logging operations (LoggingMixin)",
"+ db_semantic.py - Semantic operations (SemanticMixin)",
"+ db_prompts.py - Prompt operations (PromptsMixin)",
" \"\"\"",
" ",
"+from datetime import datetime",
"+",
" import mysql.connector",
"-from mysql.connector import Error",
" ",
" from config import DB_CONFIG",
"-from constants import DEFAULT_LIMIT",
"+from db_core import DatabaseCore",
"+from db_documents import DocumentsMixin",
"+from db_logging import LoggingMixin",
"+from db_prompts import PromptsMixin",
"+from db_queue import QueueMixin",
"+from db_semantic import SemanticMixin",
" ",
" ",
"-class Database:",
"- \"\"\"MariaDB connection wrapper with document and chunk operations.\"\"\"",
"+class Database(",
"+ DatabaseCore,",
"+ DocumentsMixin,",
"+ QueueMixin,",
"+ LoggingMixin,",
"+ SemanticMixin,",
"+ PromptsMixin,",
"+):",
"+ \"\"\"MariaDB connection wrapper with all database operations.",
" ",
"- def __init__(self):",
"- self.connection = None",
"+ Combines all mixins via multiple inheritance (MRO):",
"+ - DatabaseCore: connect, disconnect, execute, commit",
"+ - DocumentsMixin: document\/page\/chunk CRUD",
"+ - QueueMixin: pipeline queue operations",
"+ - LoggingMixin: log, log_to_protokoll, log_provenance",
"+ - SemanticMixin: entity types, stopwords, taxonomy",
"+ - PromptsMixin: prompt retrieval",
"+ \"\"\"",
" ",
"- def connect(self):",
"- \"\"\"Establish database connection.\"\"\"",
"- try:",
"- self.connection = mysql.connector.connect(**DB_CONFIG)",
"- return True",
"- except Error as e:",
"- print(f\"Database connection error: {e}\")",
"- return False",
"+ pass",
" ",
"- def disconnect(self):",
"- \"\"\"Close database connection.\"\"\"",
"- if self.connection and self.connection.is_connected():",
"- self.connection.close()",
" ",
"- def execute(self, query, params=None):",
"- \"\"\"Execute a query and return the cursor.\"\"\"",
"- cursor = self.connection.cursor(dictionary=True)",
"- cursor.execute(query, params or ())",
"- return cursor",
"-",
"- def commit(self):",
"- \"\"\"Commit the current transaction.\"\"\"",
"- self.connection.commit()",
"-",
"- # Document Operations",
"- def document_exists(self, file_path):",
"- \"\"\"Check if document already exists.\"\"\"",
"- cursor = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))",
"- result = cursor.fetchone()",
"- cursor.close()",
"- return result[\"id\"] if result else None",
"-",
"- def document_is_done(self, file_path):",
"- \"\"\"Check if document is already fully processed (status='done').\"\"\"",
"- cursor = self.execute(\"SELECT id, status FROM documents WHERE source_path = %s\", (file_path,))",
"- result = cursor.fetchone()",
"- cursor.close()",
"- if result and result[\"status\"] == \"done\":",
"- return result[\"id\"]",
"- return None",
"-",
"- def insert_document(self, file_path, title, file_type, file_size, file_hash):",
"- \"\"\"Insert a new document or update existing one.\"\"\"",
"- import os",
"-",
"- folder_path = os.path.dirname(file_path)",
"- cursor = self.execute(",
"- \"\"\"INSERT INTO documents",
"- (source_path, folder_path, filename, mime_type, file_size, file_hash, status)",
"- VALUES (%s, %s, %s, %s, %s, %s, 'processing')",
"- ON DUPLICATE KEY UPDATE",
"- file_hash = VALUES(file_hash),",
"- file_size = VALUES(file_size),",
"- status = 'processing',",
"- processed_at = NULL,",
"- error_message = NULL\"\"\",",
"- (file_path, folder_path, title, file_type, file_size, file_hash),",
"- )",
"- self.commit()",
"- doc_id = cursor.lastrowid",
"- # If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0",
"- if doc_id == 0:",
"- cursor_select = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))",
"- result = cursor_select.fetchone()",
"- cursor_select.close()",
"- doc_id = result[\"id\"] if result else None",
"- cursor.close()",
"- return doc_id",
"-",
"- def update_document_status(self, doc_id, status, error_message=None):",
"- \"\"\"Update document processing status.\"\"\"",
"- if error_message:",
"- cursor = self.execute(",
"- \"\"\"UPDATE documents",
"- SET status = %s, error_message = %s, processed_at = NOW()",
"- WHERE id = %s\"\"\",",
"- (status, error_message, doc_id),",
"- )",
"- else:",
"- cursor = self.execute(",
"- \"UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s\", (status, doc_id)",
"- )",
"- self.commit()",
"- cursor.close()",
"-",
"- # Page Operations",
"- def insert_page(self, doc_id, page_number, text_content, token_count=None):",
"- \"\"\"Insert a document page.\"\"\"",
"- if token_count is None:",
"- token_count = len(text_content.split()) if text_content else 0",
"- cursor = self.execute(",
"- \"\"\"INSERT INTO document_pages",
"- (document_id, page_number, text_content, token_count, created_at)",
"- VALUES (%s, %s, %s, %s, NOW())",
"- ON DUPLICATE KEY UPDATE",
"- text_content = VALUES(text_content),",
"- token_count = VALUES(token_count)\"\"\",",
"- (doc_id, page_number, text_content, token_count),",
"- )",
"- self.commit()",
"- page_id = cursor.lastrowid",
"- if page_id == 0:",
"- cursor_select = self.execute(",
"- \"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s\",",
"- (doc_id, page_number),",
"- )",
"- result = cursor_select.fetchone()",
"- cursor_select.close()",
"- page_id = result[\"id\"] if result else None",
"- cursor.close()",
"- return page_id",
"-",
"- def get_page_id(self, doc_id, page_number):",
"- \"\"\"Get page ID by document and page number.\"\"\"",
"- cursor = self.execute(",
"- \"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s\",",
"- (doc_id, page_number),",
"- )",
"- result = cursor.fetchone()",
"- cursor.close()",
"- return result[\"id\"] if result else None",
"-",
"- # Chunk Operations",
"- def insert_chunk(",
"- self,",
"- doc_id,",
"- chunk_index,",
"- content,",
"- heading_path,",
"- position_start=None,",
"- position_end=None,",
"- metadata=None,",
"- page_id=None,",
"- ):",
"- \"\"\"Insert a text chunk.\"\"\"",
"- # Calculate token count (rough estimate: 4 chars per token)",
"- token_count = len(content) \/\/ 4",
"- cursor = self.execute(",
"- \"\"\"INSERT INTO chunks",
"- (document_id, page_id, chunk_index, content, token_count, heading_path, metadata)",
"- VALUES (%s, %s, %s, %s, %s, %s, %s)\"\"\",",
"- (doc_id, page_id, chunk_index, content, token_count, heading_path, metadata),",
"- )",
"- self.commit()",
"- chunk_id = cursor.lastrowid",
"- cursor.close()",
"- return chunk_id",
"-",
"- def get_chunks_for_embedding(self, limit=DEFAULT_LIMIT):",
"- \"\"\"Get chunks that need embeddings.\"\"\"",
"- cursor = self.execute(",
"- \"\"\"SELECT c.id, c.content, c.document_id",
"- FROM chunks c",
"- WHERE c.qdrant_id IS NULL",
"- ORDER BY c.created_at",
"- LIMIT %s\"\"\",",
"- (limit,),",
"- )",
"- results = cursor.fetchall()",
"- cursor.close()",
"- return results",
"-",
"- def update_chunk_qdrant_id(self, chunk_id, qdrant_id):",
"- \"\"\"Update chunk with Qdrant point ID.\"\"\"",
"- cursor = self.execute(\"UPDATE chunks SET qdrant_id = %s WHERE id = %s\", (qdrant_id, chunk_id))",
"- self.commit()",
"- cursor.close()",
"-",
"- # Queue Operations",
"- def add_to_queue(self, file_path, action=\"process\"):",
"- \"\"\"Add item to pipeline queue.\"\"\"",
"- cursor = self.execute(",
"- \"\"\"INSERT INTO pipeline_queue",
"- (file_path, action, status, retry_count, created_at)",
"- VALUES (%s, %s, 'pending', 0, NOW())\"\"\",",
"- (file_path, action),",
"- )",
"- self.commit()",
"- queue_id = cursor.lastrowid",
"- cursor.close()",
"- return queue_id",
"-",
"- def get_pending_queue_items(self, limit=10):",
"- \"\"\"Get pending items from queue.\"\"\"",
"- cursor = self.execute(",
"- \"\"\"SELECT id, file_path, action, retry_count",
"- FROM pipeline_queue",
"- WHERE status = 'pending'",
"- ORDER BY created_at",
"- LIMIT %s\"\"\",",
"- (limit,),",
"- )",
"- results = cursor.fetchall()",
"- cursor.close()",
"- return results",
"-",
"- def update_queue_status(self, queue_id, status, error_message=None):",
"- \"\"\"Update queue item status.\"\"\"",
"- if status == \"error\" and error_message:",
"- cursor = self.execute(",
"- \"\"\"UPDATE pipeline_queue",
"- SET status = %s, error_message = %s,",
"- retry_count = retry_count + 1, updated_at = NOW()",
"- WHERE id = %s\"\"\",",
"- (status, error_message, queue_id),",
"- )",
"- else:",
"- cursor = self.execute(",
"- \"UPDATE pipeline_queue SET status = %s, updated_at = NOW() WHERE id = %s\", (status, queue_id)",
"- )",
"- self.commit()",
"- cursor.close()",
"-",
"- # Logging",
"- def log(self, level, message, context=None):",
"- \"\"\"Write to pipeline log (ki_dev database).\"\"\"",
"- import json",
"-",
"- import mysql.connector",
"-",
"- from config import DB_LOG_CONFIG",
"-",
"- try:",
"- # Context must be valid JSON",
"- if context is not None:",
"- if isinstance(context, str):",
"- context = json.dumps({\"info\": context})",
"- elif isinstance(context, dict):",
"- context = json.dumps(context)",
"- else:",
"- context = json.dumps({\"data\": str(context)})",
"-",
"- # Use separate connection to ki_dev for logging",
"- log_conn = mysql.connector.connect(**DB_LOG_CONFIG)",
"- cursor = log_conn.cursor()",
"- cursor.execute(",
"- \"\"\"INSERT INTO pipeline_log",
"- (level, message, context, created_at)",
"- VALUES (%s, %s, %s, NOW())\"\"\",",
"- (level, message, context),",
"- )",
"- log_conn.commit()",
"- cursor.close()",
"- log_conn.close()",
"- except Exception: # noqa: S110",
"- # Logging should never break the pipeline",
"- pass",
"-",
"- # Prompts",
"- def get_prompt(self, name, version=None):",
"- \"\"\"Get a prompt by name (optionally specific version).\"\"\"",
"- if version:",
"- cursor = self.execute(\"SELECT content FROM prompts WHERE name = %s AND version = %s\", (name, version))",
"- else:",
"- cursor = self.execute(",
"- \"\"\"SELECT content FROM prompts",
"- WHERE name = %s AND is_active = 1",
"- ORDER BY version DESC LIMIT 1\"\"\",",
"- (name,),",
"- )",
"- result = cursor.fetchone()",
"- cursor.close()",
"- return result[\"content\"] if result else None",
"-",
"- def get_prompt_by_use_case(self, use_case: str, version: str = None) -> dict | None:",
"- \"\"\"",
"- Get prompt by use_case with full metadata for provenance tracking.",
"-",
"- Args:",
"- use_case: The use case (entity_extraction, semantic_analysis, statement_extraction, etc.)",
"- version: Optional specific version (otherwise latest active)",
"-",
"- Returns:",
"- Dict with id, name, version, content, use_case or None",
"- \"\"\"",
"- if version:",
"- cursor = self.execute(",
"- \"\"\"SELECT id, name, version, content, use_case",
"- FROM prompts",
"- WHERE use_case = %s AND version = %s\"\"\",",
"- (use_case, version),",
"- )",
"- else:",
"- cursor = self.execute(",
"- \"\"\"SELECT id, name, version, content, use_case",
"- FROM prompts",
"- WHERE use_case = %s AND is_active = 1",
"- ORDER BY version DESC LIMIT 1\"\"\",",
"- (use_case,),",
"- )",
"- result = cursor.fetchone()",
"- cursor.close()",
"- return result",
"-",
"- # Entity Types",
"- def get_entity_types(self, active_only: bool = True) -> list[dict]:",
"- \"\"\"Get all entity types from database.",
"-",
"- Returns:",
"- List of dicts with code, name, description, criteria, indicators, examples",
"- \"\"\"",
"- query = \"\"\"SELECT code, name, description, criteria, indicators, examples",
"- FROM entity_types\"\"\"",
"- if active_only:",
"- query += \" WHERE is_active = 1\"",
"- query += \" ORDER BY sort_order\"",
"-",
"- cursor = self.execute(query)",
"- results = cursor.fetchall()",
"- cursor.close()",
"- return list(results) if results else []",
"-",
"- def get_entity_type_codes(self) -> set[str]:",
"- \"\"\"Get set of valid entity type codes.\"\"\"",
"- cursor = self.execute(\"SELECT code FROM entity_types WHERE is_active = 1\")",
"- results = cursor.fetchall()",
"- cursor.close()",
"- return {r[\"code\"] for r in results} if results else set()",
"-",
"- def build_entity_prompt_categories(self) -> str:",
"- \"\"\"Build categories section for entity extraction prompt from DB.\"\"\"",
"- types = self.get_entity_types()",
"- lines = []",
"- for t in types:",
"- lines.append(f\" {t['code']}: {t['criteria']}\")",
"- return \"\\n\".join(lines)",
"-",
"- # Stopwords",
"- def get_stopwords(self, active_only: bool = True) -> list[str]:",
"- \"\"\"Get list of stopword canonical forms for entity filtering.",
"-",
"- Returns:",
"- List of canonical stopword strings (lowercase, normalized)",
"- \"\"\"",
"- query = \"SELECT canonical_form FROM stopwords\"",
"- if active_only:",
"- query += \" WHERE is_active = 1\"",
"-",
"- cursor = self.execute(query)",
"- results = cursor.fetchall()",
"- cursor.close()",
"- return [r[\"canonical_form\"] for r in results] if results else []",
"-",
"- def is_stopword(self, word: str) -> bool:",
"- \"\"\"Check if a word is in the stopword list.\"\"\"",
"- canonical = self._normalize_stopword(word)",
"- stopwords = self.get_stopwords()",
"- return canonical in stopwords",
"-",
"- def _normalize_stopword(self, word: str) -> str:",
"- \"\"\"Normalize word to canonical form for stopword matching.\"\"\"",
"- import re",
"- import unicodedata",
"-",
"- result = word.lower().strip()",
"- # German umlauts",
"- replacements = {\"ä\": \"ae\", \"ö\": \"oe\", \"ü\": \"ue\", \"ß\": \"ss\"}",
"- for old, new in replacements.items():",
"- result = result.replace(old, new)",
"- # Normalize unicode",
"- result = unicodedata.normalize(\"NFKD\", result)",
"- result = result.encode(\"ascii\", \"ignore\").decode(\"ascii\")",
"- # Keep only alphanumeric",
"- result = re.sub(r\"[^a-z0-9]\", \"\", result)",
"- return result",
"-",
"- # Protokoll Logging (LLM calls)",
"- def log_to_protokoll(",
"- self,",
"- client_name,",
"- request,",
"- response=None,",
"- model_name=None,",
"- tokens_input=0,",
"- tokens_output=0,",
"- duration_ms=0,",
"- status=\"completed\",",
"- error_message=None,",
"- ):",
"- \"\"\"",
"- Log LLM call to ki_dev.protokoll table.",
"-",
"- Args:",
"- client_name: Caller identifier (e.g., 'content-studio', 'pipeline')",
"- request: The prompt sent to the LLM",
"- response: The LLM response",
"- model_name: Model used (e.g., 'claude-opus-4-5-20251101')",
"- tokens_input: Input token count",
"- tokens_output: Output token count",
"- duration_ms: Request duration in milliseconds",
"- status: 'pending', 'completed', or 'error'",
"- error_message: Error details if status is 'error'",
"- \"\"\"",
"- from datetime import datetime",
"-",
"- import mysql.connector",
"-",
"- from config import DB_PROTOKOLL_CONFIG",
"-",
"- try:",
"- conn = mysql.connector.connect(**DB_PROTOKOLL_CONFIG)",
"- cursor = conn.cursor()",
"-",
"- now = datetime.now()",
"- tokens_total = tokens_input + tokens_output",
"-",
"- cursor.execute(",
"- \"\"\"INSERT INTO protokoll",
"- (request_ip, client_name, request, request_timestamp,",
"- response, response_timestamp, duration_ms,",
"- tokens_input, tokens_output, tokens_total,",
"- model_name, status, error_message)",
"- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)\"\"\",",
"- (",
"- \"127.0.0.1\", # Local pipeline call",
"- client_name,",
"- request[:65000] if request else None, # Truncate if too long",
"- now,",
"- response[:65000] if response else None, # Truncate if too long",
"- now if response else None,",
"- duration_ms,",
"- tokens_input,",
"- tokens_output,",
"- tokens_total,",
"- model_name,",
"- status,",
"- error_message,",
"- ),",
"- )",
"- conn.commit()",
"- protokoll_id = cursor.lastrowid",
"- cursor.close()",
"- conn.close()",
"- return protokoll_id",
"- except Exception as e:",
"- # Logging should never break the pipeline",
"- print(f\"Protokoll logging error: {e}\")",
"- return None",
"-",
"- # Entity Synonyms",
"- def find_entity_by_synonym(self, synonym: str) -> dict | None:",
"- \"\"\"",
"- Find entity by synonym.",
"-",
"- Args:",
"- synonym: Synonym to search for",
"-",
"- Returns:",
"- Dict with entity_id or None",
"- \"\"\"",
"- cursor = self.execute(",
"- \"SELECT entity_id FROM entity_synonyms WHERE synonym = %s LIMIT 1\",",
"- (synonym,),",
"- )",
"- result = cursor.fetchone()",
"- cursor.close()",
"- return result",
"-",
"- def add_synonym(",
"- self,",
"- entity_id: int,",
"- synonym: str,",
"- source: str = \"extraction\",",
"- language: str = \"de\",",
"- ) -> int | None:",
"- \"\"\"",
"- Add synonym to entity if not exists.",
"-",
"- Args:",
"- entity_id: Entity ID to add synonym to",
"- synonym: The synonym text",
"- source: How it was found (extraction, manual, merge)",
"- language: Language code",
"-",
"- Returns:",
"- Synonym ID or None if already exists",
"- \"\"\"",
"- # Check if synonym already exists for this entity",
"- cursor = self.execute(",
"- \"SELECT id FROM entity_synonyms WHERE entity_id = %s AND synonym = %s\",",
"- (entity_id, synonym),",
"- )",
"- existing = cursor.fetchone()",
"- cursor.close()",
"-",
"- if existing:",
"- return None",
"-",
"- try:",
"- cursor = self.execute(",
"- \"\"\"INSERT INTO entity_synonyms (entity_id, synonym, source, language, created_at)",
"- VALUES (%s, %s, %s, %s, NOW())\"\"\",",
"- (entity_id, synonym, source, language),",
"- )",
"- self.commit()",
"- syn_id = cursor.lastrowid",
"- cursor.close()",
"- return syn_id",
"- except Exception as e:",
"- self.log(\"WARNING\", f\"Failed to add synonym: {e}\")",
"- return None",
"-",
"- # Chunk Taxonomy Mapping",
"- def add_chunk_taxonomy(",
"- self,",
"- chunk_id: int,",
"- term_id: int,",
"- confidence: float = 0.7,",
"- source: str = \"auto\",",
"- ) -> int | None:",
"- \"\"\"",
"- Add taxonomy mapping for a chunk.",
"-",
"- Args:",
"- chunk_id: Chunk ID",
"- term_id: Taxonomy term ID",
"- confidence: Confidence score (0.0-1.0)",
"- source: 'auto' or 'manual'",
"-",
"- Returns:",
"- Mapping ID or None if already exists",
"- \"\"\"",
"- # Check if mapping already exists",
"- cursor = self.execute(",
"- \"SELECT id FROM chunk_taxonomy WHERE chunk_id = %s AND taxonomy_term_id = %s\",",
"- (chunk_id, term_id),",
"- )",
"- existing = cursor.fetchone()",
"- cursor.close()",
"-",
"- if existing:",
"- return None",
"-",
"- try:",
"- cursor = self.execute(",
"- \"\"\"INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence, source, created_at)",
"- VALUES (%s, %s, %s, %s, NOW())\"\"\",",
"- (chunk_id, term_id, confidence, source),",
"- )",
"- self.commit()",
"- mapping_id = cursor.lastrowid",
"- cursor.close()",
"- return mapping_id",
"- except Exception as e:",
"- self.log(\"WARNING\", f\"Failed to add chunk taxonomy: {e}\")",
"- return None",
"-",
"- def get_chunk_taxonomies(self, chunk_id: int) -> list:",
"- \"\"\"",
"- Get all taxonomy mappings for a chunk.",
"-",
"- Args:",
"- chunk_id: Chunk ID",
"-",
"- Returns:",
"- List of taxonomy mappings with term details",
"- \"\"\"",
"- cursor = self.execute(",
"- \"\"\"SELECT ct.*, tt.name as term_name, tt.path as term_path",
"- FROM chunk_taxonomy ct",
"- JOIN taxonomy_terms tt ON ct.taxonomy_term_id = tt.id",
"- WHERE ct.chunk_id = %s",
"- ORDER BY ct.confidence DESC\"\"\",",
"- (chunk_id,),",
"- )",
"- results = cursor.fetchall()",
"- cursor.close()",
"- return results",
"-",
"- # Entity Taxonomy Mapping",
"- def add_entity_taxonomy(",
"- self,",
"- entity_id: int,",
"- term_id: int,",
"- relevance: float = 0.7,",
"- validated: bool = False,",
"- ) -> int | None:",
"- \"\"\"",
"- Add taxonomy mapping for an entity.",
"-",
"- Args:",
"- entity_id: Entity ID",
"- term_id: Taxonomy term ID",
"- relevance: Relevance score (0.0-1.0)",
"- validated: Whether manually validated",
"-",
"- Returns:",
"- Mapping ID or None if already exists",
"- \"\"\"",
"- # Check if mapping already exists",
"- cursor = self.execute(",
"- \"SELECT id FROM entity_taxonomy_mapping WHERE entity_id = %s AND taxonomy_term_id = %s\",",
"- (entity_id, term_id),",
"- )",
"- existing = cursor.fetchone()",
"- cursor.close()",
"-",
"- if existing:",
"- return None",
"-",
"- try:",
"- cursor = self.execute(",
"- \"\"\"INSERT INTO entity_taxonomy_mapping",
"- (entity_id, taxonomy_term_id, confidence, validated, created_at)",
"- VALUES (%s, %s, %s, %s, NOW())\"\"\",",
"- (entity_id, term_id, relevance, 1 if validated else 0),",
"- )",
"- self.commit()",
"- mapping_id = cursor.lastrowid",
"- cursor.close()",
"- return mapping_id",
"- except Exception as e:",
"- self.log(\"WARNING\", f\"Failed to add entity taxonomy: {e}\")",
"- return None",
"-",
"- def get_entity_taxonomies(self, entity_id: int) -> list:",
"- \"\"\"",
"- Get all taxonomy mappings for an entity.",
"-",
"- Args:",
"- entity_id: Entity ID",
"-",
"- Returns:",
"- List of taxonomy mappings with term details",
"- \"\"\"",
"- cursor = self.execute(",
"- \"\"\"SELECT etm.*, tt.name as term_name, tt.path as term_path",
"- FROM entity_taxonomy_mapping etm",
"- JOIN taxonomy_terms tt ON etm.taxonomy_term_id = tt.id",
"- WHERE etm.entity_id = %s",
"- ORDER BY etm.relevance DESC\"\"\",",
"- (entity_id,),",
"- )",
"- results = cursor.fetchall()",
"- cursor.close()",
"- return results",
"-",
"- def get_taxonomy_terms(self) -> list:",
"- \"\"\"",
"- Get all taxonomy terms for matching.",
"-",
"- Returns:",
"- List of taxonomy terms with id, name, path",
"- \"\"\"",
"- cursor = self.execute(\"SELECT id, name, path, parent_id FROM taxonomy_terms ORDER BY path\")",
"- results = cursor.fetchall()",
"- cursor.close()",
"- return results",
"-",
"- # Provenance Logging",
"- def log_provenance(",
"- self,",
"- artifact_type: str,",
"- artifact_id: int,",
"- source_type: str,",
"- source_id: int = None,",
"- pipeline_run_id: int = None,",
"- pipeline_step: str = None,",
"- model_used: str = None,",
"- prompt_version: str = None,",
"- ) -> int:",
"- \"\"\"",
"- Log artifact provenance for traceability.",
"-",
"- Args:",
"- artifact_type: Type of artifact (document, chunk, entity, relation, statement, embedding)",
"- artifact_id: ID of the artifact",
"- source_type: How it was created (file, extraction, analysis, merge, manual)",
"- source_id: Optional source artifact ID",
"- pipeline_run_id: Optional pipeline run ID",
"- pipeline_step: Optional step name (e.g., 'entity_extract')",
"- model_used: Optional model name (e.g., 'claude-opus-4-5-20251101')",
"- prompt_version: Optional prompt version",
"-",
"- Returns:",
"- Provenance record ID",
"- \"\"\"",
"- try:",
"- cursor = self.execute(",
"- \"\"\"INSERT INTO provenance",
"- (artifact_type, artifact_id, source_type, source_id,",
"- pipeline_run_id, pipeline_step, model_used, prompt_version, created_at)",
"- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())\"\"\",",
"- (",
"- artifact_type,",
"- artifact_id,",
"- source_type,",
"- source_id,",
"- pipeline_run_id,",
"- pipeline_step,",
"- model_used,",
"- prompt_version,",
"- ),",
"- )",
"- self.commit()",
"- provenance_id = cursor.lastrowid",
"- cursor.close()",
"- return provenance_id",
"- except Exception as e:",
"- self.log(\"WARNING\", f\"Failed to log provenance: {e}\")",
"- return None",
"-",
"-",
" class PipelineProgress:",
" \"\"\"Tracks and updates pipeline run progress in ki_content.pipeline_runs.\"\"\"",
" "
]
},
{
"oldStart": 728,
"oldLines": 12,
"newStart": 58,
"newLines": 6,
"lines": [
" ",
" def _update(self, **kwargs):",
" \"\"\"Internal: Update database with progress.\"\"\"",
"- from datetime import datetime",
"-",
"- import mysql.connector",
"-",
"- from config import DB_CONFIG",
"-",
" if not self.run_id:",
" return",
" "
]
},
{
"oldStart": 786,
"oldLines": 18,
"newStart": 110,
"newLines": 14,
"lines": [
" ",
" def add_log(self, message: str):",
" \"\"\"Add log line and update log_tail.\"\"\"",
"- from datetime import datetime",
"-",
" timestamp = datetime.now().strftime(\"%H:%M:%S\")",
" self.log_lines.append(f\"[{timestamp}] {message}\")",
" if len(self.log_lines) > self.max_log_lines:",
"- self.log_lines = self.log_lines[-self.max_log_lines :]",
"+ self.log_lines = self.log_lines[-self.max_log_lines:]",
" self._update(log_tail=\"\\n\".join(self.log_lines))",
" ",
" def complete(self, status: str = \"completed\", error: str = None):",
" \"\"\"Mark run as complete or failed.\"\"\"",
"- from datetime import datetime",
"-",
" data = {",
" \"status\": status,",
" \"completed_at\": datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\"),"
]
},
{
"oldStart": 810,
"oldLines": 17,
"newStart": 130,
"newLines": 16,
"lines": [
" ",
" def is_cancelled(self) -> bool:",
" \"\"\"Check if run has been cancelled by user.\"\"\"",
"- import mysql.connector",
"-",
"- from config import DB_CONFIG",
"-",
" if not self.run_id:",
" return False",
" ",
" try:",
" conn = mysql.connector.connect(**DB_CONFIG)",
" cursor = conn.cursor(dictionary=True)",
"- cursor.execute(\"SELECT status FROM pipeline_runs WHERE id = %s\", (self.run_id,))",
"+ cursor.execute(",
"+ \"SELECT status FROM pipeline_runs WHERE id = %s\",",
"+ (self.run_id,),",
"+ )",
" result = cursor.fetchone()",
" cursor.close()",
" conn.close()"
]
},
{
"oldStart": 829,
"oldLines": 5,
"newStart": 148,
"newLines": 5,
"lines": [
" return False",
" ",
" ",
"-# Global database instance",
"+# Global database instance for backward compatibility",
" db = Database()"
]
}
],
"originalFile": "\"\"\"\nDatabase operations for KI-System Pipeline\n\"\"\"\n\nimport mysql.connector\nfrom mysql.connector import Error\n\nfrom config import DB_CONFIG\nfrom constants import DEFAULT_LIMIT\n\n\nclass Database:\n \"\"\"MariaDB connection wrapper with document and chunk operations.\"\"\"\n\n def __init__(self):\n self.connection = None\n\n def connect(self):\n \"\"\"Establish database connection.\"\"\"\n try:\n self.connection = mysql.connector.connect(**DB_CONFIG)\n return True\n except Error as e:\n print(f\"Database connection error: {e}\")\n return False\n\n def disconnect(self):\n \"\"\"Close database connection.\"\"\"\n if self.connection and self.connection.is_connected():\n self.connection.close()\n\n def execute(self, query, params=None):\n \"\"\"Execute a query and return the cursor.\"\"\"\n cursor = self.connection.cursor(dictionary=True)\n cursor.execute(query, params or ())\n return cursor\n\n def commit(self):\n \"\"\"Commit the current transaction.\"\"\"\n self.connection.commit()\n\n # Document Operations\n def document_exists(self, file_path):\n \"\"\"Check if document already exists.\"\"\"\n cursor = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor.fetchone()\n cursor.close()\n return result[\"id\"] if result else None\n\n def document_is_done(self, file_path):\n \"\"\"Check if document is already fully processed (status='done').\"\"\"\n cursor = self.execute(\"SELECT id, status FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor.fetchone()\n cursor.close()\n if result and result[\"status\"] == \"done\":\n return result[\"id\"]\n return None\n\n def insert_document(self, file_path, title, file_type, file_size, file_hash):\n \"\"\"Insert a new document or update existing one.\"\"\"\n import os\n\n folder_path = os.path.dirname(file_path)\n cursor = self.execute(\n \"\"\"INSERT INTO documents\n (source_path, folder_path, filename, mime_type, file_size, file_hash, status)\n VALUES (%s, %s, %s, %s, %s, %s, 'processing')\n ON DUPLICATE KEY UPDATE\n file_hash = VALUES(file_hash),\n file_size = VALUES(file_size),\n status = 'processing',\n processed_at = NULL,\n error_message = NULL\"\"\",\n (file_path, folder_path, title, file_type, file_size, file_hash),\n )\n self.commit()\n doc_id = cursor.lastrowid\n # If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0\n if doc_id == 0:\n cursor_select = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor_select.fetchone()\n cursor_select.close()\n doc_id = result[\"id\"] if result else None\n cursor.close()\n return doc_id\n\n def update_document_status(self, doc_id, status, error_message=None):\n \"\"\"Update document processing status.\"\"\"\n if error_message:\n cursor = self.execute(\n \"\"\"UPDATE documents\n SET status = %s, error_message = %s, processed_at = NOW()\n WHERE id = %s\"\"\",\n (status, error_message, doc_id),\n )\n else:\n cursor = self.execute(\n \"UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s\", (status, doc_id)\n )\n self.commit()\n cursor.close()\n\n # Page Operations\n def insert_page(self, doc_id, page_number, text_content, token_count=None):\n \"\"\"Insert a document page.\"\"\"\n if token_count is None:\n token_count = len(text_content.split()) if text_content else 0\n cursor = self.execute(\n \"\"\"INSERT INTO document_pages\n (document_id, page_number, text_content, token_count, created_at)\n VALUES (%s, %s, %s, %s, NOW())\n ON DUPLICATE KEY UPDATE\n text_content = VALUES(text_content),\n token_count = VALUES(token_count)\"\"\",\n (doc_id, page_number, text_content, token_count),\n )\n self.commit()\n page_id = cursor.lastrowid\n if page_id == 0:\n cursor_select = self.execute(\n \"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s\",\n (doc_id, page_number),\n )\n result = cursor_select.fetchone()\n cursor_select.close()\n page_id = result[\"id\"] if result else None\n cursor.close()\n return page_id\n\n def get_page_id(self, doc_id, page_number):\n \"\"\"Get page ID by document and page number.\"\"\"\n cursor = self.execute(\n \"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s\",\n (doc_id, page_number),\n )\n result = cursor.fetchone()\n cursor.close()\n return result[\"id\"] if result else None\n\n # Chunk Operations\n def insert_chunk(\n self,\n doc_id,\n chunk_index,\n content,\n heading_path,\n position_start=None,\n position_end=None,\n metadata=None,\n page_id=None,\n ):\n \"\"\"Insert a text chunk.\"\"\"\n # Calculate token count (rough estimate: 4 chars per token)\n token_count = len(content) \/\/ 4\n cursor = self.execute(\n \"\"\"INSERT INTO chunks\n (document_id, page_id, chunk_index, content, token_count, heading_path, metadata)\n VALUES (%s, %s, %s, %s, %s, %s, %s)\"\"\",\n (doc_id, page_id, chunk_index, content, token_count, heading_path, metadata),\n )\n self.commit()\n chunk_id = cursor.lastrowid\n cursor.close()\n return chunk_id\n\n def get_chunks_for_embedding(self, limit=DEFAULT_LIMIT):\n \"\"\"Get chunks that need embeddings.\"\"\"\n cursor = self.execute(\n \"\"\"SELECT c.id, c.content, c.document_id\n FROM chunks c\n WHERE c.qdrant_id IS NULL\n ORDER BY c.created_at\n LIMIT %s\"\"\",\n (limit,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def update_chunk_qdrant_id(self, chunk_id, qdrant_id):\n \"\"\"Update chunk with Qdrant point ID.\"\"\"\n cursor = self.execute(\"UPDATE chunks SET qdrant_id = %s WHERE id = %s\", (qdrant_id, chunk_id))\n self.commit()\n cursor.close()\n\n # Queue Operations\n def add_to_queue(self, file_path, action=\"process\"):\n \"\"\"Add item to pipeline queue.\"\"\"\n cursor = self.execute(\n \"\"\"INSERT INTO pipeline_queue\n (file_path, action, status, retry_count, created_at)\n VALUES (%s, %s, 'pending', 0, NOW())\"\"\",\n (file_path, action),\n )\n self.commit()\n queue_id = cursor.lastrowid\n cursor.close()\n return queue_id\n\n def get_pending_queue_items(self, limit=10):\n \"\"\"Get pending items from queue.\"\"\"\n cursor = self.execute(\n \"\"\"SELECT id, file_path, action, retry_count\n FROM pipeline_queue\n WHERE status = 'pending'\n ORDER BY created_at\n LIMIT %s\"\"\",\n (limit,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def update_queue_status(self, queue_id, status, error_message=None):\n \"\"\"Update queue item status.\"\"\"\n if status == \"error\" and error_message:\n cursor = self.execute(\n \"\"\"UPDATE pipeline_queue\n SET status = %s, error_message = %s,\n retry_count = retry_count + 1, updated_at = NOW()\n WHERE id = %s\"\"\",\n (status, error_message, queue_id),\n )\n else:\n cursor = self.execute(\n \"UPDATE pipeline_queue SET status = %s, updated_at = NOW() WHERE id = %s\", (status, queue_id)\n )\n self.commit()\n cursor.close()\n\n # Logging\n def log(self, level, message, context=None):\n \"\"\"Write to pipeline log (ki_dev database).\"\"\"\n import json\n\n import mysql.connector\n\n from config import DB_LOG_CONFIG\n\n try:\n # Context must be valid JSON\n if context is not None:\n if isinstance(context, str):\n context = json.dumps({\"info\": context})\n elif isinstance(context, dict):\n context = json.dumps(context)\n else:\n context = json.dumps({\"data\": str(context)})\n\n # Use separate connection to ki_dev for logging\n log_conn = mysql.connector.connect(**DB_LOG_CONFIG)\n cursor = log_conn.cursor()\n cursor.execute(\n \"\"\"INSERT INTO pipeline_log\n (level, message, context, created_at)\n VALUES (%s, %s, %s, NOW())\"\"\",\n (level, message, context),\n )\n log_conn.commit()\n cursor.close()\n log_conn.close()\n except Exception: # noqa: S110\n # Logging should never break the pipeline\n pass\n\n # Prompts\n def get_prompt(self, name, version=None):\n \"\"\"Get a prompt by name (optionally specific version).\"\"\"\n if version:\n cursor = self.execute(\"SELECT content FROM prompts WHERE name = %s AND version = %s\", (name, version))\n else:\n cursor = self.execute(\n \"\"\"SELECT content FROM prompts\n WHERE name = %s AND is_active = 1\n ORDER BY version DESC LIMIT 1\"\"\",\n (name,),\n )\n result = cursor.fetchone()\n cursor.close()\n return result[\"content\"] if result else None\n\n def ... [TRUNCATED-6133b14c29ef88bd]"
}
}