repository = $repository ?? new PipelineRepository(); $this->stepService = $stepService ?? new PipelineStepService($this->repository); $this->runUseCase = $runUseCase ?? new RunPipelineUseCase($this->repository); $this->statusUseCase = $statusUseCase ?? new PipelineRunStatusUseCase($this->repository); } /** * GET /content-pipeline */ public function index(): void { $this->view('content-pipeline.index', [ 'title' => 'Content Pipeline', 'pipelines' => $this->repository->findAll(), 'stats' => $this->repository->getStatistics(), ]); } /** * GET /content-pipeline/import */ public function import(): void { $pipeline = $this->repository->findDefault(); if ($pipeline === null) { $pipelines = $this->repository->findAll(1); $pipeline = $pipelines[0] ?? null; } $this->view('content-pipeline.import', [ 'title' => 'Import Pipeline', 'pipeline' => $pipeline, 'latestRun' => $pipeline !== null ? $this->repository->findLatestRun((int) $pipeline['id']) : null, ]); } /** * GET /content-pipeline/new */ public function pipelineNew(): void { $this->view('content-pipeline.form', [ 'title' => 'Neue Pipeline', 'pipeline' => null, 'stepTypes' => PipelineStepConfig::getStepTypes(), ]); } /** * GET /content-pipeline/{id} */ public function show(string $id): void { $pipeline = $this->repository->findById((int) $id); if ($pipeline === null) { $this->notFound('Pipeline nicht gefunden'); } $this->view('content-pipeline.show', [ 'title' => 'Pipeline: ' . $pipeline['name'], 'pipeline' => $pipeline, 'runs' => $this->repository->findRuns((int) $id, 10), 'stepTypes' => PipelineStepConfig::getStepTypes(), 'models' => ModelConfig::getAll(), 'defaultModel' => ModelConfig::DEFAULT_MODEL, 'collections' => PipelineStepConfig::getCollections(), ]); } /** * GET /content-pipeline/{id}/edit */ public function edit(string $id): void { $pipeline = $this->repository->findById((int) $id); if ($pipeline === null) { $this->notFound('Pipeline nicht gefunden'); } $this->view('content-pipeline.form', [ 'title' => 'Pipeline bearbeiten: ' . $pipeline['name'], 'pipeline' => $pipeline, 'stepTypes' => PipelineStepConfig::getStepTypes(), ]); } /** * POST /content-pipeline */ public function store(): void { $this->requireCsrf(); $name = trim($_POST['name'] ?? ''); if ($name === '') { $_SESSION['error'] = 'Name ist erforderlich.'; $this->redirect('/content-pipeline/new'); } $pipelineId = $this->repository->create([ 'name' => $name, 'description' => trim($_POST['description'] ?? ''), 'source_path' => trim($_POST['source_path'] ?? '/var/www/nextcloud/data/root/files/Documents'), 'extensions' => PipelineStepConfig::parseExtensions($_POST['extensions'] ?? ''), 'is_default' => isset($_POST['is_default']) ? 1 : 0, ]); $this->stepService->createDefaultSteps($pipelineId); $_SESSION['success'] = 'Pipeline erfolgreich erstellt.'; $this->redirect('/content-pipeline/' . $pipelineId); } /** * POST /content-pipeline/{id} */ public function update(string $id): void { $this->requireCsrf(); $pipeline = $this->repository->findById((int) $id); if ($pipeline === null) { $this->notFound('Pipeline nicht gefunden'); } $name = trim($_POST['name'] ?? ''); if ($name === '') { $_SESSION['error'] = 'Name ist erforderlich.'; $this->redirect('/content-pipeline/' . $id . '/edit'); } $this->repository->update((int) $id, [ 'name' => $name, 'description' => trim($_POST['description'] ?? ''), 'source_path' => trim($_POST['source_path'] ?? ''), 'extensions' => PipelineStepConfig::parseExtensions($_POST['extensions'] ?? ''), 'is_default' => isset($_POST['is_default']) ? 1 : 0, ]); $_SESSION['success'] = 'Pipeline aktualisiert.'; $this->redirect('/content-pipeline/' . $id); } /** * POST /content-pipeline/{id}/run */ public function run(string $id): void { $this->requireCsrf(); $pipeline = $this->repository->findById((int) $id); if ($pipeline === null) { $this->notFound('Pipeline nicht gefunden'); } $runId = $this->repository->createRun((int) $id); // Pipeline im Hintergrund starten $cmd = sprintf( 'nohup %s %s all --pipeline-id=%d --run-id=%d > %s 2>&1 &', escapeshellarg('/opt/scripts/pipeline/venv/bin/python'), escapeshellarg('/opt/scripts/pipeline/pipeline.py'), (int) $id, $runId, escapeshellarg('/tmp/pipeline_run_' . $runId . '.log') ); exec($cmd); // Redirect zur Live-Status-Seite $this->redirect('/content-pipeline/' . $id . '/run/' . $runId . '/status'); } /** * GET /content-pipeline/{id}/status (AJAX) */ public function status(string $id): void { $pipeline = $this->repository->findById((int) $id); if ($pipeline === null) { $this->json(['error' => 'Pipeline nicht gefunden'], 404); return; } $this->json([ 'pipeline_id' => (int) $id, 'run' => $this->repository->findLatestRun((int) $id), ]); } /** * GET /content-pipeline/{id}/run/{runId}/status */ public function runStatus(string $id, string $runId): void { $pipeline = $this->repository->findById((int) $id); if ($pipeline === null) { $this->notFound('Pipeline nicht gefunden'); } $run = $this->repository->findRunById((int) $runId); if ($run === null || (int) $run['pipeline_id'] !== (int) $id) { $this->notFound('Run nicht gefunden'); } $this->view('content-pipeline.run-status', [ 'title' => 'Pipeline Status: ' . $pipeline['name'], 'pipeline' => $pipeline, 'run' => $run, ]); } /** * GET /content-pipeline/{id}/run/{runId}/poll (AJAX/HTMX) */ public function runStatusPoll(string $id, string $runId): void { $run = $this->repository->findRunById((int) $runId); if ($run === null || (int) $run['pipeline_id'] !== (int) $id) { $this->json(['error' => 'Run nicht gefunden'], 404); return; } // Berechne verstrichene Zeit $startedAt = strtotime($run['started_at'] ?? 'now'); $elapsed = time() - $startedAt; // Fortschrittsberechnung $total = (int) ($run['documents_total'] ?? 0); $processed = (int) ($run['documents_processed'] ?? 0); $progress = $total > 0 ? round(($processed / $total) * 100) : 0; // Geschätzte Restzeit $estimatedRemaining = null; if ($processed > 0 && $total > $processed) { $avgTimePerDoc = $elapsed / $processed; $remaining = $total - $processed; $estimatedRemaining = (int) ($avgTimePerDoc * $remaining); } // Stall-Erkennung (keine Aktivität seit >60s) $lastUpdate = strtotime($run['last_update_at'] ?? $run['started_at'] ?? 'now'); $isStalled = (time() - $lastUpdate) > 60 && $run['status'] === 'running'; // Terminal-Status (Polling stoppen wenn fertig) $isTerminal = in_array($run['status'], ['completed', 'failed', 'cancelled'], true); $this->json([ 'status' => $run['status'], 'current_step' => $run['current_step'], 'current_document' => $run['current_document'], 'documents_total' => $total, 'documents_processed' => $processed, 'documents_failed' => (int) ($run['documents_failed'] ?? 0), 'chunks_created' => (int) ($run['chunks_created'] ?? 0), 'embeddings_created' => (int) ($run['embeddings_created'] ?? 0), 'progress' => $progress, 'elapsed' => $elapsed, 'elapsed_formatted' => gmdate('i:s', $elapsed), 'estimated_remaining' => $estimatedRemaining, 'estimated_formatted' => $estimatedRemaining !== null ? gmdate('i:s', $estimatedRemaining) : null, 'log_tail' => $run['log_tail'] ?? '', 'is_stalled' => $isStalled, 'is_terminal' => $isTerminal, 'completed_at' => $run['completed_at'], 'error_log' => $run['error_log'], ]); } /** * POST /content-pipeline/{id}/run/{runId}/cancel */ public function runCancel(string $id, string $runId): void { $this->requireCsrf(); $run = $this->repository->findRunById((int) $runId); if ($run === null || (int) $run['pipeline_id'] !== (int) $id) { $this->notFound('Run nicht gefunden'); } // Nur laufende Runs können abgebrochen werden if ($run['status'] !== 'running') { $_SESSION['error'] = 'Run kann nicht abgebrochen werden (Status: ' . $run['status'] . ')'; $this->redirect('/content-pipeline/' . $id . '/run/' . $runId . '/status'); } // Status auf cancelled setzen (Python-Script prüft das) $this->repository->updateRun((int) $runId, [ 'status' => 'cancelled', 'completed_at' => date('Y-m-d H:i:s'), ]); $_SESSION['success'] = 'Pipeline-Run wurde abgebrochen.'; $this->redirect('/content-pipeline/' . $id . '/run/' . $runId . '/status'); } /** * POST /content-pipeline/{id}/steps/{stepId}/toggle */ public function toggleStep(string $id, string $stepId): void { $this->requireCsrf(); if (!$this->stepService->toggleStep((int) $id, (int) $stepId)) { $this->notFound('Pipeline oder Schritt nicht gefunden'); } $this->redirect('/content-pipeline/' . $id); } /** * POST /content-pipeline/{id}/steps/{stepId}/model (AJAX) */ public function updateStepModel(string $id, string $stepId): void { $result = $this->stepService->updateModel( (int) $id, (int) $stepId, $_POST['model'] ?? '' ); if (!$result['success']) { $this->json(['error' => $result['error']], $result['error'] === 'Schritt nicht gefunden' ? 404 : 400); return; } $this->json($result); } /** * POST /content-pipeline/{id}/steps/{stepId}/collection (AJAX) */ public function updateStepCollection(string $id, string $stepId): void { $result = $this->stepService->updateCollection( (int) $id, (int) $stepId, $_POST['collection'] ?? '' ); if (!$result['success']) { $this->json(['error' => $result['error']], $result['error'] === 'Schritt nicht gefunden' ? 404 : 400); return; } $this->json($result); } /** * POST /content-pipeline/{id}/config (AJAX) */ public function updateConfig(string $id): void { $pipeline = $this->repository->findById((int) $id); if ($pipeline === null) { $this->json(['error' => 'Pipeline nicht gefunden'], 404); return; } $updateData = []; if (isset($_POST['source_path'])) { $sourcePath = trim($_POST['source_path']); if ($sourcePath === '') { $this->json(['error' => 'Quellpfad darf nicht leer sein'], 400); return; } $updateData['source_path'] = $sourcePath; } if (isset($_POST['extensions'])) { $extensions = PipelineStepConfig::parseExtensions($_POST['extensions']); if (empty($extensions)) { $this->json(['error' => 'Mindestens ein Dateityp erforderlich'], 400); return; } $updateData['extensions'] = $extensions; } if (empty($updateData)) { $this->json(['error' => 'Keine Änderungen'], 400); return; } $this->repository->update((int) $id, $updateData); $this->json(['success' => true]); } /** * POST /content-pipeline/{id}/delete */ public function delete(string $id): void { $this->requireCsrf(); $pipeline = $this->repository->findById((int) $id); if ($pipeline === null) { $this->notFound('Pipeline nicht gefunden'); } $this->repository->delete((int) $id); $_SESSION['success'] = 'Pipeline geloescht.'; $this->redirect('/content-pipeline'); } }