db_queue.py
- Pfad:
/var/www/scripts/pipeline/db_queue.py - Namespace: pipeline
- Zeilen: 87 | Größe: 2,547 Bytes
- Geändert: 2025-12-28 08:58:37 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 100
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Klassen 1
-
QueueMixinclass Zeile 8
Code
"""
Database Queue Mixin
Single Responsibility: Pipeline queue operations.
"""
class QueueMixin:
"""Mixin for pipeline queue operations.
Provides:
- add_to_queue(): Add item to pipeline queue
- get_pending_queue_items(): Get pending items
- update_queue_status(): Update item status
"""
def add_to_queue(self, file_path: str, action: str = "process") -> int:
"""Add item to pipeline queue.
Args:
file_path: File path to process
action: Action type (process, reprocess, etc.)
Returns:
Queue item ID
"""
cursor = self.execute(
"""INSERT INTO pipeline_queue
(file_path, action, status, retry_count, created_at)
VALUES (%s, %s, 'pending', 0, NOW())""",
(file_path, action),
)
self.commit()
queue_id = cursor.lastrowid
cursor.close()
return queue_id
def get_pending_queue_items(self, limit: int = 10) -> list:
"""Get pending items from queue.
Args:
limit: Maximum number of items to return
Returns:
List of queue item dicts with id, file_path, action, retry_count
"""
cursor = self.execute(
"""SELECT id, file_path, action, retry_count
FROM pipeline_queue
WHERE status = 'pending'
ORDER BY created_at
LIMIT %s""",
(limit,),
)
results = cursor.fetchall()
cursor.close()
return results
def update_queue_status(
self,
queue_id: int,
status: str,
error_message: str = None,
):
"""Update queue item status.
Args:
queue_id: Queue item ID
status: New status (pending, processing, completed, failed)
error_message: Optional error message (increments retry_count on error)
"""
if status == "error" and error_message:
cursor = self.execute(
"""UPDATE pipeline_queue
SET status = %s, error_message = %s,
retry_count = retry_count + 1, updated_at = NOW()
WHERE id = %s""",
(status, error_message, queue_id),
)
else:
cursor = self.execute(
"UPDATE pipeline_queue SET status = %s, updated_at = NOW() WHERE id = %s",
(status, queue_id),
)
self.commit()
cursor.close()