{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/db.py",
"content": " def add_chunk_taxonomy(\n self,\n chunk_id: int,\n term_id: int,\n confidence: float = 0.7,\n source: str = \"auto\",\n ) -> int | None:\n \"\"\"\n Add taxonomy mapping for a chunk.\n\n Args:\n chunk_id: Chunk ID\n term_id: Taxonomy term ID\n confidence: Confidence score (0.0-1.0)\n source: 'auto' or 'manual'\n\n Returns:\n Mapping ID or None if already exists\n \"\"\"\n # Check if mapping already exists\n cursor = self.execute(\n \"SELECT id FROM chunk_taxonomy WHERE chunk_id = %s AND taxonomy_term_id = %s\",\n (chunk_id, term_id),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n return None\n\n try:\n cursor = self.execute(\n \"\"\"INSERT INTO chunk_taxonomy (chunk_id, taxonomy_term_id, confidence, source, created_at)\n VALUES (%s, %s, %s, %s, NOW())\"\"\",\n (chunk_id, term_id, confidence, source),\n )\n self.commit()\n mapping_id = cursor.lastrowid\n cursor.close()\n return mapping_id\n except Exception as e:\n self.log(\"WARNING\", f\"Failed to add chunk taxonomy: {e}\")\n return None\n\n def get_chunk_taxonomies(self, chunk_id: int) -> list:\n \"\"\"\n Get all taxonomy mappings for a chunk.\n\n Args:\n chunk_id: Chunk ID\n\n Returns:\n List of taxonomy mappings with term details\n \"\"\"\n cursor = self.execute(\n \"\"\"SELECT ct.*, tt.name as term_name, tt.path as term_path\n FROM chunk_taxonomy ct\n JOIN taxonomy_terms tt ON ct.taxonomy_term_id = tt.id\n WHERE ct.chunk_id = %s\n ORDER BY ct.confidence DESC\"\"\",\n (chunk_id,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n # Entity Taxonomy Mapping\n def add_entity_taxonomy(\n self,\n entity_id: int,\n term_id: int,\n relevance: float = 0.7,\n validated: bool = False,\n ) -> int | None:\n \"\"\"\n Add taxonomy mapping for an entity.\n\n Args:\n entity_id: Entity ID\n term_id: Taxonomy term ID\n relevance: Relevance score (0.0-1.0)\n validated: Whether manually validated\n\n Returns:\n Mapping ID or None if already exists\n \"\"\"\n # Check if mapping already exists\n cursor = self.execute(\n \"SELECT id FROM entity_taxonomy_mapping WHERE entity_id = %s AND taxonomy_term_id = %s\",\n (entity_id, term_id),\n )\n existing = cursor.fetchone()\n cursor.close()\n\n if existing:\n return None\n\n try:\n cursor = self.execute(\n \"\"\"INSERT INTO entity_taxonomy_mapping\n (entity_id, taxonomy_term_id, relevance, validated, created_at)\n VALUES (%s, %s, %s, %s, NOW())\"\"\",\n (entity_id, term_id, relevance, 1 if validated else 0),\n )\n self.commit()\n mapping_id = cursor.lastrowid\n cursor.close()\n return mapping_id\n except Exception as e:\n self.log(\"WARNING\", f\"Failed to add entity taxonomy: {e}\")\n return None\n\n def get_entity_taxonomies(self, entity_id: int) -> list:\n \"\"\"\n Get all taxonomy mappings for an entity.\n\n Args:\n entity_id: Entity ID\n\n Returns:\n List of taxonomy mappings with term details\n \"\"\"\n cursor = self.execute(\n \"\"\"SELECT etm.*, tt.name as term_name, tt.path as term_path\n FROM entity_taxonomy_mapping etm\n JOIN taxonomy_terms tt ON etm.taxonomy_term_id = tt.id\n WHERE etm.entity_id = %s\n ORDER BY etm.relevance DESC\"\"\",\n (entity_id,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def get_taxonomy_terms(self) -> list:\n \"\"\"\n Get all taxonomy terms for matching.\n\n Returns:\n List of taxonomy terms with id, name, path\n \"\"\"\n cursor = self.execute(\n \"SELECT id, name, path, parent_id FROM taxonomy_terms ORDER BY path\"\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n # Provenance Logging\n def log_provenance(\n self,\n artifact_type: str,\n artifact_id: int,\n source_type: str,\n source_id: int = None,\n pipeline_run_id: int = None,\n pipeline_step: str = None,\n model_used: str = None,\n prompt_version: str = None,\n ) -> int:",
"numLines": 160,
"startLine": 403,
"totalLines": 718
}
}
}