qdrant->ensureCollection(self::COLLECTION, self::VECTOR_SIZE); } /** * Syncs a single chunk to Qdrant. */ public function syncChunk(int $chunkId): bool { $chunk = $this->getChunk($chunkId); if ($chunk === null) { return false; } if ($chunk['analysis_status'] !== 'completed') { return false; } $doc = $this->getDocument((int) $chunk['dokumentation_id']); $text = $this->prepareTextForEmbedding($chunk, $doc); $embedding = $this->ollama->getEmbedding($text); $payload = $this->buildPayload($chunk, $doc); $qdrantId = $chunk['qdrant_id'] ?? $this->qdrant->generateUuid(); $success = $this->qdrant->upsertPoint(self::COLLECTION, $qdrantId, $embedding, $payload); if ($success && $chunk['qdrant_id'] === null) { $this->updateQdrantId($chunkId, $qdrantId); } return $success; } /** * Syncs all analyzed chunks that haven't been synced yet. * * @return array{synced: int, failed: int, errors: array} */ public function syncAllPending(int $limit = Constants::DEFAULT_LIMIT): array { $this->ensureCollection(); $results = ['synced' => 0, 'failed' => 0, 'errors' => []]; $chunks = $this->getUnsyncedChunks($limit); foreach ($chunks as $chunk) { try { if ($this->syncChunk((int) $chunk['id'])) { $results['synced']++; if ($results['synced'] % self::BATCH_SIZE === 0) { echo "Synced {$results['synced']} chunks...\n"; } } else { $results['failed']++; $results['errors'][] = "Chunk #{$chunk['id']}: Sync failed"; } } catch (\RuntimeException $e) { $results['failed']++; $results['errors'][] = "Chunk #{$chunk['id']}: " . $e->getMessage(); } } return $results; } /** * Syncs all chunks (re-sync). * * @return array{synced: int, failed: int, errors: array} */ public function syncAll(): array { $this->ensureCollection(); $results = ['synced' => 0, 'failed' => 0, 'errors' => []]; $chunks = $this->getAllAnalyzedChunks(); foreach ($chunks as $chunk) { try { if ($this->syncChunk((int) $chunk['id'])) { $results['synced']++; if ($results['synced'] % self::BATCH_SIZE === 0) { echo "Synced {$results['synced']} chunks...\n"; } } else { $results['failed']++; } } catch (\RuntimeException $e) { $results['failed']++; $results['errors'][] = "Chunk #{$chunk['id']}: " . $e->getMessage(); } } return $results; } /** * Gets collection statistics. * * @return array{points_count: int, status: string}|null */ public function getStats(): ?array { return $this->qdrant->getCollectionStats(self::COLLECTION); } /** * Prepares text for embedding. * * @param array $chunk * @param array $doc */ private function prepareTextForEmbedding(array $chunk, array $doc): string { $parts = []; $parts[] = 'Dokument: ' . ($doc['title'] ?? ''); $headingPath = $this->decodeJsonArray($chunk['heading_path'] ?? null); if (!empty($headingPath)) { $parts[] = 'Abschnitt: ' . implode(' > ', $headingPath); } $taxonomy = $this->decodeJsonArray($chunk['taxonomy_path'] ?? null); if (!empty($taxonomy)) { $parts[] = 'Kategorie: ' . implode(' > ', $taxonomy); } $keywords = $this->decodeJsonArray($chunk['keywords'] ?? null); if (!empty($keywords)) { $parts[] = 'Keywords: ' . implode(', ', $keywords); } $content = $chunk['content_clean'] ?? $chunk['content'] ?? ''; $content = $this->sanitizeForEmbedding($content); if (mb_strlen($content) > 1000) { $content = mb_substr($content, 0, 1000) . '...'; } $parts[] = 'Inhalt: ' . $content; $text = implode("\n\n", $parts); if (mb_strlen($text) > 1800) { $text = mb_substr($text, 0, 1800) . '...'; } return $text; } /** * Sanitizes text for embedding by removing problematic characters. */ private function sanitizeForEmbedding(string $text): string { $text = preg_replace('/[\x{2500}-\x{257F}]/u', ' ', $text) ?? $text; $text = preg_replace('/[\x{2580}-\x{259F}]/u', ' ', $text) ?? $text; $text = preg_replace('/[\x{25A0}-\x{25FF}]/u', ' ', $text) ?? $text; $text = preg_replace('/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/u', '', $text) ?? $text; $text = preg_replace('/[ \t]+/', ' ', $text) ?? $text; $text = preg_replace('/\n{3,}/', "\n\n", $text) ?? $text; $text = mb_convert_encoding($text, 'UTF-8', 'UTF-8'); return trim($text); } /** * Builds the Qdrant payload. * * @param array $chunk * @param array $doc * @return array */ private function buildPayload(array $chunk, array $doc): array { $content = $chunk['content_clean'] ?? $chunk['content'] ?? ''; $content = $this->sanitizeForEmbedding($content); $preview = mb_strlen($content) > 300 ? mb_substr($content, 0, 300) . '...' : $content; return [ 'chunk_id' => (int) $chunk['id'], 'doc_id' => (int) $chunk['dokumentation_id'], 'chunk_index' => (int) $chunk['chunk_index'], 'path' => $doc['path'] ?? '', 'title' => $doc['title'] ?? '', 'content_preview' => $preview, 'heading_path' => $this->decodeJsonArray($chunk['heading_path'] ?? null), 'taxonomy_category' => $chunk['taxonomy_category'] ?? null, 'taxonomy' => $this->decodeJsonArray($chunk['taxonomy_path'] ?? null), 'entities' => $this->decodeJsonArray($chunk['entities'] ?? null), 'keywords' => $this->decodeJsonArray($chunk['keywords'] ?? null), 'token_count' => (int) ($chunk['token_count'] ?? 0), ]; } /** * Updates the qdrant_id in the database. */ private function updateQdrantId(int $chunkId, string $qdrantId): void { $stmt = $this->pdo->prepare('UPDATE dokumentation_chunks SET qdrant_id = :qid WHERE id = :id'); $stmt->execute(['id' => $chunkId, 'qid' => $qdrantId]); } /** * Gets a chunk by ID. * * @return array|null */ private function getChunk(int $id): ?array { $stmt = $this->pdo->prepare('SELECT * FROM dokumentation_chunks WHERE id = :id'); $stmt->execute(['id' => $id]); $result = $stmt->fetch(PDO::FETCH_ASSOC); return $result !== false ? $result : null; } /** * Gets a document by ID. * * @return array */ private function getDocument(int $id): array { $stmt = $this->pdo->prepare('SELECT * FROM dokumentation WHERE id = :id'); $stmt->execute(['id' => $id]); $result = $stmt->fetch(PDO::FETCH_ASSOC); return $result !== false ? $result : []; } /** * Gets unsynced chunks (analyzed but not in Qdrant). * * @return array> */ private function getUnsyncedChunks(int $limit): array { $stmt = $this->pdo->prepare(" SELECT * FROM dokumentation_chunks WHERE analysis_status = 'completed' AND qdrant_id IS NULL ORDER BY dokumentation_id, chunk_index LIMIT :limit "); $stmt->bindValue('limit', $limit, PDO::PARAM_INT); $stmt->execute(); return $stmt->fetchAll(PDO::FETCH_ASSOC); } /** * Gets all analyzed chunks. * * @return array> */ private function getAllAnalyzedChunks(): array { $stmt = $this->pdo->query(" SELECT * FROM dokumentation_chunks WHERE analysis_status = 'completed' ORDER BY dokumentation_id, chunk_index "); return $stmt->fetchAll(PDO::FETCH_ASSOC); } }