{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/step_semantic.py",
"content": "\"\"\"\nSemantic Analysis Step Module\nHandles Layer 4: Entity extraction, relations, taxonomy, ontology.\n\nThis step runs AFTER embedding (Layer 3) and is optional\/async.\nDocuments are searchable without semantic analysis.\n\nPart of Progressive Pipeline Architecture.\n\"\"\"\n\nfrom analyzers.document_analyzer import analyze_document\n\n\nclass SemanticStep:\n \"\"\"Step: Semantic analysis of documents (Layer 4).\"\"\"\n\n def __init__(self, db, progress=None):\n \"\"\"\n Initialize semantic step.\n\n Args:\n db: Database instance\n progress: Optional PipelineProgress instance\n \"\"\"\n self.db = db\n self.progress = progress\n\n def execute(self, doc_id, text, use_anthropic=True):\n \"\"\"\n Execute full semantic analysis on a document.\n\n Args:\n doc_id: Document database ID\n text: Full document text\n use_anthropic: Use Anthropic API (True) or Ollama (False)\n\n Returns:\n dict: Analysis results with entity\/relation counts\n \"\"\"\n if self.progress:\n self.progress.update_step(\"semantic\")\n self.progress.add_log(\"Starte semantische Analyse...\")\n\n # Update document semantic status\n self._update_semantic_status(doc_id, \"processing\")\n\n try:\n # Run full analysis (entities, relations, taxonomy, ontology)\n result = analyze_document(\n document_id=doc_id,\n text=text,\n use_anthropic=use_anthropic,\n progress=self.progress,\n )\n\n # Update status based on result\n if result.get(\"entities\", 0) > 0 or result.get(\"categories\", []):\n self._update_semantic_status(doc_id, \"complete\")\n else:\n self._update_semantic_status(doc_id, \"partial\")\n\n self.db.log(\"INFO\", f\"Semantic analysis complete for doc {doc_id}: {result}\")\n\n if self.progress:\n self.progress.add_log(\n f\"Semantik: {result.get('entities', 0)} Entitäten, {result.get('relations', 0)} Relationen\"\n )\n\n return result\n\n except Exception as e:\n self._update_semantic_status(doc_id, \"error\")\n self.db.log(\"ERROR\", f\"Semantic analysis failed for doc {doc_id}: {e}\")\n if self.progress:\n self.progress.add_log(f\"Semantik-Fehler: {str(e)[:50]}\")\n raise\n\n def queue(self, doc_id, priority=5):\n \"\"\"\n Queue document for async semantic analysis.\n\n Args:\n doc_id: Document database ID\n priority: Queue priority (1=highest, 10=lowest)\n\n Returns:\n int: Queue entry ID\n \"\"\"\n cursor = self.db.execute(\n \"\"\"INSERT INTO semantic_queue (document_id, priority, status, created_at)\n VALUES (%s, %s, 'pending', NOW())\n ON DUPLICATE KEY UPDATE\n priority = VALUES(priority),\n status = 'pending',\n retry_count = 0,\n error_message = NULL\"\"\",\n (doc_id, priority),\n )\n self.db.commit()\n queue_id = cursor.lastrowid\n cursor.close()\n\n self.db.log(\"INFO\", f\"Queued semantic analysis for doc {doc_id}, priority {priority}\")\n return queue_id\n\n def process_queue(self, limit=5, use_anthropic=True):\n \"\"\"\n Process pending items from semantic queue.\n\n Args:\n limit: Max items to process\n use_anthropic: Use Anthropic API\n\n Returns:\n dict: Processing summary\n \"\"\"\n # Get pending items ordered by priority\n cursor = self.db.execute(\n \"\"\"SELECT sq.id, sq.document_id, d.source_path\n FROM semantic_queue sq\n JOIN documents d ON sq.document_id = d.id\n WHERE sq.status = 'pending'\n ORDER BY sq.priority ASC, sq.created_at ASC\n LIMIT %s\"\"\",\n (limit,),\n )\n items = cursor.fetchall()\n cursor.close()\n\n processed = 0\n failed = 0\n\n for item in items:\n queue_id = item[\"id\"]\n doc_id = item[\"document_id\"]\n\n # Mark as processing\n self._update_queue_status(queue_id, \"processing\")\n\n try:\n # Get document text\n text = self._get_document_text(doc_id)\n if not text:\n raise ValueError(\"No text found for document\")\n\n # Run analysis\n self.execute(doc_id, text, use_anthropic)\n\n # Mark as completed\n self._update_queue_status(queue_id, \"completed\")\n processed += 1\n\n except Exception as e:\n # Mark as failed, increment retry\n self._update_queue_status(queue_id, \"failed\", str(e))\n failed += 1\n\n return {\"processed\": processed, \"failed\": failed, \"remaining\": self._get_queue_count()}\n\n def _update_semantic_status(self, doc_id, status):\n \"\"\"Update document semantic_status.\"\"\"\n cursor = self.db.execute(\n \"UPDATE documents SET semantic_status = %s WHERE id = %s\",\n (status, doc_id),\n )\n self.db.commit()\n cursor.close()\n\n def _update_queue_status(self, queue_id, status, error=None):\n \"\"\"Update queue item status.\"\"\"\n if status == \"processing\":\n cursor = self.db.execute(\n \"UPDATE semantic_queue SET status = %s, started_at = NOW() WHERE id = %s\",\n (status, queue_id),\n )\n elif status == \"completed\":\n cursor = self.db.execute(\n \"UPDATE semantic_queue SET status = %s, completed_at = NOW() WHERE id = %s\",\n (status, queue_id),\n )\n elif status == \"failed\":\n cursor = self.db.execute(\n \"\"\"UPDATE semantic_queue\n SET status = %s, error_message = %s, retry_count = retry_count + 1\n WHERE id = %s\"\"\",\n (status, error, queue_id),\n )\n else:\n cursor = self.db.execute(\n \"UPDATE semantic_queue SET status = %s WHERE id = %s\",\n (status, queue_id),\n )\n self.db.commit()\n cursor.close()\n\n def _get_document_text(self, doc_id):\n \"\"\"Get full document text from pages or chunks.\"\"\"\n # Try pages first\n cursor = self.db.execute(\n \"SELECT text_content FROM document_pages WHERE document_id = %s ORDER BY page_number\",\n (doc_id,),\n )\n pages = cursor.fetchall()\n cursor.close()\n\n if pages:\n return \"\\n\\n\".join(p[\"text_content\"] for p in pages if p[\"text_content\"])\n\n # Fallback to chunks\n cursor = self.db.execute(\n \"SELECT content FROM chunks WHERE document_id = %s ORDER BY chunk_index\",\n (doc_id,),\n )\n chunks = cursor.fetchall()\n cursor.close()\n\n if chunks:\n return \"\\n\\n\".join(c[\"content\"] for c in chunks if c[\"content\"])\n\n return None\n\n def _get_queue_count(self):\n \"\"\"Get count of pending queue items.\"\"\"\n cursor = self.db.execute(\"SELECT COUNT(*) as cnt FROM semantic_queue WHERE status = 'pending'\")\n result = cursor.fetchone()\n cursor.close()\n return result[\"cnt\"] if result else 0\n",
"numLines": 228,
"startLine": 1,
"totalLines": 228
}
}
}