Backup #1182

ID1182
Dateipfad/var/www/dev.campus.systemische-tools.de/src/Infrastructure/Persistence/PipelineRepository.php
Version7
Typ modified
Größe10.9 KB
Hash133bc724fa2e285ebbd4fd74ed8229172bd7b7b1e1606829cfcc2ddecc6b31c8
Datum2025-12-25 10:32:37
Geändert vonclaude-code-hook
GrundClaude Code Pre-Hook Backup vor Edit-Operation
Datei existiert Ja

Dateiinhalt

<?php

declare(strict_types=1);

namespace Infrastructure\Persistence;

// @responsibility: Persistenz für Pipeline-Konfigurationen und Runs

use Domain\Repository\PipelineRepositoryInterface;
use Infrastructure\Config\DatabaseFactory;

class PipelineRepository implements PipelineRepositoryInterface
{
    private \PDO $pdo;

    public function __construct(?\PDO $pdo = null)
    {
        $this->pdo = $pdo ?? DatabaseFactory::content();
    }

    public function findById(int $id): ?array
    {
        $stmt = $this->pdo->prepare('SELECT * FROM pipeline_configs WHERE id = ?');
        $stmt->execute([$id]);
        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        if ($result === false) {
            return null;
        }

        $result['extensions'] = json_decode($result['extensions'] ?? '[]', true);
        $result['steps'] = $this->findSteps($id);

        return $result;
    }

    public function findByName(string $name): ?array
    {
        $stmt = $this->pdo->prepare('SELECT * FROM pipeline_configs WHERE name = ?');
        $stmt->execute([$name]);
        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        if ($result === false) {
            return null;
        }

        $result['extensions'] = json_decode($result['extensions'] ?? '[]', true);
        $result['steps'] = $this->findSteps((int) $result['id']);

        return $result;
    }

    public function findDefault(): ?array
    {
        $stmt = $this->pdo->query('SELECT * FROM pipeline_configs WHERE is_default = 1 LIMIT 1');
        if ($stmt === false) {
            return null;
        }
        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        if ($result === false) {
            return null;
        }

        $result['extensions'] = json_decode($result['extensions'] ?? '[]', true);
        $result['steps'] = $this->findSteps((int) $result['id']);

        return $result;
    }

    public function findAll(int $limit = 50): array
    {
        $stmt = $this->pdo->query(
            'SELECT p.*,
                    (SELECT COUNT(*) FROM pipeline_steps WHERE pipeline_id = p.id AND enabled = 1) as active_steps,
                    (SELECT COUNT(*) FROM pipeline_runs WHERE pipeline_id = p.id) as total_runs,
                    (SELECT MAX(created_at) FROM pipeline_runs WHERE pipeline_id = p.id) as last_run_at
             FROM pipeline_configs p
             ORDER BY p.is_default DESC, p.name ASC
             LIMIT ' . $limit
        );

        if ($stmt === false) {
            return [];
        }

        $results = $stmt->fetchAll(\PDO::FETCH_ASSOC);

        foreach ($results as &$row) {
            $row['extensions'] = json_decode($row['extensions'] ?? '[]', true);
        }

        return $results;
    }

    public function findSteps(int $pipelineId): array
    {
        $stmt = $this->pdo->prepare(
            'SELECT * FROM pipeline_steps WHERE pipeline_id = ? ORDER BY sort_order ASC'
        );
        $stmt->execute([$pipelineId]);
        $results = $stmt->fetchAll(\PDO::FETCH_ASSOC);

        foreach ($results as &$row) {
            $row['config'] = json_decode($row['config'] ?? '{}', true);
        }

        return $results;
    }

    public function create(array $data): int
    {
        $extensions = is_array($data['extensions'] ?? null)
            ? json_encode($data['extensions'])
            : ($data['extensions'] ?? '[]');

        $stmt = $this->pdo->prepare(
            'INSERT INTO pipeline_configs (name, description, is_default, source_path, extensions)
             VALUES (?, ?, ?, ?, ?)'
        );
        $stmt->execute([
            $data['name'],
            $data['description'] ?? null,
            $data['is_default'] ?? 0,
            $data['source_path'] ?? '/var/www/nextcloud/data/root/files/Documents',
            $extensions,
        ]);

        return (int) $this->pdo->lastInsertId();
    }

