reset_pipeline.py
- Pfad:
/var/www/scripts/pipeline/reset_pipeline.py - Namespace: pipeline
- Zeilen: 203 | Größe: 7,054 Bytes
- Geändert: 2025-12-25 09:31:24 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 95
- Dependencies: 80 (25%)
- LOC: 99 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 7
- use argparse
- use sys
- use mysql.connector
- use requests
- use config.DB_CONFIG
- use config.QDRANT_HOST
- use config.QDRANT_PORT
Funktionen 3
-
reset_mariadb()Zeile 94 -
reset_qdrant()Zeile 134 -
main()Zeile 171
Code
#!/usr/bin/env python3
"""
Pipeline Reset Script
Clears all pipeline data from MariaDB and Qdrant.
Usage:
python reset_pipeline.py # Interactive confirmation
python reset_pipeline.py --force # No confirmation
"""
import argparse
import sys
import mysql.connector
import requests
from config import DB_CONFIG, QDRANT_HOST, QDRANT_PORT
# Tables to truncate in order (respects FK dependencies)
TABLES_TO_CLEAR = [
# Child tables first (depend on chunks/documents/entities)
"chunk_entities",
"chunk_semantics",
"chunk_taxonomy",
"entity_relations",
"entity_ontology",
"entity_synonyms",
"entity_semantics",
"entity_taxonomy_mapping",
"entity_classifications",
"generated_questions",
"document_knowledge",
"section_knowledge",
"page_knowledge",
"document_pages",
"document_sections",
"document_taxonomy",
"content_sources",
# Parent tables last
"chunks",
"documents",
"entities",
"pipeline_queue",
"pipeline_runs",
]
# Foreign keys to temporarily drop
FK_TO_DROP = [
("chunks", "chunks_ibfk_1"),
("chunk_entities", "chunk_entities_ibfk_1"),
("chunk_semantics", "chunk_semantics_ibfk_1"),
("chunk_taxonomy", "chunk_taxonomy_ibfk_1"),
("entity_relations", "entity_relations_ibfk_1"),
("entity_relations", "entity_relations_ibfk_2"),
("entity_relations", "entity_relations_ibfk_3"),
("document_entities", "document_entities_ibfk_1"),
("document_entities", "document_entities_ibfk_2"),
("generated_questions", "generated_questions_ibfk_1"),
("generated_questions", "generated_questions_ibfk_2"),
("generated_questions", "generated_questions_ibfk_3"),
("document_knowledge", "document_knowledge_ibfk_1"),
("document_pages", "document_pages_ibfk_1"),
("document_sections", "document_sections_ibfk_1"),
("document_taxonomy", "document_taxonomy_ibfk_1"),
("section_knowledge", "section_knowledge_ibfk_1"),
("page_knowledge", "page_knowledge_ibfk_1"),
("content_sources", "content_sources_ibfk_2"),
]
# Foreign keys to recreate
FK_TO_CREATE = [
("chunks", "chunks_ibfk_1", "document_id", "documents", "id"),
("chunk_entities", "chunk_entities_ibfk_1", "chunk_id", "chunks", "id"),
("chunk_semantics", "chunk_semantics_ibfk_1", "chunk_id", "chunks", "id"),
("chunk_taxonomy", "chunk_taxonomy_ibfk_1", "chunk_id", "chunks", "id"),
("entity_relations", "entity_relations_ibfk_1", "source_entity_id", "entities", "id"),
("entity_relations", "entity_relations_ibfk_2", "target_entity_id", "entities", "id"),
("entity_relations", "entity_relations_ibfk_3", "chunk_id", "chunks", "id"),
("document_entities", "document_entities_ibfk_1", "document_id", "documents", "id"),
("document_entities", "document_entities_ibfk_2", "entity_id", "entities", "id"),
("generated_questions", "generated_questions_ibfk_1", "document_id", "documents", "id"),
("generated_questions", "generated_questions_ibfk_2", "page_id", "document_pages", "id"),
("generated_questions", "generated_questions_ibfk_3", "chunk_id", "chunks", "id"),
("document_knowledge", "document_knowledge_ibfk_1", "document_id", "documents", "id"),
("document_pages", "document_pages_ibfk_1", "document_id", "documents", "id"),
("document_sections", "document_sections_ibfk_1", "document_id", "documents", "id"),
("document_taxonomy", "document_taxonomy_ibfk_1", "document_id", "documents", "id"),
("section_knowledge", "section_knowledge_ibfk_1", "section_id", "document_sections", "id"),
("page_knowledge", "page_knowledge_ibfk_1", "page_id", "document_pages", "id"),
("content_sources", "content_sources_ibfk_2", "chunk_id", "chunks", "id"),
]
def reset_mariadb():
"""Reset all pipeline tables in MariaDB."""
print("\n[1/2] Resetting MariaDB...")
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
# Disable foreign key checks for clean truncation
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
conn.commit()
# Truncate tables
print(" Truncating tables...")
for table in TABLES_TO_CLEAR:
try:
cursor.execute(f"TRUNCATE TABLE {table}")
print(f" - {table} cleared")
except mysql.connector.Error as e:
print(f" - {table} skipped: {e.msg}")
conn.commit()
# Re-enable foreign key checks
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
conn.commit()
# Verify
cursor.execute("SELECT COUNT(*) FROM documents")
doc_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM chunks")
chunk_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM entities")
entity_count = cursor.fetchone()[0]
cursor.close()
conn.close()
print(f" MariaDB reset complete: {doc_count} documents, {chunk_count} chunks, {entity_count} entities")
return doc_count == 0 and chunk_count == 0 and entity_count == 0
def reset_qdrant():
"""Reset Qdrant documents collection."""
print("\n[2/2] Resetting Qdrant...")
base_url = f"http://{QDRANT_HOST}:{QDRANT_PORT}"
# Delete collection
try:
resp = requests.delete(f"{base_url}/collections/documents", timeout=10)
if resp.status_code in (200, 404):
print(" Collection deleted")
except requests.RequestException as e:
print(f" Delete failed: {e}")
return False
# Recreate collection
try:
resp = requests.put(
f"{base_url}/collections/documents", json={"vectors": {"size": 1024, "distance": "Cosine"}}, timeout=10
)
if resp.status_code == 200:
print(" Collection recreated")
except requests.RequestException as e:
print(f" Recreate failed: {e}")
return False
# Verify
try:
resp = requests.get(f"{base_url}/collections/documents", timeout=10)
data = resp.json()
points = data.get("result", {}).get("points_count", 0)
print(f" Qdrant reset complete: {points} points")
return points == 0
except requests.RequestException:
return False
def main():
parser = argparse.ArgumentParser(description="Reset pipeline data")
parser.add_argument("--force", action="store_true", help="Skip confirmation")
args = parser.parse_args()
print("=" * 50)
print("Pipeline Reset Script")
print("=" * 50)
print("\nThis will DELETE all:")
print(" - Documents and chunks from MariaDB")
print(" - Embeddings from Qdrant")
print(" - Pipeline runs and queue")
if not args.force:
confirm = input("\nAre you sure? Type 'yes' to confirm: ")
if confirm.lower() != "yes":
print("Aborted.")
sys.exit(0)
db_ok = reset_mariadb()
qdrant_ok = reset_qdrant()
print("\n" + "=" * 50)
if db_ok and qdrant_ok:
print("Reset complete!")
else:
print("Reset completed with warnings.")
print("=" * 50)
if __name__ == "__main__":
main()