block_direct_db.py

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 3

Funktionen 2

Code

#!/usr/bin/env python3
"""
Block Direct Database Access Hook

Blockiert direkte mysql/mariadb-Befehle in Bash.
Erzwingt Nutzung von MCP-DB stattdessen.

Trigger: PreToolUse (Bash)
"""

import json
import re
import sys

# Patterns die IMMER blockiert werden
BLOCKED_PATTERNS = [
    r'\bmysql\s+-[up]',           # mysql -u oder mysql -p
    r'\bmysql\s+.*-p\S',          # mysql mit Passwort direkt nach -p
    r'\bmariadb\s+-[up]',         # mariadb -u oder mariadb -p
    r'\bmariadb\s+.*-p\S',        # mariadb mit Passwort
    r'--password=',               # --password=xxx
    r'\bmysqldump\b',             # mysqldump
    r'\bmysqlimport\b',           # mysqlimport
    r'\bmysqladmin\b',            # mysqladmin
    r'(?<![a-z-])-p[a-zA-Z0-9_]{6,}',  # -p gefolgt von Passwort, aber nicht nach Buchstaben/Bindestrich
]

BLOCK_MESSAGE = """BLOCKIERT: Direkter Datenbankzugriff ist verboten!

Verwende stattdessen MCP-DB:
- mcp__mcp-db__db_select  - für SELECT-Abfragen
- mcp__mcp-db__db_schema  - für Tabellenstruktur
- mcp__mcp-db__db_stats   - für Statistiken

NIEMALS Passwörter in Bash-Befehlen verwenden!"""


def check_command(command: str) -> tuple[bool, str]:
    """
    Prüft ob ein Befehl blockiert werden soll.

    Returns:
        (blocked: bool, reason: str)
    """
    for pattern in BLOCKED_PATTERNS:
        if re.search(pattern, command, re.IGNORECASE):
            return True, f"Pattern matched: {pattern}"

    return False, ""


def main():
    try:
        input_data = json.load(sys.stdin)
    except json.JSONDecodeError:
        # Kein JSON = kein Hook-Input, durchlassen
        sys.exit(0)

    # Nur Bash-Tool prüfen
    tool_name = input_data.get("tool_name", "")
    if tool_name != "Bash":
        sys.exit(0)

    # Command extrahieren
    tool_input = input_data.get("tool_input", {})
    command = tool_input.get("command", "")

    if not command:
        sys.exit(0)

    # Prüfen
    blocked, reason = check_command(command)

    if blocked:
        # BLOCKIEREN mit Exit-Code 2 und stderr-Nachricht
        print(BLOCK_MESSAGE, file=sys.stderr)
        sys.exit(2)

    # Durchlassen
    sys.exit(0)


if __name__ == "__main__":
    main()
← Übersicht