semantic_worker.py

Code Hygiene Score: 83

Keine Issues gefunden.

Dependencies 9

Klassen 1

Funktionen 1

Code

#!/usr/bin/env python3
"""
Semantic Queue Worker

Processes documents from the semantic_queue table asynchronously.
Runs as a daemon/cron job to handle Layer 4 (semantic analysis) independently.

Usage:
    python semantic_worker.py              # Process batch (default: 5 items)
    python semantic_worker.py --limit 10   # Process 10 items
    python semantic_worker.py --daemon     # Run continuously
    python semantic_worker.py --status     # Show queue status
    python semantic_worker.py --retry      # Reset failed items for retry
"""

import argparse
import signal
import sys
import time
from datetime import datetime

from config import (
    SEMANTIC_QUEUE_BATCH_SIZE,
    SEMANTIC_USE_ANTHROPIC,
)
from db import db
from step_semantic import SemanticStep


class SemanticWorker:
    """Worker for processing semantic analysis queue."""

    def __init__(self, use_anthropic: bool = False):
        """
        Initialize the semantic worker.

        Args:
            use_anthropic: Use Anthropic API (True) or Ollama (False)
        """
        self.use_anthropic = use_anthropic
        self.running = True
        self.processed_count = 0
        self.failed_count = 0

        # Setup signal handlers for graceful shutdown
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)

    def _signal_handler(self, signum, frame):
        """Handle shutdown signals gracefully."""
        print(f"\n[{self._timestamp()}] Received signal {signum}, shutting down...")
        self.running = False

    def _timestamp(self) -> str:
        """Get current timestamp for logging."""
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def process_batch(self, limit: int = 5) -> dict:
        """
        Process a batch of items from the semantic queue.

        Args:
            limit: Maximum items to process

        Returns:
            dict: Processing summary
        """
        semantic_step = SemanticStep(db)
        result = semantic_step.process_queue(limit=limit, use_anthropic=self.use_anthropic)

        self.processed_count += result.get("processed", 0)
        self.failed_count += result.get("failed", 0)

        return result

    def run_daemon(self, batch_size: int = 5, sleep_interval: int = 30):
        """
        Run as a daemon, continuously processing the queue.

        Args:
            batch_size: Items to process per batch
            sleep_interval: Seconds to sleep when queue is empty
        """
        print(f"[{self._timestamp()}] Semantic Worker started (daemon mode)")
        print(f"  Batch size: {batch_size}")
        print(f"  Sleep interval: {sleep_interval}s")
        print(f"  Model: {'Anthropic' if self.use_anthropic else 'Ollama'}")
        print("  Press Ctrl+C to stop\n")

        while self.running:
            try:
                result = self.process_batch(limit=batch_size)

                processed = result.get("processed", 0)
                failed = result.get("failed", 0)
                remaining = result.get("remaining", 0)

                if processed > 0 or failed > 0:
                    print(f"[{self._timestamp()}] Batch: {processed} processed, {failed} failed, {remaining} remaining")

                # If nothing was processed and queue is empty, sleep longer
                if processed == 0 and remaining == 0:
                    if self.running:
                        time.sleep(sleep_interval)
                else:
                    # Small delay between batches to prevent overload
                    time.sleep(2)

            except Exception as e:
                print(f"[{self._timestamp()}] ERROR: {e}")
                db.log("ERROR", f"Semantic worker error: {e}")
                time.sleep(10)  # Wait before retrying after error

        print(f"\n[{self._timestamp()}] Semantic Worker stopped")
        print(f"  Total processed: {self.processed_count}")
        print(f"  Total failed: {self.failed_count}")

    def get_status(self) -> dict:
        """
        Get current queue status.

        Returns:
            dict: Queue statistics
        """
        cursor = db.execute(
            """SELECT status, COUNT(*) as count
               FROM semantic_queue
               GROUP BY status"""
        )
        status_counts = {row["status"]: row["count"] for row in cursor.fetchall()}
        cursor.close()

        cursor = db.execute(
            """SELECT
                  COUNT(*) as total,
                  SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
                  SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,
                  SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
                  SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
                  SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END) as cancelled
               FROM semantic_queue"""
        )
        totals = cursor.fetchone()
        cursor.close()

        # Get document semantic status distribution
        cursor = db.execute(
            """SELECT semantic_status, COUNT(*) as count
               FROM documents
               GROUP BY semantic_status"""
        )
        doc_status = {row["semantic_status"] or "null": row["count"] for row in cursor.fetchall()}
        cursor.close()

        return {
            "queue": totals,
            "queue_by_status": status_counts,
            "documents": doc_status,
        }

    def retry_failed(self, max_retries: int = 3) -> int:
        """
        Reset failed items for retry (if under max retries).

        Args:
            max_retries: Maximum retry attempts

        Returns:
            int: Number of items reset
        """
        cursor = db.execute(
            """UPDATE semantic_queue
               SET status = 'pending', error_message = NULL
               WHERE status = 'failed' AND retry_count < %s""",
            (max_retries,),
        )
        db.commit()
        affected = cursor.rowcount
        cursor.close()

        if affected > 0:
            db.log("INFO", f"Reset {affected} failed semantic queue items for retry")

        return affected

    def clear_stale(self, hours: int = 2) -> int:
        """
        Reset items stuck in 'processing' state.

        Args:
            hours: Hours after which to consider stuck

        Returns:
            int: Number of items reset
        """
        cursor = db.execute(
            """UPDATE semantic_queue
               SET status = 'pending', started_at = NULL
               WHERE status = 'processing'
                 AND started_at < DATE_SUB(NOW(), INTERVAL %s HOUR)""",
            (hours,),
        )
        db.commit()
        affected = cursor.rowcount
        cursor.close()

        if affected > 0:
            db.log("INFO", f"Reset {affected} stale semantic queue items")

        return affected


