detect.py

Code Hygiene Score: 98

Keine Issues gefunden.

Dependencies 6

Funktionen 4

Code

"""
File detection for KI-System Pipeline
Monitors Nextcloud folder for new/changed documents.
"""

import hashlib
import os
from pathlib import Path

from config import NEXTCLOUD_PATH, SUPPORTED_EXTENSIONS
from db import db


def calculate_file_hash(file_path):
    """Calculate SHA-256 hash of file."""
    sha256 = hashlib.sha256()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            sha256.update(chunk)
    return sha256.hexdigest()


def scan_directory(path=None):
    """
    Scan directory for supported documents.
    Returns list of files to process.
    """
    path = path or NEXTCLOUD_PATH
    files_to_process = []

    if not os.path.exists(path):
        db.log("ERROR", f"Source path does not exist: {path}")
        return files_to_process

    for root, dirs, files in os.walk(path):
        # Skip hidden directories
        dirs[:] = [d for d in dirs if not d.startswith(".")]

        for filename in files:
            # Skip hidden files
            if filename.startswith("."):
                continue

            file_path = os.path.join(root, filename)
            ext = Path(filename).suffix.lower()

            if ext not in SUPPORTED_EXTENSIONS:
                continue

            file_stat = os.stat(file_path)
            file_hash = calculate_file_hash(file_path)

            # Check if file exists in database
            existing_id = db.document_exists(file_path)

            if existing_id:
                # Check if file changed (by hash)
                cursor = db.execute("SELECT file_hash FROM documents WHERE id = %s", (existing_id,))
                result = cursor.fetchone()
                cursor.close()

                if result and result["file_hash"] != file_hash:
                    files_to_process.append(
                        {
                            "path": file_path,
                            "name": filename,
                            "ext": ext,
                            "size": file_stat.st_size,
                            "hash": file_hash,
                            "action": "update",
                            "existing_id": existing_id,
                        }
                    )
            else:
                files_to_process.append(
                    {
                        "path": file_path,
                        "name": filename,
                        "ext": ext,
                        "size": file_stat.st_size,
                        "hash": file_hash,
                        "action": "new",
                    }
                )

    return files_to_process


def queue_files(files):
    """Add detected files to processing queue."""
    queued = 0
    for file_info in files:
        queue_id = db.add_to_queue(file_info["path"], file_info["action"])
        db.log("INFO", f"Queued: {file_info['name']}", f"queue_id={queue_id}")
        queued += 1
    return queued


def run():
    """Main detection routine."""
    db.connect()
    db.log("INFO", "Starting file detection scan")

    try:
        files = scan_directory()
        db.log("INFO", f"Found {len(files)} files to process")

        if files:
            queued = queue_files(files)
            db.log("INFO", f"Queued {queued} files")

        return files

    except Exception as e:
        db.log("ERROR", f"Detection error: {str(e)}")
        raise
    finally:
        db.disconnect()


if __name__ == "__main__":
    files = run()
    for f in files:
        print(f"[{f['action']}] {f['name']} ({f['size']} bytes)")
← Übersicht