PipelineRunRepository.php

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 3

Klassen 1

Funktionen 6

Verwendet von 1

Code

<?php

declare(strict_types=1);

namespace Infrastructure\Persistence;

// @responsibility: Persistenz für Pipeline-Runs

use Domain\Repository\PipelineRunRepositoryInterface;

class PipelineRunRepository implements PipelineRunRepositoryInterface
{
    private \PDO $pdo;

    public function __construct(\PDO $pdo)
    {
        $this->pdo = $pdo;
    }

    public function findRuns(int $pipelineId, int $limit = 20): array
    {
        $stmt = $this->pdo->prepare(
            'SELECT * FROM pipeline_runs WHERE pipeline_id = ? ORDER BY created_at DESC LIMIT ?'
        );
        $stmt->execute([$pipelineId, $limit]);

        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }

    public function findRunById(int $runId): ?array
    {
        $stmt = $this->pdo->prepare('SELECT * FROM pipeline_runs WHERE id = ?');
        $stmt->execute([$runId]);
        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        return $result !== false ? $result : null;
    }

    public function findLatestRun(int $pipelineId): ?array
    {
        $stmt = $this->pdo->prepare(
            'SELECT * FROM pipeline_runs WHERE pipeline_id = ? ORDER BY created_at DESC LIMIT 1'
        );
        $stmt->execute([$pipelineId]);
        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        return $result !== false ? $result : null;
    }

    public function createRun(int $pipelineId): int
    {
        $stmt = $this->pdo->prepare(
            'INSERT INTO pipeline_runs (pipeline_id, status, started_at) VALUES (?, ?, NOW())'
        );
        $stmt->execute([$pipelineId, 'running']);

        return (int) $this->pdo->lastInsertId();
    }

    public function updateRun(int $runId, array $data): void
    {
        $allowedColumns = [
            'status',
            'completed_at',
            'documents_total',
            'documents_processed',
            'documents_failed',
            'chunks_created',
            'embeddings_created',
            'error_log',
            'current_step',
            'current_document',
            'last_update_at',
            'log_tail',
        ];

        $sets = [];
        $params = [];

        foreach ($data as $column => $value) {
            if (in_array($column, $allowedColumns, true)) {
                $sets[] = $column . ' = ?';
                $params[] = $value;
            }
        }

        if ($sets === []) {
            return;
        }

        $params[] = $runId;
        $stmt = $this->pdo->prepare(
            'UPDATE pipeline_runs SET ' . implode(', ', $sets) . ' WHERE id = ?'
        );
        $stmt->execute($params);
    }
}
← Übersicht Graph