comprehensive_test.py

Code Hygiene Score: 58

Issues 5

Zeile Typ Beschreibung
276 magic_number Magic Number gefunden: 100
338 magic_number Magic Number gefunden: 100
348 magic_number Magic Number gefunden: 100
378 magic_number Magic Number gefunden: 60
- coupling Klasse hat 16 Dependencies (max: 15)

Dependencies 16

Klassen 1

Funktionen 1

Code

#!/usr/bin/env python3
"""
MCP-DB Comprehensive Test Suite
Vollständiger Funktionstest aller MCP-DB Features
"""

import json
import sys
from datetime import datetime
from pathlib import Path
from typing import List, Tuple

sys.path.insert(0, '/var/www/mcp-servers/shared')
from constants import DEFAULT_LIMIT, MAX_ROWS, PERCENT_FULL, PERCENT_HALF

# Load environment
from dotenv import load_dotenv
load_dotenv(Path('/var/www/mcp-servers/mcp_db/.env'))

# Reset connection pool
from infrastructure.db_connection import DatabaseConnection
DatabaseConnection._pool = None

# Import tools
from tools.select_tool import register_select_tool
from tools.schema_tool import register_schema_tool
from tools.stats_tool import register_stats_tool
from config import Config

# Mock MCP
class MockMCP:
    def __init__(self):
        self.tools = {}
    def tool(self):
        def decorator(func):
            self.tools[func.__name__] = func
            return func
        return decorator

mcp = MockMCP()
register_select_tool(mcp)
register_schema_tool(mcp)
register_stats_tool(mcp)

# Test Results
results: List[Tuple[str, str, str, str, bool]] = []

def test(category: str, name: str, expected: str, func, *args, **kwargs):
    """Run a test and record result"""
    try:
        result = func(*args, **kwargs)
        status = result.get('status', 'no_status')
        error = result.get('error', '')
        
        # Handle different response formats
        if expected == 'success':
            if status == 'success':
                passed = True
            elif status == 'no_status' and 'error' not in result:
                passed = True  # db_schema/db_stats without error
            elif status == 'no_status' and result.get('error') is None:
                passed = True
            else:
                passed = False
        elif expected == 'denied':
            if status == 'denied':
                passed = True
            elif status == 'no_status' and error:
                passed = True  # db_schema returns {error: ..., tables: []}
            else:
                passed = False
        elif expected == 'error':
            passed = status == 'error'
        elif expected.startswith('denied:'):
            expected_msg = expected.split(':', 1)[1]
            if status == 'denied':
                passed = expected_msg.lower() in (error or '').lower()
            elif status == 'no_status' and error:
                passed = expected_msg.lower() in error.lower()
            else:
                passed = False
        elif expected.startswith('has:'):
            # Check if result has specific key
            key = expected.split(':', 1)[1]
            passed = key in result and result[key]
        else:
            passed = False
        
        actual_str = f"{status}: {error[:50] if error else 'OK'}"
        results.append((category, name, expected, actual_str, passed))
    except Exception as e:
        results.append((category, name, expected, f"EXCEPTION: {str(e)[:50]}", False))

# ============================================================
# TEST CATEGORY: db_select - Basic Functionality
# ============================================================

test("db_select/basic", "Simple SELECT", "success",
     mcp.tools['db_select'], 
     query="SELECT 1 as test",
     database="ki_protokoll")

test("db_select/basic", "SELECT with columns", "success",
     mcp.tools['db_select'],
     query="SELECT id, status FROM mcp_log LIMIT 1",
     database="ki_protokoll")

test("db_select/basic", "SELECT with WHERE", "success",
     mcp.tools['db_select'],
     query="SELECT * FROM mcp_log WHERE status = 'success' LIMIT 1",
     database="ki_protokoll")

test("db_select/basic", "SELECT with ORDER BY", "success",
     mcp.tools['db_select'],
     query="SELECT * FROM mcp_log ORDER BY id DESC LIMIT 1",
     database="ki_protokoll")

test("db_select/basic", "SELECT with COUNT", "success",
     mcp.tools['db_select'],
     query="SELECT COUNT(*) as total FROM mcp_log",
     database="ki_protokoll")

test("db_select/basic", "SELECT with GROUP BY", "success",
     mcp.tools['db_select'],
     query="SELECT status, COUNT(*) FROM mcp_log GROUP BY status",
     database="ki_protokoll")

# ============================================================
# TEST CATEGORY: db_select - Prepared Statements
# ============================================================

test("db_select/params", "Single param", "success",
     mcp.tools['db_select'],
     query="SELECT * FROM mcp_log WHERE status = %s LIMIT 1",
     database="ki_protokoll",
     params=["success"])

test("db_select/params", "Multiple params", "success",
     mcp.tools['db_select'],
     query="SELECT * FROM mcp_log WHERE status = %s AND id > %s LIMIT 1",
     database="ki_protokoll",
     params=["success", 0])

