repository = $repository ?? new PipelineRepository(); } /** * Start a new pipeline run. * * @param int $pipelineId Pipeline ID * @return array{success: bool, run_id?: int, error?: string} */ public function start(int $pipelineId): array { $pipeline = $this->repository->findById($pipelineId); if ($pipeline === null) { return ['success' => false, 'error' => 'Pipeline nicht gefunden']; } $runId = $this->repository->createRun($pipelineId); $this->executeInBackground($pipelineId, $runId); return ['success' => true, 'run_id' => $runId]; } /** * Cancel a running pipeline. * * @param int $pipelineId Pipeline ID * @param int $runId Run ID * @return array{success: bool, error?: string} */ public function cancel(int $pipelineId, int $runId): array { $run = $this->repository->findRunById($runId); if ($run === null || (int) $run['pipeline_id'] !== $pipelineId) { return ['success' => false, 'error' => 'Run nicht gefunden']; } if ($run['status'] !== 'running') { return [ 'success' => false, 'error' => 'Run kann nicht abgebrochen werden (Status: ' . $run['status'] . ')', ]; } $this->repository->updateRun($runId, [ 'status' => 'cancelled', 'completed_at' => date('Y-m-d H:i:s'), ]); return ['success' => true]; } /** * Get simple status for a pipeline (latest run). * * @param int $pipelineId Pipeline ID * @return array{success: bool, error?: string, run?: array} */ public function getStatus(int $pipelineId): array { $pipeline = $this->repository->findById($pipelineId); if ($pipeline === null) { return ['success' => false, 'error' => 'Pipeline nicht gefunden']; } return [ 'success' => true, 'pipeline_id' => $pipelineId, 'run' => $this->repository->findLatestRun($pipelineId), ]; } /** * Execute pipeline script in background. */ private function executeInBackground(int $pipelineId, int $runId): void { $logFile = sprintf('%s/pipeline_run_%d.log', self::LOG_DIR, $runId); $cmd = sprintf( 'nohup %s %s all --pipeline-id=%d --run-id=%d > %s 2>&1 &', escapeshellarg(self::PYTHON_BIN), escapeshellarg(self::PIPELINE_SCRIPT), $pipelineId, $runId, escapeshellarg($logFile) ); exec($cmd); } }