{
"tool_response": {
"filePath": "\/var\/www\/scripts\/pipeline\/db.py",
"oldString": " # Provenance Logging\n def log_provenance(",
"newString": " # Chunk Taxonomy Mapping\n def add_chunk_taxonomy(\n self,\n chunk_id: int,\n term_id: int,\n confidence: float = 0.7,\n source: str = \"auto\",\n ) -> int | None:\n \"\"\"\n Add taxonomy mapping for a chunk.\n\n Args:\n chunk_id: Chunk ID\n term_id: Taxonomy term ID\n confidence: Confidence score (0.0-1.0)\n source: 'auto' or 'manual'\n\n Returns:\n Mapping ID or None if already exists\n \"\"\"\n # Check if mapping already exists\n cursor = self.execute(\n \"SELECT id FROM chunk_taxonomy WHERE chunk_id = %s AND taxonomy_term_id = %s\",\n (chunk_id, term_id),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n return None\n\n try:\n cursor = self.execute(\n \"\"\"INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence, source, created_at)\n VALUES (%s, %s, %s, %s, NOW())\"\"\",\n (chunk_id, term_id, confidence, source),\n )\n self.commit()\n mapping_id = cursor.lastrowid\n cursor.close()\n return mapping_id\n except Exception as e:\n self.log(\"WARNING\", f\"Failed to add chunk taxonomy: {e}\")\n return None\n\n def get_chunk_taxonomies(self, chunk_id: int) -> list:\n \"\"\"\n Get all taxonomy mappings for a chunk.\n\n Args:\n chunk_id: Chunk ID\n\n Returns:\n List of taxonomy mappings with term details\n \"\"\"\n cursor = self.execute(\n \"\"\"SELECT ct.*, tt.name as term_name, tt.path as term_path\n FROM chunk_taxonomy ct\n JOIN taxonomy_terms tt ON ct.taxonomy_term_id = tt.id\n WHERE ct.chunk_id = %s\n ORDER BY ct.confidence DESC\"\"\",\n (chunk_id,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n # Entity Taxonomy Mapping\n def add_entity_taxonomy(\n self,\n entity_id: int,\n term_id: int,\n relevance: float = 0.7,\n validated: bool = False,\n ) -> int | None:\n \"\"\"\n Add taxonomy mapping for an entity.\n\n Args:\n entity_id: Entity ID\n term_id: Taxonomy term ID\n relevance: Relevance score (0.0-1.0)\n validated: Whether manually validated\n\n Returns:\n Mapping ID or None if already exists\n \"\"\"\n # Check if mapping already exists\n cursor = self.execute(\n \"SELECT id FROM entity_taxonomy_mapping WHERE entity_id = %s AND taxonomy_term_id = %s\",\n (entity_id, term_id),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n return None\n\n try:\n cursor = self.execute(\n \"\"\"INSERT INTO entity_taxonomy_mapping\n (entity_id, taxonomy_term_id, relevance, validated, created_at)\n VALUES (%s, %s, %s, %s, NOW())\"\"\",\n (entity_id, term_id, relevance, 1 if validated else 0),\n )\n self.commit()\n mapping_id = cursor.lastrowid\n cursor.close()\n return mapping_id\n except Exception as e:\n self.log(\"WARNING\", f\"Failed to add entity taxonomy: {e}\")\n return None\n\n def get_entity_taxonomies(self, entity_id: int) -> list:\n \"\"\"\n Get all taxonomy mappings for an entity.\n\n Args:\n entity_id: Entity ID\n\n Returns:\n List of taxonomy mappings with term details\n \"\"\"\n cursor = self.execute(\n \"\"\"SELECT etm.*, tt.name as term_name, tt.path as term_path\n FROM entity_taxonomy_mapping etm\n JOIN taxonomy_terms tt ON etm.taxonomy_term_id = tt.id\n WHERE etm.entity_id = %s\n ORDER BY etm.relevance DESC\"\"\",\n (entity_id,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def get_taxonomy_terms(self) -> list:\n \"\"\"\n Get all taxonomy terms for matching.\n\n Returns:\n List of taxonomy terms with id, name, path\n \"\"\"\n cursor = self.execute(\n \"SELECT id, name, path, parent_id FROM taxonomy_terms ORDER BY path\"\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n # Provenance Logging\n def log_provenance(",
"originalFile": "\"\"\"\nDatabase operations for KI-System Pipeline\n\"\"\"\n\nimport mysql.connector\nfrom mysql.connector import Error\n\nfrom config import DB_CONFIG\n\n\nclass Database:\n \"\"\"MariaDB connection wrapper with document and chunk operations.\"\"\"\n\n def __init__(self):\n self.connection = None\n\n def connect(self):\n \"\"\"Establish database connection.\"\"\"\n try:\n self.connection = mysql.connector.connect(**DB_CONFIG)\n return True\n except Error as e:\n print(f\"Database connection error: {e}\")\n return False\n\n def disconnect(self):\n \"\"\"Close database connection.\"\"\"\n if self.connection and self.connection.is_connected():\n self.connection.close()\n\n def execute(self, query, params=None):\n \"\"\"Execute a query and return the cursor.\"\"\"\n cursor = self.connection.cursor(dictionary=True)\n cursor.execute(query, params or ())\n return cursor\n\n def commit(self):\n \"\"\"Commit the current transaction.\"\"\"\n self.connection.commit()\n\n # Document Operations\n def document_exists(self, file_path):\n \"\"\"Check if document already exists.\"\"\"\n cursor = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor.fetchone()\n cursor.close()\n return result[\"id\"] if result else None\n\n def document_is_done(self, file_path):\n \"\"\"Check if document is already fully processed (status='done').\"\"\"\n cursor = self.execute(\"SELECT id, status FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor.fetchone()\n cursor.close()\n if result and result[\"status\"] == \"done\":\n return result[\"id\"]\n return None\n\n def insert_document(self, file_path, title, file_type, file_size, file_hash):\n \"\"\"Insert a new document or update existing one.\"\"\"\n import os\n\n folder_path = os.path.dirname(file_path)\n cursor = self.execute(\n \"\"\"INSERT INTO documents\n (source_path, folder_path, filename, mime_type, file_size, file_hash, status)\n VALUES (%s, %s, %s, %s, %s, %s, 'processing')\n ON DUPLICATE KEY UPDATE\n file_hash = VALUES(file_hash),\n file_size = VALUES(file_size),\n status = 'processing',\n processed_at = NULL,\n error_message = NULL\"\"\",\n (file_path, folder_path, title, file_type, file_size, file_hash),\n )\n self.commit()\n doc_id = cursor.lastrowid\n # If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0\n if doc_id == 0:\n cursor_select = self.execute(\"SELECT id FROM documents WHERE source_path = %s\", (file_path,))\n result = cursor_select.fetchone()\n cursor_select.close()\n doc_id = result[\"id\"] if result else None\n cursor.close()\n return doc_id\n\n def update_document_status(self, doc_id, status, error_message=None):\n \"\"\"Update document processing status.\"\"\"\n if error_message:\n cursor = self.execute(\n \"\"\"UPDATE documents\n SET status = %s, error_message = %s, processed_at = NOW()\n WHERE id = %s\"\"\",\n (status, error_message, doc_id),\n )\n else:\n cursor = self.execute(\n \"UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s\", (status, doc_id)\n )\n self.commit()\n cursor.close()\n\n # Chunk Operations\n def insert_chunk(\n self, doc_id, chunk_index, content, heading_path, position_start=None, position_end=None, metadata=None\n ):\n \"\"\"Insert a text chunk.\"\"\"\n # Calculate token count (rough estimate: 4 chars per token)\n token_count = len(content) \/\/ 4\n cursor = self.execute(\n \"\"\"INSERT INTO chunks\n (document_id, chunk_index, content, token_count, heading_path, metadata)\n VALUES (%s, %s, %s, %s, %s, %s)\"\"\",\n (doc_id, chunk_index, content, token_count, heading_path, metadata),\n )\n self.commit()\n chunk_id = cursor.lastrowid\n cursor.close()\n return chunk_id\n\n def get_chunks_for_embedding(self, limit=100):\n \"\"\"Get chunks that need embeddings.\"\"\"\n cursor = self.execute(\n \"\"\"SELECT c.id, c.content, c.document_id\n FROM chunks c\n WHERE c.qdrant_id IS NULL\n ORDER BY c.created_at\n LIMIT %s\"\"\",\n (limit,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def update_chunk_qdrant_id(self, chunk_id, qdrant_id):\n \"\"\"Update chunk with Qdrant point ID.\"\"\"\n cursor = self.execute(\"UPDATE chunks SET qdrant_id = %s WHERE id = %s\", (qdrant_id, chunk_id))\n self.commit()\n cursor.close()\n\n # Queue Operations\n def add_to_queue(self, file_path, action=\"process\"):\n \"\"\"Add item to pipeline queue.\"\"\"\n cursor = self.execute(\n \"\"\"INSERT INTO pipeline_queue\n (file_path, action, status, retry_count, created_at)\n VALUES (%s, %s, 'pending', 0, NOW())\"\"\",\n (file_path, action),\n )\n self.commit()\n queue_id = cursor.lastrowid\n cursor.close()\n return queue_id\n\n def get_pending_queue_items(self, limit=10):\n \"\"\"Get pending items from queue.\"\"\"\n cursor = self.execute(\n \"\"\"SELECT id, file_path, action, retry_count\n FROM pipeline_queue\n WHERE status = 'pending'\n ORDER BY created_at\n LIMIT %s\"\"\",\n (limit,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def update_queue_status(self, queue_id, status, error_message=None):\n \"\"\"Update queue item status.\"\"\"\n if status == \"error\" and error_message:\n cursor = self.execute(\n \"\"\"UPDATE pipeline_queue\n SET status = %s, error_message = %s,\n retry_count = retry_count + 1, updated_at = NOW()\n WHERE id = %s\"\"\",\n (status, error_message, queue_id),\n )\n else:\n cursor = self.execute(\n \"UPDATE pipeline_queue SET status = %s, updated_at = NOW() WHERE id = %s\", (status, queue_id)\n )\n self.commit()\n cursor.close()\n\n # Logging\n def log(self, level, message, context=None):\n \"\"\"Write to pipeline log (ki_dev database).\"\"\"\n import json\n\n import mysql.connector\n\n from config import DB_LOG_CONFIG\n\n try:\n # Context must be valid JSON\n if context is not None:\n if isinstance(context, str):\n context = json.dumps({\"info\": context})\n elif isinstance(context, dict):\n context = json.dumps(context)\n else:\n context = json.dumps({\"data\": str(context)})\n\n # Use separate connection to ki_dev for logging\n log_conn = mysql.connector.connect(**DB_LOG_CONFIG)\n cursor = log_conn.cursor()\n cursor.execute(\n \"\"\"INSERT INTO pipeline_log\n (level, message, context, created_at)\n VALUES (%s, %s, %s, NOW())\"\"\",\n (level, message, context),\n )\n log_conn.commit()\n cursor.close()\n log_conn.close()\n except Exception: # noqa: S110\n # Logging should never break the pipeline\n pass\n\n # Prompts\n def get_prompt(self, name, version=None):\n \"\"\"Get a prompt by name (optionally specific version).\"\"\"\n if version:\n cursor = self.execute(\"SELECT content FROM prompts WHERE name = %s AND version = %s\", (name, version))\n else:\n cursor = self.execute(\n \"\"\"SELECT content FROM prompts\n WHERE name = %s AND is_active = 1\n ORDER BY version DESC LIMIT 1\"\"\",\n (name,),\n )\n result = cursor.fetchone()\n cursor.close()\n return result[\"content\"] if result else None\n\n def get_prompt_by_use_case(self, use_case: str, version: str = None) -> dict | None:\n \"\"\"\n Get prompt by use_case with full metadata for provenance tracking.\n\n Args:\n use_case: The use case (entity_extraction, semantic_analysis, statement_extraction, etc.)\n version: Optional specific version (otherwise latest active)\n\n Returns:\n Dict with id, name, version, content, use_case or None\n \"\"\"\n if version:\n cursor = self.execute(\n \"\"\"SELECT id, name, version, content, use_case\n FROM prompts\n WHERE use_case = %s AND version = %s\"\"\",\n (use_case, version),\n )\n else:\n cursor = self.execute(\n \"\"\"SELECT id, name, version, content, use_case\n FROM prompts\n WHERE use_case = %s AND is_active = 1\n ORDER BY version DESC LIMIT 1\"\"\",\n (use_case,),\n )\n result = cursor.fetchone()\n cursor.close()\n return result\n\n # Protokoll Logging (LLM calls)\n def log_to_protokoll(\n self,\n client_name,\n request,\n response=None,\n model_name=None,\n tokens_input=0,\n tokens_output=0,\n duration_ms=0,\n status=\"completed\",\n error_message=None,\n ):\n \"\"\"\n Log LLM call to ki_dev.protokoll table.\n\n Args:\n client_name: Caller identifier (e.g., 'content-studio', 'pipeline')\n request: The prompt sent to the LLM\n response: The LLM response\n ... [TRUNCATED-11cc21e6c5a68baa]",
"structuredPatch": [
{
"oldStart": 399,
"oldLines": 6,
"newStart": 399,
"newLines": 155,
"lines": [
" self.log(\"WARNING\", f\"Failed to add synonym: {e}\")",
" return None",
" ",
"+ # Chunk Taxonomy Mapping",
"+ def add_chunk_taxonomy(",
"+ self,",
"+ chunk_id: int,",
"+ term_id: int,",
"+ confidence: float = 0.7,",
"+ source: str = \"auto\",",
"+ ) -> int | None:",
"+ \"\"\"",
"+ Add taxonomy mapping for a chunk.",
"+",
"+ Args:",
"+ chunk_id: Chunk ID",
"+ term_id: Taxonomy term ID",
"+ confidence: Confidence score (0.0-1.0)",
"+ source: 'auto' or 'manual'",
"+",
"+ Returns:",
"+ Mapping ID or None if already exists",
"+ \"\"\"",
"+ # Check if mapping already exists",
"+ cursor = self.execute(",
"+ \"SELECT id FROM chunk_taxonomy WHERE chunk_id = %s AND taxonomy_term_id = %s\",",
"+ (chunk_id, term_id),",
"+ )",
"+ existing = cursor.fetchone()",
"+ cursor.close()",
"+",
"+ if existing:",
"+ return None",
"+",
"+ try:",
"+ cursor = self.execute(",
"+ \"\"\"INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence, source, created_at)",
"+ VALUES (%s, %s, %s, %s, NOW())\"\"\",",
"+ (chunk_id, term_id, confidence, source),",
"+ )",
"+ self.commit()",
"+ mapping_id = cursor.lastrowid",
"+ cursor.close()",
"+ return mapping_id",
"+ except Exception as e:",
"+ self.log(\"WARNING\", f\"Failed to add chunk taxonomy: {e}\")",
"+ return None",
"+",
"+ def get_chunk_taxonomies(self, chunk_id: int) -> list:",
"+ \"\"\"",
"+ Get all taxonomy mappings for a chunk.",
"+",
"+ Args:",
"+ chunk_id: Chunk ID",
"+",
"+ Returns:",
"+ List of taxonomy mappings with term details",
"+ \"\"\"",
"+ cursor = self.execute(",
"+ \"\"\"SELECT ct.*, tt.name as term_name, tt.path as term_path",
"+ FROM chunk_taxonomy ct",
"+ JOIN taxonomy_terms tt ON ct.taxonomy_term_id = tt.id",
"+ WHERE ct.chunk_id = %s",
"+ ORDER BY ct.confidence DESC\"\"\",",
"+ (chunk_id,),",
"+ )",
"+ results = cursor.fetchall()",
"+ cursor.close()",
"+ return results",
"+",
"+ # Entity Taxonomy Mapping",
"+ def add_entity_taxonomy(",
"+ self,",
"+ entity_id: int,",
"+ term_id: int,",
"+ relevance: float = 0.7,",
"+ validated: bool = False,",
"+ ) -> int | None:",
"+ \"\"\"",
"+ Add taxonomy mapping for an entity.",
"+",
"+ Args:",
"+ entity_id: Entity ID",
"+ term_id: Taxonomy term ID",
"+ relevance: Relevance score (0.0-1.0)",
"+ validated: Whether manually validated",
"+",
"+ Returns:",
"+ Mapping ID or None if already exists",
"+ \"\"\"",
"+ # Check if mapping already exists",
"+ cursor = self.execute(",
"+ \"SELECT id FROM entity_taxonomy_mapping WHERE entity_id = %s AND taxonomy_term_id = %s\",",
"+ (entity_id, term_id),",
"+ )",
"+ existing = cursor.fetchone()",
"+ cursor.close()",
"+",
"+ if existing:",
"+ return None",
"+",
"+ try:",
"+ cursor = self.execute(",
"+ \"\"\"INSERT INTO entity_taxonomy_mapping",
"+ (entity_id, taxonomy_term_id, relevance, validated, created_at)",
"+ VALUES (%s, %s, %s, %s, NOW())\"\"\",",
"+ (entity_id, term_id, relevance, 1 if validated else 0),",
"+ )",
"+ self.commit()",
"+ mapping_id = cursor.lastrowid",
"+ cursor.close()",
"+ return mapping_id",
"+ except Exception as e:",
"+ self.log(\"WARNING\", f\"Failed to add entity taxonomy: {e}\")",
"+ return None",
"+",
"+ def get_entity_taxonomies(self, entity_id: int) -> list:",
"+ \"\"\"",
"+ Get all taxonomy mappings for an entity.",
"+",
"+ Args:",
"+ entity_id: Entity ID",
"+",
"+ Returns:",
"+ List of taxonomy mappings with term details",
"+ \"\"\"",
"+ cursor = self.execute(",
"+ \"\"\"SELECT etm.*, tt.name as term_name, tt.path as term_path",
"+ FROM entity_taxonomy_mapping etm",
"+ JOIN taxonomy_terms tt ON etm.taxonomy_term_id = tt.id",
"+ WHERE etm.entity_id = %s",
"+ ORDER BY etm.relevance DESC\"\"\",",
"+ (entity_id,),",
"+ )",
"+ results = cursor.fetchall()",
"+ cursor.close()",
"+ return results",
"+",
"+ def get_taxonomy_terms(self) -> list:",
"+ \"\"\"",
"+ Get all taxonomy terms for matching.",
"+",
"+ Returns:",
"+ List of taxonomy terms with id, name, path",
"+ \"\"\"",
"+ cursor = self.execute(",
"+ \"SELECT id, name, path, parent_id FROM taxonomy_terms ORDER BY path\"",
"+ )",
"+ results = cursor.fetchall()",
"+ cursor.close()",
"+ return results",
"+",
" # Provenance Logging",
" def log_provenance(",
" self,"
]
}
],
"userModified": false,
"replaceAll": false
}
}