ollama = new OllamaService(); $this->pdo = $this->createConnection(); } /** * Ensures the Qdrant collection exists with proper configuration. */ public function ensureCollection(): bool { // Check if collection exists $url = sprintf('%s/collections/%s', self::QDRANT_HOST, self::COLLECTION); try { $response = $this->makeRequest($url, [], 'GET'); if (isset($response['result'])) { return true; // Collection exists } } catch (RuntimeException) { // Collection doesn't exist, create it } // Create collection $payload = [ 'vectors' => [ 'size' => self::VECTOR_SIZE, 'distance' => 'Cosine', ], ]; try { $this->makeRequest($url, $payload, 'PUT'); return true; } catch (RuntimeException $e) { throw new RuntimeException('Failed to create collection: ' . $e->getMessage()); } } /** * Syncs a single chunk to Qdrant. */ public function syncChunk(int $chunkId): bool { $chunk = $this->getChunk($chunkId); if ($chunk === null) { return false; } // Only sync completed analyses if ($chunk['analysis_status'] !== 'completed') { return false; } // Get document context $doc = $this->getDocument((int) $chunk['dokumentation_id']); // Prepare text for embedding $text = $this->prepareTextForEmbedding($chunk, $doc); // Generate embedding $embedding = $this->ollama->getEmbedding($text); // Build payload with metadata $payload = $this->buildPayload($chunk, $doc); // Generate UUID for Qdrant if not exists $qdrantId = $chunk['qdrant_id'] ?? $this->generateUuid(); // Upsert to Qdrant $success = $this->upsertPoint($qdrantId, $embedding, $payload); if ($success && $chunk['qdrant_id'] === null) { $this->updateQdrantId($chunkId, $qdrantId); } return $success; } /** * Syncs all analyzed chunks that haven't been synced yet. * * @return array{synced: int, failed: int, errors: array} */ public function syncAllPending(int $limit = 100): array { $this->ensureCollection(); $results = ['synced' => 0, 'failed' => 0, 'errors' => []]; $chunks = $this->getUnsyncedChunks($limit); foreach ($chunks as $chunk) { try { if ($this->syncChunk((int) $chunk['id'])) { $results['synced']++; if ($results['synced'] % self::BATCH_SIZE === 0) { echo "Synced {$results['synced']} chunks...\n"; } } else { $results['failed']++; $results['errors'][] = "Chunk #{$chunk['id']}: Sync failed"; } } catch (RuntimeException $e) { $results['failed']++; $results['errors'][] = "Chunk #{$chunk['id']}: " . $e->getMessage(); } } return $results; } /** * Syncs all chunks (re-sync). * * @return array{synced: int, failed: int, errors: array} */ public function syncAll(): array { $this->ensureCollection(); $results = ['synced' => 0, 'failed' => 0, 'errors' => []]; $chunks = $this->getAllAnalyzedChunks(); foreach ($chunks as $chunk) { try { if ($this->syncChunk((int) $chunk['id'])) { $results['synced']++; if ($results['synced'] % self::BATCH_SIZE === 0) { echo "Synced {$results['synced']} chunks...\n"; } } else { $results['failed']++; } } catch (RuntimeException $e) { $results['failed']++; $results['errors'][] = "Chunk #{$chunk['id']}: " . $e->getMessage(); } } return $results; } /** * Searches for similar chunks using semantic search. * * @param array|null $filter Optional filter for taxonomy/entities * @return array, entities: array}> */ public function search(string $query, int $limit = 5, ?array $filter = null): array { $embedding = $this->ollama->getEmbedding($query); $url = sprintf('%s/collections/%s/points/search', self::QDRANT_HOST, self::COLLECTION); $payload = [ 'vector' => array_values($embedding), 'limit' => $limit, 'with_payload' => true, ]; if ($filter !== null) { $payload['filter'] = $filter; } $response = $this->makeRequest($url, $payload, 'POST'); if (!isset($response['result']) || !is_array($response['result'])) { return []; } return array_map(static function (array $item): array { $payload = $item['payload'] ?? []; return [ 'id' => (int) ($payload['chunk_id'] ?? 0), 'doc_id' => (int) ($payload['doc_id'] ?? 0), 'path' => (string) ($payload['path'] ?? ''), 'title' => (string) ($payload['title'] ?? ''), 'content' => (string) ($payload['content_preview'] ?? ''), 'score' => (float) ($item['score'] ?? 0), 'taxonomy' => is_array($payload['taxonomy'] ?? null) ? $payload['taxonomy'] : [], 'entities' => is_array($payload['entities'] ?? null) ? $payload['entities'] : [], ]; }, $response['result']); } /** * Searches with taxonomy filter. * * @return array> */ public function searchByTaxonomy(string $query, string $category, int $limit = 5): array { $filter = [ 'must' => [ [ 'key' => 'taxonomy_category', 'match' => ['value' => $category], ], ], ]; return $this->search($query, $limit, $filter); } /** * Gets collection statistics. * * @return array{points_count: int, status: string}|null */ public function getStats(): ?array { $url = sprintf('%s/collections/%s', self::QDRANT_HOST, self::COLLECTION); try { $response = $this->makeRequest($url, [], 'GET'); if (!isset($response['result'])) { return null; } return [ 'points_count' => (int) ($response['result']['points_count'] ?? 0), 'status' => (string) ($response['result']['status'] ?? 'unknown'), ]; } catch (RuntimeException) { return null; } } /** * Prepares text for embedding. * * @param array $chunk * @param array $doc */ private function prepareTextForEmbedding(array $chunk, array $doc): string { $parts = []; // Document context $parts[] = 'Dokument: ' . ($doc['title'] ?? ''); // Heading path $headingPath = $this->decodeJsonArray($chunk['heading_path'] ?? null); if (!empty($headingPath)) { $parts[] = 'Abschnitt: ' . implode(' > ', $headingPath); } // Taxonomy $taxonomy = $this->decodeJsonArray($chunk['taxonomy_path'] ?? null); if (!empty($taxonomy)) { $parts[] = 'Kategorie: ' . implode(' > ', $taxonomy); } // Keywords $keywords = $this->decodeJsonArray($chunk['keywords'] ?? null); if (!empty($keywords)) { $parts[] = 'Keywords: ' . implode(', ', $keywords); } // Main content - sanitize and limit $content = $chunk['content_clean'] ?? $chunk['content'] ?? ''; $content = $this->sanitizeForEmbedding($content); if (mb_strlen($content) > 1000) { $content = mb_substr($content, 0, 1000) . '...'; } $parts[] = 'Inhalt: ' . $content; $text = implode("\n\n", $parts); // Final safety limit for embedding model context if (mb_strlen($text) > 1800) { $text = mb_substr($text, 0, 1800) . '...'; } return $text; } /** * Sanitizes text for embedding by removing problematic characters. */ private function sanitizeForEmbedding(string $text): string { // Remove box-drawing and other problematic Unicode characters $text = preg_replace('/[\x{2500}-\x{257F}]/u', ' ', $text) ?? $text; // Box Drawing $text = preg_replace('/[\x{2580}-\x{259F}]/u', ' ', $text) ?? $text; // Block Elements $text = preg_replace('/[\x{25A0}-\x{25FF}]/u', ' ', $text) ?? $text; // Geometric Shapes // Remove control characters except newlines and tabs $text = preg_replace('/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/u', '', $text) ?? $text; // Normalize whitespace $text = preg_replace('/[ \t]+/', ' ', $text) ?? $text; $text = preg_replace('/\n{3,}/', "\n\n", $text) ?? $text; // Ensure valid UTF-8 $text = mb_convert_encoding($text, 'UTF-8', 'UTF-8'); return trim($text); } /** * Builds the Qdrant payload. * * @param array $chunk * @param array $doc * @return array */ private function buildPayload(array $chunk, array $doc): array { $content = $chunk['content_clean'] ?? $chunk['content'] ?? ''; $content = $this->sanitizeForEmbedding($content); $preview = mb_strlen($content) > 300 ? mb_substr($content, 0, 300) . '...' : $content; return [ 'chunk_id' => (int) $chunk['id'], 'doc_id' => (int) $chunk['dokumentation_id'], 'chunk_index' => (int) $chunk['chunk_index'], 'path' => $doc['path'] ?? '', 'title' => $doc['title'] ?? '', 'content_preview' => $preview, 'heading_path' => json_decode($chunk['heading_path'] ?? '[]', true) ?: [], 'taxonomy_category' => $chunk['taxonomy_category'] ?? null, 'taxonomy' => json_decode($chunk['taxonomy_path'] ?? '[]', true) ?: [], 'entities' => json_decode($chunk['entities'] ?? '[]', true) ?: [], 'keywords' => json_decode($chunk['keywords'] ?? '[]', true) ?: [], 'token_count' => (int) ($chunk['token_count'] ?? 0), ]; } /** * Upserts a point to Qdrant. * * @param array $vector * @param array $payload */ private function upsertPoint(string $id, array $vector, array $payload): bool { $url = sprintf('%s/collections/%s/points', self::QDRANT_HOST, self::COLLECTION); $data = [ 'points' => [ [ 'id' => $id, 'vector' => array_values($vector), 'payload' => $payload, ], ], ]; try { $this->makeRequest($url, $data, 'PUT'); return true; } catch (RuntimeException) { return false; } } /** * Updates the qdrant_id in the database. */ private function updateQdrantId(int $chunkId, string $qdrantId): void { $stmt = $this->pdo->prepare('UPDATE dokumentation_chunks SET qdrant_id = :qid WHERE id = :id'); $stmt->execute(['id' => $chunkId, 'qid' => $qdrantId]); } /** * Gets a chunk by ID. * * @return array|null */ private function getChunk(int $id): ?array { $stmt = $this->pdo->prepare('SELECT * FROM dokumentation_chunks WHERE id = :id'); $stmt->execute(['id' => $id]); $result = $stmt->fetch(PDO::FETCH_ASSOC); return $result !== false ? $result : null; } /** * Gets a document by ID. * * @return array */ private function getDocument(int $id): array { $stmt = $this->pdo->prepare('SELECT * FROM dokumentation WHERE id = :id'); $stmt->execute(['id' => $id]); $result = $stmt->fetch(PDO::FETCH_ASSOC); return $result !== false ? $result : []; } /** * Gets unsynced chunks (analyzed but not in Qdrant). * * @return array> */ private function getUnsyncedChunks(int $limit): array { $stmt = $this->pdo->prepare(" SELECT * FROM dokumentation_chunks WHERE analysis_status = 'completed' AND qdrant_id IS NULL ORDER BY dokumentation_id, chunk_index LIMIT :limit "); $stmt->bindValue('limit', $limit, PDO::PARAM_INT); $stmt->execute(); return $stmt->fetchAll(PDO::FETCH_ASSOC); } /** * Gets all analyzed chunks. * * @return array> */ private function getAllAnalyzedChunks(): array { $stmt = $this->pdo->query(" SELECT * FROM dokumentation_chunks WHERE analysis_status = 'completed' ORDER BY dokumentation_id, chunk_index "); return $stmt->fetchAll(PDO::FETCH_ASSOC); } /** * Generates a UUID v4. */ private function generateUuid(): string { $data = random_bytes(16); $data[6] = chr((ord($data[6]) & 0x0f) | 0x40); $data[8] = chr((ord($data[8]) & 0x3f) | 0x80); return vsprintf('%s%s-%s-%s-%s-%s%s%s', str_split(bin2hex($data), 4)); } /** * Makes an HTTP request to Qdrant. * * @param array $payload * @return array */ private function makeRequest(string $url, array $payload, string $method): array { $ch = curl_init($url); if ($ch === false) { throw new RuntimeException('Failed to initialize cURL'); } $options = [ CURLOPT_RETURNTRANSFER => true, CURLOPT_TIMEOUT => self::TIMEOUT, CURLOPT_CONNECTTIMEOUT => 10, CURLOPT_HTTPHEADER => ['Content-Type: application/json'], ]; if ($method === 'GET') { $options[CURLOPT_CUSTOMREQUEST] = 'GET'; } else { $jsonPayload = json_encode($payload); if ($jsonPayload === false) { curl_close($ch); throw new RuntimeException('Failed to encode JSON payload'); } $options[CURLOPT_CUSTOMREQUEST] = $method; $options[CURLOPT_POSTFIELDS] = $jsonPayload; $options[CURLOPT_HTTPHEADER][] = 'Content-Length: ' . strlen($jsonPayload); } curl_setopt_array($ch, $options); $result = curl_exec($ch); $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); $curlError = curl_error($ch); curl_close($ch); if ($result === false) { throw new RuntimeException(sprintf('cURL request failed: %s', $curlError ?: 'Unknown error')); } if ($httpCode >= 400) { throw new RuntimeException(sprintf('Qdrant API returned HTTP %d: %s', $httpCode, $result)); } $decoded = json_decode((string) $result, true); return is_array($decoded) ? $decoded : []; } private function createConnection(): PDO { return \Infrastructure\Config\DatabaseFactory::dev(); } }