    public function update(int $id, array $data): void
    {
        $sets = [];
        $params = [];

        if (isset($data['name'])) {
            $sets[] = 'name = ?';
            $params[] = $data['name'];
        }
        if (isset($data['description'])) {
            $sets[] = 'description = ?';
            $params[] = $data['description'];
        }
        if (isset($data['is_default'])) {
            $sets[] = 'is_default = ?';
            $params[] = $data['is_default'];
        }
        if (isset($data['source_path'])) {
            $sets[] = 'source_path = ?';
            $params[] = $data['source_path'];
        }
        if (isset($data['extensions'])) {
            $sets[] = 'extensions = ?';
            $params[] = is_array($data['extensions'])
                ? json_encode($data['extensions'])
                : $data['extensions'];
        }

        if ($sets === []) {
            return;
        }

        $params[] = $id;
        $stmt = $this->pdo->prepare(
            'UPDATE pipeline_configs SET ' . implode(', ', $sets) . ' WHERE id = ?'
        );
        $stmt->execute($params);
    }

    public function delete(int $id): void
    {
        $stmt = $this->pdo->prepare('DELETE FROM pipeline_configs WHERE id = ?');
        $stmt->execute([$id]);
    }

    public function addStep(int $pipelineId, array $stepData): int
    {
        $config = is_array($stepData['config'] ?? null)
            ? json_encode($stepData['config'])
            : ($stepData['config'] ?? '{}');

        $stmt = $this->pdo->prepare(
            'INSERT INTO pipeline_steps (pipeline_id, step_type, config, sort_order, enabled)
             VALUES (?, ?, ?, ?, ?)'
        );
        $stmt->execute([
            $pipelineId,
            $stepData['step_type'],
            $config,
            $stepData['sort_order'] ?? 0,
            $stepData['enabled'] ?? 1,
        ]);

        return (int) $this->pdo->lastInsertId();
    }

    public function updateStep(int $stepId, array $stepData): void
    {
        $sets = [];
        $params = [];

        if (isset($stepData['step_type'])) {
            $sets[] = 'step_type = ?';
            $params[] = $stepData['step_type'];
        }
        if (isset($stepData['config'])) {
            $sets[] = 'config = ?';
            $params[] = is_array($stepData['config'])
                ? json_encode($stepData['config'])
                : $stepData['config'];
        }
        if (isset($stepData['sort_order'])) {
            $sets[] = 'sort_order = ?';
            $params[] = $stepData['sort_order'];
        }
        if (isset($stepData['enabled'])) {
            $sets[] = 'enabled = ?';
            $params[] = $stepData['enabled'];
        }

        if ($sets === []) {
            return;
        }

        $params[] = $stepId;
        $stmt = $this->pdo->prepare(
            'UPDATE pipeline_steps SET ' . implode(', ', $sets) . ' WHERE id = ?'
        );
        $stmt->execute($params);
    }

    public function deleteStep(int $stepId): void
    {
        $stmt = $this->pdo->prepare('DELETE FROM pipeline_steps WHERE id = ?');
        $stmt->execute([$stepId]);
    }

    /**
     * @param array<int> $stepIds
     */
    public function reorderSteps(int $pipelineId, array $stepIds): void
    {
        $stmt = $this->pdo->prepare(
            'UPDATE pipeline_steps SET sort_order = ? WHERE id = ? AND pipeline_id = ?'
        );

        foreach ($stepIds as $order => $stepId) {
            $stmt->execute([$order + 1, $stepId, $pipelineId]);
        }
    }

