RunPipelineUseCase.php

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 2

Klassen 1

Funktionen 5

Verwendet von 2

Versionen 3

Code

<?php

declare(strict_types=1);

namespace UseCases\Pipeline;

// @responsibility: Startet und verwaltet Pipeline-Ausführungen

use Domain\Repository\PipelineRepositoryInterface;

class RunPipelineUseCase
{
    private const PYTHON_BIN = '/var/www/scripts/pipeline/venv/bin/python';
    private const PIPELINE_SCRIPT = '/var/www/scripts/pipeline/pipeline.py';
    private const LOG_DIR = '/tmp';

    public function __construct(
        private PipelineRepositoryInterface $repository
    ) {
    }

    /**
     * Start a new pipeline run.
     *
     * @param int $pipelineId Pipeline ID
     * @return array{success: bool, run_id?: int, error?: string}
     */
    public function start(int $pipelineId): array
    {
        $pipeline = $this->repository->findById($pipelineId);

        if ($pipeline === null) {
            return ['success' => false, 'error' => 'Pipeline nicht gefunden'];
        }

        $runId = $this->repository->createRun($pipelineId);

        $this->executeInBackground($pipelineId, $runId);

        return ['success' => true, 'run_id' => $runId];
    }

    /**
     * Cancel a running pipeline.
     *
     * @param int $pipelineId Pipeline ID
     * @param int $runId Run ID
     * @return array{success: bool, error?: string}
     */
    public function cancel(int $pipelineId, int $runId): array
    {
        $run = $this->repository->findRunById($runId);

        if ($run === null || (int) $run['pipeline_id'] !== $pipelineId) {
            return ['success' => false, 'error' => 'Run nicht gefunden'];
        }

        if ($run['status'] !== 'running') {
            return [
                'success' => false,
                'error' => 'Run kann nicht abgebrochen werden (Status: ' . $run['status'] . ')',
            ];
        }

        $this->repository->updateRun($runId, [
            'status' => 'cancelled',
            'completed_at' => date('Y-m-d H:i:s'),
        ]);

        return ['success' => true];
    }

    /**
     * Get simple status for a pipeline (latest run).
     *
     * @param int $pipelineId Pipeline ID
     * @return array{success: bool, error?: string, run?: array}
     */
    public function getStatus(int $pipelineId): array
    {
        $pipeline = $this->repository->findById($pipelineId);

        if ($pipeline === null) {
            return ['success' => false, 'error' => 'Pipeline nicht gefunden'];
        }

        return [
            'success' => true,
            'pipeline_id' => $pipelineId,
            'run' => $this->repository->findLatestRun($pipelineId),
        ];
    }

    /**
     * Execute pipeline script in background.
     */
    private function executeInBackground(int $pipelineId, int $runId): void
    {
        $logFile = sprintf('%s/pipeline_run_%d.log', self::LOG_DIR, $runId);

        // Get DB password from environment or .env file
        $dbPassword = $_ENV['DB_PASSWORD'] ?? $_ENV['MARIADB_ROOT_PASSWORD'] ?? '';
        if ($dbPassword === '' && file_exists('/var/www/dev.campus.systemische-tools.de/.env')) {
            $envContent = file_get_contents('/var/www/dev.campus.systemische-tools.de/.env');
            if (preg_match('/^MARIADB_ROOT_PASSWORD=(.+)$/m', $envContent, $matches)) {
                $dbPassword = trim($matches[1]);
            }
        }

        $cmd = sprintf(
            'nohup env DB_PASSWORD=%s %s %s all --pipeline-id=%d --run-id=%d > %s 2>&1 &',
            escapeshellarg($dbPassword),
            escapeshellarg(self::PYTHON_BIN),
            escapeshellarg(self::PIPELINE_SCRIPT),
            $pipelineId,
            $runId,
            escapeshellarg($logFile)
        );

        exec($cmd);
    }
}
← Übersicht Graph