test("db_select/params", "LIKE param", "success",
     mcp.tools['db_select'],
     query="SELECT * FROM mcp_log WHERE request LIKE %s LIMIT 1",
     database="ki_protokoll",
     params=["%SELECT%"])

# ============================================================
# TEST CATEGORY: db_select - Database Allowlist
# ============================================================

test("db_select/db_allow", "ki_protokoll allowed", "success",
     mcp.tools['db_select'],
     query="SELECT 1",
     database="ki_protokoll")

test("db_select/db_allow", "ki_system allowed", "success",
     mcp.tools['db_select'],
     query="SELECT 1",
     database="ki_system")

test("db_select/db_allow", "mysql denied", "denied:not allowed",
     mcp.tools['db_select'],
     query="SELECT 1",
     database="mysql")

test("db_select/db_allow", "information_schema denied", "denied:not allowed",
     mcp.tools['db_select'],
     query="SELECT 1",
     database="information_schema")

# ============================================================
# TEST CATEGORY: db_select - Table Allowlist (existierende Tabellen)
# ============================================================

# ki_protokoll tables
test("db_select/table_allow", "Table mcp_log (ki_protokoll)", "success",
     mcp.tools['db_select'],
     query="SELECT 1 FROM mcp_log LIMIT 1",
     database="ki_protokoll")

test("db_select/table_allow", "Table protokoll (ki_protokoll)", "success",
     mcp.tools['db_select'],
     query="SELECT 1 FROM protokoll LIMIT 1",
     database="ki_protokoll")

# ki_system tables - sample
test("db_select/table_allow", "Table documents (ki_system)", "success",
     mcp.tools['db_select'],
     query="SELECT 1 FROM documents LIMIT 1",
     database="ki_system")

test("db_select/table_allow", "Table chunks (ki_system)", "success",
     mcp.tools['db_select'],
     query="SELECT 1 FROM chunks LIMIT 1",
     database="ki_system")

test("db_select/table_allow", "Table content_orders (ki_system)", "success",
     mcp.tools['db_select'],
     query="SELECT 1 FROM content_orders LIMIT 1",
     database="ki_system")

# Denied tables
test("db_select/table_allow", "Table users denied", "denied:not allowed",
     mcp.tools['db_select'],
     query="SELECT * FROM users",
     database="ki_protokoll")

test("db_select/table_allow", "Table secrets denied", "denied:not allowed",
     mcp.tools['db_select'],
     query="SELECT * FROM secrets",
     database="ki_protokoll")

test("db_select/table_allow", "Table mysql.user denied", "denied:not allowed",
     mcp.tools['db_select'],
     query="SELECT * FROM mysql.user",
     database="ki_protokoll")

# ============================================================
# TEST CATEGORY: db_select - Keyword Blocklist (alle 15)
# ============================================================

blocked_tests = [
    ("DROP", "SELECT * FROM mcp_log; DROP TABLE x"),
    ("DELETE", "SELECT * FROM mcp_log; DELETE FROM x"),
    ("INSERT", "SELECT * FROM mcp_log; INSERT INTO x VALUES(1)"),
    ("UPDATE", "SELECT * FROM mcp_log; UPDATE x SET y=1"),
    ("TRUNCATE", "SELECT * FROM mcp_log; TRUNCATE TABLE x"),
    ("ALTER", "SELECT * FROM mcp_log; ALTER TABLE x ADD y INT"),
    ("CREATE", "SELECT * FROM mcp_log; CREATE TABLE x(id INT)"),
    ("RENAME", "SELECT * FROM mcp_log; RENAME TABLE x TO y"),
    ("GRANT", "SELECT * FROM mcp_log; GRANT ALL ON x TO y"),
    ("REVOKE", "SELECT * FROM mcp_log; REVOKE ALL ON x FROM y"),
    ("LOAD_FILE", "SELECT LOAD_FILE('/etc/passwd')"),
    ("INTO OUTFILE", "SELECT * FROM mcp_log INTO OUTFILE '/tmp/x'"),
    ("INTO DUMPFILE", "SELECT * FROM mcp_log INTO DUMPFILE '/tmp/x'"),
    ("BENCHMARK", "SELECT BENCHMARK(1000000, MD5('test'))"),
    ("SLEEP", "SELECT SLEEP(10)"),
]

for keyword, query in blocked_tests:
    test("db_select/blocklist", f"Keyword {keyword} blocked", f"denied:{keyword}",
         mcp.tools['db_select'],
         query=query,
         database="ki_protokoll")

# Word boundary tests
test("db_select/blocklist", "Word boundary: dropdown != DROP", "success",
     mcp.tools['db_select'],
     query="SELECT 'dropdown' as word FROM mcp_log LIMIT 1",
     database="ki_protokoll")

test("db_select/blocklist", "Word boundary: updated_at != UPDATE", "success",
     mcp.tools['db_select'],
     query="SELECT 'updated_at' as col FROM mcp_log LIMIT 1",
     database="ki_protokoll")

