comprehensive_test.py
- Pfad:
/var/www/mcp-servers/mcp-db/comprehensive_test.py
- Namespace: -
- Zeilen: 397 | Größe: 13,894 Bytes
- Geändert: 2025-12-28 12:21:49 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 58
- Dependencies: 0 (25%)
- LOC: 34 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 60 (10%)
Issues 5
| Zeile |
Typ |
Beschreibung |
| 276 |
magic_number |
Magic Number gefunden: 100 |
| 338 |
magic_number |
Magic Number gefunden: 100 |
| 348 |
magic_number |
Magic Number gefunden: 100 |
| 378 |
magic_number |
Magic Number gefunden: 60 |
| - |
coupling |
Klasse hat 16 Dependencies (max: 15) |
Dependencies 16
- use json
- use sys
- use datetime.datetime
- use pathlib.Path
- use typing.List
- use typing.Tuple
- use constants.DEFAULT_LIMIT
- use constants.MAX_ROWS
- use constants.PERCENT_FULL
- use constants.PERCENT_HALF
- use dotenv.load_dotenv
- use infrastructure.db_connection.DatabaseConnection
- use tools.select_tool.register_select_tool
- use tools.schema_tool.register_schema_tool
- use tools.stats_tool.register_stats_tool
- use config.Config
Klassen 1
Funktionen 1
Code
#!/usr/bin/env python3
"""
MCP-DB Comprehensive Test Suite
Vollständiger Funktionstest aller MCP-DB Features
"""
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import List, Tuple
sys.path.insert(0, '/var/www/mcp-servers/shared')
from constants import DEFAULT_LIMIT, MAX_ROWS, PERCENT_FULL, PERCENT_HALF
# Load environment
from dotenv import load_dotenv
load_dotenv(Path('/var/www/mcp-servers/mcp_db/.env'))
# Reset connection pool
from infrastructure.db_connection import DatabaseConnection
DatabaseConnection._pool = None
# Import tools
from tools.select_tool import register_select_tool
from tools.schema_tool import register_schema_tool
from tools.stats_tool import register_stats_tool
from config import Config
# Mock MCP
class MockMCP:
def __init__(self):
self.tools = {}
def tool(self):
def decorator(func):
self.tools[func.__name__] = func
return func
return decorator
mcp = MockMCP()
register_select_tool(mcp)
register_schema_tool(mcp)
register_stats_tool(mcp)
# Test Results
results: List[Tuple[str, str, str, str, bool]] = []
def test(category: str, name: str, expected: str, func, *args, **kwargs):
"""Run a test and record result"""
try:
result = func(*args, **kwargs)
status = result.get('status', 'no_status')
error = result.get('error', '')
# Handle different response formats
if expected == 'success':
if status == 'success':
passed = True
elif status == 'no_status' and 'error' not in result:
passed = True # db_schema/db_stats without error
elif status == 'no_status' and result.get('error') is None:
passed = True
else:
passed = False
elif expected == 'denied':
if status == 'denied':
passed = True
elif status == 'no_status' and error:
passed = True # db_schema returns {error: ..., tables: []}
else:
passed = False
elif expected == 'error':
passed = status == 'error'
elif expected.startswith('denied:'):
expected_msg = expected.split(':', 1)[1]
if status == 'denied':
passed = expected_msg.lower() in (error or '').lower()
elif status == 'no_status' and error:
passed = expected_msg.lower() in error.lower()
else:
passed = False
elif expected.startswith('has:'):
# Check if result has specific key
key = expected.split(':', 1)[1]
passed = key in result and result[key]
else:
passed = False
actual_str = f"{status}: {error[:50] if error else 'OK'}"
results.append((category, name, expected, actual_str, passed))
except Exception as e:
results.append((category, name, expected, f"EXCEPTION: {str(e)[:50]}", False))
# ============================================================
# TEST CATEGORY: db_select - Basic Functionality
# ============================================================
test("db_select/basic", "Simple SELECT", "success",
mcp.tools['db_select'],
query="SELECT 1 as test",
database="ki_protokoll")
test("db_select/basic", "SELECT with columns", "success",
mcp.tools['db_select'],
query="SELECT id, status FROM mcp_log LIMIT 1",
database="ki_protokoll")
test("db_select/basic", "SELECT with WHERE", "success",
mcp.tools['db_select'],
query="SELECT * FROM mcp_log WHERE status = 'success' LIMIT 1",
database="ki_protokoll")
test("db_select/basic", "SELECT with ORDER BY", "success",
mcp.tools['db_select'],
query="SELECT * FROM mcp_log ORDER BY id DESC LIMIT 1",
database="ki_protokoll")
test("db_select/basic", "SELECT with COUNT", "success",
mcp.tools['db_select'],
query="SELECT COUNT(*) as total FROM mcp_log",
database="ki_protokoll")
test("db_select/basic", "SELECT with GROUP BY", "success",
mcp.tools['db_select'],
query="SELECT status, COUNT(*) FROM mcp_log GROUP BY status",
database="ki_protokoll")
# ============================================================
# TEST CATEGORY: db_select - Prepared Statements
# ============================================================
test("db_select/params", "Single param", "success",
mcp.tools['db_select'],
query="SELECT * FROM mcp_log WHERE status = %s LIMIT 1",
database="ki_protokoll",
params=["success"])
test("db_select/params", "Multiple params", "success",
mcp.tools['db_select'],
query="SELECT * FROM mcp_log WHERE status = %s AND id > %s LIMIT 1",
database="ki_protokoll",
params=["success", 0])
test("db_select/params", "LIKE param", "success",
mcp.tools['db_select'],
query="SELECT * FROM mcp_log WHERE request LIKE %s LIMIT 1",
database="ki_protokoll",
params=["%SELECT%"])
# ============================================================
# TEST CATEGORY: db_select - Database Allowlist
# ============================================================
test("db_select/db_allow", "ki_protokoll allowed", "success",
mcp.tools['db_select'],
query="SELECT 1",
database="ki_protokoll")
test("db_select/db_allow", "ki_system allowed", "success",
mcp.tools['db_select'],
query="SELECT 1",
database="ki_system")
test("db_select/db_allow", "mysql denied", "denied:not allowed",
mcp.tools['db_select'],
query="SELECT 1",
database="mysql")
test("db_select/db_allow", "information_schema denied", "denied:not allowed",
mcp.tools['db_select'],
query="SELECT 1",
database="information_schema")
# ============================================================
# TEST CATEGORY: db_select - Table Allowlist (existierende Tabellen)
# ============================================================
# ki_protokoll tables
test("db_select/table_allow", "Table mcp_log (ki_protokoll)", "success",
mcp.tools['db_select'],
query="SELECT 1 FROM mcp_log LIMIT 1",
database="ki_protokoll")
test("db_select/table_allow", "Table protokoll (ki_protokoll)", "success",
mcp.tools['db_select'],
query="SELECT 1 FROM protokoll LIMIT 1",
database="ki_protokoll")
# ki_system tables - sample
test("db_select/table_allow", "Table documents (ki_system)", "success",
mcp.tools['db_select'],
query="SELECT 1 FROM documents LIMIT 1",
database="ki_system")
test("db_select/table_allow", "Table chunks (ki_system)", "success",
mcp.tools['db_select'],
query="SELECT 1 FROM chunks LIMIT 1",
database="ki_system")
test("db_select/table_allow", "Table content_orders (ki_system)", "success",
mcp.tools['db_select'],
query="SELECT 1 FROM content_orders LIMIT 1",
database="ki_system")
# Denied tables
test("db_select/table_allow", "Table users denied", "denied:not allowed",
mcp.tools['db_select'],
query="SELECT * FROM users",
database="ki_protokoll")
test("db_select/table_allow", "Table secrets denied", "denied:not allowed",
mcp.tools['db_select'],
query="SELECT * FROM secrets",
database="ki_protokoll")
test("db_select/table_allow", "Table mysql.user denied", "denied:not allowed",
mcp.tools['db_select'],
query="SELECT * FROM mysql.user",
database="ki_protokoll")
# ============================================================
# TEST CATEGORY: db_select - Keyword Blocklist (alle 15)
# ============================================================
blocked_tests = [
("DROP", "SELECT * FROM mcp_log; DROP TABLE x"),
("DELETE", "SELECT * FROM mcp_log; DELETE FROM x"),
("INSERT", "SELECT * FROM mcp_log; INSERT INTO x VALUES(1)"),
("UPDATE", "SELECT * FROM mcp_log; UPDATE x SET y=1"),
("TRUNCATE", "SELECT * FROM mcp_log; TRUNCATE TABLE x"),
("ALTER", "SELECT * FROM mcp_log; ALTER TABLE x ADD y INT"),
("CREATE", "SELECT * FROM mcp_log; CREATE TABLE x(id INT)"),
("RENAME", "SELECT * FROM mcp_log; RENAME TABLE x TO y"),
("GRANT", "SELECT * FROM mcp_log; GRANT ALL ON x TO y"),
("REVOKE", "SELECT * FROM mcp_log; REVOKE ALL ON x FROM y"),
("LOAD_FILE", "SELECT LOAD_FILE('/etc/passwd')"),
("INTO OUTFILE", "SELECT * FROM mcp_log INTO OUTFILE '/tmp/x'"),
("INTO DUMPFILE", "SELECT * FROM mcp_log INTO DUMPFILE '/tmp/x'"),
("BENCHMARK", "SELECT BENCHMARK(1000000, MD5('test'))"),
("SLEEP", "SELECT SLEEP(10)"),
]
for keyword, query in blocked_tests:
test("db_select/blocklist", f"Keyword {keyword} blocked", f"denied:{keyword}",
mcp.tools['db_select'],
query=query,
database="ki_protokoll")
# Word boundary tests
test("db_select/blocklist", "Word boundary: dropdown != DROP", "success",
mcp.tools['db_select'],
query="SELECT 'dropdown' as word FROM mcp_log LIMIT 1",
database="ki_protokoll")
test("db_select/blocklist", "Word boundary: updated_at != UPDATE", "success",
mcp.tools['db_select'],
query="SELECT 'updated_at' as col FROM mcp_log LIMIT 1",
database="ki_protokoll")
# ============================================================
# TEST CATEGORY: db_select - Limits
# ============================================================
test("db_select/limits", "max_rows=1", "success",
mcp.tools['db_select'],
query="SELECT * FROM mcp_log",
database="ki_protokoll",
max_rows=1)
test("db_select/limits", "max_rows=50", "success",
mcp.tools['db_select'],
query="SELECT * FROM mcp_log",
database="ki_protokoll",
max_rows=PERCENT_HALF)
test("db_select/limits", "max_rows=100 (max)", "success",
mcp.tools['db_select'],
query="SELECT * FROM mcp_log",
database="ki_protokoll",
max_rows=MAX_ROWS)
test("db_select/limits", "max_rows=0 denied", "denied:max_rows",
mcp.tools['db_select'],
query="SELECT * FROM mcp_log",
database="ki_protokoll",
max_rows=0)
test("db_select/limits", "max_rows=101 denied", "denied:max_rows",
mcp.tools['db_select'],
query="SELECT * FROM mcp_log",
database="ki_protokoll",
max_rows=MAX_ROWS + 1)
test("db_select/limits", "Empty query denied", "denied:empty",
mcp.tools['db_select'],
query="",
database="ki_protokoll")
test("db_select/limits", "Non-SELECT denied", "denied:SELECT",
mcp.tools['db_select'],
query="SHOW TABLES",
database="ki_protokoll")
# Query length test
long_query = "SELECT * FROM mcp_log WHERE " + " OR ".join([f"id = {i}" for i in range(500)])
test("db_select/limits", f"Query > 2000 chars denied", "denied:2000",
mcp.tools['db_select'],
query=long_query,
database="ki_protokoll")
# ============================================================
# TEST CATEGORY: db_schema
# ============================================================
test("db_schema", "ki_protokoll schema", "has:tables",
mcp.tools['db_schema'],
database="ki_protokoll")
test("db_schema", "ki_system schema", "has:tables",
mcp.tools['db_schema'],
database="ki_system")
test("db_schema", "mysql denied", "denied:not allowed",
mcp.tools['db_schema'],
database="mysql")
# ============================================================
# TEST CATEGORY: db_stats
# ============================================================
test("db_stats", "Default limit (50)", "has:logs",
mcp.tools['db_stats'])
test("db_stats", "Custom limit (5)", "has:logs",
mcp.tools['db_stats'],
limit=5)
test("db_stats", "Max limit (100)", "has:logs",
mcp.tools['db_stats'],
limit=DEFAULT_LIMIT)
# db_stats clamps invalid values
result = mcp.tools['db_stats'](limit=0)
results.append(("db_stats", "limit=0 clamped to 1", "success",
f"count={result.get('count', 0)}", 'logs' in result and result.get('count', 0) >= 1))
result = mcp.tools['db_stats'](limit=200)
results.append(("db_stats", "limit=200 clamped to 100", "success",
f"count={result.get('count', 0)}", 'logs' in result and result.get('count', 0) <= DEFAULT_LIMIT))
# ============================================================
# GENERATE REPORT
# ============================================================
print("=" * 80)
print("MCP-DB COMPREHENSIVE TEST REPORT")
print(f"Datum: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Python: {sys.version.split()[0]}")
print(f"Config: {len(Config.ALLOWED_TABLES)} erlaubte Tabellen, {len(Config.BLOCKED_KEYWORDS)} blockierte Keywords")
print("=" * 80)
# Group by category
categories = {}
for cat, name, expected, actual, passed in results:
if cat not in categories:
categories[cat] = []
categories[cat].append((name, expected, actual, passed))
total_passed = sum(1 for r in results if r[4])
total_failed = sum(1 for r in results if not r[4])
for cat in sorted(categories.keys()):
tests = categories[cat]
cat_passed = sum(1 for t in tests if t[3])
cat_failed = sum(1 for t in tests if not t[3])
status_icon = "✓" if cat_failed == 0 else "✗"
print(f"\n{status_icon} {cat} ({cat_passed}/{len(tests)})")
print("-" * 60)
for name, expected, actual, passed in tests:
status = "✓" if passed else "✗"
print(f" {status} {name}")
if not passed:
print(f" Expected: {expected}")
print(f" Actual: {actual}")
print("\n" + "=" * 80)
print(f"GESAMT: {total_passed}/{len(results)} Tests bestanden ({PERCENT_FULL*total_passed//len(results)}%)")
if total_failed > 0:
print(f"FEHLER: {total_failed} Tests fehlgeschlagen")
print("STATUS: TESTS FEHLGESCHLAGEN ✗")
else:
print("STATUS: ALLE TESTS BESTANDEN ✓")
print("=" * 80)
# Exit with error code if tests failed
sys.exit(0 if total_failed == 0 else 1)