def main():
    """Main entry point."""
    parser = argparse.ArgumentParser(description="Semantic Queue Worker - Process Layer 4 analysis asynchronously")
    parser.add_argument(
        "--limit",
        type=int,
        default=SEMANTIC_QUEUE_BATCH_SIZE,
        help=f"Items to process per batch (default: {SEMANTIC_QUEUE_BATCH_SIZE})",
    )
    parser.add_argument(
        "--daemon",
        action="store_true",
        help="Run continuously as daemon",
    )
    parser.add_argument(
        "--sleep",
        type=int,
        default=30,
        help="Seconds to sleep when queue is empty (default: 30)",
    )
    parser.add_argument(
        "--status",
        action="store_true",
        help="Show queue status and exit",
    )
    parser.add_argument(
        "--retry",
        action="store_true",
        help="Reset failed items for retry",
    )
    parser.add_argument(
        "--clear-stale",
        action="store_true",
        help="Reset items stuck in processing state",
    )
    parser.add_argument(
        "--anthropic",
        action="store_true",
        default=SEMANTIC_USE_ANTHROPIC,
        help="Use Anthropic API instead of Ollama",
    )

    args = parser.parse_args()

    db.connect()

    try:
        worker = SemanticWorker(use_anthropic=args.anthropic)

        if args.status:
            status = worker.get_status()
            print("\n=== Semantic Queue Status ===\n")

            print("Queue:")
            queue = status["queue"]
            print(f"  Total:      {queue['total'] or 0}")
            print(f"  Pending:    {queue['pending'] or 0}")
            print(f"  Processing: {queue['processing'] or 0}")
            print(f"  Completed:  {queue['completed'] or 0}")
            print(f"  Failed:     {queue['failed'] or 0}")
            print(f"  Cancelled:  {queue['cancelled'] or 0}")

            print("\nDocument Semantic Status:")
            for status_name, count in status["documents"].items():
                print(f"  {status_name}: {count}")

        elif args.retry:
            reset = worker.retry_failed()
            print(f"Reset {reset} failed items for retry")

        elif args.clear_stale:
            reset = worker.clear_stale()
            print(f"Reset {reset} stale items")

        elif args.daemon:
            worker.run_daemon(batch_size=args.limit, sleep_interval=args.sleep)

        else:
            # Single batch run
            print(f"Processing up to {args.limit} items...")
            result = worker.process_batch(limit=args.limit)
            print(f"Processed: {result.get('processed', 0)}")
            print(f"Failed: {result.get('failed', 0)}")
            print(f"Remaining: {result.get('remaining', 0)}")

    except Exception as e:
        db.log("ERROR", f"Semantic worker error: {e}")
        print(f"Error: {e}")
        sys.exit(1)
    finally:
        db.disconnect()


if __name__ == "__main__":
    main()
← Übersicht Graph