PipelineConfigRepository.php

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 3

Klassen 1

Funktionen 9

Verwendet von 1

Code

<?php

declare(strict_types=1);

namespace Infrastructure\Persistence;

// @responsibility: Persistenz für Pipeline-Konfigurationen

use Domain\Repository\PipelineConfigRepositoryInterface;

class PipelineConfigRepository implements PipelineConfigRepositoryInterface
{
    private \PDO $pdo;

    public function __construct(\PDO $pdo)
    {
        $this->pdo = $pdo;
    }

    public function findById(int $id): ?array
    {
        $stmt = $this->pdo->prepare('SELECT * FROM pipeline_configs WHERE id = ?');
        $stmt->execute([$id]);
        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        if ($result === false) {
            return null;
        }

        $result['extensions'] = json_decode($result['extensions'] ?? '[]', true);

        return $result;
    }

    public function findByName(string $name): ?array
    {
        $stmt = $this->pdo->prepare('SELECT * FROM pipeline_configs WHERE name = ?');
        $stmt->execute([$name]);
        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        if ($result === false) {
            return null;
        }

        $result['extensions'] = json_decode($result['extensions'] ?? '[]', true);

        return $result;
    }

    public function findDefault(): ?array
    {
        $stmt = $this->pdo->query('SELECT * FROM pipeline_configs WHERE is_default = 1 LIMIT 1');
        if ($stmt === false) {
            return null;
        }
        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        if ($result === false) {
            return null;
        }

        $result['extensions'] = json_decode($result['extensions'] ?? '[]', true);

        return $result;
    }

    public function findAll(int $limit = 50): array
    {
        $stmt = $this->pdo->query(
            'SELECT p.*,
                    (SELECT COUNT(*) FROM pipeline_steps WHERE pipeline_id = p.id AND enabled = 1) as active_steps,
                    (SELECT COUNT(*) FROM pipeline_runs WHERE pipeline_id = p.id) as total_runs,
                    (SELECT MAX(created_at) FROM pipeline_runs WHERE pipeline_id = p.id) as last_run_at
             FROM pipeline_configs p
             ORDER BY p.is_default DESC, p.name ASC
             LIMIT ' . $limit
        );

        if ($stmt === false) {
            return [];
        }

        $results = $stmt->fetchAll(\PDO::FETCH_ASSOC);

        foreach ($results as &$row) {
            $row['extensions'] = json_decode($row['extensions'] ?? '[]', true);
        }

        return $results;
    }

    public function create(array $data): int
    {
        $extensions = is_array($data['extensions'] ?? null)
            ? json_encode($data['extensions'])
            : ($data['extensions'] ?? '[]');

        $stmt = $this->pdo->prepare(
            'INSERT INTO pipeline_configs (name, description, is_default, source_path, extensions)
             VALUES (?, ?, ?, ?, ?)'
        );
        $stmt->execute([
            $data['name'],
            $data['description'] ?? null,
            $data['is_default'] ?? 0,
            $data['source_path'] ?? '/var/www/nextcloud/data/root/files/Documents',
            $extensions,
        ]);

        return (int) $this->pdo->lastInsertId();
    }

    public function update(int $id, array $data): void
    {
        $sets = [];
        $params = [];

        if (isset($data['name'])) {
            $sets[] = 'name = ?';
            $params[] = $data['name'];
        }
        if (isset($data['description'])) {
            $sets[] = 'description = ?';
            $params[] = $data['description'];
        }
        if (isset($data['is_default'])) {
            $sets[] = 'is_default = ?';
            $params[] = $data['is_default'];
        }
        if (isset($data['source_path'])) {
            $sets[] = 'source_path = ?';
            $params[] = $data['source_path'];
        }
        if (isset($data['extensions'])) {
            $sets[] = 'extensions = ?';
            $params[] = is_array($data['extensions'])
                ? json_encode($data['extensions'])
                : $data['extensions'];
        }

        if ($sets === []) {
            return;
        }

        $params[] = $id;
        $stmt = $this->pdo->prepare(
            'UPDATE pipeline_configs SET ' . implode(', ', $sets) . ' WHERE id = ?'
        );
        $stmt->execute($params);
    }

    public function delete(int $id): void
    {
        $stmt = $this->pdo->prepare('DELETE FROM pipeline_configs WHERE id = ?');
        $stmt->execute([$id]);
    }

    public function getStatistics(): array
    {
        $stmt = $this->pdo->query(
            'SELECT
                (SELECT COUNT(*) FROM pipeline_configs) as pipelines,
                (SELECT COUNT(*) FROM pipeline_runs) as runs_total,
                (SELECT COUNT(*) FROM pipeline_runs WHERE status = "completed") as runs_completed,
                (SELECT COUNT(*) FROM pipeline_runs WHERE status = "failed") as runs_failed,
                (SELECT COALESCE(SUM(documents_processed), 0) FROM pipeline_runs) as documents_processed,
                (SELECT COALESCE(SUM(chunks_created), 0) FROM pipeline_runs) as chunks_created'
        );

        if ($stmt === false) {
            return [
                'pipelines' => 0,
                'runs_total' => 0,
                'runs_completed' => 0,
                'runs_failed' => 0,
                'documents_processed' => 0,
                'chunks_created' => 0,
            ];
        }

        $result = $stmt->fetch(\PDO::FETCH_ASSOC);

        return [
            'pipelines' => (int) ($result['pipelines'] ?? 0),
            'runs_total' => (int) ($result['runs_total'] ?? 0),
            'runs_completed' => (int) ($result['runs_completed'] ?? 0),
            'runs_failed' => (int) ($result['runs_failed'] ?? 0),
            'documents_processed' => (int) ($result['documents_processed'] ?? 0),
            'chunks_created' => (int) ($result['chunks_created'] ?? 0),
        ];
    }
}
← Übersicht Graph