run_demo.py
- Pfad:
/var/www/scripts/pipeline/run_demo.py - Namespace: pipeline
- Zeilen: 168 | Größe: 5,292 Bytes
- Geändert: 2025-12-31 03:01:09 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 85
- Dependencies: 40 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 11
- use json
- use os
- use sys
- use time
- use requests
- use analyzers.entity_extractor._build_prompt_from_yaml
- use analyzers.entity_extractor.extract_entities_ollama
- use analyzers.entity_extractor.store_entities
- use db.db
- use extract.extract_pdf
- use chunk.chunk_pdf
Funktionen 2
-
log()Zeile 21 -
main()Zeile 26
Code
#!/usr/bin/env python3
"""
Verbose Pipeline Runner for demo.pdf
Shows every step with timing.
Uses existing db.py methods for correct schema handling.
"""
import json
import os
import sys
import time
import requests
sys.path.insert(0, "/var/www/scripts/pipeline")
from analyzers.entity_extractor import _build_prompt_from_yaml, extract_entities_ollama, store_entities
from db import db
def log(msg):
"""Print timestamped log message."""
print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)
def main():
"""Run entity extraction demo pipeline."""
log("=== START ===")
# 1. DB Connect
log("1. DB verbinden...")
start = time.time()
db.connect()
log(f" OK ({time.time() - start:.1f}s)")
# 2. DB Reset
log("2. DB Reset (Tabellen leeren)...")
start = time.time()
tables_order = [
"entity_relations",
"chunk_entities",
"document_entities",
"chunk_semantics",
"chunk_taxonomy",
"document_taxonomy",
"document_pages",
"entities",
"chunks",
"documents",
]
for table in tables_order:
try:
db.execute(f"DELETE FROM {table}")
db.commit()
log(f" {table}: OK")
except Exception as e:
log(f" {table}: skip ({e})")
log(f" DB Reset done ({time.time() - start:.1f}s)")
# 3. Qdrant Reset
log("3. Qdrant Reset...")
start = time.time()
try:
resp = requests.post(
"http://localhost:6333/collections/documents/points/delete", json={"filter": {"must": []}}, timeout=10
)
log(f" Qdrant: {resp.status_code} ({time.time() - start:.1f}s)")
except Exception as e:
log(f" Qdrant: {e}")
# 4. PDF laden
log("4. PDF laden...")
start = time.time()
from extract import extract_pdf
file_path = "/var/www/nextcloud/data/root/files/Documents/demo.pdf"
pages = extract_pdf(file_path)
text = "\n\n".join(p["text"] for p in pages)
log(f" OK: {len(text)} chars, {len(pages)} pages ({time.time() - start:.1f}s)")
# 5. Document erstellen (nutze db.insert_document)
log("5. Document in DB erstellen...")
start = time.time()
doc_id = db.insert_document(
file_path=file_path,
title=os.path.basename(file_path),
file_type="application/pdf",
file_size=os.path.getsize(file_path),
file_hash="demo_test",
)
log(f" OK: doc_id={doc_id} ({time.time() - start:.1f}s)")
# 6. Chunking
log("6. Text chunken...")
start = time.time()
from chunk import chunk_pdf
chunks = chunk_pdf(pages)
log(f" OK: {len(chunks)} chunks ({time.time() - start:.1f}s)")
# 7. Chunks speichern (nutze db.insert_chunk)
log("7. Chunks in DB speichern...")
start = time.time()
chunk_ids = []
chunk_texts = []
for i, chunk in enumerate(chunks):
content = chunk["content"]
heading_path = json.dumps(chunk.get("heading_path", []))
metadata = json.dumps(chunk.get("metadata", {}))
chunk_id = db.insert_chunk(
doc_id=doc_id, chunk_index=i, content=content, heading_path=heading_path, metadata=metadata
)
chunk_ids.append(chunk_id)
chunk_texts.append(content)
log(f" Chunk {i + 1}: {len(content)} chars -> id={chunk_id}")
log(f" OK: {len(chunk_ids)} chunks gespeichert ({time.time() - start:.1f}s)")
# 8. YAML Prompt laden
log("8. YAML Prompt aus DB laden...")
start = time.time()
prompt_data = db.get_prompt_by_use_case("entity_extraction")
if prompt_data:
log(f" OK: Prompt geladen ({time.time() - start:.1f}s)")
sample_prompt = _build_prompt_from_yaml(prompt_data["content"], "SAMPLE")
log(f" Prompt-Preview:\n{sample_prompt[:400]}...")
else:
log(" WARNUNG: Kein Prompt gefunden!")
# 9. Entity Extraction pro Chunk
log("9. Entity Extraction (Ollama)...")
all_entities = []
for i, (_chunk_id, chunk_text) in enumerate(zip(chunk_ids, chunk_texts)):
log(f" Chunk {i + 1}/{len(chunk_texts)}: {len(chunk_text)} chars...")
start = time.time()
entities = extract_entities_ollama(chunk_text)
log(f" -> {len(entities)} entities ({time.time() - start:.1f}s)")
for e in entities[:5]:
log(f" - {e['name']} ({e['type']})")
if len(entities) > 5:
log(f" ... und {len(entities) - 5} weitere")
all_entities.extend(entities)
# 10. Entities speichern (nutze store_entities)
log("10. Entities in DB speichern...")
start = time.time()
stored = store_entities(doc_id, all_entities)
log(f" OK: {stored} entities gespeichert ({time.time() - start:.1f}s)")
# 11. Zusammenfassung
log("=== ERGEBNIS ===")
cursor = db.execute("SELECT type, COUNT(*) as cnt FROM entities GROUP BY type ORDER BY cnt DESC")
for row in cursor.fetchall():
log(f" {row['type']}: {row['cnt']}")
cursor.close()
cursor = db.execute("SELECT COUNT(*) as cnt FROM entities")
total = cursor.fetchone()["cnt"]
cursor.close()
log(f" TOTAL: {total} entities")
db.disconnect()
log("=== DONE ===")
if __name__ == "__main__":
main()