test_validator.py
- Pfad:
/var/www/mcp-servers/mcp-db/tests/test_validator.py
- Namespace: -
- Zeilen: 122 | Größe: 4,055 Bytes
- Geändert: 2025-12-28 12:21:49 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 97
- Dependencies: 90 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 90 (10%)
Issues 1
| Zeile |
Typ |
Beschreibung |
| 104 |
magic_number |
Magic Number gefunden: 100 |
Dependencies 6
- use sys
- use pathlib.Path
- use pytest
- use constants.PERCENT_HALF
- use dotenv.load_dotenv
- use validators.query_validator.QueryValidator
Klassen 1
-
TestQueryValidator
class
Zeile 20
Code
#!/usr/bin/env python3
"""Tests für QueryValidator"""
import sys
from pathlib import Path
import pytest
sys.path.insert(0, "/var/www/mcp-servers/mcp_db")
sys.path.insert(0, "/var/www/mcp-servers/shared")
from constants import PERCENT_HALF
from dotenv import load_dotenv
load_dotenv(Path("/var/www/mcp-servers/mcp_db/.env"))
from validators.query_validator import QueryValidator
class TestQueryValidator:
"""Test QueryValidator Funktionalität"""
def test_simple_valid_query(self):
"""Test: Einfache gültige Query"""
valid, error = QueryValidator.validate_query(
"SELECT * FROM mcp_log", "ki_protokoll", PERCENT_HALF
)
assert valid is True
assert error == ""
def test_drop_keyword_blocked(self):
"""Test: DROP Keyword blockiert"""
valid, error = QueryValidator.validate_query(
"SELECT * FROM mcp_log; DROP TABLE mcp_log", "ki_protokoll", PERCENT_HALF
)
assert valid is False
assert "DROP" in error
def test_sleep_keyword_blocked(self):
"""Test: SLEEP Keyword blockiert"""
valid, error = QueryValidator.validate_query(
"SELECT SLEEP(10) FROM mcp_log", "ki_protokoll", PERCENT_HALF
)
assert valid is False
assert "SLEEP" in error
def test_insert_blocked(self):
"""Test: INSERT blockiert (kein SELECT)"""
valid, error = QueryValidator.validate_query(
"INSERT INTO mcp_log VALUES (1,2,3)", "ki_protokoll", PERCENT_HALF
)
assert valid is False
assert "Only SELECT" in error
def test_update_keyword_blocked(self):
"""Test: UPDATE Keyword blockiert"""
valid, error = QueryValidator.validate_query(
"SELECT * FROM mcp_log WHERE status='denied' OR 1=1; UPDATE mcp_log SET status='success'",
"ki_protokoll",
50,
)
assert valid is False
assert "UPDATE" in error
def test_delete_keyword_blocked(self):
"""Test: DELETE Keyword blockiert"""
valid, error = QueryValidator.validate_query(
"SELECT * FROM mcp_log; DELETE FROM mcp_log", "ki_protokoll", PERCENT_HALF
)
assert valid is False
assert "DELETE" in error
def test_load_file_blocked(self):
"""Test: LOAD_FILE blockiert"""
valid, error = QueryValidator.validate_query(
"SELECT LOAD_FILE('/etc/passwd')", "ki_protokoll", PERCENT_HALF
)
assert valid is False
assert "LOAD_FILE" in error
def test_mysql_database_blocked(self):
"""Test: mysql Datenbank blockiert"""
valid, error = QueryValidator.validate_query("SELECT * FROM users", "mysql", PERCENT_HALF)
assert valid is False
assert "not allowed" in error
def test_users_table_blocked(self):
"""Test: users Tabelle blockiert"""
valid, error = QueryValidator.validate_query(
"SELECT * FROM users", "ki_protokoll", PERCENT_HALF
)
assert valid is False
assert "not allowed" in error
def test_query_too_long(self):
"""Test: Query > 2000 Zeichen blockiert"""
long_query = "SELECT * FROM mcp_log WHERE " + "id=1 OR " * 500
valid, error = QueryValidator.validate_query(long_query, "ki_protokoll", PERCENT_HALF)
assert valid is False
assert "max" in error
assert "chars" in error
def test_max_rows_too_high(self):
"""Test: max_rows > 100 blockiert"""
valid, error = QueryValidator.validate_query(
"SELECT * FROM mcp_log", "ki_protokoll", 999
)
assert valid is False
assert "max_rows" in error
def test_ki_system_database_allowed(self):
"""Test: ki_system Datenbank erlaubt"""
valid, error = QueryValidator.validate_query(
"SELECT * FROM chunks", "ki_system", PERCENT_HALF
)
assert valid is True
assert error == ""
if __name__ == "__main__":
pytest.main([__file__, "-v"])