Protokoll #15850

ID15850
Zeitstempel2025-12-25 16:57:20.797730
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens8,953 (Input: 2,158, Output: 6,795)
Dauer111 ms
Request-Zeit2025-12-25 16:57:20.797730
Response-Zeit2025-12-25 16:57:20.908849

Request

{
    "event": "PreToolUse",
    "tool_name": "Edit",
    "tool_input": {
        "file_path": "\/var\/www\/scripts\/pipeline\/pipeline.py",
        "old_string": "def process_file(file_path, progress=None):\n    \"\"\"Process a single file through the pipeline.\"\"\"\n    db.log(\"INFO\", f\"Processing: {file_path}\")\n    file_name = Path(file_path).name\n\n    if progress:\n        progress.update_document(file_name)\n\n    # Check if cancelled before starting\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Extract text\n    if progress:\n        progress.update_step(\"extract\")\n        progress.add_log(f\"Extrahiere Text: {file_name}\")\n\n    extraction = extract(file_path)\n    if not extraction[\"success\"]:\n        db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n        if progress:\n            progress.add_log(\"FEHLER: Extraktion fehlgeschlagen\")\n        return False, 0, 0\n\n    # Check if cancelled after extraction\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Get page count for PDFs\n    total_pages = 0\n    if extraction[\"file_type\"] == \".pdf\" and isinstance(extraction.get(\"content\"), list):\n        total_pages = len(extraction[\"content\"])\n        if progress:\n            progress.add_log(f\"{total_pages} Seiten extrahiert\")\n\n    # Get document info\n    file_stat = os.stat(file_path)\n\n    import hashlib\n\n    with open(file_path, \"rb\") as f:\n        file_hash = hashlib.sha256(f.read()).hexdigest()\n\n    # Insert document\n    doc_id = db.insert_document(\n        file_path=file_path,\n        title=file_name,\n        file_type=extraction[\"file_type\"],\n        file_size=file_stat.st_size,\n        file_hash=file_hash,\n    )\n    db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n    # Vision analysis for PDFs\n    if extraction[\"file_type\"] == \".pdf\":\n        if progress:\n            progress.update_step(\"vision\")\n            progress.add_log(\"Vision-Analyse gestartet...\")\n\n        db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n        vision_config = {\n            \"model\": \"llama3.2-vision:11b\",\n            \"store_images\": True,\n            \"detect_images\": True,\n            \"detect_charts\": True,\n            \"detect_tables\": True,\n        }\n        vision_result = run_vision_step(doc_id, file_path, vision_config, progress=progress)\n        if vision_result[\"success\"]:\n            db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n            if progress:\n                progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")\n        else:\n            db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n        # Check if cancelled after vision\n        if progress and progress.is_cancelled():\n            return \"cancelled\", 0, 0\n\n    # Chunk content\n    if progress:\n        progress.update_step(\"chunk\")\n        if total_pages > 0:\n            progress.add_log(f\"Erstelle Chunks aus {total_pages} Seiten...\")\n        else:\n            progress.add_log(\"Erstelle Chunks...\")\n\n    chunks = chunk_by_structure(extraction)\n    db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n    # Store chunks\n    for i, chunk in enumerate(chunks):\n        chunk_id = db.insert_chunk(\n            doc_id=doc_id,\n            chunk_index=i,\n            content=chunk[\"content\"],\n            heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n            position_start=chunk.get(\"position_start\", 0),\n            position_end=chunk.get(\"position_end\", 0),\n            metadata=json.dumps(chunk.get(\"metadata\", {})),\n        )\n        chunk[\"db_id\"] = chunk_id\n\n    if progress:\n        progress.add_log(\n            f\"{len(chunks)} Chunks erstellt (aus {total_pages} Seiten)\"\n            if total_pages > 0\n            else f\"{len(chunks)} Chunks erstellt\"\n        )\n\n    # Check if cancelled after chunking\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Enrich chunks with vision context (for PDFs)\n    if extraction[\"file_type\"] == \".pdf\":\n        if progress:\n            progress.update_step(\"enrich\")\n\n        db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n        enrich_result = run_enrichment_step(doc_id)\n        if enrich_result[\"success\"]:\n            db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n        else:\n            db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n        # Check if cancelled after enrichment\n        if progress and progress.is_cancelled():\n            return \"cancelled\", len(chunks), 0\n\n    # Generate embeddings\n    if progress:\n        progress.update_step(\"embed\")\n        progress.add_log(f\"Erstelle Embeddings für {len(chunks)} Chunks...\")\n\n    embedded = embed_chunks(chunks, doc_id, file_name, file_path, progress=progress)\n    db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n    if progress:\n        progress.add_log(f\"{embedded} Embeddings erstellt\")\n\n    # Check if cancelled after embedding\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), embedded\n\n    # Semantic analysis\n    if progress:\n        progress.update_step(\"analyze\")\n\n    full_text = get_full_text(extraction)\n    analysis = analyze_document(doc_id, full_text, progress=progress)\n    db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n    # Update status\n    db.update_document_status(doc_id, \"done\")\n\n    if progress:\n        progress.add_log(f\"Fertig: {file_name}\")\n\n    return True, len(chunks), embedded",
        "new_string": "def process_file(file_path, progress=None):\n    \"\"\"Process a single file through the pipeline.\"\"\"\n    file_name = Path(file_path).name\n\n    if progress:\n        progress.update_document(file_name)\n\n    # Initialize pipeline steps\n    extract_step = ExtractionStep(db, progress)\n    load_step = LoadStep(db, progress)\n    transform_step = TransformationStep(db, progress)\n    embed_step = EmbeddingStep(db, progress)\n\n    # Check if cancelled before starting\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Step 1: Extract\n    extract_result = extract_step.execute(file_path)\n    if not extract_result[\"success\"]:\n        if extract_result.get(\"error\") == \"cancelled\":\n            return \"cancelled\", 0, 0\n        return False, 0, 0\n\n    extraction = extract_result[\"extraction\"]\n    file_info = extract_result[\"file_info\"]\n    total_pages = extract_result.get(\"total_pages\", 0)\n\n    # Check if cancelled after extraction\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Step 2: Load document\n    doc_id = load_step.create_document(file_info)\n\n    # Step 3: Vision analysis (PDFs only)\n    if file_info[\"type\"] == \".pdf\":\n        transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n\n        # Check if cancelled after vision\n        if progress and progress.is_cancelled():\n            return \"cancelled\", 0, 0\n\n    # Step 4: Chunking\n    chunks = transform_step.execute_chunking(extraction, total_pages)\n\n    # Step 5: Store chunks\n    chunks = load_step.store_chunks(doc_id, chunks)\n\n    # Check if cancelled after chunking\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Step 6: Enrichment (PDFs only)\n    if file_info[\"type\"] == \".pdf\":\n        transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n\n        # Check if cancelled after enrichment\n        if progress and progress.is_cancelled():\n            return \"cancelled\", len(chunks), 0\n\n    # Step 7: Embeddings\n    embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n    # Check if cancelled after embedding\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), embedded\n\n    # Step 8: Semantic analysis\n    full_text = extract_step.get_full_text_from_extraction(extraction)\n    transform_step.execute_analysis(doc_id, full_text)\n\n    # Step 9: Update status\n    load_step.update_document_status(doc_id, \"done\")\n\n    if progress:\n        progress.add_log(f\"Fertig: {file_name}\")\n\n    return True, len(chunks), embedded"
    }
}

