{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/semantic_worker.py",
"content": "#!\/usr\/bin\/env python3\n\"\"\"\nSemantic Queue Worker\n\nProcesses documents from the semantic_queue table asynchronously.\nRuns as a daemon\/cron job to handle Layer 4 (semantic analysis) independently.\n\nUsage:\n python semantic_worker.py # Process batch (default: 5 items)\n python semantic_worker.py --limit 10 # Process 10 items\n python semantic_worker.py --daemon # Run continuously\n python semantic_worker.py --status # Show queue status\n python semantic_worker.py --retry # Reset failed items for retry\n\"\"\"\n\nimport argparse\nimport signal\nimport sys\nimport time\nfrom datetime import datetime\n\nfrom config import (\n SEMANTIC_QUEUE_BATCH_SIZE,\n SEMANTIC_USE_ANTHROPIC,\n)\nfrom db import db\nfrom step_semantic import SemanticStep\n\n\nclass SemanticWorker:\n \"\"\"Worker for processing semantic analysis queue.\"\"\"\n\n def __init__(self, use_anthropic: bool = False):\n \"\"\"\n Initialize the semantic worker.\n\n Args:\n use_anthropic: Use Anthropic API (True) or Ollama (False)\n \"\"\"\n self.use_anthropic = use_anthropic\n self.running = True\n self.processed_count = 0\n self.failed_count = 0\n\n # Setup signal handlers for graceful shutdown\n signal.signal(signal.SIGINT, self._signal_handler)\n signal.signal(signal.SIGTERM, self._signal_handler)\n\n def _signal_handler(self, signum, frame):\n \"\"\"Handle shutdown signals gracefully.\"\"\"\n print(f\"\\n[{self._timestamp()}] Received signal {signum}, shutting down...\")\n self.running = False\n\n def _timestamp(self) -> str:\n \"\"\"Get current timestamp for logging.\"\"\"\n return datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n def process_batch(self, limit: int = 5) -> dict:\n \"\"\"\n Process a batch of items from the semantic queue.\n\n Args:\n limit: Maximum items to process\n\n Returns:\n dict: Processing summary\n \"\"\"\n semantic_step = SemanticStep(db)\n result = semantic_step.process_queue(limit=limit, use_anthropic=self.use_anthropic)\n\n self.processed_count += result.get(\"processed\", 0)\n self.failed_count += result.get(\"failed\", 0)\n\n return result\n\n def run_daemon(self, batch_size: int = 5, sleep_interval: int = 30):\n \"\"\"\n Run as a daemon, continuously processing the queue.\n\n Args:\n batch_size: Items to process per batch\n sleep_interval: Seconds to sleep when queue is empty\n \"\"\"\n print(f\"[{self._timestamp()}] Semantic Worker started (daemon mode)\")\n print(f\" Batch size: {batch_size}\")\n print(f\" Sleep interval: {sleep_interval}s\")\n print(f\" Model: {'Anthropic' if self.use_anthropic else 'Ollama'}\")\n print(\" Press Ctrl+C to stop\\n\")\n\n while self.running:\n try:\n result = self.process_batch(limit=batch_size)\n\n processed = result.get(\"processed\", 0)\n failed = result.get(\"failed\", 0)\n remaining = result.get(\"remaining\", 0)\n\n if processed > 0 or failed > 0:\n print(\n f\"[{self._timestamp()}] Batch: {processed} processed, \"\n f\"{failed} failed, {remaining} remaining\"\n )\n\n # If nothing was processed and queue is empty, sleep longer\n if processed == 0 and remaining == 0:\n if self.running:\n time.sleep(sleep_interval)\n else:\n # Small delay between batches to prevent overload\n time.sleep(2)\n\n except Exception as e:\n print(f\"[{self._timestamp()}] ERROR: {e}\")\n db.log(\"ERROR\", f\"Semantic worker error: {e}\")\n time.sleep(10) # Wait before retrying after error\n\n print(f\"\\n[{self._timestamp()}] Semantic Worker stopped\")\n print(f\" Total processed: {self.processed_count}\")\n print(f\" Total failed: {self.failed_count}\")\n\n def get_status(self) -> dict:\n \"\"\"\n Get current queue status.\n\n Returns:\n dict: Queue statistics\n \"\"\"\n cursor = db.execute(\n \"\"\"SELECT status, COUNT(*) as count\n FROM semantic_queue\n GROUP BY status\"\"\"\n )\n status_counts = {row[\"status\"]: row[\"count\"] for row in cursor.fetchall()}\n cursor.close()\n\n cursor = db.execute(\n \"\"\"SELECT\n COUNT(*) as total,\n SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,\n SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,\n SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,\n SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,\n SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END) as cancelled\n FROM semantic_queue\"\"\"\n )\n totals = cursor.fetchone()\n cursor.close()\n\n # Get document semantic status distribution\n cursor = db.execute(\n \"\"\"SELECT semantic_status, COUNT(*) as count\n FROM documents\n GROUP BY semantic_status\"\"\"\n )\n doc_status = {row[\"semantic_status\"] or \"null\": row[\"count\"] for row in cursor.fetchall()}\n cursor.close()\n\n return {\n \"queue\": totals,\n \"queue_by_status\": status_counts,\n \"documents\": doc_status,\n }\n\n def retry_failed(self, max_retries: int = 3) -> int:\n \"\"\"\n Reset failed items for retry (if under max retries).\n\n Args:\n max_retries: Maximum retry attempts\n\n Returns:\n int: Number of items reset\n \"\"\"\n cursor = db.execute(\n \"\"\"UPDATE semantic_queue\n SET status = 'pending', error_message = NULL\n WHERE status = 'failed' AND retry_count < %s\"\"\",\n (max_retries,),\n )\n db.commit()\n affected = cursor.rowcount\n cursor.close()\n\n if affected > 0:\n db.log(\"INFO\", f\"Reset {affected} failed semantic queue items for retry\")\n\n return affected\n\n def clear_stale(self, hours: int = 2) -> int:\n \"\"\"\n Reset items stuck in 'processing' state.\n\n Args:\n hours: Hours after which to consider stuck\n\n Returns:\n int: Number of items reset\n \"\"\"\n cursor = db.execute(\n \"\"\"UPDATE semantic_queue\n SET status = 'pending', started_at = NULL\n WHERE status = 'processing'\n AND started_at < DATE_SUB(NOW(), INTERVAL %s HOUR)\"\"\",\n (hours,),\n )\n db.commit()\n affected = cursor.rowcount\n cursor.close()\n\n if affected > 0:\n db.log(\"INFO\", f\"Reset {affected} stale semantic queue items\")\n\n return affected\n\n\ndef main():\n \"\"\"Main entry point.\"\"\"\n parser = argparse.ArgumentParser(\n description=\"Semantic Queue Worker - Process Layer 4 analysis asynchronously\"\n )\n parser.add_argument(\n \"--limit\",\n type=int,\n default=SEMANTIC_QUEUE_BATCH_SIZE,\n help=f\"Items to process per batch (default: {SEMANTIC_QUEUE_BATCH_SIZE})\",\n )\n parser.add_argument(\n \"--daemon\",\n action=\"store_true\",\n help=\"Run continuously as daemon\",\n )\n parser.add_argument(\n \"--sleep\",\n type=int,\n default=30,\n help=\"Seconds to sleep when queue is empty (default: 30)\",\n )\n parser.add_argument(\n \"--status\",\n action=\"store_true\",\n help=\"Show queue status and exit\",\n )\n parser.add_argument(\n \"--retry\",\n action=\"store_true\",\n help=\"Reset failed items for retry\",\n )\n parser.add_argument(\n \"--clear-stale\",\n action=\"store_true\",\n help=\"Reset items stuck in processing state\",\n )\n parser.add_argument(\n \"--anthropic\",\n action=\"store_true\",\n default=SEMANTIC_USE_ANTHROPIC,\n help=\"Use Anthropic API instead of Ollama\",\n )\n\n args = parser.parse_args()\n\n db.connect()\n\n try:\n worker = SemanticWorker(use_anthropic=args.anthropic)\n\n if args.status:\n status = worker.get_status()\n print(\"\\n=== Semantic Queue Status ===\\n\")\n\n print(\"Queue:\")\n queue = status[\"queue\"]\n print(f\" Total: {queue['total'] or 0}\")\n print(f\" Pending: {queue['pending'] or 0}\")\n print(f\" Processing: {queue['processing'] or 0}\")\n print(f\" Completed: {queue['completed'] or 0}\")\n print(f\" Failed: {queue['failed'] or 0}\")\n print(f\" Cancelled: {queue['cancelled'] or 0}\")\n\n print(\"\\nDocument Semantic Status:\")\n for status_name, count in status[\"documents\"].items():\n print(f\" {status_name}: {count}\")\n\n elif args.retry:\n reset = worker.retry_failed()\n print(f\"Reset {reset} failed items for retry\")\n\n elif args.clear_stale:\n reset = worker.clear_stale()\n print(f\"Reset {reset} stale items\")\n\n elif args.daemon:\n worker.run_daemon(batch_size=args.limit, sleep_interval=args.sleep)\n\n else:\n # Single batch run\n print(f\"Processing up to {args.limit} items...\")\n result = worker.process_batch(limit=args.limit)\n print(f\"Processed: {result.get('processed', 0)}\")\n print(f\"Failed: {result.get('failed', 0)}\")\n print(f\"Remaining: {result.get('remaining', 0)}\")\n\n except Exception as e:\n db.log(\"ERROR\", f\"Semantic worker error: {e}\")\n print(f\"Error: {e}\")\n sys.exit(1)\n finally:\n db.disconnect()\n\n\nif __name__ == \"__main__\":\n main()\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/scripts\/pipeline\/semantic_worker.py",
"content": "#!\/usr\/bin\/env python3\n\"\"\"\nSemantic Queue Worker\n\nProcesses documents from the semantic_queue table asynchronously.\nRuns as a daemon\/cron job to handle Layer 4 (semantic analysis) independently.\n\nUsage:\n python semantic_worker.py # Process batch (default: 5 items)\n python semantic_worker.py --limit 10 # Process 10 items\n python semantic_worker.py --daemon # Run continuously\n python semantic_worker.py --status # Show queue status\n python semantic_worker.py --retry # Reset failed items for retry\n\"\"\"\n\nimport argparse\nimport signal\nimport sys\nimport time\nfrom datetime import datetime\n\nfrom config import (\n SEMANTIC_QUEUE_BATCH_SIZE,\n SEMANTIC_USE_ANTHROPIC,\n)\nfrom db import db\nfrom step_semantic import SemanticStep\n\n\nclass SemanticWorker:\n \"\"\"Worker for processing semantic analysis queue.\"\"\"\n\n def __init__(self, use_anthropic: bool = False):\n \"\"\"\n Initialize the semantic worker.\n\n Args:\n use_anthropic: Use Anthropic API (True) or Ollama (False)\n \"\"\"\n self.use_anthropic = use_anthropic\n self.running = True\n self.processed_count = 0\n self.failed_count = 0\n\n # Setup signal handlers for graceful shutdown\n signal.signal(signal.SIGINT, self._signal_handler)\n signal.signal(signal.SIGTERM, self._signal_handler)\n\n def _signal_handler(self, signum, frame):\n \"\"\"Handle shutdown signals gracefully.\"\"\"\n print(f\"\\n[{self._timestamp()}] Received signal {signum}, shutting down...\")\n self.running = False\n\n def _timestamp(self) -> str:\n \"\"\"Get current timestamp for logging.\"\"\"\n return datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n\n def process_batch(self, limit: int = 5) -> dict:\n \"\"\"\n Process a batch of items from the semantic queue.\n\n Args:\n limit: Maximum items to process\n\n Returns:\n dict: Processing summary\n \"\"\"\n semantic_step = SemanticStep(db)\n result = semantic_step.process_queue(limit=limit, use_anthropic=self.use_anthropic)\n\n self.processed_count += result.get(\"processed\", 0)\n self.failed_count += result.get(\"failed\", 0)\n\n return result\n\n def run_daemon(self, batch_size: int = 5, sleep_interval: int = 30):\n \"\"\"\n Run as a daemon, continuously processing the queue.\n\n Args:\n batch_size: Items to process per batch\n sleep_interval: Seconds to sleep when queue is empty\n \"\"\"\n print(f\"[{self._timestamp()}] Semantic Worker started (daemon mode)\")\n print(f\" Batch size: {batch_size}\")\n print(f\" Sleep interval: {sleep_interval}s\")\n print(f\" Model: {'Anthropic' if self.use_anthropic else 'Ollama'}\")\n print(\" Press Ctrl+C to stop\\n\")\n\n while self.running:\n try:\n result = self.process_batch(limit=batch_size)\n\n processed = result.get(\"processed\", 0)\n failed = result.get(\"failed\", 0)\n remaining = result.get(\"remaining\", 0)\n\n if processed > 0 or failed > 0:\n print(\n f\"[{self._timestamp()}] Batch: {processed} processed, \"\n f\"{failed} failed, {remaining} remaining\"\n )\n\n # If nothing was processed and queue is empty, sleep longer\n if processed == 0 and remaining == 0:\n if self.running:\n time.sleep(sleep_interval)\n else:\n # Small delay between batches to prevent overload\n time.sleep(2)\n\n except Exception as e:\n print(f\"[{self._timestamp()}] ERROR: {e}\")\n db.log(\"ERROR\", f\"Semantic worker error: {e}\")\n time.sleep(10) # Wait before retrying after error\n\n print(f\"\\n[{self._timestamp()}] Semantic Worker stopped\")\n print(f\" Total processed: {self.processed_count}\")\n print(f\" Total failed: {self.failed_count}\")\n\n def get_status(self) -> dict:\n \"\"\"\n Get current queue status.\n\n Returns:\n dict: Queue statistics\n \"\"\"\n cursor = db.execute(\n \"\"\"SELECT status, COUNT(*) as count\n FROM semantic_queue\n GROUP BY status\"\"\"\n )\n status_counts = {row[\"status\"]: row[\"count\"] for row in cursor.fetchall()}\n cursor.close()\n\n cursor = db.execute(\n \"\"\"SELECT\n COUNT(*) as total,\n SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,\n SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,\n SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,\n SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,\n SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END) as cancelled\n FROM semantic_queue\"\"\"\n )\n totals = cursor.fetchone()\n cursor.close()\n\n # Get document semantic status distribution\n cursor = db.execute(\n \"\"\"SELECT semantic_status, COUNT(*) as count\n FROM documents\n GROUP BY semantic_status\"\"\"\n )\n doc_status = {row[\"semantic_status\"] or \"null\": row[\"count\"] for row in cursor.fetchall()}\n cursor.close()\n\n return {\n \"queue\": totals,\n \"queue_by_status\": status_counts,\n \"documents\": doc_status,\n }\n\n def retry_failed(self, max_retries: int = 3) -> int:\n \"\"\"\n Reset failed items for retry (if under max retries).\n\n Args:\n max_retries: Maximum retry attempts\n\n Returns:\n int: Number of items reset\n \"\"\"\n cursor = db.execute(\n \"\"\"UPDATE semantic_queue\n SET status = 'pending', error_message = NULL\n WHERE status = 'failed' AND retry_count < %s\"\"\",\n (max_retries,),\n )\n db.commit()\n affected = cursor.rowcount\n cursor.close()\n\n if affected > 0:\n db.log(\"INFO\", f\"Reset {affected} failed semantic queue items for retry\")\n\n return affected\n\n def clear_stale(self, hours: int = 2) -> int:\n \"\"\"\n Reset items stuck in 'processing' state.\n\n Args:\n hours: Hours after which to consider stuck\n\n Returns:\n int: Number of items reset\n \"\"\"\n cursor = db.execute(\n \"\"\"UPDATE semantic_queue\n SET status = 'pending', started_at = NULL\n WHERE status = 'processing'\n AND started_at < DATE_SUB(NOW(), INTERVAL %s HOUR)\"\"\",\n (hours,),\n )\n db.commit()\n affected = cursor.rowcount\n cursor.close()\n\n if affected > 0:\n db.log(\"INFO\", f\"Reset {affected} stale semantic queue items\")\n\n return affected\n\n\ndef main():\n \"\"\"Main entry point.\"\"\"\n parser = argparse.ArgumentParser(\n description=\"Semantic Queue Worker - Process Layer 4 analysis asynchronously\"\n )\n parser.add_argument(\n \"--limit\",\n type=int,\n default=SEMANTIC_QUEUE_BATCH_SIZE,\n help=f\"Items to process per batch (default: {SEMANTIC_QUEUE_BATCH_SIZE})\",\n )\n parser.add_argument(\n \"--daemon\",\n action=\"store_true\",\n help=\"Run continuously as daemon\",\n )\n parser.add_argument(\n \"--sleep\",\n type=int,\n default=30,\n help=\"Seconds to sleep when queue is empty (default: 30)\",\n )\n parser.add_argument(\n \"--status\",\n action=\"store_true\",\n help=\"Show queue status and exit\",\n )\n parser.add_argument(\n \"--retry\",\n action=\"store_true\",\n help=\"Reset failed items for retry\",\n )\n parser.add_argument(\n \"--clear-stale\",\n action=\"store_true\",\n help=\"Reset items stuck in processing state\",\n )\n parser.add_argument(\n \"--anthropic\",\n action=\"store_true\",\n default=SEMANTIC_USE_ANTHROPIC,\n help=\"Use Anthropic API instead of Ollama\",\n )\n\n args = parser.parse_args()\n\n db.connect()\n\n try:\n worker = SemanticWorker(use_anthropic=args.anthropic)\n\n if args.status:\n status = worker.get_status()\n print(\"\\n=== Semantic Queue Status ===\\n\")\n\n print(\"Queue:\")\n queue = status[\"queue\"]\n print(f\" Total: {queue['total'] or 0}\")\n print(f\" Pending: {queue['pending'] or 0}\")\n print(f\" Processing: {queue['processing'] or 0}\")\n print(f\" Completed: {queue['completed'] or 0}\")\n print(f\" Failed: {queue['failed'] or 0}\")\n print(f\" Cancelled: {queue['cancelled'] or 0}\")\n\n print(\"\\nDocument Semantic Status:\")\n for status_name, count in status[\"documents\"].items():\n print(f\" {status_name}: {count}\")\n\n elif args.retry:\n reset = worker.retry_failed()\n print(f\"Reset {reset} failed items for retry\")\n\n elif args.clear_stale:\n reset = worker.clear_stale()\n print(f\"Reset {reset} stale items\")\n\n elif args.daemon:\n worker.run_daemon(batch_size=args.limit, sleep_interval=args.sleep)\n\n else:\n # Single batch run\n print(f\"Processing up to {args.limit} items...\")\n result = worker.process_batch(limit=args.limit)\n print(f\"Processed: {result.get('processed', 0)}\")\n print(f\"Failed: {result.get('failed', 0)}\")\n print(f\"Remaining: {result.get('remaining', 0)}\")\n\n except Exception as e:\n db.log(\"ERROR\", f\"Semantic worker error: {e}\")\n print(f\"Error: {e}\")\n sys.exit(1)\n finally:\n db.disconnect()\n\n\nif __name__ == \"__main__\":\n main()\n",
"structuredPatch": [],
"originalFile": null
}
}