# ============================================================
# TEST CATEGORY: db_select - Limits
# ============================================================

test("db_select/limits", "max_rows=1", "success",
     mcp.tools['db_select'],
     query="SELECT * FROM mcp_log",
     database="ki_protokoll",
     max_rows=1)

test("db_select/limits", "max_rows=50", "success",
     mcp.tools['db_select'],
     query="SELECT * FROM mcp_log",
     database="ki_protokoll",
     max_rows=PERCENT_HALF)

test("db_select/limits", "max_rows=100 (max)", "success",
     mcp.tools['db_select'],
     query="SELECT * FROM mcp_log",
     database="ki_protokoll",
     max_rows=MAX_ROWS)

test("db_select/limits", "max_rows=0 denied", "denied:max_rows",
     mcp.tools['db_select'],
     query="SELECT * FROM mcp_log",
     database="ki_protokoll",
     max_rows=0)

test("db_select/limits", "max_rows=101 denied", "denied:max_rows",
     mcp.tools['db_select'],
     query="SELECT * FROM mcp_log",
     database="ki_protokoll",
     max_rows=MAX_ROWS + 1)

test("db_select/limits", "Empty query denied", "denied:empty",
     mcp.tools['db_select'],
     query="",
     database="ki_protokoll")

test("db_select/limits", "Non-SELECT denied", "denied:SELECT",
     mcp.tools['db_select'],
     query="SHOW TABLES",
     database="ki_protokoll")

# Query length test
long_query = "SELECT * FROM mcp_log WHERE " + " OR ".join([f"id = {i}" for i in range(500)])
test("db_select/limits", f"Query > 2000 chars denied", "denied:2000",
     mcp.tools['db_select'],
     query=long_query,
     database="ki_protokoll")

# ============================================================
# TEST CATEGORY: db_schema
# ============================================================

test("db_schema", "ki_protokoll schema", "has:tables",
     mcp.tools['db_schema'],
     database="ki_protokoll")

test("db_schema", "ki_system schema", "has:tables",
     mcp.tools['db_schema'],
     database="ki_system")

test("db_schema", "mysql denied", "denied:not allowed",
     mcp.tools['db_schema'],
     database="mysql")

# ============================================================
# TEST CATEGORY: db_stats
# ============================================================

test("db_stats", "Default limit (50)", "has:logs",
     mcp.tools['db_stats'])

test("db_stats", "Custom limit (5)", "has:logs",
     mcp.tools['db_stats'],
     limit=5)

test("db_stats", "Max limit (100)", "has:logs",
     mcp.tools['db_stats'],
     limit=DEFAULT_LIMIT)

# db_stats clamps invalid values
result = mcp.tools['db_stats'](limit=0)
results.append(("db_stats", "limit=0 clamped to 1", "success", 
                f"count={result.get('count', 0)}", 'logs' in result and result.get('count', 0) >= 1))

result = mcp.tools['db_stats'](limit=200)
results.append(("db_stats", "limit=200 clamped to 100", "success",
                f"count={result.get('count', 0)}", 'logs' in result and result.get('count', 0) <= DEFAULT_LIMIT))

# ============================================================
# GENERATE REPORT
# ============================================================

print("=" * 80)
print("MCP-DB COMPREHENSIVE TEST REPORT")
print(f"Datum: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Python: {sys.version.split()[0]}")
print(f"Config: {len(Config.ALLOWED_TABLES)} erlaubte Tabellen, {len(Config.BLOCKED_KEYWORDS)} blockierte Keywords")
print("=" * 80)

# Group by category
categories = {}
for cat, name, expected, actual, passed in results:
    if cat not in categories:
        categories[cat] = []
    categories[cat].append((name, expected, actual, passed))

total_passed = sum(1 for r in results if r[4])
total_failed = sum(1 for r in results if not r[4])

for cat in sorted(categories.keys()):
    tests = categories[cat]
    cat_passed = sum(1 for t in tests if t[3])
    cat_failed = sum(1 for t in tests if not t[3])
    status_icon = "✓" if cat_failed == 0 else "✗"
    print(f"\n{status_icon} {cat} ({cat_passed}/{len(tests)})")
    print("-" * 60)
    for name, expected, actual, passed in tests:
        status = "✓" if passed else "✗"
        print(f"  {status} {name}")
        if not passed:
            print(f"      Expected: {expected}")
            print(f"      Actual:   {actual}")

print("\n" + "=" * 80)
print(f"GESAMT: {total_passed}/{len(results)} Tests bestanden ({PERCENT_FULL*total_passed//len(results)}%)")
if total_failed > 0:
    print(f"FEHLER: {total_failed} Tests fehlgeschlagen")
    print("STATUS: TESTS FEHLGESCHLAGEN ✗")
else:
    print("STATUS: ALLE TESTS BESTANDEN ✓")
print("=" * 80)

# Exit with error code if tests failed
sys.exit(0 if total_failed == 0 else 1)
← Übersicht Graph