Response

{
    "tool_response": {
        "filePath": "\/var\/www\/scripts\/pipeline\/pipeline.py",
        "oldString": "def process_file(file_path, progress=None):\n    \"\"\"Process a single file through the pipeline.\"\"\"\n    db.log(\"INFO\", f\"Processing: {file_path}\")\n    file_name = Path(file_path).name\n\n    if progress:\n        progress.update_document(file_name)\n\n    # Check if cancelled before starting\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Extract text\n    if progress:\n        progress.update_step(\"extract\")\n        progress.add_log(f\"Extrahiere Text: {file_name}\")\n\n    extraction = extract(file_path)\n    if not extraction[\"success\"]:\n        db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n        if progress:\n            progress.add_log(\"FEHLER: Extraktion fehlgeschlagen\")\n        return False, 0, 0\n\n    # Check if cancelled after extraction\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Get page count for PDFs\n    total_pages = 0\n    if extraction[\"file_type\"] == \".pdf\" and isinstance(extraction.get(\"content\"), list):\n        total_pages = len(extraction[\"content\"])\n        if progress:\n            progress.add_log(f\"{total_pages} Seiten extrahiert\")\n\n    # Get document info\n    file_stat = os.stat(file_path)\n\n    import hashlib\n\n    with open(file_path, \"rb\") as f:\n        file_hash = hashlib.sha256(f.read()).hexdigest()\n\n    # Insert document\n    doc_id = db.insert_document(\n        file_path=file_path,\n        title=file_name,\n        file_type=extraction[\"file_type\"],\n        file_size=file_stat.st_size,\n        file_hash=file_hash,\n    )\n    db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n    # Vision analysis for PDFs\n    if extraction[\"file_type\"] == \".pdf\":\n        if progress:\n            progress.update_step(\"vision\")\n            progress.add_log(\"Vision-Analyse gestartet...\")\n\n        db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n        vision_config = {\n            \"model\": \"llama3.2-vision:11b\",\n            \"store_images\": True,\n            \"detect_images\": True,\n            \"detect_charts\": True,\n            \"detect_tables\": True,\n        }\n        vision_result = run_vision_step(doc_id, file_path, vision_config, progress=progress)\n        if vision_result[\"success\"]:\n            db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n            if progress:\n                progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")\n        else:\n            db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n        # Check if cancelled after vision\n        if progress and progress.is_cancelled():\n            return \"cancelled\", 0, 0\n\n    # Chunk content\n    if progress:\n        progress.update_step(\"chunk\")\n        if total_pages > 0:\n            progress.add_log(f\"Erstelle Chunks aus {total_pages} Seiten...\")\n        else:\n            progress.add_log(\"Erstelle Chunks...\")\n\n    chunks = chunk_by_structure(extraction)\n    db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n    # Store chunks\n    for i, chunk in enumerate(chunks):\n        chunk_id = db.insert_chunk(\n            doc_id=doc_id,\n            chunk_index=i,\n            content=chunk[\"content\"],\n            heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n            position_start=chunk.get(\"position_start\", 0),\n            position_end=chunk.get(\"position_end\", 0),\n            metadata=json.dumps(chunk.get(\"metadata\", {})),\n        )\n        chunk[\"db_id\"] = chunk_id\n\n    if progress:\n        progress.add_log(\n            f\"{len(chunks)} Chunks erstellt (aus {total_pages} Seiten)\"\n            if total_pages > 0\n            else f\"{len(chunks)} Chunks erstellt\"\n        )\n\n    # Check if cancelled after chunking\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Enrich chunks with vision context (for PDFs)\n    if extraction[\"file_type\"] == \".pdf\":\n        if progress:\n            progress.update_step(\"enrich\")\n\n        db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n        enrich_result = run_enrichment_step(doc_id)\n        if enrich_result[\"success\"]:\n            db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n        else:\n            db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n        # Check if cancelled after enrichment\n        if progress and progress.is_cancelled():\n            return \"cancelled\", len(chunks), 0\n\n    # Generate embeddings\n    if progress:\n        progress.update_step(\"embed\")\n        progress.add_log(f\"Erstelle Embeddings für {len(chunks)} Chunks...\")\n\n    embedded = embed_chunks(chunks, doc_id, file_name, file_path, progress=progress)\n    db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n    if progress:\n        progress.add_log(f\"{embedded} Embeddings erstellt\")\n\n    # Check if cancelled after embedding\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), embedded\n\n    # Semantic analysis\n    if progress:\n        progress.update_step(\"analyze\")\n\n    full_text = get_full_text(extraction)\n    analysis = analyze_document(doc_id, full_text, progress=progress)\n    db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n    # Update status\n    db.update_document_status(doc_id, \"done\")\n\n    if progress:\n        progress.add_log(f\"Fertig: {file_name}\")\n\n    return True, len(chunks), embedded",
        "newString": "def process_file(file_path, progress=None):\n    \"\"\"Process a single file through the pipeline.\"\"\"\n    file_name = Path(file_path).name\n\n    if progress:\n        progress.update_document(file_name)\n\n    # Initialize pipeline steps\n    extract_step = ExtractionStep(db, progress)\n    load_step = LoadStep(db, progress)\n    transform_step = TransformationStep(db, progress)\n    embed_step = EmbeddingStep(db, progress)\n\n    # Check if cancelled before starting\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Step 1: Extract\n    extract_result = extract_step.execute(file_path)\n    if not extract_result[\"success\"]:\n        if extract_result.get(\"error\") == \"cancelled\":\n            return \"cancelled\", 0, 0\n        return False, 0, 0\n\n    extraction = extract_result[\"extraction\"]\n    file_info = extract_result[\"file_info\"]\n    total_pages = extract_result.get(\"total_pages\", 0)\n\n    # Check if cancelled after extraction\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Step 2: Load document\n    doc_id = load_step.create_document(file_info)\n\n    # Step 3: Vision analysis (PDFs only)\n    if file_info[\"type\"] == \".pdf\":\n        transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])\n\n        # Check if cancelled after vision\n        if progress and progress.is_cancelled():\n            return \"cancelled\", 0, 0\n\n    # Step 4: Chunking\n    chunks = transform_step.execute_chunking(extraction, total_pages)\n\n    # Step 5: Store chunks\n    chunks = load_step.store_chunks(doc_id, chunks)\n\n    # Check if cancelled after chunking\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Step 6: Enrichment (PDFs only)\n    if file_info[\"type\"] == \".pdf\":\n        transform_step.execute_enrichment(doc_id, file_info[\"type\"])\n\n        # Check if cancelled after enrichment\n        if progress and progress.is_cancelled():\n            return \"cancelled\", len(chunks), 0\n\n    # Step 7: Embeddings\n    embedded = embed_step.execute(chunks, doc_id, file_name, file_path)\n\n    # Check if cancelled after embedding\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), embedded\n\n    # Step 8: Semantic analysis\n    full_text = extract_step.get_full_text_from_extraction(extraction)\n    transform_step.execute_analysis(doc_id, full_text)\n\n    # Step 9: Update status\n    load_step.update_document_status(doc_id, \"done\")\n\n    if progress:\n        progress.add_log(f\"Fertig: {file_name}\")\n\n    return True, len(chunks), embedded",
        "originalFile": "#!\/usr\/bin\/env python3\n\"\"\"\nKI-System Document Pipeline\nMain orchestration script for document processing.\n\nUsage:\n    python pipeline.py scan      # Scan for new documents\n    python pipeline.py process   # Process queued documents\n    python pipeline.py embed     # Embed pending chunks\n    python pipeline.py all       # Full pipeline run\n    python pipeline.py all --pipeline-id=1 --run-id=5  # With tracking\n    python pipeline.py file <path>  # Process single file\n\"\"\"\n\nimport argparse\nimport os\nimport time\nfrom pathlib import Path\n\nfrom config import MAX_RETRIES, RETRY_BACKOFF_BASE\nfrom db import PipelineProgress, db\nfrom detect import queue_files, scan_directory\nfrom step_embed import EmbeddingStep\nfrom step_extract import ExtractionStep\nfrom step_load import LoadStep\nfrom step_transform import TransformationStep\n\n\ndef process_file(file_path, progress=None):\n    \"\"\"Process a single file through the pipeline.\"\"\"\n    db.log(\"INFO\", f\"Processing: {file_path}\")\n    file_name = Path(file_path).name\n\n    if progress:\n        progress.update_document(file_name)\n\n    # Check if cancelled before starting\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Extract text\n    if progress:\n        progress.update_step(\"extract\")\n        progress.add_log(f\"Extrahiere Text: {file_name}\")\n\n    extraction = extract(file_path)\n    if not extraction[\"success\"]:\n        db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")\n        if progress:\n            progress.add_log(\"FEHLER: Extraktion fehlgeschlagen\")\n        return False, 0, 0\n\n    # Check if cancelled after extraction\n    if progress and progress.is_cancelled():\n        return \"cancelled\", 0, 0\n\n    # Get page count for PDFs\n    total_pages = 0\n    if extraction[\"file_type\"] == \".pdf\" and isinstance(extraction.get(\"content\"), list):\n        total_pages = len(extraction[\"content\"])\n        if progress:\n            progress.add_log(f\"{total_pages} Seiten extrahiert\")\n\n    # Get document info\n    file_stat = os.stat(file_path)\n\n    import hashlib\n\n    with open(file_path, \"rb\") as f:\n        file_hash = hashlib.sha256(f.read()).hexdigest()\n\n    # Insert document\n    doc_id = db.insert_document(\n        file_path=file_path,\n        title=file_name,\n        file_type=extraction[\"file_type\"],\n        file_size=file_stat.st_size,\n        file_hash=file_hash,\n    )\n    db.log(\"INFO\", f\"Created document: {doc_id}\")\n\n    # Vision analysis for PDFs\n    if extraction[\"file_type\"] == \".pdf\":\n        if progress:\n            progress.update_step(\"vision\")\n            progress.add_log(\"Vision-Analyse gestartet...\")\n\n        db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")\n        vision_config = {\n            \"model\": \"llama3.2-vision:11b\",\n            \"store_images\": True,\n            \"detect_images\": True,\n            \"detect_charts\": True,\n            \"detect_tables\": True,\n        }\n        vision_result = run_vision_step(doc_id, file_path, vision_config, progress=progress)\n        if vision_result[\"success\"]:\n            db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")\n            if progress:\n                progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")\n        else:\n            db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")\n\n        # Check if cancelled after vision\n        if progress and progress.is_cancelled():\n            return \"cancelled\", 0, 0\n\n    # Chunk content\n    if progress:\n        progress.update_step(\"chunk\")\n        if total_pages > 0:\n            progress.add_log(f\"Erstelle Chunks aus {total_pages} Seiten...\")\n        else:\n            progress.add_log(\"Erstelle Chunks...\")\n\n    chunks = chunk_by_structure(extraction)\n    db.log(\"INFO\", f\"Created {len(chunks)} chunks\")\n\n    # Store chunks\n    for i, chunk in enumerate(chunks):\n        chunk_id = db.insert_chunk(\n            doc_id=doc_id,\n            chunk_index=i,\n            content=chunk[\"content\"],\n            heading_path=json.dumps(chunk.get(\"heading_path\", [])),\n            position_start=chunk.get(\"position_start\", 0),\n            position_end=chunk.get(\"position_end\", 0),\n            metadata=json.dumps(chunk.get(\"metadata\", {})),\n        )\n        chunk[\"db_id\"] = chunk_id\n\n    if progress:\n        progress.add_log(\n            f\"{len(chunks)} Chunks erstellt (aus {total_pages} Seiten)\"\n            if total_pages > 0\n            else f\"{len(chunks)} Chunks erstellt\"\n        )\n\n    # Check if cancelled after chunking\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), 0\n\n    # Enrich chunks with vision context (for PDFs)\n    if extraction[\"file_type\"] == \".pdf\":\n        if progress:\n            progress.update_step(\"enrich\")\n\n        db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")\n        enrich_result = run_enrichment_step(doc_id)\n        if enrich_result[\"success\"]:\n            db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")\n        else:\n            db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")\n\n        # Check if cancelled after enrichment\n        if progress and progress.is_cancelled():\n            return \"cancelled\", len(chunks), 0\n\n    # Generate embeddings\n    if progress:\n        progress.update_step(\"embed\")\n        progress.add_log(f\"Erstelle Embeddings für {len(chunks)} Chunks...\")\n\n    embedded = embed_chunks(chunks, doc_id, file_name, file_path, progress=progress)\n    db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")\n\n    if progress:\n        progress.add_log(f\"{embedded} Embeddings erstellt\")\n\n    # Check if cancelled after embedding\n    if progress and progress.is_cancelled():\n        return \"cancelled\", len(chunks), embedded\n\n    # Semantic analysis\n    if progress:\n        progress.update_step(\"analyze\")\n\n    full_text = get_full_text(extraction)\n    analysis = analyze_document(doc_id, full_text, progress=progress)\n    db.log(\"INFO\", f\"Analysis complete: {analysis}\")\n\n    # Update status\n    db.update_document_status(doc_id, \"done\")\n\n    if progress:\n        progress.add_log(f\"Fertig: {file_name}\")\n\n    return True, len(chunks), embedded\n\n\ndef process_queue():\n    \"\"\"Process items from the queue.\"\"\"\n    items = db.get_pending_queue_items(limit=10)\n    db.log(\"INFO\", f\"Found {len(items)} items in queue\")\n\n    for item in items:\n        queue_id = item[\"id\"]\n        file_path = item[\"file_path\"]\n        retry_count = item[\"retry_count\"]\n\n        if retry_count >= MAX_RETRIES:\n            db.update_queue_status(queue_id, \"failed\", \"Max retries exceeded\")\n            continue\n\n        db.update_queue_status(queue_id, \"processing\")\n\n        try:\n            success = process_file(file_path)\n            if success:\n                db.update_queue_status(queue_id, \"completed\")\n            else:\n                raise Exception(\"Processing returned False\")\n        except Exception as e:\n            error_msg = str(e)\n            db.update_queue_status(queue_id, \"pending\", error_msg)\n\n            # Exponential backoff\n            wait_time = RETRY_BACKOFF_BASE ** (retry_count + 1)\n            db.log(\"INFO\", f\"Retry {retry_count + 1} in {wait_time}s: {file_path}\")\n            time.sleep(wait_time)\n\n\ndef run_scan():\n    \"\"\"Scan for new documents.\"\"\"\n    files = scan_directory()\n    print(f\"Found {len(files)} files\")\n\n    if files:\n        queued = queue_files(files)\n        print(f\"Queued {queued} files\")\n\n    return files\n\n\ndef run_full_pipeline(run_id=None, pipeline_id=None):\n    \"\"\"Run complete pipeline: scan → process → embed.\"\"\"\n    progress = PipelineProgress(run_id) if run_id else None\n\n    print(\"=\" * 50)\n    print(\"KI-System Pipeline - Full Run\")\n    if run_id:\n        print(f\"Run ID: {run_id}, Pipeline ID: {pipeline_id}\")\n    print(\"=\" * 50)\n\n    try:\n        # Phase 1: Scan\n        if progress:\n            progress.update_step(\"detect\")\n            progress.add_log(\"Scanne nach Dokumenten...\")\n\n        print(\"\\n[1\/3] Scanning for documents...\")\n        files = scan_directory()\n        print(f\"Found {len(files)} files\")\n\n        if progress:\n            progress.add_log(f\"{len(files)} neue Dokumente gefunden\")\n\n        if files:\n            queued = queue_files(files)\n            print(f\"Queued {queued} files\")\n\n        # Phase 2: Process queue items (includes resume of previous runs)\n        items = db.get_pending_queue_items(limit=100)\n        print(f\"\\n[2\/3] Processing {len(items)} documents...\")\n\n        if items:\n            # Update total with actual queue count (may include items from previous runs)\n            if progress:\n                progress.update_progress(total=len(items))\n                progress.add_log(f\"{len(items)} Dokumente in Queue\")\n\n            total_chunks = 0\n            total_embeddings = 0\n            processed = 0\n            failed = 0\n\n            for item in items:\n                # Check if cancelled\n                if progress and progress.is_cancelled():\n                    progress.add_log(\"Pipeline abgebrochen durch Benutzer\")\n                    progress.complete(\"cancelled\")\n                    print(\"\\nPipeline cancelled by user\")\n                    return\n\n                queue_id = item[\"id\"]\n                file_path = item[\"file_path\"]\n                file_name = Path(file_path).name\n\n                # Skip already-done documents (for resume capability)\n                if db.document_is_done(file_path):\n                    db.update_queue_status(queue_id, \"completed\")\n                    processed += 1\n                    if progress:\n                        progress.add_log(f\"Übersprungen (bereits fertig): {file_name}\")\n                        progress.update_progress(processed=processed)\n                    continue\n\n                if progress:\n                    progress.update_document(file_name)\n\n               ... [TRUNCATED-936a7f517f4e4357]",
        "structuredPatch": [
            {
                "oldStart": 28,
                "oldLines": 160,
                "newStart": 28,
                "newLines": 79,
                "lines": [
                    " ",
                    " def process_file(file_path, progress=None):",
                    "     \"\"\"Process a single file through the pipeline.\"\"\"",
                    "-    db.log(\"INFO\", f\"Processing: {file_path}\")",
                    "     file_name = Path(file_path).name",
                    " ",
                    "     if progress:",
                    "         progress.update_document(file_name)",
                    " ",
                    "+    # Initialize pipeline steps",
                    "+    extract_step = ExtractionStep(db, progress)",
                    "+    load_step = LoadStep(db, progress)",
                    "+    transform_step = TransformationStep(db, progress)",
                    "+    embed_step = EmbeddingStep(db, progress)",
                    "+",
                    "     # Check if cancelled before starting",
                    "     if progress and progress.is_cancelled():",
                    "         return \"cancelled\", 0, 0",
                    " ",
                    "-    # Extract text",
                    "-    if progress:",
                    "-        progress.update_step(\"extract\")",
                    "-        progress.add_log(f\"Extrahiere Text: {file_name}\")",
                    "-",
                    "-    extraction = extract(file_path)",
                    "-    if not extraction[\"success\"]:",
                    "-        db.log(\"ERROR\", f\"Extraction failed: {extraction.get('error')}\")",
                    "-        if progress:",
                    "-            progress.add_log(\"FEHLER: Extraktion fehlgeschlagen\")",
                    "+    # Step 1: Extract",
                    "+    extract_result = extract_step.execute(file_path)",
                    "+    if not extract_result[\"success\"]:",
                    "+        if extract_result.get(\"error\") == \"cancelled\":",
                    "+            return \"cancelled\", 0, 0",
                    "         return False, 0, 0",
                    " ",
                    "+    extraction = extract_result[\"extraction\"]",
                    "+    file_info = extract_result[\"file_info\"]",
                    "+    total_pages = extract_result.get(\"total_pages\", 0)",
                    "+",
                    "     # Check if cancelled after extraction",
                    "     if progress and progress.is_cancelled():",
                    "         return \"cancelled\", 0, 0",
                    " ",
                    "-    # Get page count for PDFs",
                    "-    total_pages = 0",
                    "-    if extraction[\"file_type\"] == \".pdf\" and isinstance(extraction.get(\"content\"), list):",
                    "-        total_pages = len(extraction[\"content\"])",
                    "-        if progress:",
                    "-            progress.add_log(f\"{total_pages} Seiten extrahiert\")",
                    "+    # Step 2: Load document",
                    "+    doc_id = load_step.create_document(file_info)",
                    " ",
                    "-    # Get document info",
                    "-    file_stat = os.stat(file_path)",
                    "+    # Step 3: Vision analysis (PDFs only)",
                    "+    if file_info[\"type\"] == \".pdf\":",
                    "+        transform_step.execute_vision(doc_id, file_path, file_info[\"type\"])",
                    " ",
                    "-    import hashlib",
                    "-",
                    "-    with open(file_path, \"rb\") as f:",
                    "-        file_hash = hashlib.sha256(f.read()).hexdigest()",
                    "-",
                    "-    # Insert document",
                    "-    doc_id = db.insert_document(",
                    "-        file_path=file_path,",
                    "-        title=file_name,",
                    "-        file_type=extraction[\"file_type\"],",
                    "-        file_size=file_stat.st_size,",
                    "-        file_hash=file_hash,",
                    "-    )",
                    "-    db.log(\"INFO\", f\"Created document: {doc_id}\")",
                    "-",
                    "-    # Vision analysis for PDFs",
                    "-    if extraction[\"file_type\"] == \".pdf\":",
                    "-        if progress:",
                    "-            progress.update_step(\"vision\")",
                    "-            progress.add_log(\"Vision-Analyse gestartet...\")",
                    "-",
                    "-        db.log(\"INFO\", f\"Running vision analysis for document {doc_id}\")",
                    "-        vision_config = {",
                    "-            \"model\": \"llama3.2-vision:11b\",",
                    "-            \"store_images\": True,",
                    "-            \"detect_images\": True,",
                    "-            \"detect_charts\": True,",
                    "-            \"detect_tables\": True,",
                    "-        }",
                    "-        vision_result = run_vision_step(doc_id, file_path, vision_config, progress=progress)",
                    "-        if vision_result[\"success\"]:",
                    "-            db.log(\"INFO\", f\"Vision: {vision_result['pages_analyzed']}\/{vision_result['pages_total']} pages analyzed\")",
                    "-            if progress:",
                    "-                progress.add_log(f\"Vision: {vision_result['pages_analyzed']} Seiten analysiert\")",
                    "-        else:",
                    "-            db.log(\"WARNING\", f\"Vision analysis failed: {vision_result.get('error')}\")",
                    "-",
                    "         # Check if cancelled after vision",
                    "         if progress and progress.is_cancelled():",
                    "             return \"cancelled\", 0, 0",
                    " ",
                    "-    # Chunk content",
                    "-    if progress:",
                    "-        progress.update_step(\"chunk\")",
                    "-        if total_pages > 0:",
                    "-            progress.add_log(f\"Erstelle Chunks aus {total_pages} Seiten...\")",
                    "-        else:",
                    "-            progress.add_log(\"Erstelle Chunks...\")",
                    "+    # Step 4: Chunking",
                    "+    chunks = transform_step.execute_chunking(extraction, total_pages)",
                    " ",
                    "-    chunks = chunk_by_structure(extraction)",
                    "-    db.log(\"INFO\", f\"Created {len(chunks)} chunks\")",
                    "+    # Step 5: Store chunks",
                    "+    chunks = load_step.store_chunks(doc_id, chunks)",
                    " ",
                    "-    # Store chunks",
                    "-    for i, chunk in enumerate(chunks):",
                    "-        chunk_id = db.insert_chunk(",
                    "-            doc_id=doc_id,",
                    "-            chunk_index=i,",
                    "-            content=chunk[\"content\"],",
                    "-            heading_path=json.dumps(chunk.get(\"heading_path\", [])),",
                    "-            position_start=chunk.get(\"position_start\", 0),",
                    "-            position_end=chunk.get(\"position_end\", 0),",
                    "-            metadata=json.dumps(chunk.get(\"metadata\", {})),",
                    "-        )",
                    "-        chunk[\"db_id\"] = chunk_id",
                    "-",
                    "-    if progress:",
                    "-        progress.add_log(",
                    "-            f\"{len(chunks)} Chunks erstellt (aus {total_pages} Seiten)\"",
                    "-            if total_pages > 0",
                    "-            else f\"{len(chunks)} Chunks erstellt\"",
                    "-        )",
                    "-",
                    "     # Check if cancelled after chunking",
                    "     if progress and progress.is_cancelled():",
                    "         return \"cancelled\", len(chunks), 0",
                    " ",
                    "-    # Enrich chunks with vision context (for PDFs)",
                    "-    if extraction[\"file_type\"] == \".pdf\":",
                    "-        if progress:",
                    "-            progress.update_step(\"enrich\")",
                    "+    # Step 6: Enrichment (PDFs only)",
                    "+    if file_info[\"type\"] == \".pdf\":",
                    "+        transform_step.execute_enrichment(doc_id, file_info[\"type\"])",
                    " ",
                    "-        db.log(\"INFO\", f\"Running vision enrichment for document {doc_id}\")",
                    "-        enrich_result = run_enrichment_step(doc_id)",
                    "-        if enrich_result[\"success\"]:",
                    "-            db.log(\"INFO\", f\"Enrichment: {enrich_result['enriched']}\/{enrich_result['total_chunks']} chunks enriched\")",
                    "-        else:",
                    "-            db.log(\"WARNING\", f\"Enrichment failed: {enrich_result.get('error')}\")",
                    "-",
                    "         # Check if cancelled after enrichment",
                    "         if progress and progress.is_cancelled():",
                    "             return \"cancelled\", len(chunks), 0",
                    " ",
                    "-    # Generate embeddings",
                    "-    if progress:",
                    "-        progress.update_step(\"embed\")",
                    "-        progress.add_log(f\"Erstelle Embeddings für {len(chunks)} Chunks...\")",
                    "+    # Step 7: Embeddings",
                    "+    embedded = embed_step.execute(chunks, doc_id, file_name, file_path)",
                    " ",
                    "-    embedded = embed_chunks(chunks, doc_id, file_name, file_path, progress=progress)",
                    "-    db.log(\"INFO\", f\"Embedded {embedded}\/{len(chunks)} chunks\")",
                    "-",
                    "-    if progress:",
                    "-        progress.add_log(f\"{embedded} Embeddings erstellt\")",
                    "-",
                    "     # Check if cancelled after embedding",
                    "     if progress and progress.is_cancelled():",
                    "         return \"cancelled\", len(chunks), embedded",
                    " ",
                    "-    # Semantic analysis",
                    "-    if progress:",
                    "-        progress.update_step(\"analyze\")",
                    "+    # Step 8: Semantic analysis",
                    "+    full_text = extract_step.get_full_text_from_extraction(extraction)",
                    "+    transform_step.execute_analysis(doc_id, full_text)",
                    " ",
                    "-    full_text = get_full_text(extraction)",
                    "-    analysis = analyze_document(doc_id, full_text, progress=progress)",
                    "-    db.log(\"INFO\", f\"Analysis complete: {analysis}\")",
                    "+    # Step 9: Update status",
                    "+    load_step.update_document_status(doc_id, \"done\")",
                    " ",
                    "-    # Update status",
                    "-    db.update_document_status(doc_id, \"done\")",
                    "-",
                    "     if progress:",
                    "         progress.add_log(f\"Fertig: {file_name}\")",
                    " "
                ]
            }
        ],
        "userModified": false,
        "replaceAll": false
    }
}
← Vorheriger Zur Liste Nächster →