hook_dispatcher.py
- Pfad:
/var/www/tools/ki-protokoll/claude-hook/hook_dispatcher.py - Namespace: claude-hook
- Zeilen: 195 | Größe: 6,079 Bytes
- Geändert: 2025-12-28 09:47:10 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 98
- Dependencies: 90 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 6
- use json
- use sys
- use importlib.util
- use os
- use pathlib.Path
- use importlib
Funktionen 3
-
should_check()Zeile 47 -
load_module()Zeile 86 -
main()Zeile 97
Code
#!/usr/bin/python3
"""
Hook Dispatcher - Stabiler Einstiegspunkt für Claude Code Hooks.
Änderungen an Regeln nur in quality/*.py
Usage:
hook_dispatcher.py pre # PreToolUse (BLOCK)
hook_dispatcher.py post # PostToolUse (WARN)
Unterstützte Dateitypen:
- .php: Alle PHP-Dateien
- .py: Nur in PYTHON_CHECK_PATHS
"""
import json
import sys
import importlib.util
import os
from pathlib import Path
# Add quality directory to path for relative imports
HOOK_DIR = Path(__file__).parent
sys.path.insert(0, str(HOOK_DIR))
# =============================================================================
# KONFIGURATION: Welche Dateien werden geprüft?
# =============================================================================
# Erlaubte Dateiendungen
ALLOWED_EXTENSIONS = {".php", ".py"}
# Python-Dateien werden NUR in diesen Pfaden geprüft
PYTHON_CHECK_PATHS = [
"/var/www/scripts/pipeline/",
]
# Diese Pfade werden IMMER übersprungen
SKIP_PATHS = [
"/venv/",
"/__pycache__/",
"/tests/",
"/vendor/",
]
def should_check(file_path: str) -> bool:
"""
Prüft ob eine Datei vom Hook-System geprüft werden soll.
Args:
file_path: Absoluter Pfad zur Datei
Returns:
True wenn Datei geprüft werden soll
"""
# Skip-Pfade immer überspringen
if any(skip in file_path for skip in SKIP_PATHS):
return False
# Dateiendung prüfen
ext = Path(file_path).suffix
if ext not in ALLOWED_EXTENSIONS:
return False
# PHP-Dateien: immer prüfen
if ext == ".php":
return True
# Python-Dateien: nur in bestimmten Pfaden
if ext == ".py":
return any(file_path.startswith(p) for p in PYTHON_CHECK_PATHS)
return False
# DB-Password aus .env laden
env_path = Path(__file__).parent / ".env"
if env_path.exists():
with open(env_path) as f:
for line in f:
if "=" in line and not line.startswith("#"):
key, value = line.strip().split("=", 1)
os.environ[key] = value
def load_module(name: str):
"""Lädt ein Python-Modul dynamisch aus quality/."""
module_path = HOOK_DIR / "quality" / f"{name}.py"
if not module_path.exists():
return None
# Use regular import to handle relative imports properly
import importlib
return importlib.import_module(f"quality.{name}")
def main():
if len(sys.argv) < 2:
print(json.dumps({"error": "Usage: hook_dispatcher.py pre|post"}))
sys.exit(1)
mode = sys.argv[1]
# Hook-Input von stdin lesen
try:
hook_input = json.load(sys.stdin)
except json.JSONDecodeError:
print(json.dumps({"allowed": True})) # Bei Fehler durchlassen
sys.exit(0)
# Tool-Daten extrahieren
tool_name = hook_input.get("tool_name", "")
tool_input = hook_input.get("tool_input", {})
# Nur für Write/Edit auf PHP-Dateien
file_path = tool_input.get("file_path", "")
# Für Edit: Simuliere vollständigen Datei-Inhalt nach dem Edit
if tool_name == "Edit":
old_string = tool_input.get("old_string", "")
new_string = tool_input.get("new_string", "")
try:
with open(file_path, "r", encoding="utf-8") as f:
full_content = f.read()
# Simuliere den Edit
content = full_content.replace(old_string, new_string, 1)
except (OSError, IOError):
content = new_string # Fallback bei Lesefehler
else:
content = tool_input.get("content", "")
if not should_check(file_path):
# Datei nicht in Prüfungsscope: durchlassen ohne Output (implizit allow)
sys.exit(0)
# Modul laden und ausführen
if mode == "pre":
module = load_module("pre_rules")
if module and hasattr(module, "check"):
result = module.check(file_path, content)
if not result.get("allowed", True):
# Task für Violation erstellen
try:
task_creator = load_module("task_creator")
if task_creator:
# Parse Rule-ID aus Message
msg = result.get("message", "")
rule_id = "UNKNOWN"
if "[" in msg and "]" in msg:
rule_id = msg.split("[")[1].split("]")[0]
task_creator.create_violation_task(
file_path, rule_id, msg, "block"
)
except Exception:
pass # Task-Erstellung darf Hook nicht crashen
# BLOCK: Korrektes Claude Code Format
output = {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": result.get("message", "Quality check failed")
}
}
print(json.dumps(output))
# Bei allow: kein Output nötig (implizit allow)
elif mode == "post":
module = load_module("post_rules")
if module and hasattr(module, "check"):
result = module.check(file_path, content)
# Post-Hook gibt Warnungen aus, blockiert nicht
if result.get("warnings"):
# Tasks für Warnungen erstellen
try:
task_creator = load_module("task_creator")
if task_creator:
task_creator.create_tasks_for_warnings(
file_path, result["warnings"]
)
except Exception:
pass # Task-Erstellung darf Hook nicht crashen
for warning in result["warnings"]:
print(f"[QUALITY] {warning}", file=sys.stderr)
# Post-Hook blockiert nie
else:
print(json.dumps({"error": f"Unknown mode: {mode}"}))
sys.exit(1)
if __name__ == "__main__":
main()