    public function findRuns(int $pipelineId, int $limit = 20): array
    {
        $stmt = $this->pdo->prepare(
            'SELECT * FROM pipeline_runs WHERE pipeline_id = ? ORDER BY created_at DESC LIMIT ?'
        );
        $stmt->execute([$pipelineId, $limit]);

        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }

    public function findRunById(int $runId): ?array
    {
        $stmt = $this->pdo->prepare('SELECT * FROM pipeline_runs WHERE id = ?');
        $stmt->execute([$runId]);
        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        return $result !== false ? $result : null;
    }

    public function findLatestRun(int $pipelineId): ?array
    {
        $stmt = $this->pdo->prepare(
            'SELECT * FROM pipeline_runs WHERE pipeline_id = ? ORDER BY created_at DESC LIMIT 1'
        );
        $stmt->execute([$pipelineId]);
        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        return $result !== false ? $result : null;
    }

    public function createRun(int $pipelineId): int
    {
        $stmt = $this->pdo->prepare(
            'INSERT INTO pipeline_runs (pipeline_id, status, started_at) VALUES (?, ?, NOW())'
        );
        $stmt->execute([$pipelineId, 'running']);

        return (int) $this->pdo->lastInsertId();
    }

    public function updateRun(int $runId, array $data): void
    {
        $allowedColumns = [
            'status',
            'completed_at',
            'documents_total',
            'documents_processed',
            'documents_failed',
            'chunks_created',
            'embeddings_created',
            'error_log',
            'current_step',
            'current_document',
            'last_update_at',
            'log_tail',
        ];

        $sets = [];
        $params = [];

        foreach ($data as $column => $value) {
            if (in_array($column, $allowedColumns, true)) {
                $sets[] = $column . ' = ?';
                $params[] = $value;
            }
        }

        if ($sets === []) {
            return;
        }

        $params[] = $runId;
        $stmt = $this->pdo->prepare(
            'UPDATE pipeline_runs SET ' . implode(', ', $sets) . ' WHERE id = ?'
        );
        $stmt->execute($params);
    }

    public function getStatistics(): array
    {
        $stmt = $this->pdo->query(
            'SELECT
                (SELECT COUNT(*) FROM pipeline_configs) as pipelines,
                (SELECT COUNT(*) FROM pipeline_runs) as runs_total,
                (SELECT COUNT(*) FROM pipeline_runs WHERE status = "completed") as runs_completed,
                (SELECT COUNT(*) FROM pipeline_runs WHERE status = "failed") as runs_failed,
                (SELECT COALESCE(SUM(documents_processed), 0) FROM pipeline_runs) as documents_processed,
                (SELECT COALESCE(SUM(chunks_created), 0) FROM pipeline_runs) as chunks_created'
        );

        if ($stmt === false) {
            return [
                'pipelines' => 0,
                'runs_total' => 0,
                'runs_completed' => 0,
                'runs_failed' => 0,
                'documents_processed' => 0,
                'chunks_created' => 0,
            ];
        }

        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        return [
            'pipelines' => (int) ($result['pipelines'] ?? 0),
            'runs_total' => (int) ($result['runs_total'] ?? 0),
            'runs_completed' => (int) ($result['runs_completed'] ?? 0),
            'runs_failed' => (int) ($result['runs_failed'] ?? 0),
            'documents_processed' => (int) ($result['documents_processed'] ?? 0),
            'chunks_created' => (int) ($result['chunks_created'] ?? 0),
        ];
    }
}

Vollständig herunterladen

Aktionen

Herunterladen

Andere Versionen dieser Datei

ID Version Typ Größe Datum
1199 8 modified 10.8 KB 2025-12-25 10:34
1182 7 modified 10.9 KB 2025-12-25 10:32
770 6 modified 10.8 KB 2025-12-23 08:04
571 5 modified 11.3 KB 2025-12-23 04:08
417 4 modified 11.0 KB 2025-12-22 09:23
415 3 modified 10.9 KB 2025-12-22 09:23
414 2 modified 10.8 KB 2025-12-22 09:23
413 1 modified 10.8 KB 2025-12-22 09:23

← Zurück zur Übersicht