step_semantic.py

Code Hygiene Score: 98

Keine Issues gefunden.

Dependencies 1

Klassen 1

Code

"""
Semantic Analysis Step Module
Handles Layer 4: Entity extraction, relations, taxonomy, ontology.

This step runs AFTER embedding (Layer 3) and is optional/async.
Documents are searchable without semantic analysis.

Part of Progressive Pipeline Architecture.
"""

from analyzers.document_analyzer import analyze_document


class SemanticStep:
    """Step: Semantic analysis of documents (Layer 4)."""

    def __init__(self, db, progress=None):
        """
        Initialize semantic step.

        Args:
            db: Database instance
            progress: Optional PipelineProgress instance
        """
        self.db = db
        self.progress = progress

    def execute(self, doc_id, text, use_anthropic=True):
        """
        Execute full semantic analysis on a document.

        Args:
            doc_id: Document database ID
            text: Full document text
            use_anthropic: Use Anthropic API (True) or Ollama (False)

        Returns:
            dict: Analysis results with entity/relation counts
        """
        if self.progress:
            self.progress.update_step("semantic")
            self.progress.add_log("Starte semantische Analyse...")

        # Update document semantic status
        self._update_semantic_status(doc_id, "processing")

        try:
            # Run full analysis (entities, relations, taxonomy, ontology)
            result = analyze_document(
                document_id=doc_id,
                text=text,
                use_anthropic=use_anthropic,
                progress=self.progress,
            )

            # Update status based on result
            if result.get("entities", 0) > 0 or result.get("categories", []):
                self._update_semantic_status(doc_id, "complete")
            else:
                self._update_semantic_status(doc_id, "partial")

            self.db.log("INFO", f"Semantic analysis complete for doc {doc_id}: {result}")

            if self.progress:
                self.progress.add_log(
                    f"Semantik: {result.get('entities', 0)} Entitäten, {result.get('relations', 0)} Relationen"
                )

            return result

        except Exception as e:
            self._update_semantic_status(doc_id, "error")
            self.db.log("ERROR", f"Semantic analysis failed for doc {doc_id}: {e}")
            if self.progress:
                self.progress.add_log(f"Semantik-Fehler: {str(e)[:50]}")
            raise

    def queue(self, doc_id, priority=5):
        """
        Queue document for async semantic analysis.

        Args:
            doc_id: Document database ID
            priority: Queue priority (1=highest, 10=lowest)

        Returns:
            int: Queue entry ID
        """
        cursor = self.db.execute(
            """INSERT INTO semantic_queue (document_id, priority, status, created_at)
               VALUES (%s, %s, 'pending', NOW())
               ON DUPLICATE KEY UPDATE
               priority = VALUES(priority),
               status = 'pending',
               retry_count = 0,
               error_message = NULL""",
            (doc_id, priority),
        )
        self.db.commit()
        queue_id = cursor.lastrowid
        cursor.close()

        self.db.log("INFO", f"Queued semantic analysis for doc {doc_id}, priority {priority}")
        return queue_id

    def process_queue(self, limit=5, use_anthropic=True):
        """
        Process pending items from semantic queue.

        Args:
            limit: Max items to process
            use_anthropic: Use Anthropic API

        Returns:
            dict: Processing summary
        """
        # Get pending items ordered by priority
        cursor = self.db.execute(
            """SELECT sq.id, sq.document_id, d.source_path
               FROM semantic_queue sq
               JOIN documents d ON sq.document_id = d.id
               WHERE sq.status = 'pending'
               ORDER BY sq.priority ASC, sq.created_at ASC
               LIMIT %s""",
            (limit,),
        )
        items = cursor.fetchall()
        cursor.close()

        processed = 0
        failed = 0

        for item in items:
            queue_id = item["id"]
            doc_id = item["document_id"]

            # Mark as processing
            self._update_queue_status(queue_id, "processing")

            try:
                # Get document text
                text = self._get_document_text(doc_id)
                if not text:
                    raise ValueError("No text found for document")

                # Run analysis
                self.execute(doc_id, text, use_anthropic)

                # Mark as completed
                self._update_queue_status(queue_id, "completed")
                processed += 1

            except Exception as e:
                # Mark as failed, increment retry
                self._update_queue_status(queue_id, "failed", str(e))
                failed += 1

        return {"processed": processed, "failed": failed, "remaining": self._get_queue_count()}

    def _update_semantic_status(self, doc_id, status):
        """Update document semantic_status."""
        cursor = self.db.execute(
            "UPDATE documents SET semantic_status = %s WHERE id = %s",
            (status, doc_id),
        )
        self.db.commit()
        cursor.close()

    def _update_queue_status(self, queue_id, status, error=None):
        """Update queue item status."""
        if status == "processing":
            cursor = self.db.execute(
                "UPDATE semantic_queue SET status = %s, started_at = NOW() WHERE id = %s",
                (status, queue_id),
            )
        elif status == "completed":
            cursor = self.db.execute(
                "UPDATE semantic_queue SET status = %s, completed_at = NOW() WHERE id = %s",
                (status, queue_id),
            )
        elif status == "failed":
            cursor = self.db.execute(
                """UPDATE semantic_queue
                   SET status = %s, error_message = %s, retry_count = retry_count + 1
                   WHERE id = %s""",
                (status, error, queue_id),
            )
        else:
            cursor = self.db.execute(
                "UPDATE semantic_queue SET status = %s WHERE id = %s",
                (status, queue_id),
            )
        self.db.commit()
        cursor.close()

    def _get_document_text(self, doc_id):
        """Get full document text from pages or chunks."""
        # Try pages first
        cursor = self.db.execute(
            "SELECT text_content FROM document_pages WHERE document_id = %s ORDER BY page_number",
            (doc_id,),
        )
        pages = cursor.fetchall()
        cursor.close()

        if pages:
            return "\n\n".join(p["text_content"] for p in pages if p["text_content"])

        # Fallback to chunks
        cursor = self.db.execute(
            "SELECT content FROM chunks WHERE document_id = %s ORDER BY chunk_index",
            (doc_id,),
        )
        chunks = cursor.fetchall()
        cursor.close()

        if chunks:
            return "\n\n".join(c["content"] for c in chunks if c["content"])

        return None

    def _get_queue_count(self):
        """Get count of pending queue items."""
        cursor = self.db.execute("SELECT COUNT(*) as cnt FROM semantic_queue WHERE status = 'pending'")
        result = cursor.fetchone()
        cursor.close()
        return result["cnt"] if result else 0
← Übersicht Graph