db_queue.py

Code Hygiene Score: 100

Keine Issues gefunden.

Klassen 1

Code

"""
Database Queue Mixin

Single Responsibility: Pipeline queue operations.
"""


class QueueMixin:
    """Mixin for pipeline queue operations.

    Provides:
    - add_to_queue(): Add item to pipeline queue
    - get_pending_queue_items(): Get pending items
    - update_queue_status(): Update item status
    """

    def add_to_queue(self, file_path: str, action: str = "process") -> int:
        """Add item to pipeline queue.

        Args:
            file_path: File path to process
            action: Action type (process, reprocess, etc.)

        Returns:
            Queue item ID
        """
        cursor = self.execute(
            """INSERT INTO pipeline_queue
               (file_path, action, status, retry_count, created_at)
               VALUES (%s, %s, 'pending', 0, NOW())""",
            (file_path, action),
        )
        self.commit()
        queue_id = cursor.lastrowid
        cursor.close()
        return queue_id

    def get_pending_queue_items(self, limit: int = 10) -> list:
        """Get pending items from queue.

        Args:
            limit: Maximum number of items to return

        Returns:
            List of queue item dicts with id, file_path, action, retry_count
        """
        cursor = self.execute(
            """SELECT id, file_path, action, retry_count
               FROM pipeline_queue
               WHERE status = 'pending'
               ORDER BY created_at
               LIMIT %s""",
            (limit,),
        )
        results = cursor.fetchall()
        cursor.close()
        return results

    def update_queue_status(
        self,
        queue_id: int,
        status: str,
        error_message: str = None,
    ):
        """Update queue item status.

        Args:
            queue_id: Queue item ID
            status: New status (pending, processing, completed, failed)
            error_message: Optional error message (increments retry_count on error)
        """
        if status == "error" and error_message:
            cursor = self.execute(
                """UPDATE pipeline_queue
                   SET status = %s, error_message = %s,
                       retry_count = retry_count + 1, updated_at = NOW()
                   WHERE id = %s""",
                (status, error_message, queue_id),
            )
        else:
            cursor = self.execute(
                "UPDATE pipeline_queue SET status = %s, updated_at = NOW() WHERE id = %s",
                (status, queue_id),
            )
        self.commit()
        cursor.close()
← Übersicht Graph