pdo = $pdo ?? DatabaseFactory::content(); } public function findById(int $id): ?array { $stmt = $this->pdo->prepare('SELECT * FROM pipeline_configs WHERE id = ?'); $stmt->execute([$id]); $result = $stmt->fetch(\PDO::FETCH_ASSOC); if ($result === false) { return null; } $result['extensions'] = json_decode($result['extensions'] ?? '[]', true); $result['steps'] = $this->findSteps($id); return $result; } public function findByName(string $name): ?array { $stmt = $this->pdo->prepare('SELECT * FROM pipeline_configs WHERE name = ?'); $stmt->execute([$name]); $result = $stmt->fetch(\PDO::FETCH_ASSOC); if ($result === false) { return null; } $result['extensions'] = json_decode($result['extensions'] ?? '[]', true); $result['steps'] = $this->findSteps((int) $result['id']); return $result; } public function findDefault(): ?array { $stmt = $this->pdo->query('SELECT * FROM pipeline_configs WHERE is_default = 1 LIMIT 1'); if ($stmt === false) { return null; } $result = $stmt->fetch(\PDO::FETCH_ASSOC); if ($result === false) { return null; } $result['extensions'] = json_decode($result['extensions'] ?? '[]', true); $result['steps'] = $this->findSteps((int) $result['id']); return $result; } public function findAll(int $limit = 50): array { $stmt = $this->pdo->query( 'SELECT p.*, (SELECT COUNT(*) FROM pipeline_steps WHERE pipeline_id = p.id AND enabled = 1) as active_steps, (SELECT COUNT(*) FROM pipeline_runs WHERE pipeline_id = p.id) as total_runs, (SELECT MAX(created_at) FROM pipeline_runs WHERE pipeline_id = p.id) as last_run_at FROM pipeline_configs p ORDER BY p.is_default DESC, p.name ASC LIMIT ' . $limit ); if ($stmt === false) { return []; } $results = $stmt->fetchAll(\PDO::FETCH_ASSOC); foreach ($results as &$row) { $row['extensions'] = json_decode($row['extensions'] ?? '[]', true); } return $results; } public function findSteps(int $pipelineId): array { $stmt = $this->pdo->prepare( 'SELECT * FROM pipeline_steps WHERE pipeline_id = ? ORDER BY sort_order ASC' ); $stmt->execute([$pipelineId]); $results = $stmt->fetchAll(\PDO::FETCH_ASSOC); foreach ($results as &$row) { $row['config'] = json_decode($row['config'] ?? '{}', true); } return $results; } public function create(array $data): int { $extensions = is_array($data['extensions'] ?? null) ? json_encode($data['extensions']) : ($data['extensions'] ?? '[]'); $stmt = $this->pdo->prepare( 'INSERT INTO pipeline_configs (name, description, is_default, source_path, extensions) VALUES (?, ?, ?, ?, ?)' ); $stmt->execute([ $data['name'], $data['description'] ?? null, $data['is_default'] ?? 0, $data['source_path'] ?? '/var/www/nextcloud/data/root/files/Documents', $extensions, ]); return (int) $this->pdo->lastInsertId(); } public function update(int $id, array $data): void { $sets = []; $params = []; if (isset($data['name'])) { $sets[] = 'name = ?'; $params[] = $data['name']; } if (isset($data['description'])) { $sets[] = 'description = ?'; $params[] = $data['description']; } if (isset($data['is_default'])) { $sets[] = 'is_default = ?'; $params[] = $data['is_default']; } if (isset($data['source_path'])) { $sets[] = 'source_path = ?'; $params[] = $data['source_path']; } if (isset($data['extensions'])) { $sets[] = 'extensions = ?'; $params[] = is_array($data['extensions']) ? json_encode($data['extensions']) : $data['extensions']; } if ($sets === []) { return; } $params[] = $id; $stmt = $this->pdo->prepare( 'UPDATE pipeline_configs SET ' . implode(', ', $sets) . ' WHERE id = ?' ); $stmt->execute($params); } public function delete(int $id): void { $stmt = $this->pdo->prepare('DELETE FROM pipeline_configs WHERE id = ?'); $stmt->execute([$id]); } public function addStep(int $pipelineId, array $stepData): int { $config = is_array($stepData['config'] ?? null) ? json_encode($stepData['config']) : ($stepData['config'] ?? '{}'); $stmt = $this->pdo->prepare( 'INSERT INTO pipeline_steps (pipeline_id, step_type, config, sort_order, enabled) VALUES (?, ?, ?, ?, ?)' ); $stmt->execute([ $pipelineId, $stepData['step_type'], $config, $stepData['sort_order'] ?? 0, $stepData['enabled'] ?? 1, ]); return (int) $this->pdo->lastInsertId(); } public function updateStep(int $stepId, array $stepData): void { $sets = []; $params = []; if (isset($stepData['step_type'])) { $sets[] = 'step_type = ?'; $params[] = $stepData['step_type']; } if (isset($stepData['config'])) { $sets[] = 'config = ?'; $params[] = is_array($stepData['config']) ? json_encode($stepData['config']) : $stepData['config']; } if (isset($stepData['sort_order'])) { $sets[] = 'sort_order = ?'; $params[] = $stepData['sort_order']; } if (isset($stepData['enabled'])) { $sets[] = 'enabled = ?'; $params[] = $stepData['enabled']; } if ($sets === []) { return; } $params[] = $stepId; $stmt = $this->pdo->prepare( 'UPDATE pipeline_steps SET ' . implode(', ', $sets) . ' WHERE id = ?' ); $stmt->execute($params); } public function deleteStep(int $stepId): void { $stmt = $this->pdo->prepare('DELETE FROM pipeline_steps WHERE id = ?'); $stmt->execute([$stepId]); } /** * @param array $stepIds */ public function reorderSteps(int $pipelineId, array $stepIds): void { $stmt = $this->pdo->prepare( 'UPDATE pipeline_steps SET sort_order = ? WHERE id = ? AND pipeline_id = ?' ); foreach ($stepIds as $order => $stepId) { $stmt->execute([$order + 1, $stepId, $pipelineId]); } } public function findRuns(int $pipelineId, int $limit = 20): array { $stmt = $this->pdo->prepare( 'SELECT * FROM pipeline_runs WHERE pipeline_id = ? ORDER BY created_at DESC LIMIT ?' ); $stmt->execute([$pipelineId, $limit]); return $stmt->fetchAll(\PDO::FETCH_ASSOC); } public function findRunById(int $runId): ?array { $stmt = $this->pdo->prepare('SELECT * FROM pipeline_runs WHERE id = ?'); $stmt->execute([$runId]); $result = $stmt->fetch(\PDO::FETCH_ASSOC); return $result !== false ? $result : null; } public function findLatestRun(int $pipelineId): ?array { $stmt = $this->pdo->prepare( 'SELECT * FROM pipeline_runs WHERE pipeline_id = ? ORDER BY created_at DESC LIMIT 1' ); $stmt->execute([$pipelineId]); $result = $stmt->fetch(\PDO::FETCH_ASSOC); return $result !== false ? $result : null; } public function createRun(int $pipelineId): int { $stmt = $this->pdo->prepare( 'INSERT INTO pipeline_runs (pipeline_id, status, started_at) VALUES (?, ?, NOW())' ); $stmt->execute([$pipelineId, 'running']); return (int) $this->pdo->lastInsertId(); } public function updateRun(int $runId, array $data): void { $sets = []; $params = []; if (isset($data['status'])) { $sets[] = 'status = ?'; $params[] = $data['status']; } if (isset($data['completed_at'])) { $sets[] = 'completed_at = ?'; $params[] = $data['completed_at']; } if (isset($data['documents_total'])) { $sets[] = 'documents_total = ?'; $params[] = $data['documents_total']; } if (isset($data['documents_processed'])) { $sets[] = 'documents_processed = ?'; $params[] = $data['documents_processed']; } if (isset($data['documents_failed'])) { $sets[] = 'documents_failed = ?'; $params[] = $data['documents_failed']; } if (isset($data['chunks_created'])) { $sets[] = 'chunks_created = ?'; $params[] = $data['chunks_created']; } if (isset($data['error_log'])) { $sets[] = 'error_log = ?'; $params[] = $data['error_log']; } if ($sets === []) { return; } $params[] = $runId; $stmt = $this->pdo->prepare( 'UPDATE pipeline_runs SET ' . implode(', ', $sets) . ' WHERE id = ?' ); $stmt->execute($params); } public function getStatistics(): array { $stmt = $this->pdo->query( 'SELECT (SELECT COUNT(*) FROM pipeline_configs) as pipelines, (SELECT COUNT(*) FROM pipeline_runs) as runs_total, (SELECT COUNT(*) FROM pipeline_runs WHERE status = "completed") as runs_completed, (SELECT COUNT(*) FROM pipeline_runs WHERE status = "failed") as runs_failed, (SELECT COALESCE(SUM(documents_processed), 0) FROM pipeline_runs) as documents_processed, (SELECT COALESCE(SUM(chunks_created), 0) FROM pipeline_runs) as chunks_created' ); if ($stmt === false) { return [ 'pipelines' => 0, 'runs_total' => 0, 'runs_completed' => 0, 'runs_failed' => 0, 'documents_processed' => 0, 'chunks_created' => 0, ]; } $result = $stmt->fetch(\PDO::FETCH_ASSOC); return [ 'pipelines' => (int) ($result['pipelines'] ?? 0), 'runs_total' => (int) ($result['runs_total'] ?? 0), 'runs_completed' => (int) ($result['runs_completed'] ?? 0), 'runs_failed' => (int) ($result['runs_failed'] ?? 0), 'documents_processed' => (int) ($result['documents_processed'] ?? 0), 'chunks_created' => (int) ($result['chunks_created'] ?? 0), ]; } }