Protokoll #25039

ID25039
Zeitstempel2025-12-28 02:00:59.295008
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens2,044 (Input: 29, Output: 2,015)
Dauer107 ms
Request-Zeit2025-12-28 02:00:59.295008
Response-Zeit2025-12-28 02:00:59.402285

Request

{
    "event": "PreToolUse",
    "tool_name": "Read",
    "tool_input": {
        "file_path": "\/var\/www\/scripts\/pipeline\/step_semantic.py"
    }
}

Response

{
    "tool_response": {
        "type": "text",
        "file": {
            "filePath": "\/var\/www\/scripts\/pipeline\/step_semantic.py",
            "content": "\"\"\"\nSemantic Analysis Step Module\nHandles Layer 4: Entity extraction, relations, taxonomy, ontology.\n\nThis step runs AFTER embedding (Layer 3) and is optional\/async.\nDocuments are searchable without semantic analysis.\n\nPart of Progressive Pipeline Architecture.\n\"\"\"\n\nfrom analyzers.document_analyzer import analyze_document\n\n\nclass SemanticStep:\n    \"\"\"Step: Semantic analysis of documents (Layer 4).\"\"\"\n\n    def __init__(self, db, progress=None):\n        \"\"\"\n        Initialize semantic step.\n\n        Args:\n            db: Database instance\n            progress: Optional PipelineProgress instance\n        \"\"\"\n        self.db = db\n        self.progress = progress\n\n    def execute(self, doc_id, text, use_anthropic=True):\n        \"\"\"\n        Execute full semantic analysis on a document.\n\n        Args:\n            doc_id: Document database ID\n            text: Full document text\n            use_anthropic: Use Anthropic API (True) or Ollama (False)\n\n        Returns:\n            dict: Analysis results with entity\/relation counts\n        \"\"\"\n        if self.progress:\n            self.progress.update_step(\"semantic\")\n            self.progress.add_log(\"Starte semantische Analyse...\")\n\n        # Update document semantic status\n        self._update_semantic_status(doc_id, \"processing\")\n\n        try:\n            # Run full analysis (entities, relations, taxonomy, ontology)\n            result = analyze_document(\n                document_id=doc_id,\n                text=text,\n                use_anthropic=use_anthropic,\n                progress=self.progress,\n            )\n\n            # Update status based on result\n            if result.get(\"entities\", 0) > 0 or result.get(\"categories\", []):\n                self._update_semantic_status(doc_id, \"complete\")\n            else:\n                self._update_semantic_status(doc_id, \"partial\")\n\n            self.db.log(\"INFO\", f\"Semantic analysis complete for doc {doc_id}: {result}\")\n\n            if self.progress:\n                self.progress.add_log(\n                    f\"Semantik: {result.get('entities', 0)} Entitäten, {result.get('relations', 0)} Relationen\"\n                )\n\n            return result\n\n        except Exception as e:\n            self._update_semantic_status(doc_id, \"error\")\n            self.db.log(\"ERROR\", f\"Semantic analysis failed for doc {doc_id}: {e}\")\n            if self.progress:\n                self.progress.add_log(f\"Semantik-Fehler: {str(e)[:50]}\")\n            raise\n\n    def queue(self, doc_id, priority=5):\n        \"\"\"\n        Queue document for async semantic analysis.\n\n        Args:\n            doc_id: Document database ID\n            priority: Queue priority (1=highest, 10=lowest)\n\n        Returns:\n            int: Queue entry ID\n        \"\"\"\n        cursor = self.db.execute(\n            \"\"\"INSERT INTO semantic_queue (document_id, priority, status, created_at)\n               VALUES (%s, %s, 'pending', NOW())\n               ON DUPLICATE KEY UPDATE\n               priority = VALUES(priority),\n               status = 'pending',\n               retry_count = 0,\n               error_message = NULL\"\"\",\n            (doc_id, priority),\n        )\n        self.db.commit()\n        queue_id = cursor.lastrowid\n        cursor.close()\n\n        self.db.log(\"INFO\", f\"Queued semantic analysis for doc {doc_id}, priority {priority}\")\n        return queue_id\n\n    def process_queue(self, limit=5, use_anthropic=True):\n        \"\"\"\n        Process pending items from semantic queue.\n\n        Args:\n            limit: Max items to process\n            use_anthropic: Use Anthropic API\n\n        Returns:\n            dict: Processing summary\n        \"\"\"\n        # Get pending items ordered by priority\n        cursor = self.db.execute(\n            \"\"\"SELECT sq.id, sq.document_id, d.source_path\n               FROM semantic_queue sq\n               JOIN documents d ON sq.document_id = d.id\n               WHERE sq.status = 'pending'\n               ORDER BY sq.priority ASC, sq.created_at ASC\n               LIMIT %s\"\"\",\n            (limit,),\n        )\n        items = cursor.fetchall()\n        cursor.close()\n\n        processed = 0\n        failed = 0\n\n        for item in items:\n            queue_id = item[\"id\"]\n            doc_id = item[\"document_id\"]\n\n            # Mark as processing\n            self._update_queue_status(queue_id, \"processing\")\n\n            try:\n                # Get document text\n                text = self._get_document_text(doc_id)\n                if not text:\n                    raise ValueError(\"No text found for document\")\n\n                # Run analysis\n                self.execute(doc_id, text, use_anthropic)\n\n                # Mark as completed\n                self._update_queue_status(queue_id, \"completed\")\n                processed += 1\n\n            except Exception as e:\n                # Mark as failed, increment retry\n                self._update_queue_status(queue_id, \"failed\", str(e))\n                failed += 1\n\n        return {\"processed\": processed, \"failed\": failed, \"remaining\": self._get_queue_count()}\n\n    def _update_semantic_status(self, doc_id, status):\n        \"\"\"Update document semantic_status.\"\"\"\n        cursor = self.db.execute(\n            \"UPDATE documents SET semantic_status = %s WHERE id = %s\",\n            (status, doc_id),\n        )\n        self.db.commit()\n        cursor.close()\n\n    def _update_queue_status(self, queue_id, status, error=None):\n        \"\"\"Update queue item status.\"\"\"\n        if status == \"processing\":\n            cursor = self.db.execute(\n                \"UPDATE semantic_queue SET status = %s, started_at = NOW() WHERE id = %s\",\n                (status, queue_id),\n            )\n        elif status == \"completed\":\n            cursor = self.db.execute(\n                \"UPDATE semantic_queue SET status = %s, completed_at = NOW() WHERE id = %s\",\n                (status, queue_id),\n            )\n        elif status == \"failed\":\n            cursor = self.db.execute(\n                \"\"\"UPDATE semantic_queue\n                   SET status = %s, error_message = %s, retry_count = retry_count + 1\n                   WHERE id = %s\"\"\",\n                (status, error, queue_id),\n            )\n        else:\n            cursor = self.db.execute(\n                \"UPDATE semantic_queue SET status = %s WHERE id = %s\",\n                (status, queue_id),\n            )\n        self.db.commit()\n        cursor.close()\n\n    def _get_document_text(self, doc_id):\n        \"\"\"Get full document text from pages or chunks.\"\"\"\n        # Try pages first\n        cursor = self.db.execute(\n            \"SELECT text_content FROM document_pages WHERE document_id = %s ORDER BY page_number\",\n            (doc_id,),\n        )\n        pages = cursor.fetchall()\n        cursor.close()\n\n        if pages:\n            return \"\\n\\n\".join(p[\"text_content\"] for p in pages if p[\"text_content\"])\n\n        # Fallback to chunks\n        cursor = self.db.execute(\n            \"SELECT content FROM chunks WHERE document_id = %s ORDER BY chunk_index\",\n            (doc_id,),\n        )\n        chunks = cursor.fetchall()\n        cursor.close()\n\n        if chunks:\n            return \"\\n\\n\".join(c[\"content\"] for c in chunks if c[\"content\"])\n\n        return None\n\n    def _get_queue_count(self):\n        \"\"\"Get count of pending queue items.\"\"\"\n        cursor = self.db.execute(\"SELECT COUNT(*) as cnt FROM semantic_queue WHERE status = 'pending'\")\n        result = cursor.fetchone()\n        cursor.close()\n        return result[\"cnt\"] if result else 0\n",
            "numLines": 228,
            "startLine": 1,
            "totalLines": 228
        }
    }
}
← Vorheriger Zur Liste Nächster →