test_validator.py

Code Hygiene Score: 97

Issues 1

Zeile Typ Beschreibung
104 magic_number Magic Number gefunden: 100

Dependencies 6

Klassen 1

Code

#!/usr/bin/env python3
"""Tests für QueryValidator"""

import sys
from pathlib import Path

import pytest

sys.path.insert(0, "/var/www/mcp-servers/mcp_db")
sys.path.insert(0, "/var/www/mcp-servers/shared")
from constants import PERCENT_HALF

from dotenv import load_dotenv

load_dotenv(Path("/var/www/mcp-servers/mcp_db/.env"))

from validators.query_validator import QueryValidator


class TestQueryValidator:
    """Test QueryValidator Funktionalität"""

    def test_simple_valid_query(self):
        """Test: Einfache gültige Query"""
        valid, error = QueryValidator.validate_query(
            "SELECT * FROM mcp_log", "ki_protokoll", PERCENT_HALF
        )
        assert valid is True
        assert error == ""

    def test_drop_keyword_blocked(self):
        """Test: DROP Keyword blockiert"""
        valid, error = QueryValidator.validate_query(
            "SELECT * FROM mcp_log; DROP TABLE mcp_log", "ki_protokoll", PERCENT_HALF
        )
        assert valid is False
        assert "DROP" in error

    def test_sleep_keyword_blocked(self):
        """Test: SLEEP Keyword blockiert"""
        valid, error = QueryValidator.validate_query(
            "SELECT SLEEP(10) FROM mcp_log", "ki_protokoll", PERCENT_HALF
        )
        assert valid is False
        assert "SLEEP" in error

    def test_insert_blocked(self):
        """Test: INSERT blockiert (kein SELECT)"""
        valid, error = QueryValidator.validate_query(
            "INSERT INTO mcp_log VALUES (1,2,3)", "ki_protokoll", PERCENT_HALF
        )
        assert valid is False
        assert "Only SELECT" in error

    def test_update_keyword_blocked(self):
        """Test: UPDATE Keyword blockiert"""
        valid, error = QueryValidator.validate_query(
            "SELECT * FROM mcp_log WHERE status='denied' OR 1=1; UPDATE mcp_log SET status='success'",
            "ki_protokoll",
            50,
        )
        assert valid is False
        assert "UPDATE" in error

    def test_delete_keyword_blocked(self):
        """Test: DELETE Keyword blockiert"""
        valid, error = QueryValidator.validate_query(
            "SELECT * FROM mcp_log; DELETE FROM mcp_log", "ki_protokoll", PERCENT_HALF
        )
        assert valid is False
        assert "DELETE" in error

    def test_load_file_blocked(self):
        """Test: LOAD_FILE blockiert"""
        valid, error = QueryValidator.validate_query(
            "SELECT LOAD_FILE('/etc/passwd')", "ki_protokoll", PERCENT_HALF
        )
        assert valid is False
        assert "LOAD_FILE" in error

    def test_mysql_database_blocked(self):
        """Test: mysql Datenbank blockiert"""
        valid, error = QueryValidator.validate_query("SELECT * FROM users", "mysql", PERCENT_HALF)
        assert valid is False
        assert "not allowed" in error

    def test_users_table_blocked(self):
        """Test: users Tabelle blockiert"""
        valid, error = QueryValidator.validate_query(
            "SELECT * FROM users", "ki_protokoll", PERCENT_HALF
        )
        assert valid is False
        assert "not allowed" in error

    def test_query_too_long(self):
        """Test: Query > 2000 Zeichen blockiert"""
        long_query = "SELECT * FROM mcp_log WHERE " + "id=1 OR " * 500
        valid, error = QueryValidator.validate_query(long_query, "ki_protokoll", PERCENT_HALF)
        assert valid is False
        assert "max" in error
        assert "chars" in error

    def test_max_rows_too_high(self):
        """Test: max_rows > 100 blockiert"""
        valid, error = QueryValidator.validate_query(
            "SELECT * FROM mcp_log", "ki_protokoll", 999
        )
        assert valid is False
        assert "max_rows" in error

    def test_ki_system_database_allowed(self):
        """Test: ki_system Datenbank erlaubt"""
        valid, error = QueryValidator.validate_query(
            "SELECT * FROM chunks", "ki_system", PERCENT_HALF
        )
        assert valid is True
        assert error == ""


if __name__ == "__main__":
    pytest.main([__file__, "-v"])
← Übersicht Graph