Backup #414
| ID | 414 |
| Dateipfad | /var/www/dev.campus.systemische-tools.de/src/Infrastructure/Persistence/PipelineRepository.php |
| Version | 2 |
| Typ |
modified |
| Größe | 10.8 KB |
| Hash | 2e7fbd449ef19e6b6eff98d5c5f579c05fc9b9ae98329efd8423a2d93849f832 |
| Datum | 2025-12-22 09:23:21 |
| Geändert von | claude-code-hook |
| Grund | Claude Code Pre-Hook Backup vor Edit-Operation |
| Datei existiert |
Ja
|
Dateiinhalt
<?php
namespace Infrastructure\Persistence;
use Domain\Repository\PipelineRepositoryInterface;
use Infrastructure\Config\DatabaseFactory;
/**
* Repository for Pipeline operations.
*/
class PipelineRepository implements PipelineRepositoryInterface
{
private \PDO $pdo;
public function __construct(?\PDO $pdo = null)
{
$this->pdo = $pdo ?? DatabaseFactory::content();
}
public function findById(int $id): ?array
{
$stmt = $this->pdo->prepare('SELECT * FROM pipeline_configs WHERE id = ?');
$stmt->execute([$id]);
$result = $stmt->fetch(\PDO::FETCH_ASSOC);
if ($result === false) {
return null;
}
$result['extensions'] = json_decode($result['extensions'] ?? '[]', true);
$result['steps'] = $this->findSteps($id);
return $result;
}
public function findByName(string $name): ?array
{
$stmt = $this->pdo->prepare('SELECT * FROM pipeline_configs WHERE name = ?');
$stmt->execute([$name]);
$result = $stmt->fetch(\PDO::FETCH_ASSOC);
if ($result === false) {
return null;
}
$result['extensions'] = json_decode($result['extensions'] ?? '[]', true);
$result['steps'] = $this->findSteps((int) $result['id']);
return $result;
}
public function findDefault(): ?array
{
$stmt = $this->pdo->query('SELECT * FROM pipeline_configs WHERE is_default = 1 LIMIT 1');
if ($stmt === false) {
return null;
}
$result = $stmt->fetch(\PDO::FETCH_ASSOC);
if ($result === false) {
return null;
}
$result['extensions'] = json_decode($result['extensions'] ?? '[]', true);
$result['steps'] = $this->findSteps((int) $result['id']);
return $result;
}
public function findAll(int $limit = 50): array
{
$stmt = $this->pdo->query(
'SELECT p.*,
(SELECT COUNT(*) FROM pipeline_steps WHERE pipeline_id = p.id AND enabled = 1) as active_steps,
(SELECT COUNT(*) FROM pipeline_runs WHERE pipeline_id = p.id) as total_runs,
(SELECT MAX(created_at) FROM pipeline_runs WHERE pipeline_id = p.id) as last_run_at
FROM pipeline_configs p
ORDER BY p.is_default DESC, p.name ASC
LIMIT ' . $limit
);
$results = $stmt->fetchAll(\PDO::FETCH_ASSOC);
foreach ($results as &$row) {
$row['extensions'] = json_decode($row['extensions'] ?? '[]', true);
}
return $results;
}
public function findSteps(int $pipelineId): array
{
$stmt = $this->pdo->prepare(
'SELECT * FROM pipeline_steps WHERE pipeline_id = ? ORDER BY sort_order ASC'
);
$stmt->execute([$pipelineId]);
$results = $stmt->fetchAll(\PDO::FETCH_ASSOC);
foreach ($results as &$row) {
$row['config'] = json_decode($row['config'] ?? '{}', true);
}
return $results;
}
public function create(array $data): int
{
$extensions = is_array($data['extensions'] ?? null)
? json_encode($data['extensions'])
: ($data['extensions'] ?? '[]');
$stmt = $this->pdo->prepare(
'INSERT INTO pipeline_configs (name, description, is_default, source_path, extensions)
VALUES (?, ?, ?, ?, ?)'
);
$stmt->execute([
$data['name'],
$data['description'] ?? null,
$data['is_default'] ?? 0,
$data['source_path'] ?? '/var/www/nextcloud/data/root/files/Documents',
$extensions,
]);
return (int) $this->pdo->lastInsertId();
}
public function update(int $id, array $data): void
{
$sets = [];
$params = [];
if (isset($data['name'])) {
$sets[] = 'name = ?';
$params[] = $data['name'];
}
if (isset($data['description'])) {
$sets[] = 'description = ?';
$params[] = $data['description'];
}
if (isset($data['is_default'])) {
$sets[] = 'is_default = ?';
$params[] = $data['is_default'];
}
if (isset($data['source_path'])) {
$sets[] = 'source_path = ?';
$params[] = $data['source_path'];
}
if (isset($data['extensions'])) {
$sets[] = 'extensions = ?';
$params[] = is_array($data['extensions'])
? json_encode($data['extensions'])
: $data['extensions'];
}
if ($sets === []) {
return;
}
$params[] = $id;
$stmt = $this->pdo->prepare(
'UPDATE pipeline_configs SET ' . implode(', ', $sets) . ' WHERE id = ?'
);
$stmt->execute($params);
}
public function delete(int $id): void
{
$stmt = $this->pdo->prepare('DELETE FROM pipeline_configs WHERE id = ?');
$stmt->execute([$id]);
}
public function addStep(int $pipelineId, array $stepData): int
{
$config = is_array($stepData['config'] ?? null)
? json_encode($stepData['config'])
: ($stepData['config'] ?? '{}');
$stmt = $this->pdo->prepare(
'INSERT INTO pipeline_steps (pipeline_id, step_type, config, sort_order, enabled)
VALUES (?, ?, ?, ?, ?)'
);
$stmt->execute([
$pipelineId,
$stepData['step_type'],
$config,
$stepData['sort_order'] ?? 0,
$stepData['enabled'] ?? 1,
]);
return (int) $this->pdo->lastInsertId();
}
public function updateStep(int $stepId, array $stepData): void
{
$sets = [];
$params = [];
if (isset($stepData['step_type'])) {
$sets[] = 'step_type = ?';
$params[] = $stepData['step_type'];
}
if (isset($stepData['config'])) {
$sets[] = 'config = ?';
$params[] = is_array($stepData['config'])
? json_encode($stepData['config'])
: $stepData['config'];
}
if (isset($stepData['sort_order'])) {
$sets[] = 'sort_order = ?';
$params[] = $stepData['sort_order'];
}
if (isset($stepData['enabled'])) {
$sets[] = 'enabled = ?';
$params[] = $stepData['enabled'];
}
if ($sets === []) {
return;
}
$params[] = $stepId;
$stmt = $this->pdo->prepare(
'UPDATE pipeline_steps SET ' . implode(', ', $sets) . ' WHERE id = ?'
);
$stmt->execute($params);
}
public function deleteStep(int $stepId): void
{
$stmt = $this->pdo->prepare('DELETE FROM pipeline_steps WHERE id = ?');
$stmt->execute([$stepId]);
}
public function reorderSteps(int $pipelineId, array $stepIds): void
{
$stmt = $this->pdo->prepare(
'UPDATE pipeline_steps SET sort_order = ? WHERE id = ? AND pipeline_id = ?'
);
foreach ($stepIds as $order => $stepId) {
$stmt->execute([$order + 1, $stepId, $pipelineId]);
}
}
public function findRuns(int $pipelineId, int $limit = 20): array
{
$stmt = $this->pdo->prepare(
'SELECT * FROM pipeline_runs WHERE pipeline_id = ? ORDER BY created_at DESC LIMIT ?'
);
$stmt->execute([$pipelineId, $limit]);
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
public function findRunById(int $runId): ?array
{
$stmt = $this->pdo->prepare('SELECT * FROM pipeline_runs WHERE id = ?');
$stmt->execute([$runId]);
$result = $stmt->fetch(\PDO::FETCH_ASSOC);
return $result !== false ? $result : null;
}
public function findLatestRun(int $pipelineId): ?array
{
$stmt = $this->pdo->prepare(
'SELECT * FROM pipeline_runs WHERE pipeline_id = ? ORDER BY created_at DESC LIMIT 1'
);
$stmt->execute([$pipelineId]);
$result = $stmt->fetch(\PDO::FETCH_ASSOC);
return $result !== false ? $result : null;
}
public function createRun(int $pipelineId): int
{
$stmt = $this->pdo->prepare(
'INSERT INTO pipeline_runs (pipeline_id, status, started_at) VALUES (?, ?, NOW())'
);
$stmt->execute([$pipelineId, 'running']);
return (int) $this->pdo->lastInsertId();
}
public function updateRun(int $runId, array $data): void
{
$sets = [];
$params = [];
if (isset($data['status'])) {
$sets[] = 'status = ?';
$params[] = $data['status'];
}
if (isset($data['completed_at'])) {
$sets[] = 'completed_at = ?';
$params[] = $data['completed_at'];
}
if (isset($data['documents_total'])) {
$sets[] = 'documents_total = ?';
$params[] = $data['documents_total'];
}
if (isset($data['documents_processed'])) {
$sets[] = 'documents_processed = ?';
$params[] = $data['documents_processed'];
}
if (isset($data['documents_failed'])) {
$sets[] = 'documents_failed = ?';
$params[] = $data['documents_failed'];
}
if (isset($data['chunks_created'])) {
$sets[] = 'chunks_created = ?';
$params[] = $data['chunks_created'];
}
if (isset($data['error_log'])) {
$sets[] = 'error_log = ?';
$params[] = $data['error_log'];
}
if ($sets === []) {
return;
}
$params[] = $runId;
$stmt = $this->pdo->prepare(
'UPDATE pipeline_runs SET ' . implode(', ', $sets) . ' WHERE id = ?'
);
$stmt->execute($params);
}
public function getStatistics(): array
{
$stmt = $this->pdo->query(
'SELECT
(SELECT COUNT(*) FROM pipeline_configs) as pipelines,
(SELECT COUNT(*) FROM pipeline_runs) as runs_total,
(SELECT COUNT(*) FROM pipeline_runs WHERE status = "completed") as runs_completed,
(SELECT COUNT(*) FROM pipeline_runs WHERE status = "failed") as runs_failed,
(SELECT COALESCE(SUM(documents_processed), 0) FROM pipeline_runs) as documents_processed,
(SELECT COALESCE(SUM(chunks_created), 0) FROM pipeline_runs) as chunks_created'
);
$result = $stmt->fetch(\PDO::FETCH_ASSOC);
return [
'pipelines' => (int) ($result['pipelines'] ?? 0),
'runs_total' => (int) ($result['runs_total'] ?? 0),
'runs_completed' => (int) ($result['runs_completed'] ?? 0),
'runs_failed' => (int) ($result['runs_failed'] ?? 0),
'documents_processed' => (int) ($result['documents_processed'] ?? 0),
'chunks_created' => (int) ($result['chunks_created'] ?? 0),
];
}
}
Vollständig herunterladen
Aktionen
Andere Versionen dieser Datei
| ID |
Version |
Typ |
Größe |
Datum |
| 1199 |
8 |
modified |
10.8 KB |
2025-12-25 10:34 |
| 1182 |
7 |
modified |
10.9 KB |
2025-12-25 10:32 |
| 770 |
6 |
modified |
10.8 KB |
2025-12-23 08:04 |
| 571 |
5 |
modified |
11.3 KB |
2025-12-23 04:08 |
| 417 |
4 |
modified |
11.0 KB |
2025-12-22 09:23 |
| 415 |
3 |
modified |
10.9 KB |
2025-12-22 09:23 |
| 414 |
2 |
modified |
10.8 KB |
2025-12-22 09:23 |
| 413 |
1 |
modified |
10.8 KB |
2025-12-22 09:23 |
← Zurück zur Übersicht