run_demo.py

Code Hygiene Score: 85

Keine Issues gefunden.

Dependencies 11

Funktionen 2

Code

#!/usr/bin/env python3
"""
Verbose Pipeline Runner for demo.pdf
Shows every step with timing.
Uses existing db.py methods for correct schema handling.
"""

import json
import os
import sys
import time

import requests

sys.path.insert(0, "/var/www/scripts/pipeline")

from analyzers.entity_extractor import _build_prompt_from_yaml, extract_entities_ollama, store_entities
from db import db


def log(msg):
    """Print timestamped log message."""
    print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)


def main():
    """Run entity extraction demo pipeline."""
    log("=== START ===")

    # 1. DB Connect
    log("1. DB verbinden...")
    start = time.time()
    db.connect()
    log(f"   OK ({time.time() - start:.1f}s)")

    # 2. DB Reset
    log("2. DB Reset (Tabellen leeren)...")
    start = time.time()
    tables_order = [
        "entity_relations",
        "chunk_entities",
        "document_entities",
        "chunk_semantics",
        "chunk_taxonomy",
        "document_taxonomy",
        "document_pages",
        "entities",
        "chunks",
        "documents",
    ]
    for table in tables_order:
        try:
            db.execute(f"DELETE FROM {table}")
            db.commit()
            log(f"   {table}: OK")
        except Exception as e:
            log(f"   {table}: skip ({e})")
    log(f"   DB Reset done ({time.time() - start:.1f}s)")

    # 3. Qdrant Reset
    log("3. Qdrant Reset...")
    start = time.time()
    try:
        resp = requests.post(
            "http://localhost:6333/collections/documents/points/delete", json={"filter": {"must": []}}, timeout=10
        )
        log(f"   Qdrant: {resp.status_code} ({time.time() - start:.1f}s)")
    except Exception as e:
        log(f"   Qdrant: {e}")

    # 4. PDF laden
    log("4. PDF laden...")
    start = time.time()
    from extract import extract_pdf

    file_path = "/var/www/nextcloud/data/root/files/Documents/demo.pdf"
    pages = extract_pdf(file_path)
    text = "\n\n".join(p["text"] for p in pages)
    log(f"   OK: {len(text)} chars, {len(pages)} pages ({time.time() - start:.1f}s)")

    # 5. Document erstellen (nutze db.insert_document)
    log("5. Document in DB erstellen...")
    start = time.time()
    doc_id = db.insert_document(
        file_path=file_path,
        title=os.path.basename(file_path),
        file_type="application/pdf",
        file_size=os.path.getsize(file_path),
        file_hash="demo_test",
    )
    log(f"   OK: doc_id={doc_id} ({time.time() - start:.1f}s)")

    # 6. Chunking
    log("6. Text chunken...")
    start = time.time()
    from chunk import chunk_pdf

    chunks = chunk_pdf(pages)
    log(f"   OK: {len(chunks)} chunks ({time.time() - start:.1f}s)")

    # 7. Chunks speichern (nutze db.insert_chunk)
    log("7. Chunks in DB speichern...")
    start = time.time()
    chunk_ids = []
    chunk_texts = []
    for i, chunk in enumerate(chunks):
        content = chunk["content"]
        heading_path = json.dumps(chunk.get("heading_path", []))
        metadata = json.dumps(chunk.get("metadata", {}))

        chunk_id = db.insert_chunk(
            doc_id=doc_id, chunk_index=i, content=content, heading_path=heading_path, metadata=metadata
        )
        chunk_ids.append(chunk_id)
        chunk_texts.append(content)
        log(f"   Chunk {i + 1}: {len(content)} chars -> id={chunk_id}")
    log(f"   OK: {len(chunk_ids)} chunks gespeichert ({time.time() - start:.1f}s)")

    # 8. YAML Prompt laden
    log("8. YAML Prompt aus DB laden...")
    start = time.time()
    prompt_data = db.get_prompt_by_use_case("entity_extraction")
    if prompt_data:
        log(f"   OK: Prompt geladen ({time.time() - start:.1f}s)")
        sample_prompt = _build_prompt_from_yaml(prompt_data["content"], "SAMPLE")
        log(f"   Prompt-Preview:\n{sample_prompt[:400]}...")
    else:
        log("   WARNUNG: Kein Prompt gefunden!")

    # 9. Entity Extraction pro Chunk
    log("9. Entity Extraction (Ollama)...")
    all_entities = []
    for i, (_chunk_id, chunk_text) in enumerate(zip(chunk_ids, chunk_texts)):
        log(f"   Chunk {i + 1}/{len(chunk_texts)}: {len(chunk_text)} chars...")
        start = time.time()
        entities = extract_entities_ollama(chunk_text)
        log(f"      -> {len(entities)} entities ({time.time() - start:.1f}s)")
        for e in entities[:5]:
            log(f"         - {e['name']} ({e['type']})")
        if len(entities) > 5:
            log(f"         ... und {len(entities) - 5} weitere")
        all_entities.extend(entities)

    # 10. Entities speichern (nutze store_entities)
    log("10. Entities in DB speichern...")
    start = time.time()
    stored = store_entities(doc_id, all_entities)
    log(f"   OK: {stored} entities gespeichert ({time.time() - start:.1f}s)")

    # 11. Zusammenfassung
    log("=== ERGEBNIS ===")
    cursor = db.execute("SELECT type, COUNT(*) as cnt FROM entities GROUP BY type ORDER BY cnt DESC")
    for row in cursor.fetchall():
        log(f"   {row['type']}: {row['cnt']}")
    cursor.close()

    cursor = db.execute("SELECT COUNT(*) as cnt FROM entities")
    total = cursor.fetchone()["cnt"]
    cursor.close()
    log(f"   TOTAL: {total} entities")

    db.disconnect()
    log("=== DONE ===")


if __name__ == "__main__":
    main()
← Übersicht