reset_pipeline.py

Code Hygiene Score: 95

Keine Issues gefunden.

Dependencies 7

Funktionen 3

Code

#!/usr/bin/env python3
"""
Pipeline Reset Script
Clears all pipeline data from MariaDB and Qdrant.

Usage:
    python reset_pipeline.py          # Interactive confirmation
    python reset_pipeline.py --force  # No confirmation
"""

import argparse
import sys

import mysql.connector
import requests

from config import DB_CONFIG, QDRANT_HOST, QDRANT_PORT

# Tables to truncate in order (respects FK dependencies)
TABLES_TO_CLEAR = [
    # Child tables first (depend on chunks/documents/entities)
    "chunk_entities",
    "chunk_semantics",
    "chunk_taxonomy",
    "entity_relations",
    "entity_ontology",
    "entity_synonyms",
    "entity_semantics",
    "entity_taxonomy_mapping",
    "entity_classifications",
    "generated_questions",
    "document_knowledge",
    "section_knowledge",
    "page_knowledge",
    "document_pages",
    "document_sections",
    "document_taxonomy",
    "content_sources",
    # Parent tables last
    "chunks",
    "documents",
    "entities",
    "pipeline_queue",
    "pipeline_runs",
]

# Foreign keys to temporarily drop
FK_TO_DROP = [
    ("chunks", "chunks_ibfk_1"),
    ("chunk_entities", "chunk_entities_ibfk_1"),
    ("chunk_semantics", "chunk_semantics_ibfk_1"),
    ("chunk_taxonomy", "chunk_taxonomy_ibfk_1"),
    ("entity_relations", "entity_relations_ibfk_1"),
    ("entity_relations", "entity_relations_ibfk_2"),
    ("entity_relations", "entity_relations_ibfk_3"),
    ("document_entities", "document_entities_ibfk_1"),
    ("document_entities", "document_entities_ibfk_2"),
    ("generated_questions", "generated_questions_ibfk_1"),
    ("generated_questions", "generated_questions_ibfk_2"),
    ("generated_questions", "generated_questions_ibfk_3"),
    ("document_knowledge", "document_knowledge_ibfk_1"),
    ("document_pages", "document_pages_ibfk_1"),
    ("document_sections", "document_sections_ibfk_1"),
    ("document_taxonomy", "document_taxonomy_ibfk_1"),
    ("section_knowledge", "section_knowledge_ibfk_1"),
    ("page_knowledge", "page_knowledge_ibfk_1"),
    ("content_sources", "content_sources_ibfk_2"),
]

# Foreign keys to recreate
FK_TO_CREATE = [
    ("chunks", "chunks_ibfk_1", "document_id", "documents", "id"),
    ("chunk_entities", "chunk_entities_ibfk_1", "chunk_id", "chunks", "id"),
    ("chunk_semantics", "chunk_semantics_ibfk_1", "chunk_id", "chunks", "id"),
    ("chunk_taxonomy", "chunk_taxonomy_ibfk_1", "chunk_id", "chunks", "id"),
    ("entity_relations", "entity_relations_ibfk_1", "source_entity_id", "entities", "id"),
    ("entity_relations", "entity_relations_ibfk_2", "target_entity_id", "entities", "id"),
    ("entity_relations", "entity_relations_ibfk_3", "chunk_id", "chunks", "id"),
    ("document_entities", "document_entities_ibfk_1", "document_id", "documents", "id"),
    ("document_entities", "document_entities_ibfk_2", "entity_id", "entities", "id"),
    ("generated_questions", "generated_questions_ibfk_1", "document_id", "documents", "id"),
    ("generated_questions", "generated_questions_ibfk_2", "page_id", "document_pages", "id"),
    ("generated_questions", "generated_questions_ibfk_3", "chunk_id", "chunks", "id"),
    ("document_knowledge", "document_knowledge_ibfk_1", "document_id", "documents", "id"),
    ("document_pages", "document_pages_ibfk_1", "document_id", "documents", "id"),
    ("document_sections", "document_sections_ibfk_1", "document_id", "documents", "id"),
    ("document_taxonomy", "document_taxonomy_ibfk_1", "document_id", "documents", "id"),
    ("section_knowledge", "section_knowledge_ibfk_1", "section_id", "document_sections", "id"),
    ("page_knowledge", "page_knowledge_ibfk_1", "page_id", "document_pages", "id"),
    ("content_sources", "content_sources_ibfk_2", "chunk_id", "chunks", "id"),
]


