semantic_worker.py
- Pfad:
/var/www/scripts/pipeline/semantic_worker.py - Namespace: pipeline
- Zeilen: 308 | Größe: 9,890 Bytes
- Geändert: 2025-12-27 00:19:36 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 83
- Dependencies: 60 (25%)
- LOC: 64 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 9
- use argparse
- use signal
- use sys
- use time
- use datetime.datetime
- use config.SEMANTIC_QUEUE_BATCH_SIZE
- use config.SEMANTIC_USE_ANTHROPIC
- use db.db
- use step_semantic.SemanticStep
Klassen 1
-
SemanticWorkerclass Zeile 30
Funktionen 1
-
main()Zeile 213
Code
#!/usr/bin/env python3
"""
Semantic Queue Worker
Processes documents from the semantic_queue table asynchronously.
Runs as a daemon/cron job to handle Layer 4 (semantic analysis) independently.
Usage:
python semantic_worker.py # Process batch (default: 5 items)
python semantic_worker.py --limit 10 # Process 10 items
python semantic_worker.py --daemon # Run continuously
python semantic_worker.py --status # Show queue status
python semantic_worker.py --retry # Reset failed items for retry
"""
import argparse
import signal
import sys
import time
from datetime import datetime
from config import (
SEMANTIC_QUEUE_BATCH_SIZE,
SEMANTIC_USE_ANTHROPIC,
)
from db import db
from step_semantic import SemanticStep
class SemanticWorker:
"""Worker for processing semantic analysis queue."""
def __init__(self, use_anthropic: bool = False):
"""
Initialize the semantic worker.
Args:
use_anthropic: Use Anthropic API (True) or Ollama (False)
"""
self.use_anthropic = use_anthropic
self.running = True
self.processed_count = 0
self.failed_count = 0
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully."""
print(f"\n[{self._timestamp()}] Received signal {signum}, shutting down...")
self.running = False
def _timestamp(self) -> str:
"""Get current timestamp for logging."""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def process_batch(self, limit: int = 5) -> dict:
"""
Process a batch of items from the semantic queue.
Args:
limit: Maximum items to process
Returns:
dict: Processing summary
"""
semantic_step = SemanticStep(db)
result = semantic_step.process_queue(limit=limit, use_anthropic=self.use_anthropic)
self.processed_count += result.get("processed", 0)
self.failed_count += result.get("failed", 0)
return result
def run_daemon(self, batch_size: int = 5, sleep_interval: int = 30):
"""
Run as a daemon, continuously processing the queue.
Args:
batch_size: Items to process per batch
sleep_interval: Seconds to sleep when queue is empty
"""
print(f"[{self._timestamp()}] Semantic Worker started (daemon mode)")
print(f" Batch size: {batch_size}")
print(f" Sleep interval: {sleep_interval}s")
print(f" Model: {'Anthropic' if self.use_anthropic else 'Ollama'}")
print(" Press Ctrl+C to stop\n")
while self.running:
try:
result = self.process_batch(limit=batch_size)
processed = result.get("processed", 0)
failed = result.get("failed", 0)
remaining = result.get("remaining", 0)
if processed > 0 or failed > 0:
print(f"[{self._timestamp()}] Batch: {processed} processed, {failed} failed, {remaining} remaining")
# If nothing was processed and queue is empty, sleep longer
if processed == 0 and remaining == 0:
if self.running:
time.sleep(sleep_interval)
else:
# Small delay between batches to prevent overload
time.sleep(2)
except Exception as e:
print(f"[{self._timestamp()}] ERROR: {e}")
db.log("ERROR", f"Semantic worker error: {e}")
time.sleep(10) # Wait before retrying after error
print(f"\n[{self._timestamp()}] Semantic Worker stopped")
print(f" Total processed: {self.processed_count}")
print(f" Total failed: {self.failed_count}")
def get_status(self) -> dict:
"""
Get current queue status.
Returns:
dict: Queue statistics
"""
cursor = db.execute(
"""SELECT status, COUNT(*) as count
FROM semantic_queue
GROUP BY status"""
)
status_counts = {row["status"]: row["count"] for row in cursor.fetchall()}
cursor.close()
cursor = db.execute(
"""SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END) as cancelled
FROM semantic_queue"""
)
totals = cursor.fetchone()
cursor.close()
# Get document semantic status distribution
cursor = db.execute(
"""SELECT semantic_status, COUNT(*) as count
FROM documents
GROUP BY semantic_status"""
)
doc_status = {row["semantic_status"] or "null": row["count"] for row in cursor.fetchall()}
cursor.close()
return {
"queue": totals,
"queue_by_status": status_counts,
"documents": doc_status,
}
def retry_failed(self, max_retries: int = 3) -> int:
"""
Reset failed items for retry (if under max retries).
Args:
max_retries: Maximum retry attempts
Returns:
int: Number of items reset
"""
cursor = db.execute(
"""UPDATE semantic_queue
SET status = 'pending', error_message = NULL
WHERE status = 'failed' AND retry_count < %s""",
(max_retries,),
)
db.commit()
affected = cursor.rowcount
cursor.close()
if affected > 0:
db.log("INFO", f"Reset {affected} failed semantic queue items for retry")
return affected
def clear_stale(self, hours: int = 2) -> int:
"""
Reset items stuck in 'processing' state.
Args:
hours: Hours after which to consider stuck
Returns:
int: Number of items reset
"""
cursor = db.execute(
"""UPDATE semantic_queue
SET status = 'pending', started_at = NULL
WHERE status = 'processing'
AND started_at < DATE_SUB(NOW(), INTERVAL %s HOUR)""",
(hours,),
)
db.commit()
affected = cursor.rowcount
cursor.close()
if affected > 0:
db.log("INFO", f"Reset {affected} stale semantic queue items")
return affected
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Semantic Queue Worker - Process Layer 4 analysis asynchronously")
parser.add_argument(
"--limit",
type=int,
default=SEMANTIC_QUEUE_BATCH_SIZE,
help=f"Items to process per batch (default: {SEMANTIC_QUEUE_BATCH_SIZE})",
)
parser.add_argument(
"--daemon",
action="store_true",
help="Run continuously as daemon",
)
parser.add_argument(
"--sleep",
type=int,
default=30,
help="Seconds to sleep when queue is empty (default: 30)",
)
parser.add_argument(
"--status",
action="store_true",
help="Show queue status and exit",
)
parser.add_argument(
"--retry",
action="store_true",
help="Reset failed items for retry",
)
parser.add_argument(
"--clear-stale",
action="store_true",
help="Reset items stuck in processing state",
)
parser.add_argument(
"--anthropic",
action="store_true",
default=SEMANTIC_USE_ANTHROPIC,
help="Use Anthropic API instead of Ollama",
)
args = parser.parse_args()
db.connect()
try:
worker = SemanticWorker(use_anthropic=args.anthropic)
if args.status:
status = worker.get_status()
print("\n=== Semantic Queue Status ===\n")
print("Queue:")
queue = status["queue"]
print(f" Total: {queue['total'] or 0}")
print(f" Pending: {queue['pending'] or 0}")
print(f" Processing: {queue['processing'] or 0}")
print(f" Completed: {queue['completed'] or 0}")
print(f" Failed: {queue['failed'] or 0}")
print(f" Cancelled: {queue['cancelled'] or 0}")
print("\nDocument Semantic Status:")
for status_name, count in status["documents"].items():
print(f" {status_name}: {count}")
elif args.retry:
reset = worker.retry_failed()
print(f"Reset {reset} failed items for retry")
elif args.clear_stale:
reset = worker.clear_stale()
print(f"Reset {reset} stale items")
elif args.daemon:
worker.run_daemon(batch_size=args.limit, sleep_interval=args.sleep)
else:
# Single batch run
print(f"Processing up to {args.limit} items...")
result = worker.process_batch(limit=args.limit)
print(f"Processed: {result.get('processed', 0)}")
print(f"Failed: {result.get('failed', 0)}")
print(f"Remaining: {result.get('remaining', 0)}")
except Exception as e:
db.log("ERROR", f"Semantic worker error: {e}")
print(f"Error: {e}")
sys.exit(1)
finally:
db.disconnect()
if __name__ == "__main__":
main()