test_all_functions.py
- Pfad:
/var/www/mcp-servers/mcp-db/test_all_functions.py
- Namespace: -
- Zeilen: 744 | Größe: 22,200 Bytes
- Geändert: 2025-12-28 12:21:49 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 53
- Dependencies: 0 (25%)
- LOC: 0 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 80 (10%)
Issues 4
| Zeile |
Typ |
Beschreibung |
| 194 |
magic_number |
Magic Number gefunden: 100 |
| 348 |
magic_number |
Magic Number gefunden: 100 |
| - |
complexity |
Datei hat 744 Zeilen (max: 500) |
| - |
coupling |
Klasse hat 17 Dependencies (max: 15) |
Dependencies 17
- use sys
- use pathlib.Path
- use constants.DEFAULT_LIMIT
- use constants.MAX_ROWS
- use constants.PERCENT_HALF
- use dotenv.load_dotenv
- use config.Config
- use validators.query_validator.QueryValidator
- use domain.query_contract.QueryRequest
- use domain.query_contract.QueryResponse
- use domain.query_contract.QueryStatus
- use tools.select_tool.SelectExecutor
- use infrastructure.db_connection.DatabaseConnection
- use infrastructure.protokoll_logger.ProtokollLogger
- use mysql.connector
- use domain.log_contract.LogEntry
- use traceback
Klassen 1
-
TestReport
class
Zeile 23
Funktionen 7
-
test_validator()
Zeile 71
-
test_db_select()
Zeile 211
-
test_db_schema()
Zeile 365
-
test_db_stats()
Zeile 451
-
test_logging()
Zeile 526
-
test_config()
Zeile 655
-
main()
Zeile 715
Code
#!/usr/bin/env python3
"""Umfassende Tests für MCP-DB Server - Alle Funktionen"""
import sys
from pathlib import Path
sys.path.insert(0, "/var/www/mcp-servers/mcp_db")
sys.path.insert(0, "/var/www/mcp-servers/shared")
from constants import DEFAULT_LIMIT, MAX_ROWS, PERCENT_HALF
# .env laden BEVOR Config importiert wird
from dotenv import load_dotenv
load_dotenv(Path("/var/www/mcp-servers/mcp_db/.env"))
from config import Config
from validators.query_validator import QueryValidator
from domain.query_contract import QueryRequest, QueryResponse, QueryStatus
from tools.select_tool import SelectExecutor
from infrastructure.db_connection import DatabaseConnection
from infrastructure.protokoll_logger import ProtokollLogger
class TestReport:
"""Test-Bericht Klasse"""
def __init__(self):
self.tests = []
self.passed = 0
self.failed = 0
def add_test(self, category: str, name: str, success: bool, details: str = ""):
"""Test hinzufügen"""
status = "PASS" if success else "FAIL"
self.tests.append(
{
"category": category,
"name": name,
"status": status,
"success": success,
"details": details,
}
)
if success:
self.passed += 1
else:
self.failed += 1
def print_report(self):
"""Bericht ausgeben"""
print("\n" + "=" * 80)
print("MCP-DB SERVER TEST REPORT")
print("=" * 80 + "\n")
current_category = None
for test in self.tests:
if test["category"] != current_category:
current_category = test["category"]
print(f"\n{current_category}")
print("-" * 80)
status_symbol = "✓" if test["success"] else "✗"
print(f"{status_symbol} [{test['status']}] {test['name']}")
if test["details"]:
print(f" → {test['details']}")
print("\n" + "=" * 80)
print(f"SUMMARY: {self.passed} passed, {self.failed} failed")
print("=" * 80 + "\n")
def test_validator(report: TestReport):
"""Test QueryValidator"""
print("\n[1/6] Testing QueryValidator...")
# Test 1.1: Einfache gültige Query
valid, error = QueryValidator.validate_query(
"SELECT * FROM mcp_log", "ki_protokoll", PERCENT_HALF
)
report.add_test(
"VALIDATOR",
"Einfache gültige Query",
valid and error == "",
f"valid={valid}, error={error}",
)
# Test 1.2: DROP blockiert
valid, error = QueryValidator.validate_query(
"SELECT * FROM mcp_log; DROP TABLE mcp_log", "ki_protokoll", PERCENT_HALF
)
report.add_test(
"VALIDATOR",
"DROP Keyword blockiert",
not valid and "DROP" in error,
f"error={error}",
)
# Test 1.3: SLEEP blockiert
valid, error = QueryValidator.validate_query(
"SELECT SLEEP(10) FROM mcp_log", "ki_protokoll", PERCENT_HALF
)
report.add_test(
"VALIDATOR",
"SLEEP Keyword blockiert",
not valid and "SLEEP" in error,
f"error={error}",
)
# Test 1.4: INSERT blockiert
valid, error = QueryValidator.validate_query(
"INSERT INTO mcp_log VALUES (1,2,3)", "ki_protokoll", PERCENT_HALF
)
report.add_test(
"VALIDATOR",
"INSERT blockiert (kein SELECT)",
not valid and "Only SELECT" in error,
f"error={error}",
)
# Test 1.5: UPDATE blockiert
valid, error = QueryValidator.validate_query(
"SELECT * FROM mcp_log WHERE status='denied' OR 1=1; UPDATE mcp_log SET status='success'",
"ki_protokoll",
50,
)
report.add_test(
"VALIDATOR",
"UPDATE Keyword blockiert",
not valid and "UPDATE" in error,
f"error={error}",
)
# Test 1.6: DELETE blockiert
valid, error = QueryValidator.validate_query(
"SELECT * FROM mcp_log; DELETE FROM mcp_log", "ki_protokoll", PERCENT_HALF
)
report.add_test(
"VALIDATOR",
"DELETE Keyword blockiert",
not valid and "DELETE" in error,
f"error={error}",
)
# Test 1.7: LOAD_FILE blockiert
valid, error = QueryValidator.validate_query(
"SELECT LOAD_FILE('/etc/passwd')", "ki_protokoll", PERCENT_HALF
)
report.add_test(
"VALIDATOR",
"LOAD_FILE blockiert",
not valid and "LOAD_FILE" in error,
f"error={error}",
)
# Test 1.8: Nicht-erlaubte Datenbank
valid, error = QueryValidator.validate_query(
"SELECT * FROM users", "mysql", PERCENT_HALF
)
report.add_test(
"VALIDATOR",
"mysql Datenbank blockiert",
not valid and "not allowed" in error,
f"error={error}",
)
# Test 1.9: Nicht-erlaubte Tabelle
valid, error = QueryValidator.validate_query(
"SELECT * FROM users", "ki_protokoll", PERCENT_HALF
)
report.add_test(
"VALIDATOR",
"users Tabelle blockiert",
not valid and "not allowed" in error,
f"error={error}",
)
# Test 1.10: Query zu lang
long_query = "SELECT * FROM mcp_log WHERE " + "id=1 OR " * 500
valid, error = QueryValidator.validate_query(
long_query, "ki_protokoll", PERCENT_HALF
)
report.add_test(
"VALIDATOR",
"Query > 2000 Zeichen blockiert",
not valid and "max" in error and "chars" in error,
f"query_length={len(long_query)}, error={error}",
)
# Test 1.11: max_rows zu hoch
valid, error = QueryValidator.validate_query(
"SELECT * FROM mcp_log", "ki_protokoll", 999
)
report.add_test(
"VALIDATOR",
"max_rows > 100 blockiert",
not valid and "max_rows" in error,
f"error={error}",
)
# Test 1.12: Erlaubte Datenbank ki_system
valid, error = QueryValidator.validate_query(
"SELECT * FROM chunks", "ki_system", PERCENT_HALF
)
report.add_test(
"VALIDATOR",
"ki_system Datenbank erlaubt",
valid and error == "",
f"valid={valid}",
)
def test_db_select(report: TestReport):
"""Test db_select Tool"""
print("\n[2/6] Testing db_select Tool...")
# Test 2.1: Einfache SELECT Query
try:
import mysql.connector
conn = mysql.connector.connect(
host=Config.DB_HOST,
database="ki_protokoll",
user=Config.DB_USER,
password=Config.DB_PASSWORD,
charset="utf8mb4"
)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM mcp_log ORDER BY timestamp DESC LIMIT 10")
rows = cursor.fetchall()
cursor.close()
conn.close()
success = len(rows) >= 0
report.add_test(
"DB_SELECT",
"Einfache SELECT Query",
success,
f"rows={len(rows)}",
)
except Exception as e:
report.add_test("DB_SELECT", "Einfache SELECT Query", False, f"Exception: {e}")
# Test 2.2: SELECT mit Prepared Statement
try:
import mysql.connector
conn = mysql.connector.connect(
host=Config.DB_HOST,
database="ki_protokoll",
user=Config.DB_USER,
password=Config.DB_PASSWORD,
charset="utf8mb4"
)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM mcp_log WHERE status = %s LIMIT 5", ("success",))
rows = cursor.fetchall()
cursor.close()
conn.close()
success = isinstance(rows, list)
report.add_test(
"DB_SELECT",
"SELECT mit Prepared Statement",
success,
f"rows={len(rows)}",
)
except Exception as e:
report.add_test(
"DB_SELECT", "SELECT mit Prepared Statement", False, f"Exception: {e}"
)
# Test 2.3: SELECT mit max_rows Limit
try:
import mysql.connector
conn = mysql.connector.connect(
host=Config.DB_HOST,
database="ki_protokoll",
user=Config.DB_USER,
password=Config.DB_PASSWORD,
charset="utf8mb4"
)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM mcp_log LIMIT 3")
rows = cursor.fetchall()
cursor.close()
conn.close()
success = len(rows) <= 3
report.add_test(
"DB_SELECT",
"SELECT mit max_rows Limit",
success,
f"max_rows=3, actual_rows={len(rows)}",
)
except Exception as e:
report.add_test(
"DB_SELECT", "SELECT mit max_rows Limit", False, f"Exception: {e}"
)
# Test 2.4: SELECT auf ki_system Datenbank
try:
import mysql.connector
conn = mysql.connector.connect(
host=Config.DB_HOST,
database="ki_system",
user=Config.DB_USER,
password=Config.DB_PASSWORD,
charset="utf8mb4"
)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM chunks LIMIT 5")
rows = cursor.fetchall()
cursor.close()
conn.close()
success = isinstance(rows, list)
report.add_test(
"DB_SELECT",
"SELECT auf ki_system Datenbank",
success,
f"rows={len(rows)}",
)
except Exception as e:
report.add_test(
"DB_SELECT", "SELECT auf ki_system Datenbank", False, f"Exception: {e}"
)
# Test 2.5: Ungültige Query führt zu Error
try:
import mysql.connector
conn = mysql.connector.connect(
host=Config.DB_HOST,
database="ki_protokoll",
user=Config.DB_USER,
password=Config.DB_PASSWORD,
charset="utf8mb4"
)
cursor = conn.cursor(dictionary=True)
try:
cursor.execute("SELECT * FROM nonexistent_table")
rows = cursor.fetchall()
success = False
error_msg = "Query should have failed"
except Exception as query_error:
success = True
error_msg = str(query_error)[:100]
finally:
cursor.close()
conn.close()
report.add_test(
"DB_SELECT",
"Ungültige Query gibt ERROR zurück",
success,
f"error={error_msg}",
)
except Exception as e:
report.add_test(
"DB_SELECT", "Ungültige Query gibt ERROR zurück", False, f"Exception: {e}"
)
def test_db_schema(report: TestReport):
"""Test db_schema Tool"""
print("\n[3/6] Testing db_schema Tool...")
# Test 3.1: Schema von ki_protokoll
try:
if "ki_protokoll" not in Config.ALLOWED_DATABASES:
raise Exception("ki_protokoll not in ALLOWED_DATABASES")
with DatabaseConnection.get_connection("ki_protokoll") as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(
"""SELECT TABLE_NAME, TABLE_ROWS, CREATE_TIME
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = %s
AND TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME""",
("ki_protokoll",),
)
tables = cursor.fetchall()
cursor.close()
success = len(tables) > 0 and any(
t["TABLE_NAME"] == "mcp_log" for t in tables
)
report.add_test(
"DB_SCHEMA",
"Schema von ki_protokoll abrufen",
success,
f"tables_found={len(tables)}, contains_mcp_log={any(t['TABLE_NAME'] == 'mcp_log' for t in tables)}",
)
except Exception as e:
report.add_test(
"DB_SCHEMA", "Schema von ki_protokoll abrufen", False, f"Exception: {e}"
)
# Test 3.2: Schema von ki_system
try:
if "ki_system" not in Config.ALLOWED_DATABASES:
raise Exception("ki_system not in ALLOWED_DATABASES")
with DatabaseConnection.get_connection("ki_system") as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(
"""SELECT TABLE_NAME, TABLE_ROWS, CREATE_TIME
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = %s
AND TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME""",
("ki_system",),
)
tables = cursor.fetchall()
cursor.close()
success = len(tables) > 0
report.add_test(
"DB_SCHEMA",
"Schema von ki_system abrufen",
success,
f"tables_found={len(tables)}",
)
except Exception as e:
report.add_test(
"DB_SCHEMA", "Schema von ki_system abrufen", False, f"Exception: {e}"
)
# Test 3.3: Nicht-erlaubte Datenbank blockiert
try:
if "mysql" in Config.ALLOWED_DATABASES:
success = False
details = "mysql should not be in ALLOWED_DATABASES"
else:
success = True
details = "mysql correctly blocked by Config"
report.add_test(
"DB_SCHEMA", "Nicht-erlaubte Datenbank (mysql) blockiert", success, details
)
except Exception as e:
report.add_test(
"DB_SCHEMA",
"Nicht-erlaubte Datenbank (mysql) blockiert",
False,
f"Exception: {e}",
)
def test_db_stats(report: TestReport):
"""Test db_stats Tool"""
print("\n[4/6] Testing db_stats Tool...")
# Test 4.1: Letzte Logs abrufen
try:
import mysql.connector
# db_stats sollte mcp_readonly verwenden
conn = mysql.connector.connect(
host=Config.DB_HOST,
database=Config.LOG_DB_NAME,
user=Config.DB_USER,
password=Config.DB_PASSWORD,
charset="utf8mb4"
)
cursor = conn.cursor(dictionary=True)
cursor.execute(
"""SELECT id, timestamp, client_name, request, status,
duration_ms, error_message
FROM mcp_log
ORDER BY timestamp DESC
LIMIT 10""",
)
logs = cursor.fetchall()
cursor.close()
conn.close()
success = isinstance(logs, list)
report.add_test(
"DB_STATS",
"Letzte Logs abrufen (Limit=10)",
success,
f"logs_found={len(logs)}",
)
except Exception as e:
report.add_test("DB_STATS", "Letzte Logs abrufen (Limit=10)", False, f"Exception: {e}")
# Test 4.2: Limit Parameter funktioniert
try:
import mysql.connector
limit = 3
# db_stats sollte mcp_readonly verwenden
conn = mysql.connector.connect(
host=Config.DB_HOST,
database=Config.LOG_DB_NAME,
user=Config.DB_USER,
password=Config.DB_PASSWORD,
charset="utf8mb4"
)
cursor = conn.cursor(dictionary=True)
cursor.execute(
"""SELECT id, timestamp, client_name, request, status,
duration_ms, error_message
FROM mcp_log
ORDER BY timestamp DESC
LIMIT %s""",
(limit,),
)
logs = cursor.fetchall()
cursor.close()
conn.close()
success = len(logs) <= limit
report.add_test(
"DB_STATS",
"Limit Parameter funktioniert",
success,
f"limit={limit}, actual={len(logs)}",
)
except Exception as e:
report.add_test("DB_STATS", "Limit Parameter funktioniert", False, f"Exception: {e}")
def test_logging(report: TestReport):
"""Test Logging-Funktionalität"""
print("\n[5/6] Testing Logging...")
logger = ProtokollLogger()
# Test 5.1: Success Log
try:
import mysql.connector
from domain.log_contract import LogEntry
log_entry = LogEntry(
request="TEST: SELECT * FROM mcp_log",
status="success",
duration_ms=42,
)
logger.log(log_entry)
# Prüfe ob Log geschrieben wurde (mit mcp_readonly)
conn = mysql.connector.connect(
host=Config.DB_HOST,
database=Config.LOG_DB_NAME,
user=Config.DB_USER,
password=Config.DB_PASSWORD,
charset="utf8mb4"
)
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM mcp_log WHERE request LIKE 'TEST:%' ORDER BY timestamp DESC LIMIT 1"
)
log = cursor.fetchone()
cursor.close()
conn.close()
success = log is not None and log["status"] == "success"
report.add_test(
"LOGGING",
"Success Log schreiben",
success,
f"log_written={log is not None}, status={log['status'] if log else 'N/A'}",
)
except Exception as e:
report.add_test("LOGGING", "Success Log schreiben", False, f"Exception: {e}")
# Test 5.2: Denied Log
try:
import mysql.connector
log_entry = LogEntry(
request="TEST: DROP TABLE mcp_log",
status="denied",
duration_ms=0,
error_message="Blocked keyword: DROP",
)
logger.log(log_entry)
# Prüfe ob Log geschrieben wurde (mit mcp_readonly)
conn = mysql.connector.connect(
host=Config.DB_HOST,
database=Config.LOG_DB_NAME,
user=Config.DB_USER,
password=Config.DB_PASSWORD,
charset="utf8mb4"
)
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM mcp_log WHERE request LIKE 'TEST: DROP%' ORDER BY timestamp DESC LIMIT 1"
)
log = cursor.fetchone()
cursor.close()
conn.close()
success = (
log is not None
and log["status"] == "denied"
and "DROP" in (log["error_message"] or "")
)
report.add_test(
"LOGGING",
"Denied Log schreiben",
success,
f"log_written={log is not None}, status={log['status'] if log else 'N/A'}",
)
except Exception as e:
report.add_test("LOGGING", "Denied Log schreiben", False, f"Exception: {e}")
# Test 5.3: Error Log
try:
import mysql.connector
log_entry = LogEntry(
request="TEST: SELECT * FROM nonexistent",
status="error",
duration_ms=15,
error_message="Table doesn't exist",
)
logger.log(log_entry)
# Prüfe ob Log geschrieben wurde (mit mcp_readonly)
conn = mysql.connector.connect(
host=Config.DB_HOST,
database=Config.LOG_DB_NAME,
user=Config.DB_USER,
password=Config.DB_PASSWORD,
charset="utf8mb4"
)
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM mcp_log WHERE request LIKE 'TEST: SELECT * FROM nonexistent%' ORDER BY timestamp DESC LIMIT 1"
)
log = cursor.fetchone()
cursor.close()
conn.close()
success = (
log is not None
and log["status"] == "error"
and log["error_message"] is not None
)
report.add_test(
"LOGGING",
"Error Log schreiben",
success,
f"log_written={log is not None}, status={log['status'] if log else 'N/A'}",
)
except Exception as e:
report.add_test("LOGGING", "Error Log schreiben", False, f"Exception: {e}")
def test_config(report: TestReport):
"""Test Konfiguration"""
print("\n[6/6] Testing Config...")
# Test 6.1: ALLOWED_DATABASES
success = (
"ki_protokoll" in Config.ALLOWED_DATABASES
and "ki_system" in Config.ALLOWED_DATABASES
and "mysql" not in Config.ALLOWED_DATABASES
)
report.add_test(
"CONFIG",
"ALLOWED_DATABASES korrekt",
success,
f"databases={Config.ALLOWED_DATABASES}",
)
# Test 6.2: BLOCKED_KEYWORDS
required_keywords = [
"DROP",
"DELETE",
"INSERT",
"UPDATE",
"SLEEP",
"LOAD_FILE",
]
success = all(kw in Config.BLOCKED_KEYWORDS for kw in required_keywords)
report.add_test(
"CONFIG",
"BLOCKED_KEYWORDS enthält kritische Keywords",
success,
f"required={required_keywords}, actual={Config.BLOCKED_KEYWORDS}",
)
# Test 6.3: MAX_QUERY_LENGTH
success = Config.MAX_QUERY_LENGTH == 2000
report.add_test(
"CONFIG",
"MAX_QUERY_LENGTH = 2000",
success,
f"value={Config.MAX_QUERY_LENGTH}",
)
# Test 6.4: MAX_ROWS
success = Config.MAX_ROWS == MAX_ROWS
report.add_test(
"CONFIG", f"MAX_ROWS = {MAX_ROWS}", success, f"value={Config.MAX_ROWS}"
)
# Test 6.5: ALLOWED_TABLES
required_tables = ["mcp_log", "chunks", "protokoll"]
success = all(tbl in Config.ALLOWED_TABLES for tbl in required_tables)
report.add_test(
"CONFIG",
"ALLOWED_TABLES enthält wichtige Tabellen",
success,
f"required={required_tables}, count={len(Config.ALLOWED_TABLES)}",
)
def main():
"""Führe alle Tests aus"""
print("\n" + "=" * 80)
print("MCP-DB SERVER - COMPREHENSIVE TESTING")
print("=" * 80)
report = TestReport()
try:
test_validator(report)
test_db_select(report)
test_db_schema(report)
test_db_stats(report)
test_logging(report)
test_config(report)
except Exception as e:
print(f"\nFATAL ERROR: {e}")
import traceback
traceback.print_exc()
report.print_report()
# Exit Code
sys.exit(0 if report.failed == 0 else 1)
if __name__ == "__main__":
main()