def reset_mariadb():
    """Reset all pipeline tables in MariaDB."""
    print("\n[1/2] Resetting MariaDB...")

    conn = mysql.connector.connect(**DB_CONFIG)
    cursor = conn.cursor()

    # Disable foreign key checks for clean truncation
    cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
    conn.commit()

    # Truncate tables
    print("  Truncating tables...")
    for table in TABLES_TO_CLEAR:
        try:
            cursor.execute(f"TRUNCATE TABLE {table}")
            print(f"    - {table} cleared")
        except mysql.connector.Error as e:
            print(f"    - {table} skipped: {e.msg}")
    conn.commit()

    # Re-enable foreign key checks
    cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
    conn.commit()

    # Verify
    cursor.execute("SELECT COUNT(*) FROM documents")
    doc_count = cursor.fetchone()[0]
    cursor.execute("SELECT COUNT(*) FROM chunks")
    chunk_count = cursor.fetchone()[0]
    cursor.execute("SELECT COUNT(*) FROM entities")
    entity_count = cursor.fetchone()[0]

    cursor.close()
    conn.close()

    print(f"  MariaDB reset complete: {doc_count} documents, {chunk_count} chunks, {entity_count} entities")
    return doc_count == 0 and chunk_count == 0 and entity_count == 0


def reset_qdrant():
    """Reset Qdrant documents collection."""
    print("\n[2/2] Resetting Qdrant...")

    base_url = f"http://{QDRANT_HOST}:{QDRANT_PORT}"

    # Delete collection
    try:
        resp = requests.delete(f"{base_url}/collections/documents", timeout=10)
        if resp.status_code in (200, 404):
            print("  Collection deleted")
    except requests.RequestException as e:
        print(f"  Delete failed: {e}")
        return False

    # Recreate collection
    try:
        resp = requests.put(
            f"{base_url}/collections/documents", json={"vectors": {"size": 1024, "distance": "Cosine"}}, timeout=10
        )
        if resp.status_code == 200:
            print("  Collection recreated")
    except requests.RequestException as e:
        print(f"  Recreate failed: {e}")
        return False

    # Verify
    try:
        resp = requests.get(f"{base_url}/collections/documents", timeout=10)
        data = resp.json()
        points = data.get("result", {}).get("points_count", 0)
        print(f"  Qdrant reset complete: {points} points")
        return points == 0
    except requests.RequestException:
        return False


def main():
    parser = argparse.ArgumentParser(description="Reset pipeline data")
    parser.add_argument("--force", action="store_true", help="Skip confirmation")
    args = parser.parse_args()

    print("=" * 50)
    print("Pipeline Reset Script")
    print("=" * 50)
    print("\nThis will DELETE all:")
    print("  - Documents and chunks from MariaDB")
    print("  - Embeddings from Qdrant")
    print("  - Pipeline runs and queue")

    if not args.force:
        confirm = input("\nAre you sure? Type 'yes' to confirm: ")
        if confirm.lower() != "yes":
            print("Aborted.")
            sys.exit(0)

    db_ok = reset_mariadb()
    qdrant_ok = reset_qdrant()

    print("\n" + "=" * 50)
    if db_ok and qdrant_ok:
        print("Reset complete!")
    else:
        print("Reset completed with warnings.")
    print("=" * 50)


if __name__ == "__main__":
    main()
← Übersicht