Protokoll #19417

ID19417
Zeitstempel2025-12-27 00:18:26.446363
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens5,343 (Input: 2,669, Output: 2,674)
Dauer104 ms
Request-Zeit2025-12-27 00:18:26.446363
Response-Zeit2025-12-27 00:18:26.550184

Request

{
    "event": "PreToolUse",
    "tool_name": "Write",
    "tool_input": {
        "file_path": "\/var\/www\/scripts\/pipeline\/semantic_worker.py",
        "content": "#!\/usr\/bin\/env python3\n\"\"\"\nSemantic Queue Worker\n\nProcesses documents from the semantic_queue table asynchronously.\nRuns as a daemon\/cron job to handle Layer 4 (semantic analysis) independently.\n\nUsage:\n    python semantic_worker.py              # Process batch (default: 5 items)\n    python semantic_worker.py --limit 10   # Process 10 items\n    python semantic_worker.py --daemon     # Run continuously\n    python semantic_worker.py --status     # Show queue status\n    python semantic_worker.py --retry      # Reset failed items for retry\n\"\"\"\n\nimport argparse\nimport signal\nimport sys\nimport time\nfrom datetime import datetime\n\nfrom config import (\n    SEMANTIC_QUEUE_BATCH_SIZE,\n    SEMANTIC_USE_ANTHROPIC,\n)\nfrom db import db\nfrom step_semantic import SemanticStep\n\n\nclass SemanticWorker:\n    \"\"\"Worker for processing semantic analysis queue.\"\"\"\n\n    def __init__(self, use_anthropic: bool = False):\n        \"\"\"\n        Initialize the semantic worker.\n\n        Args:\n            use_anthropic: Use Anthropic API (True) or Ollama (False)\n        \"\"\"\n        self.use_anthropic = use_anthropic\n        self.running = True\n        self.processed_count = 0\n        self.failed_count = 0\n\n        # Setup signal handlers for graceful shutdown\n        signal.signal(signal.SIGINT, self._signal_handler)\n        signal.signal(signal.SIGTERM, self._signal_handler)\n\n    def _signal_handler(self, signum, frame):\n        \"\"\"Handle shutdown signals gracefully.\"\"\"\n        print(f\"\\n[{self._timestamp()}] Received signal {signum}, shutting down...\")\n        self.running = False\n\n    def _timestamp(self) -> str:\n        \"\"\"Get current timestamp for logging.\"\"\"\n        return datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n    def process_batch(self, limit: int = 5) -> dict:\n        \"\"\"\n        Process a batch of items from the semantic queue.\n\n        Args:\n            limit: Maximum items to process\n\n        Returns:\n            dict: Processing summary\n        \"\"\"\n        semantic_step = SemanticStep(db)\n        result = semantic_step.process_queue(limit=limit, use_anthropic=self.use_anthropic)\n\n        self.processed_count += result.get(\"processed\", 0)\n        self.failed_count += result.get(\"failed\", 0)\n\n        return result\n\n    def run_daemon(self, batch_size: int = 5, sleep_interval: int = 30):\n        \"\"\"\n        Run as a daemon, continuously processing the queue.\n\n        Args:\n            batch_size: Items to process per batch\n            sleep_interval: Seconds to sleep when queue is empty\n        \"\"\"\n        print(f\"[{self._timestamp()}] Semantic Worker started (daemon mode)\")\n        print(f\"  Batch size: {batch_size}\")\n        print(f\"  Sleep interval: {sleep_interval}s\")\n        print(f\"  Model: {'Anthropic' if self.use_anthropic else 'Ollama'}\")\n        print(\"  Press Ctrl+C to stop\\n\")\n\n        while self.running:\n            try:\n                result = self.process_batch(limit=batch_size)\n\n                processed = result.get(\"processed\", 0)\n                failed = result.get(\"failed\", 0)\n                remaining = result.get(\"remaining\", 0)\n\n                if processed > 0 or failed > 0:\n                    print(\n                        f\"[{self._timestamp()}] Batch: {processed} processed, \"\n                        f\"{failed} failed, {remaining} remaining\"\n                    )\n\n                # If nothing was processed and queue is empty, sleep longer\n                if processed == 0 and remaining == 0:\n                    if self.running:\n                        time.sleep(sleep_interval)\n                else:\n                    # Small delay between batches to prevent overload\n                    time.sleep(2)\n\n            except Exception as e:\n                print(f\"[{self._timestamp()}] ERROR: {e}\")\n                db.log(\"ERROR\", f\"Semantic worker error: {e}\")\n                time.sleep(10)  # Wait before retrying after error\n\n        print(f\"\\n[{self._timestamp()}] Semantic Worker stopped\")\n        print(f\"  Total processed: {self.processed_count}\")\n        print(f\"  Total failed: {self.failed_count}\")\n\n    def get_status(self) -> dict:\n        \"\"\"\n        Get current queue status.\n\n        Returns:\n            dict: Queue statistics\n        \"\"\"\n        cursor = db.execute(\n            \"\"\"SELECT status, COUNT(*) as count\n               FROM semantic_queue\n               GROUP BY status\"\"\"\n        )\n        status_counts = {row[\"status\"]: row[\"count\"] for row in cursor.fetchall()}\n        cursor.close()\n\n        cursor = db.execute(\n            \"\"\"SELECT\n                  COUNT(*) as total,\n                  SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,\n                  SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,\n                  SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,\n                  SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,\n                  SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END) as cancelled\n               FROM semantic_queue\"\"\"\n        )\n        totals = cursor.fetchone()\n        cursor.close()\n\n        # Get document semantic status distribution\n        cursor = db.execute(\n            \"\"\"SELECT semantic_status, COUNT(*) as count\n               FROM documents\n               GROUP BY semantic_status\"\"\"\n        )\n        doc_status = {row[\"semantic_status\"] or \"null\": row[\"count\"] for row in cursor.fetchall()}\n        cursor.close()\n\n        return {\n            \"queue\": totals,\n            \"queue_by_status\": status_counts,\n            \"documents\": doc_status,\n        }\n\n    def retry_failed(self, max_retries: int = 3) -> int:\n        \"\"\"\n        Reset failed items for retry (if under max retries).\n\n        Args:\n            max_retries: Maximum retry attempts\n\n        Returns:\n            int: Number of items reset\n        \"\"\"\n        cursor = db.execute(\n            \"\"\"UPDATE semantic_queue\n               SET status = 'pending', error_message = NULL\n               WHERE status = 'failed' AND retry_count < %s\"\"\",\n            (max_retries,),\n        )\n        db.commit()\n        affected = cursor.rowcount\n        cursor.close()\n\n        if affected > 0:\n            db.log(\"INFO\", f\"Reset {affected} failed semantic queue items for retry\")\n\n        return affected\n\n    def clear_stale(self, hours: int = 2) -> int:\n        \"\"\"\n        Reset items stuck in 'processing' state.\n\n        Args:\n            hours: Hours after which to consider stuck\n\n        Returns:\n            int: Number of items reset\n        \"\"\"\n        cursor = db.execute(\n            \"\"\"UPDATE semantic_queue\n               SET status = 'pending', started_at = NULL\n               WHERE status = 'processing'\n                 AND started_at < DATE_SUB(NOW(), INTERVAL %s HOUR)\"\"\",\n            (hours,),\n        )\n        db.commit()\n        affected = cursor.rowcount\n        cursor.close()\n\n        if affected > 0:\n            db.log(\"INFO\", f\"Reset {affected} stale semantic queue items\")\n\n        return affected\n\n\ndef main():\n    \"\"\"Main entry point.\"\"\"\n    parser = argparse.ArgumentParser(\n        description=\"Semantic Queue Worker - Process Layer 4 analysis asynchronously\"\n    )\n    parser.add_argument(\n        \"--limit\",\n        type=int,\n        default=SEMANTIC_QUEUE_BATCH_SIZE,\n        help=f\"Items to process per batch (default: {SEMANTIC_QUEUE_BATCH_SIZE})\",\n    )\n    parser.add_argument(\n        \"--daemon\",\n        action=\"store_true\",\n        help=\"Run continuously as daemon\",\n    )\n    parser.add_argument(\n        \"--sleep\",\n        type=int,\n        default=30,\n        help=\"Seconds to sleep when queue is empty (default: 30)\",\n    )\n    parser.add_argument(\n        \"--status\",\n        action=\"store_true\",\n        help=\"Show queue status and exit\",\n    )\n    parser.add_argument(\n        \"--retry\",\n        action=\"store_true\",\n        help=\"Reset failed items for retry\",\n    )\n    parser.add_argument(\n        \"--clear-stale\",\n        action=\"store_true\",\n        help=\"Reset items stuck in processing state\",\n    )\n    parser.add_argument(\n        \"--anthropic\",\n        action=\"store_true\",\n        default=SEMANTIC_USE_ANTHROPIC,\n        help=\"Use Anthropic API instead of Ollama\",\n    )\n\n    args = parser.parse_args()\n\n    db.connect()\n\n    try:\n        worker = SemanticWorker(use_anthropic=args.anthropic)\n\n        if args.status:\n            status = worker.get_status()\n            print(\"\\n=== Semantic Queue Status ===\\n\")\n\n            print(\"Queue:\")\n            queue = status[\"queue\"]\n            print(f\"  Total:      {queue['total'] or 0}\")\n            print(f\"  Pending:    {queue['pending'] or 0}\")\n            print(f\"  Processing: {queue['processing'] or 0}\")\n            print(f\"  Completed:  {queue['completed'] or 0}\")\n            print(f\"  Failed:     {queue['failed'] or 0}\")\n            print(f\"  Cancelled:  {queue['cancelled'] or 0}\")\n\n            print(\"\\nDocument Semantic Status:\")\n            for status_name, count in status[\"documents\"].items():\n                print(f\"  {status_name}: {count}\")\n\n        elif args.retry:\n            reset = worker.retry_failed()\n            print(f\"Reset {reset} failed items for retry\")\n\n        elif args.clear_stale:\n            reset = worker.clear_stale()\n            print(f\"Reset {reset} stale items\")\n\n        elif args.daemon:\n            worker.run_daemon(batch_size=args.limit, sleep_interval=args.sleep)\n\n        else:\n            # Single batch run\n            print(f\"Processing up to {args.limit} items...\")\n            result = worker.process_batch(limit=args.limit)\n            print(f\"Processed: {result.get('processed', 0)}\")\n            print(f\"Failed: {result.get('failed', 0)}\")\n            print(f\"Remaining: {result.get('remaining', 0)}\")\n\n    except Exception as e:\n        db.log(\"ERROR\", f\"Semantic worker error: {e}\")\n        print(f\"Error: {e}\")\n        sys.exit(1)\n    finally:\n        db.disconnect()\n\n\nif __name__ == \"__main__\":\n    main()\n"
    }
}

