detect.py
- Pfad:
/var/www/scripts/pipeline/detect.py - Namespace: pipeline
- Zeilen: 125 | Größe: 3,573 Bytes
- Geändert: 2025-12-23 22:57:42 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 98
- Dependencies: 90 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 6
- use hashlib
- use os
- use pathlib.Path
- use config.NEXTCLOUD_PATH
- use config.SUPPORTED_EXTENSIONS
- use db.db
Funktionen 4
-
calculate_file_hash()Zeile 14 -
scan_directory()Zeile 23 -
queue_files()Zeile 89 -
run()Zeile 99
Code
"""
File detection for KI-System Pipeline
Monitors Nextcloud folder for new/changed documents.
"""
import hashlib
import os
from pathlib import Path
from config import NEXTCLOUD_PATH, SUPPORTED_EXTENSIONS
from db import db
def calculate_file_hash(file_path):
"""Calculate SHA-256 hash of file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()
def scan_directory(path=None):
"""
Scan directory for supported documents.
Returns list of files to process.
"""
path = path or NEXTCLOUD_PATH
files_to_process = []
if not os.path.exists(path):
db.log("ERROR", f"Source path does not exist: {path}")
return files_to_process
for root, dirs, files in os.walk(path):
# Skip hidden directories
dirs[:] = [d for d in dirs if not d.startswith(".")]
for filename in files:
# Skip hidden files
if filename.startswith("."):
continue
file_path = os.path.join(root, filename)
ext = Path(filename).suffix.lower()
if ext not in SUPPORTED_EXTENSIONS:
continue
file_stat = os.stat(file_path)
file_hash = calculate_file_hash(file_path)
# Check if file exists in database
existing_id = db.document_exists(file_path)
if existing_id:
# Check if file changed (by hash)
cursor = db.execute("SELECT file_hash FROM documents WHERE id = %s", (existing_id,))
result = cursor.fetchone()
cursor.close()
if result and result["file_hash"] != file_hash:
files_to_process.append(
{
"path": file_path,
"name": filename,
"ext": ext,
"size": file_stat.st_size,
"hash": file_hash,
"action": "update",
"existing_id": existing_id,
}
)
else:
files_to_process.append(
{
"path": file_path,
"name": filename,
"ext": ext,
"size": file_stat.st_size,
"hash": file_hash,
"action": "new",
}
)
return files_to_process
def queue_files(files):
"""Add detected files to processing queue."""
queued = 0
for file_info in files:
queue_id = db.add_to_queue(file_info["path"], file_info["action"])
db.log("INFO", f"Queued: {file_info['name']}", f"queue_id={queue_id}")
queued += 1
return queued
def run():
"""Main detection routine."""
db.connect()
db.log("INFO", "Starting file detection scan")
try:
files = scan_directory()
db.log("INFO", f"Found {len(files)} files to process")
if files:
queued = queue_files(files)
db.log("INFO", f"Queued {queued} files")
return files
except Exception as e:
db.log("ERROR", f"Detection error: {str(e)}")
raise
finally:
db.disconnect()
if __name__ == "__main__":
files = run()
for f in files:
print(f"[{f['action']}] {f['name']} ({f['size']} bytes)")