Content Pipeline
System zur Verarbeitung von Dokumenten aus Nextcloud mit konfigurierbaren Pipeline-Stufen.
Übersicht
Die Content Pipeline verarbeitet Dokumente in mehreren Schritten:
- Import - Dokumente aus Nextcloud scannen und in Queue aufnehmen
- Extract - Text aus PDF, DOCX, PPTX, MD, TXT extrahieren
- Chunk - Semantisches Chunking mit Struktur-Erhaltung
- Embed - Vektorisierung via Ollama → Qdrant
- Analyze - Entitäten, Relationen, Taxonomie extrahieren
Sub-Seiten
- /content-pipeline/import - Import-Konfiguration
Technische Basis
| Komponente | Technologie |
|---|---|
| Quelle | Nextcloud /var/www/nextcloud/data/root/files/Documents |
| Datenbank | MariaDB ki_content |
| Vektoren | Qdrant localhost:6333 |
| Embedding | Ollama mxbai-embed-large (1024 dims) |
| Analyse | Anthropic Claude / Ollama |
Import Pipeline - Planungsdokument
1. Bestehendes System (IST-Analyse)
1.1 Python-Skripte unter /var/www/scripts/pipeline/
| Datei | Funktion | Kernlogik |
|---|---|---|
| pipeline.py | Orchestrator | CLI mit scan, process, embed, all, file, status |
| config.py | Konfiguration | Hardcoded Pfade, Modelle, Limits |
| detect.py | Datei-Erkennung | Scan Nextcloud, Hash-Vergleich, Queue |
| extract.py | Text-Extraktion | PDF (OCR), DOCX, PPTX, MD, TXT |
| chunk.py | Chunking | Semantisch nach Typ, Heading-Pfad |
| embed.py | Embedding | Ollama → Qdrant |
| analyze.py | Semantische Analyse | Entitäten, Relationen, Taxonomie |
| db.py | Datenbank-Wrapper | CRUD für documents, chunks, queue |
1.2 Datenfluss
Nextcloud (Files)
↓
[detect.py] Scan + Hash
↓
documents (DB) status=pending
↓
[extract.py] PDF/DOCX/... → Text
↓
[chunk.py] Semantisches Chunking
↓
chunks (DB) + heading_path, metadata
↓
[embed.py] Ollama mxbai-embed-large
↓
Qdrant (Vektoren) + chunks.qdrant_id
↓
[analyze.py] Entity/Relation/Taxonomy
↓
entities, entity_relations, chunk_entities,
chunk_taxonomy, chunk_semantics (DB)
1.3 Aktuelle Konfiguration (config.py)
| Parameter | Wert |
|---|---|
| NEXTCLOUD_PATH | /var/www/nextcloud/data/root/files/Documents |
| SUPPORTED_EXTENSIONS | .pdf, .pptx, .docx, .md, .txt |
| QDRANT_HOST | localhost:6333 |
| QDRANT_COLLECTIONS | documents, mail, entities |
| OLLAMA_HOST | localhost:11434 |
| EMBED_MODEL | mxbai-embed-large (1024 dims) |
| MIN_CHUNK_SIZE | 100 Zeichen |
| MAX_CHUNK_SIZE | 2000 Zeichen |
| CHUNK_OVERLAP | 10% |
1.4 Datenbank-Struktur (ki_content)
documents
id INT PK AUTO
source_path VARCHAR(500) UNIQUE
folder_path VARCHAR(500)
filename VARCHAR(255)
mime_type VARCHAR(100)
file_hash VARCHAR(64) - SHA256 für Änderungserkennung
file_size INT
language VARCHAR(10) DEFAULT 'de'
imported_at DATETIME DEFAULT CURRENT_TIMESTAMP
processed_at DATETIME
status ENUM('pending','importing','imported','chunking','chunked',
'embedding','embedded','enriching','enriched','processing','done','error')
semantic_status ENUM('pending','processing','partial','complete','error','skipped')
error_message TEXT
authority_score FLOAT DEFAULT 0.5
chunks
id INT PK AUTO
document_id INT FK
page_id INT FK
chunk_index INT
content TEXT
token_count INT
heading_path JSON - ["H1", "H2", ...]
metadata JSON
qdrant_id VARCHAR(36) - UUID in Qdrant
status ENUM('created','embedding','embedded','error','deprecated')
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
section_id INT
entities
id INT PK AUTO
name VARCHAR(255)
type ENUM('PERSON','ORGANIZATION','LOCATION','EVENT','ROLE','TOOL',
'ARTIFACT','METAPHOR','METHOD','THEORY','MODEL','PRINCIPLE',
'DATE_TIME','QUANTITY_MEASURE','LAW_REGULATION','DIAGNOSIS_CONDITION',
'SYMPTOM_SIGN','ASSESSMENT_INSTRUMENT','PUBLICATION_WORK',
'DEMOGRAPHIC_GROUP','VALUE_NORM_RIGHT_DUTY','CONCEPT',
'INTERVENTION_EXERCISE','FACILITATION_FORMAT','PROCESS_PHASE_STEP',
'QUESTION_TYPE','EMOTION_FEELING','NEED_MOTIVE','TRAIT_ATTRIBUTE',
'RELATIONSHIP_TYPE','SYSTEM_CONTEXT','SYSTEM_TYPE','DIMENSION_AXIS',
'TYPOLOGY_CLASS','ORGANIZATIONAL_PROPERTY','FRAME_CONDITION_RESOURCE',
'SOURCE_CITATION_STUDY','QUOTE_STATEMENT','COMMUNICATION_PATTERN',
'RULE_SET_PROTOCOL','CONTACT_IDENTITY','OTHER') DEFAULT 'OTHER'
description TEXT
canonical_name VARCHAR(255) - Deduplizierung
status ENUM('extracted','normalized','validated','deprecated','merged')
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
updated_at DATETIME ON UPDATE CURRENT_TIMESTAMP
name_lower VARCHAR(255) STORED GENERATED - Lowercase für Suche
entity_relations
id INT PK AUTO source_entity_id INT FK target_entity_id INT FK relation_type VARCHAR(100) - z.B. DEVELOPED_BY, RELATED_TO strength FLOAT DEFAULT 1 context TEXT chunk_id INT FK - Herkunft created_at DATETIME
taxonomy_terms
id INT PK AUTO name VARCHAR(255) slug VARCHAR(255) UNIQUE parent_id INT FK (self-ref) description TEXT depth INT DEFAULT 0 path VARCHAR(1000) - z.B. "/Methoden/Systemisch" created_at DATETIME
chunk_entities
id INT PK AUTO chunk_id INT FK → chunks(id) entity_id INT FK relevance_score FLOAT DEFAULT 1 mention_count INT DEFAULT 1 UNIQUE KEY (chunk_id, entity_id)
chunk_taxonomy
id INT PK AUTO
chunk_id INT FK → chunks(id)
taxonomy_term_id INT FK → taxonomy_terms(id)
confidence FLOAT DEFAULT 1
source ENUM('auto','manual') DEFAULT 'auto'
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
UNIQUE KEY (chunk_id, taxonomy_term_id)
chunk_semantics
id INT PK AUTO
chunk_id INT FK → chunks(id) UNIQUE
summary TEXT
keywords JSON
sentiment ENUM('positive','negative','neutral','mixed') DEFAULT 'neutral'
topics JSON
language VARCHAR(10) DEFAULT 'de'
statement_form ENUM('assertion','question','command','conditional')
intent ENUM('explain','argue','define','compare','exemplify','warn','instruct')
frame ENUM('theoretical','practical','historical','methodological','critical')
is_negated TINYINT(1) DEFAULT 0
discourse_role ENUM('thesis','evidence','example','counter','summary','definition')
analyzed_at DATETIME
analysis_model VARCHAR(100)
prompt_id INT
prompt_version VARCHAR(20)
1.5 Ursprünglich geplante Tabellen (ersetzt)
Die ursprünglich im Code vorgesehenen Tabellen wurden durch das Pipeline-Management-System ersetzt:
| Ursprünglich geplant | Ersetzt durch | Status |
|---|---|---|
| processing_queue | pipeline_queue | Implementiert |
| processing_log | pipeline_runs | Implementiert |
| (neu) | pipeline_configs | Implementiert |
| (neu) | pipeline_steps | Implementiert |
Alle vier Pipeline-Tabellen sind Teil des SOLL-Konzepts (Abschnitt 2) und wurden vollständig implementiert.
2. SOLL-Konzept (GUI)
2.1 Anforderungen
- Visuelle Darstellung der Pipeline-Schritte
- Konfigurierbare Parameter pro Schritt
- Unterstützung mehrerer Pipeline-Definitionen
- Status-Übersicht für Dokumente
- Manuelle Trigger-Möglichkeit
2.2 Tabelle: pipeline_configs (ki_content) ✓
id INT PK AUTO
name VARCHAR(100) UNIQUE - z.B. "Standard", "Nur-Embedding"
description TEXT
is_default BOOLEAN DEFAULT FALSE
source_path VARCHAR(500) - Nextcloud-Ordner
extensions JSON - [".pdf", ".docx", ...]
steps JSON - Aktivierte Steps + Reihenfolge
created_at, updated_at DATETIME
Beispiel steps:
[
{"step": "detect", "enabled": true, "order": 1},
{"step": "extract", "enabled": true, "order": 2, "config": {"ocr": true}},
{"step": "chunk", "enabled": true, "order": 3, "config": {"min": 100, "max": 2000, "overlap": 0.1}},
{"step": "embed", "enabled": true, "order": 4, "config": {"model": "mxbai-embed-large", "collection": "documents"}},
{"step": "analyze", "enabled": false, "order": 5}
]
2.3 Tabelle: pipeline_steps (ki_content) ✓
id INT PK AUTO
pipeline_id INT FK
step_type ENUM('detect','extract','chunk','embed','analyze')
config JSON - Step-spezifische Einstellungen
sort_order INT
enabled BOOLEAN DEFAULT TRUE
created_at, updated_at DATETIME
2.4 Tabelle: pipeline_runs (ki_content) ✓
id INT PK AUTO
pipeline_id INT FK
status ENUM('pending','running','completed','failed','cancelled')
started_at DATETIME
completed_at DATETIME
documents_processed INT DEFAULT 0
documents_failed INT DEFAULT 0
error_log TEXT
created_at DATETIME
2.5 Tabelle: pipeline_queue (ki_content) ✓
id INT PK AUTO
pipeline_run_id INT FK
document_id INT FK
status ENUM('pending','processing','done','error')
step_index INT
error_message TEXT
started_at, completed_at DATETIME
2.6 URL-Struktur
/content-pipeline - Übersicht aller Pipelines
/content-pipeline/import - Import-Konfiguration (erste Seite)
/content-pipeline/{id} - Pipeline-Detail
/content-pipeline/{id}/run - Pipeline starten (POST)
/content-pipeline/{id}/status - Laufender Status
/content-pipeline/new - Neue Pipeline erstellen
2.7 View-Komponenten
┌─────────────────────────────────────────────────────────┐ │ Content Pipeline: Standard [Run] │ ├─────────────────────────────────────────────────────────┤ │ │ │ ┌──────┐ ┌─────────┐ ┌───────┐ ┌───────┐ ┌────┐│ │ │Detect│ → │ Extract │ → │ Chunk │ → │ Embed │ → │Anal││ │ │ ✓ │ │ ✓ │ │ ✓ │ │ ✓ │ │ ✗ ││ │ └──────┘ └─────────┘ └───────┘ └───────┘ └────┘│ │ │ │ Quelle: /Documents │ │ Formate: .pdf, .docx, .pptx, .md, .txt │ │ │ │ Letzte Ausführung: 2025-12-20 14:30 │ │ Verarbeitet: 2 Dokumente, 6 Chunks │ └─────────────────────────────────────────────────────────┘
3. Implementierungsplan
Phase 1: Tabellen + Repository ✓
- DDL für pipeline_configs, pipeline_steps, pipeline_runs, pipeline_queue
- Domain\Repository\PipelineRepositoryInterface
- Infrastructure\Persistence\PipelineRepository
Phase 2: Controller + Views
- Controller\ContentPipelineController
- View\content-pipeline\index.php (Übersicht)
- View\content-pipeline\show.php (Detail + Steps)
- View\content-pipeline\form.php (Create/Edit)
Phase 3: Integration Python
- Pipeline-Aufruf via Bash (mit Config-ID)
- Status-Polling via AJAX
- Log-Streaming
Prinzipien
- DRY: Wiederverwendung bestehender Python-Logik
- KISS: Minimale neue Tabellen, JSON für flexible Config
- SRP: Repository nur für Pipelines, Controller nur für HTTP
- YAGNI: Keine Features die nicht gefordert wurden