Response

{
    "tool_response": {
        "type": "create",
        "filePath": "\/var\/www\/scripts\/pipeline\/semantic_worker.py",
        "content": "#!\/usr\/bin\/env python3\n\"\"\"\nSemantic Queue Worker\n\nProcesses documents from the semantic_queue table asynchronously.\nRuns as a daemon\/cron job to handle Layer 4 (semantic analysis) independently.\n\nUsage:\n    python semantic_worker.py              # Process batch (default: 5 items)\n    python semantic_worker.py --limit 10   # Process 10 items\n    python semantic_worker.py --daemon     # Run continuously\n    python semantic_worker.py --status     # Show queue status\n    python semantic_worker.py --retry      # Reset failed items for retry\n\"\"\"\n\nimport argparse\nimport signal\nimport sys\nimport time\nfrom datetime import datetime\n\nfrom config import (\n    SEMANTIC_QUEUE_BATCH_SIZE,\n    SEMANTIC_USE_ANTHROPIC,\n)\nfrom db import db\nfrom step_semantic import SemanticStep\n\n\nclass SemanticWorker:\n    \"\"\"Worker for processing semantic analysis queue.\"\"\"\n\n    def __init__(self, use_anthropic: bool = False):\n        \"\"\"\n        Initialize the semantic worker.\n\n        Args:\n            use_anthropic: Use Anthropic API (True) or Ollama (False)\n        \"\"\"\n        self.use_anthropic = use_anthropic\n        self.running = True\n        self.processed_count = 0\n        self.failed_count = 0\n\n        # Setup signal handlers for graceful shutdown\n        signal.signal(signal.SIGINT, self._signal_handler)\n        signal.signal(signal.SIGTERM, self._signal_handler)\n\n    def _signal_handler(self, signum, frame):\n        \"\"\"Handle shutdown signals gracefully.\"\"\"\n        print(f\"\\n[{self._timestamp()}] Received signal {signum}, shutting down...\")\n        self.running = False\n\n    def _timestamp(self) -> str:\n        \"\"\"Get current timestamp for logging.\"\"\"\n        return datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n    def process_batch(self, limit: int = 5) -> dict:\n        \"\"\"\n        Process a batch of items from the semantic queue.\n\n        Args:\n            limit: Maximum items to process\n\n        Returns:\n            dict: Processing summary\n        \"\"\"\n        semantic_step = SemanticStep(db)\n        result = semantic_step.process_queue(limit=limit, use_anthropic=self.use_anthropic)\n\n        self.processed_count += result.get(\"processed\", 0)\n        self.failed_count += result.get(\"failed\", 0)\n\n        return result\n\n    def run_daemon(self, batch_size: int = 5, sleep_interval: int = 30):\n        \"\"\"\n        Run as a daemon, continuously processing the queue.\n\n        Args:\n            batch_size: Items to process per batch\n            sleep_interval: Seconds to sleep when queue is empty\n        \"\"\"\n        print(f\"[{self._timestamp()}] Semantic Worker started (daemon mode)\")\n        print(f\"  Batch size: {batch_size}\")\n        print(f\"  Sleep interval: {sleep_interval}s\")\n        print(f\"  Model: {'Anthropic' if self.use_anthropic else 'Ollama'}\")\n        print(\"  Press Ctrl+C to stop\\n\")\n\n        while self.running:\n            try:\n                result = self.process_batch(limit=batch_size)\n\n                processed = result.get(\"processed\", 0)\n                failed = result.get(\"failed\", 0)\n                remaining = result.get(\"remaining\", 0)\n\n                if processed > 0 or failed > 0:\n                    print(\n                        f\"[{self._timestamp()}] Batch: {processed} processed, \"\n                        f\"{failed} failed, {remaining} remaining\"\n                    )\n\n                # If nothing was processed and queue is empty, sleep longer\n                if processed == 0 and remaining == 0:\n                    if self.running:\n                        time.sleep(sleep_interval)\n                else:\n                    # Small delay between batches to prevent overload\n                    time.sleep(2)\n\n            except Exception as e:\n                print(f\"[{self._timestamp()}] ERROR: {e}\")\n                db.log(\"ERROR\", f\"Semantic worker error: {e}\")\n                time.sleep(10)  # Wait before retrying after error\n\n        print(f\"\\n[{self._timestamp()}] Semantic Worker stopped\")\n        print(f\"  Total processed: {self.processed_count}\")\n        print(f\"  Total failed: {self.failed_count}\")\n\n    def get_status(self) -> dict:\n        \"\"\"\n        Get current queue status.\n\n        Returns:\n            dict: Queue statistics\n        \"\"\"\n        cursor = db.execute(\n            \"\"\"SELECT status, COUNT(*) as count\n               FROM semantic_queue\n               GROUP BY status\"\"\"\n        )\n        status_counts = {row[\"status\"]: row[\"count\"] for row in cursor.fetchall()}\n        cursor.close()\n\n        cursor = db.execute(\n            \"\"\"SELECT\n                  COUNT(*) as total,\n                  SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,\n                  SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,\n                  SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,\n                  SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,\n                  SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END) as cancelled\n               FROM semantic_queue\"\"\"\n        )\n        totals = cursor.fetchone()\n        cursor.close()\n\n        # Get document semantic status distribution\n        cursor = db.execute(\n            \"\"\"SELECT semantic_status, COUNT(*) as count\n               FROM documents\n               GROUP BY semantic_status\"\"\"\n        )\n        doc_status = {row[\"semantic_status\"] or \"null\": row[\"count\"] for row in cursor.fetchall()}\n        cursor.close()\n\n        return {\n            \"queue\": totals,\n            \"queue_by_status\": status_counts,\n            \"documents\": doc_status,\n        }\n\n    def retry_failed(self, max_retries: int = 3) -> int:\n        \"\"\"\n        Reset failed items for retry (if under max retries).\n\n        Args:\n            max_retries: Maximum retry attempts\n\n        Returns:\n            int: Number of items reset\n        \"\"\"\n        cursor = db.execute(\n            \"\"\"UPDATE semantic_queue\n               SET status = 'pending', error_message = NULL\n               WHERE status = 'failed' AND retry_count < %s\"\"\",\n            (max_retries,),\n        )\n        db.commit()\n        affected = cursor.rowcount\n        cursor.close()\n\n        if affected > 0:\n            db.log(\"INFO\", f\"Reset {affected} failed semantic queue items for retry\")\n\n        return affected\n\n    def clear_stale(self, hours: int = 2) -> int:\n        \"\"\"\n        Reset items stuck in 'processing' state.\n\n        Args:\n            hours: Hours after which to consider stuck\n\n        Returns:\n            int: Number of items reset\n        \"\"\"\n        cursor = db.execute(\n            \"\"\"UPDATE semantic_queue\n               SET status = 'pending', started_at = NULL\n               WHERE status = 'processing'\n                 AND started_at < DATE_SUB(NOW(), INTERVAL %s HOUR)\"\"\",\n            (hours,),\n        )\n        db.commit()\n        affected = cursor.rowcount\n        cursor.close()\n\n        if affected > 0:\n            db.log(\"INFO\", f\"Reset {affected} stale semantic queue items\")\n\n        return affected\n\n\ndef main():\n    \"\"\"Main entry point.\"\"\"\n    parser = argparse.ArgumentParser(\n        description=\"Semantic Queue Worker - Process Layer 4 analysis asynchronously\"\n    )\n    parser.add_argument(\n        \"--limit\",\n        type=int,\n        default=SEMANTIC_QUEUE_BATCH_SIZE,\n        help=f\"Items to process per batch (default: {SEMANTIC_QUEUE_BATCH_SIZE})\",\n    )\n    parser.add_argument(\n        \"--daemon\",\n        action=\"store_true\",\n        help=\"Run continuously as daemon\",\n    )\n    parser.add_argument(\n        \"--sleep\",\n        type=int,\n        default=30,\n        help=\"Seconds to sleep when queue is empty (default: 30)\",\n    )\n    parser.add_argument(\n        \"--status\",\n        action=\"store_true\",\n        help=\"Show queue status and exit\",\n    )\n    parser.add_argument(\n        \"--retry\",\n        action=\"store_true\",\n        help=\"Reset failed items for retry\",\n    )\n    parser.add_argument(\n        \"--clear-stale\",\n        action=\"store_true\",\n        help=\"Reset items stuck in processing state\",\n    )\n    parser.add_argument(\n        \"--anthropic\",\n        action=\"store_true\",\n        default=SEMANTIC_USE_ANTHROPIC,\n        help=\"Use Anthropic API instead of Ollama\",\n    )\n\n    args = parser.parse_args()\n\n    db.connect()\n\n    try:\n        worker = SemanticWorker(use_anthropic=args.anthropic)\n\n        if args.status:\n            status = worker.get_status()\n            print(\"\\n=== Semantic Queue Status ===\\n\")\n\n            print(\"Queue:\")\n            queue = status[\"queue\"]\n            print(f\"  Total:      {queue['total'] or 0}\")\n            print(f\"  Pending:    {queue['pending'] or 0}\")\n            print(f\"  Processing: {queue['processing'] or 0}\")\n            print(f\"  Completed:  {queue['completed'] or 0}\")\n            print(f\"  Failed:     {queue['failed'] or 0}\")\n            print(f\"  Cancelled:  {queue['cancelled'] or 0}\")\n\n            print(\"\\nDocument Semantic Status:\")\n            for status_name, count in status[\"documents\"].items():\n                print(f\"  {status_name}: {count}\")\n\n        elif args.retry:\n            reset = worker.retry_failed()\n            print(f\"Reset {reset} failed items for retry\")\n\n        elif args.clear_stale:\n            reset = worker.clear_stale()\n            print(f\"Reset {reset} stale items\")\n\n        elif args.daemon:\n            worker.run_daemon(batch_size=args.limit, sleep_interval=args.sleep)\n\n        else:\n            # Single batch run\n            print(f\"Processing up to {args.limit} items...\")\n            result = worker.process_batch(limit=args.limit)\n            print(f\"Processed: {result.get('processed', 0)}\")\n            print(f\"Failed: {result.get('failed', 0)}\")\n            print(f\"Remaining: {result.get('remaining', 0)}\")\n\n    except Exception as e:\n        db.log(\"ERROR\", f\"Semantic worker error: {e}\")\n        print(f\"Error: {e}\")\n        sys.exit(1)\n    finally:\n        db.disconnect()\n\n\nif __name__ == \"__main__\":\n    main()\n",
        "structuredPatch": [],
        "originalFile": null
    }
}
← Vorheriger Zur Liste Nächster →