PipelineRunRepository.php
- Pfad:
src/Infrastructure/Persistence/PipelineRunRepository.php - Namespace: Infrastructure\Persistence
- Zeilen: 98 | Größe: 2,604 Bytes
- Geändert: 2025-12-25 16:57:55 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 100
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 3
- implements Domain\Repository\PipelineRunRepositoryInterface
- constructor PDO
- use Domain\Repository\PipelineRunRepositoryInterface
Klassen 1
-
PipelineRunRepositoryclass Zeile 11
Funktionen 6
-
__construct()public Zeile 15 -
findRuns()public Zeile 20 -
findRunById()public Zeile 30 -
findLatestRun()public Zeile 39 -
createRun()public Zeile 50 -
updateRun()public Zeile 60
Verwendet von 1
Code
<?php
declare(strict_types=1);
namespace Infrastructure\Persistence;
// @responsibility: Persistenz für Pipeline-Runs
use Domain\Repository\PipelineRunRepositoryInterface;
class PipelineRunRepository implements PipelineRunRepositoryInterface
{
private \PDO $pdo;
public function __construct(\PDO $pdo)
{
$this->pdo = $pdo;
}
public function findRuns(int $pipelineId, int $limit = 20): array
{
$stmt = $this->pdo->prepare(
'SELECT * FROM pipeline_runs WHERE pipeline_id = ? ORDER BY created_at DESC LIMIT ?'
);
$stmt->execute([$pipelineId, $limit]);
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
public function findRunById(int $runId): ?array
{
$stmt = $this->pdo->prepare('SELECT * FROM pipeline_runs WHERE id = ?');
$stmt->execute([$runId]);
$result = $stmt->fetch(\PDO::FETCH_ASSOC);
return $result !== false ? $result : null;
}
public function findLatestRun(int $pipelineId): ?array
{
$stmt = $this->pdo->prepare(
'SELECT * FROM pipeline_runs WHERE pipeline_id = ? ORDER BY created_at DESC LIMIT 1'
);
$stmt->execute([$pipelineId]);
$result = $stmt->fetch(\PDO::FETCH_ASSOC);
return $result !== false ? $result : null;
}
public function createRun(int $pipelineId): int
{
$stmt = $this->pdo->prepare(
'INSERT INTO pipeline_runs (pipeline_id, status, started_at) VALUES (?, ?, NOW())'
);
$stmt->execute([$pipelineId, 'running']);
return (int) $this->pdo->lastInsertId();
}
public function updateRun(int $runId, array $data): void
{
$allowedColumns = [
'status',
'completed_at',
'documents_total',
'documents_processed',
'documents_failed',
'chunks_created',
'embeddings_created',
'error_log',
'current_step',
'current_document',
'last_update_at',
'log_tail',
];
$sets = [];
$params = [];
foreach ($data as $column => $value) {
if (in_array($column, $allowedColumns, true)) {
$sets[] = $column . ' = ?';
$params[] = $value;
}
}
if ($sets === []) {
return;
}
$params[] = $runId;
$stmt = $this->pdo->prepare(
'UPDATE pipeline_runs SET ' . implode(', ', $sets) . ' WHERE id = ?'
);
$stmt->execute($params);
}
}