{
"event": "PostToolUse",
"tool_name": "Read",
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/migration\/content-pipeline\/src\/db_documents.py",
"content": "\"\"\"\nDatabase Documents Mixin\n\nSingle Responsibility: Document, Page, and Chunk CRUD operations.\n\"\"\"\n\nimport os\n\nfrom constants import DEFAULT_LIMIT\n\n\nclass DocumentsMixin:\n \"\"\"Mixin for document, page, and chunk operations.\n\n Provides:\n - Document: exists, is_done, insert, update_status\n - Page: insert, get_id\n - Chunk: insert, get_for_embedding, update_qdrant_id\n \"\"\"\n\n # ========== Document Operations ==========\n\n def document_exists(self, file_path: str) -> int | None:\n \"\"\"Check if document already exists.\n\n Args:\n file_path: Source file path\n\n Returns:\n Document ID if exists, None otherwise\n \"\"\"\n cursor = self.execute(\n \"SELECT id FROM documents WHERE source_path = %s\",\n (file_path,),\n )\n result = cursor.fetchone()\n cursor.close()\n return result[\"id\"] if result else None\n\n def document_is_done(self, file_path: str) -> int | None:\n \"\"\"Check if document is already fully processed (status='done').\n\n Args:\n file_path: Source file path\n\n Returns:\n Document ID if done, None otherwise\n \"\"\"\n cursor = self.execute(\n \"SELECT id, status FROM documents WHERE source_path = %s\",\n (file_path,),\n )\n result = cursor.fetchone()\n cursor.close()\n if result and result[\"status\"] == \"done\":\n return result[\"id\"]\n return None\n\n def insert_document(\n self,\n file_path: str,\n title: str,\n file_type: str,\n file_size: int,\n file_hash: str,\n ) -> int | None:\n \"\"\"Insert a new document or update existing one.\n\n Args:\n file_path: Source file path\n title: Document title (filename)\n file_type: MIME type\n file_size: File size in bytes\n file_hash: File hash for change detection\n\n Returns:\n Document ID\n \"\"\"\n folder_path = os.path.dirname(file_path)\n cursor = self.execute(\n \"\"\"INSERT INTO documents\n (source_path, folder_path, filename, mime_type, file_size, file_hash, status)\n VALUES (%s, %s, %s, %s, %s, %s, 'processing')\n ON DUPLICATE KEY UPDATE\n file_hash = VALUES(file_hash),\n file_size = VALUES(file_size),\n status = 'processing',\n processed_at = NULL,\n error_message = NULL\"\"\",\n (file_path, folder_path, title, file_type, file_size, file_hash),\n )\n self.commit()\n doc_id = cursor.lastrowid\n\n # If ON DUPLICATE KEY UPDATE was triggered, lastrowid is 0\n if doc_id == 0:\n cursor_select = self.execute(\n \"SELECT id FROM documents WHERE source_path = %s\",\n (file_path,),\n )\n result = cursor_select.fetchone()\n cursor_select.close()\n doc_id = result[\"id\"] if result else None\n\n cursor.close()\n return doc_id\n\n def update_document_status(\n self,\n doc_id: int,\n status: str,\n error_message: str = None,\n ):\n \"\"\"Update document processing status.\n\n Args:\n doc_id: Document ID\n status: New status (processing, done, error)\n error_message: Optional error message\n \"\"\"\n if error_message:\n cursor = self.execute(\n \"\"\"UPDATE documents\n SET status = %s, error_message = %s, processed_at = NOW()\n WHERE id = %s\"\"\",\n (status, error_message, doc_id),\n )\n else:\n cursor = self.execute(\n \"UPDATE documents SET status = %s, processed_at = NOW() WHERE id = %s\",\n (status, doc_id),\n )\n self.commit()\n cursor.close()\n\n # ========== Page Operations ==========\n\n def insert_page(\n self,\n doc_id: int,\n page_number: int,\n text_content: str,\n token_count: int = None,\n ) -> int | None:\n \"\"\"Insert a document page.\n\n Args:\n doc_id: Document ID\n page_number: Page number (1-based)\n text_content: Page text content\n token_count: Optional token count\n\n Returns:\n Page ID\n \"\"\"\n if token_count is None:\n token_count = len(text_content.split()) if text_content else 0\n\n cursor = self.execute(\n \"\"\"INSERT INTO document_pages\n (document_id, page_number, text_content, token_count, created_at)\n VALUES (%s, %s, %s, %s, NOW())\n ON DUPLICATE KEY UPDATE\n text_content = VALUES(text_content),\n token_count = VALUES(token_count)\"\"\",\n (doc_id, page_number, text_content, token_count),\n )\n self.commit()\n page_id = cursor.lastrowid\n\n if page_id == 0:\n cursor_select = self.execute(\n \"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s\",\n (doc_id, page_number),\n )\n result = cursor_select.fetchone()\n cursor_select.close()\n page_id = result[\"id\"] if result else None\n\n cursor.close()\n return page_id\n\n def get_page_id(self, doc_id: int, page_number: int) -> int | None:\n \"\"\"Get page ID by document and page number.\n\n Args:\n doc_id: Document ID\n page_number: Page number\n\n Returns:\n Page ID or None\n \"\"\"\n cursor = self.execute(\n \"SELECT id FROM document_pages WHERE document_id = %s AND page_number = %s\",\n (doc_id, page_number),\n )\n result = cursor.fetchone()\n cursor.close()\n return result[\"id\"] if result else None\n\n # ========== Chunk Operations ==========\n\n def insert_chunk(\n self,\n doc_id: int,\n chunk_index: int,\n content: str,\n heading_path: str,\n position_start: int = None,\n position_end: int = None,\n metadata: str = None,\n page_id: int = None,\n ) -> int:\n \"\"\"Insert a text chunk.\n\n Args:\n doc_id: Document ID\n chunk_index: Chunk index within document\n content: Chunk text content\n heading_path: Heading hierarchy path\n position_start: Optional start position\n position_end: Optional end position\n metadata: Optional JSON metadata\n page_id: Optional page ID\n\n Returns:\n Chunk ID\n \"\"\"\n # Calculate token count (rough estimate: 4 chars per token)\n token_count = len(content) \/\/ 4\n\n cursor = self.execute(\n \"\"\"INSERT INTO chunks\n (document_id, page_id, chunk_index, content, token_count, heading_path, metadata)\n VALUES (%s, %s, %s, %s, %s, %s, %s)\"\"\",\n (doc_id, page_id, chunk_index, content, token_count, heading_path, metadata),\n )\n self.commit()\n chunk_id = cursor.lastrowid\n cursor.close()\n return chunk_id\n\n def get_chunks_for_embedding(self, limit: int = DEFAULT_LIMIT) -> list:\n \"\"\"Get chunks that need embeddings.\n\n Args:\n limit: Maximum number of chunks to return\n\n Returns:\n List of chunk dicts with id, content, document_id\n \"\"\"\n cursor = self.execute(\n \"\"\"SELECT c.id, c.content, c.document_id\n FROM chunks c\n WHERE c.qdrant_id IS NULL\n ORDER BY c.created_at\n LIMIT %s\"\"\",\n (limit,),\n )\n results = cursor.fetchall()\n cursor.close()\n return results\n\n def update_chunk_qdrant_id(self, chunk_id: int, qdrant_id: str):\n \"\"\"Update chunk with Qdrant point ID.\n\n Args:\n chunk_id: Chunk ID\n qdrant_id: Qdrant point UUID\n \"\"\"\n cursor = self.execute(\n \"UPDATE chunks SET qdrant_id = %s WHERE id = %s\",\n (qdrant_id, chunk_id),\n )\n self.commit()\n cursor.close()\n",
"numLines": 277,
"startLine": 1,
"totalLines": 277
}
}
}