test_all_functions.py

Code Hygiene Score: 53

Issues 4

Zeile Typ Beschreibung
194 magic_number Magic Number gefunden: 100
348 magic_number Magic Number gefunden: 100
- complexity Datei hat 744 Zeilen (max: 500)
- coupling Klasse hat 17 Dependencies (max: 15)

Dependencies 17

Klassen 1

Funktionen 7

Code

#!/usr/bin/env python3
"""Umfassende Tests für MCP-DB Server - Alle Funktionen"""

import sys
from pathlib import Path

sys.path.insert(0, "/var/www/mcp-servers/mcp_db")
sys.path.insert(0, "/var/www/mcp-servers/shared")
from constants import DEFAULT_LIMIT, MAX_ROWS, PERCENT_HALF

# .env laden BEVOR Config importiert wird
from dotenv import load_dotenv
load_dotenv(Path("/var/www/mcp-servers/mcp_db/.env"))

from config import Config
from validators.query_validator import QueryValidator
from domain.query_contract import QueryRequest, QueryResponse, QueryStatus
from tools.select_tool import SelectExecutor
from infrastructure.db_connection import DatabaseConnection
from infrastructure.protokoll_logger import ProtokollLogger


class TestReport:
    """Test-Bericht Klasse"""

    def __init__(self):
        self.tests = []
        self.passed = 0
        self.failed = 0

    def add_test(self, category: str, name: str, success: bool, details: str = ""):
        """Test hinzufügen"""
        status = "PASS" if success else "FAIL"
        self.tests.append(
            {
                "category": category,
                "name": name,
                "status": status,
                "success": success,
                "details": details,
            }
        )
        if success:
            self.passed += 1
        else:
            self.failed += 1

    def print_report(self):
        """Bericht ausgeben"""
        print("\n" + "=" * 80)
        print("MCP-DB SERVER TEST REPORT")
        print("=" * 80 + "\n")

        current_category = None
        for test in self.tests:
            if test["category"] != current_category:
                current_category = test["category"]
                print(f"\n{current_category}")
                print("-" * 80)

            status_symbol = "✓" if test["success"] else "✗"
            print(f"{status_symbol} [{test['status']}] {test['name']}")
            if test["details"]:
                print(f"  → {test['details']}")

        print("\n" + "=" * 80)
        print(f"SUMMARY: {self.passed} passed, {self.failed} failed")
        print("=" * 80 + "\n")


def test_validator(report: TestReport):
    """Test QueryValidator"""
    print("\n[1/6] Testing QueryValidator...")

    # Test 1.1: Einfache gültige Query
    valid, error = QueryValidator.validate_query(
        "SELECT * FROM mcp_log", "ki_protokoll", PERCENT_HALF
    )
    report.add_test(
        "VALIDATOR",
        "Einfache gültige Query",
        valid and error == "",
        f"valid={valid}, error={error}",
    )

    # Test 1.2: DROP blockiert
    valid, error = QueryValidator.validate_query(
        "SELECT * FROM mcp_log; DROP TABLE mcp_log", "ki_protokoll", PERCENT_HALF
    )
    report.add_test(
        "VALIDATOR",
        "DROP Keyword blockiert",
        not valid and "DROP" in error,
        f"error={error}",
    )

    # Test 1.3: SLEEP blockiert
    valid, error = QueryValidator.validate_query(
        "SELECT SLEEP(10) FROM mcp_log", "ki_protokoll", PERCENT_HALF
    )
    report.add_test(
        "VALIDATOR",
        "SLEEP Keyword blockiert",
        not valid and "SLEEP" in error,
        f"error={error}",
    )

    # Test 1.4: INSERT blockiert
    valid, error = QueryValidator.validate_query(
        "INSERT INTO mcp_log VALUES (1,2,3)", "ki_protokoll", PERCENT_HALF
    )
    report.add_test(
        "VALIDATOR",
        "INSERT blockiert (kein SELECT)",
        not valid and "Only SELECT" in error,
        f"error={error}",
    )

    # Test 1.5: UPDATE blockiert
    valid, error = QueryValidator.validate_query(
        "SELECT * FROM mcp_log WHERE status='denied' OR 1=1; UPDATE mcp_log SET status='success'",
        "ki_protokoll",
        50,
    )
    report.add_test(
        "VALIDATOR",
        "UPDATE Keyword blockiert",
        not valid and "UPDATE" in error,
        f"error={error}",
    )

    # Test 1.6: DELETE blockiert
    valid, error = QueryValidator.validate_query(
        "SELECT * FROM mcp_log; DELETE FROM mcp_log", "ki_protokoll", PERCENT_HALF
    )
    report.add_test(
        "VALIDATOR",
        "DELETE Keyword blockiert",
        not valid and "DELETE" in error,
        f"error={error}",
    )

    # Test 1.7: LOAD_FILE blockiert
    valid, error = QueryValidator.validate_query(
        "SELECT LOAD_FILE('/etc/passwd')", "ki_protokoll", PERCENT_HALF
    )
    report.add_test(
        "VALIDATOR",
        "LOAD_FILE blockiert",
        not valid and "LOAD_FILE" in error,
        f"error={error}",
    )

    # Test 1.8: Nicht-erlaubte Datenbank
    valid, error = QueryValidator.validate_query(
        "SELECT * FROM users", "mysql", PERCENT_HALF
    )
    report.add_test(
        "VALIDATOR",
        "mysql Datenbank blockiert",
        not valid and "not allowed" in error,
        f"error={error}",
    )

    # Test 1.9: Nicht-erlaubte Tabelle
    valid, error = QueryValidator.validate_query(
        "SELECT * FROM users", "ki_protokoll", PERCENT_HALF
    )
    report.add_test(
        "VALIDATOR",
        "users Tabelle blockiert",
        not valid and "not allowed" in error,
        f"error={error}",
    )

    # Test 1.10: Query zu lang
    long_query = "SELECT * FROM mcp_log WHERE " + "id=1 OR " * 500
    valid, error = QueryValidator.validate_query(
        long_query, "ki_protokoll", PERCENT_HALF
    )
    report.add_test(
        "VALIDATOR",
        "Query > 2000 Zeichen blockiert",
        not valid and "max" in error and "chars" in error,
        f"query_length={len(long_query)}, error={error}",
    )

    # Test 1.11: max_rows zu hoch
    valid, error = QueryValidator.validate_query(
        "SELECT * FROM mcp_log", "ki_protokoll", 999
    )
    report.add_test(
        "VALIDATOR",
        "max_rows > 100 blockiert",
        not valid and "max_rows" in error,
        f"error={error}",
    )

    # Test 1.12: Erlaubte Datenbank ki_system
    valid, error = QueryValidator.validate_query(
        "SELECT * FROM chunks", "ki_system", PERCENT_HALF
    )
    report.add_test(
        "VALIDATOR",
        "ki_system Datenbank erlaubt",
        valid and error == "",
        f"valid={valid}",
    )


def test_db_select(report: TestReport):
    """Test db_select Tool"""
    print("\n[2/6] Testing db_select Tool...")

    # Test 2.1: Einfache SELECT Query
    try:
        import mysql.connector

        conn = mysql.connector.connect(
            host=Config.DB_HOST,
            database="ki_protokoll",
            user=Config.DB_USER,
            password=Config.DB_PASSWORD,
            charset="utf8mb4"
        )
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT * FROM mcp_log ORDER BY timestamp DESC LIMIT 10")
        rows = cursor.fetchall()
        cursor.close()
        conn.close()

        success = len(rows) >= 0
        report.add_test(
            "DB_SELECT",
            "Einfache SELECT Query",
            success,
            f"rows={len(rows)}",
        )
    except Exception as e:
        report.add_test("DB_SELECT", "Einfache SELECT Query", False, f"Exception: {e}")

    # Test 2.2: SELECT mit Prepared Statement
    try:
        import mysql.connector

        conn = mysql.connector.connect(
            host=Config.DB_HOST,
            database="ki_protokoll",
            user=Config.DB_USER,
            password=Config.DB_PASSWORD,
            charset="utf8mb4"
        )
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT * FROM mcp_log WHERE status = %s LIMIT 5", ("success",))
        rows = cursor.fetchall()
        cursor.close()
        conn.close()

        success = isinstance(rows, list)
        report.add_test(
            "DB_SELECT",
            "SELECT mit Prepared Statement",
            success,
            f"rows={len(rows)}",
        )
    except Exception as e:
        report.add_test(
            "DB_SELECT", "SELECT mit Prepared Statement", False, f"Exception: {e}"
        )

    # Test 2.3: SELECT mit max_rows Limit
    try:
        import mysql.connector

        conn = mysql.connector.connect(
            host=Config.DB_HOST,
            database="ki_protokoll",
            user=Config.DB_USER,
            password=Config.DB_PASSWORD,
            charset="utf8mb4"
        )
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT * FROM mcp_log LIMIT 3")
        rows = cursor.fetchall()
        cursor.close()
        conn.close()

        success = len(rows) <= 3
        report.add_test(
            "DB_SELECT",
            "SELECT mit max_rows Limit",
            success,
            f"max_rows=3, actual_rows={len(rows)}",
        )
    except Exception as e:
        report.add_test(
            "DB_SELECT", "SELECT mit max_rows Limit", False, f"Exception: {e}"
        )

    # Test 2.4: SELECT auf ki_system Datenbank
    try:
        import mysql.connector

        conn = mysql.connector.connect(
            host=Config.DB_HOST,
            database="ki_system",
            user=Config.DB_USER,
            password=Config.DB_PASSWORD,
            charset="utf8mb4"
        )
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SELECT * FROM chunks LIMIT 5")
        rows = cursor.fetchall()
        cursor.close()
        conn.close()

        success = isinstance(rows, list)
        report.add_test(
            "DB_SELECT",
            "SELECT auf ki_system Datenbank",
            success,
            f"rows={len(rows)}",
        )
    except Exception as e:
        report.add_test(
            "DB_SELECT", "SELECT auf ki_system Datenbank", False, f"Exception: {e}"
        )

    # Test 2.5: Ungültige Query führt zu Error
    try:
        import mysql.connector

        conn = mysql.connector.connect(
            host=Config.DB_HOST,
            database="ki_protokoll",
            user=Config.DB_USER,
            password=Config.DB_PASSWORD,
            charset="utf8mb4"
        )
        cursor = conn.cursor(dictionary=True)
        try:
            cursor.execute("SELECT * FROM nonexistent_table")
            rows = cursor.fetchall()
            success = False
            error_msg = "Query should have failed"
        except Exception as query_error:
            success = True
            error_msg = str(query_error)[:100]
        finally:
            cursor.close()
            conn.close()

        report.add_test(
            "DB_SELECT",
            "Ungültige Query gibt ERROR zurück",
            success,
            f"error={error_msg}",
        )
    except Exception as e:
        report.add_test(
            "DB_SELECT", "Ungültige Query gibt ERROR zurück", False, f"Exception: {e}"
        )


def test_db_schema(report: TestReport):
    """Test db_schema Tool"""
    print("\n[3/6] Testing db_schema Tool...")

    # Test 3.1: Schema von ki_protokoll
    try:
        if "ki_protokoll" not in Config.ALLOWED_DATABASES:
            raise Exception("ki_protokoll not in ALLOWED_DATABASES")

        with DatabaseConnection.get_connection("ki_protokoll") as conn:
            cursor = conn.cursor(dictionary=True)
            cursor.execute(
                """SELECT TABLE_NAME, TABLE_ROWS, CREATE_TIME
                   FROM information_schema.TABLES
                   WHERE TABLE_SCHEMA = %s
                   AND TABLE_TYPE = 'BASE TABLE'
                   ORDER BY TABLE_NAME""",
                ("ki_protokoll",),
            )
            tables = cursor.fetchall()
            cursor.close()

        success = len(tables) > 0 and any(
            t["TABLE_NAME"] == "mcp_log" for t in tables
        )
        report.add_test(
            "DB_SCHEMA",
            "Schema von ki_protokoll abrufen",
            success,
            f"tables_found={len(tables)}, contains_mcp_log={any(t['TABLE_NAME'] == 'mcp_log' for t in tables)}",
        )
    except Exception as e:
        report.add_test(
            "DB_SCHEMA", "Schema von ki_protokoll abrufen", False, f"Exception: {e}"
        )

    # Test 3.2: Schema von ki_system
    try:
        if "ki_system" not in Config.ALLOWED_DATABASES:
            raise Exception("ki_system not in ALLOWED_DATABASES")

        with DatabaseConnection.get_connection("ki_system") as conn:
            cursor = conn.cursor(dictionary=True)
            cursor.execute(
                """SELECT TABLE_NAME, TABLE_ROWS, CREATE_TIME
                   FROM information_schema.TABLES
                   WHERE TABLE_SCHEMA = %s
                   AND TABLE_TYPE = 'BASE TABLE'
                   ORDER BY TABLE_NAME""",
                ("ki_system",),
            )
            tables = cursor.fetchall()
            cursor.close()

        success = len(tables) > 0
        report.add_test(
            "DB_SCHEMA",
            "Schema von ki_system abrufen",
            success,
            f"tables_found={len(tables)}",
        )
    except Exception as e:
        report.add_test(
            "DB_SCHEMA", "Schema von ki_system abrufen", False, f"Exception: {e}"
        )

    # Test 3.3: Nicht-erlaubte Datenbank blockiert
    try:
        if "mysql" in Config.ALLOWED_DATABASES:
            success = False
            details = "mysql should not be in ALLOWED_DATABASES"
        else:
            success = True
            details = "mysql correctly blocked by Config"
        report.add_test(
            "DB_SCHEMA", "Nicht-erlaubte Datenbank (mysql) blockiert", success, details
        )
    except Exception as e:
        report.add_test(
            "DB_SCHEMA",
            "Nicht-erlaubte Datenbank (mysql) blockiert",
            False,
            f"Exception: {e}",
        )


def test_db_stats(report: TestReport):
    """Test db_stats Tool"""
    print("\n[4/6] Testing db_stats Tool...")

    # Test 4.1: Letzte Logs abrufen
    try:
        import mysql.connector

        # db_stats sollte mcp_readonly verwenden
        conn = mysql.connector.connect(
            host=Config.DB_HOST,
            database=Config.LOG_DB_NAME,
            user=Config.DB_USER,
            password=Config.DB_PASSWORD,
            charset="utf8mb4"
        )
        cursor = conn.cursor(dictionary=True)
        cursor.execute(
            """SELECT id, timestamp, client_name, request, status,
                      duration_ms, error_message
               FROM mcp_log
               ORDER BY timestamp DESC
               LIMIT 10""",
        )
        logs = cursor.fetchall()
        cursor.close()
        conn.close()

        success = isinstance(logs, list)
        report.add_test(
            "DB_STATS",
            "Letzte Logs abrufen (Limit=10)",
            success,
            f"logs_found={len(logs)}",
        )
    except Exception as e:
        report.add_test("DB_STATS", "Letzte Logs abrufen (Limit=10)", False, f"Exception: {e}")

    # Test 4.2: Limit Parameter funktioniert
    try:
        import mysql.connector

        limit = 3
        # db_stats sollte mcp_readonly verwenden
        conn = mysql.connector.connect(
            host=Config.DB_HOST,
            database=Config.LOG_DB_NAME,
            user=Config.DB_USER,
            password=Config.DB_PASSWORD,
            charset="utf8mb4"
        )
        cursor = conn.cursor(dictionary=True)
        cursor.execute(
            """SELECT id, timestamp, client_name, request, status,
                      duration_ms, error_message
               FROM mcp_log
               ORDER BY timestamp DESC
               LIMIT %s""",
            (limit,),
        )
        logs = cursor.fetchall()
        cursor.close()
        conn.close()

        success = len(logs) <= limit
        report.add_test(
            "DB_STATS",
            "Limit Parameter funktioniert",
            success,
            f"limit={limit}, actual={len(logs)}",
        )
    except Exception as e:
        report.add_test("DB_STATS", "Limit Parameter funktioniert", False, f"Exception: {e}")


def test_logging(report: TestReport):
    """Test Logging-Funktionalität"""
    print("\n[5/6] Testing Logging...")

    logger = ProtokollLogger()

    # Test 5.1: Success Log
    try:
        import mysql.connector
        from domain.log_contract import LogEntry

        log_entry = LogEntry(
            request="TEST: SELECT * FROM mcp_log",
            status="success",
            duration_ms=42,
        )
        logger.log(log_entry)

        # Prüfe ob Log geschrieben wurde (mit mcp_readonly)
        conn = mysql.connector.connect(
            host=Config.DB_HOST,
            database=Config.LOG_DB_NAME,
            user=Config.DB_USER,
            password=Config.DB_PASSWORD,
            charset="utf8mb4"
        )
        cursor = conn.cursor(dictionary=True)
        cursor.execute(
            "SELECT * FROM mcp_log WHERE request LIKE 'TEST:%' ORDER BY timestamp DESC LIMIT 1"
        )
        log = cursor.fetchone()
        cursor.close()
        conn.close()

        success = log is not None and log["status"] == "success"
        report.add_test(
            "LOGGING",
            "Success Log schreiben",
            success,
            f"log_written={log is not None}, status={log['status'] if log else 'N/A'}",
        )
    except Exception as e:
        report.add_test("LOGGING", "Success Log schreiben", False, f"Exception: {e}")

    # Test 5.2: Denied Log
    try:
        import mysql.connector

        log_entry = LogEntry(
            request="TEST: DROP TABLE mcp_log",
            status="denied",
            duration_ms=0,
            error_message="Blocked keyword: DROP",
        )
        logger.log(log_entry)

        # Prüfe ob Log geschrieben wurde (mit mcp_readonly)
        conn = mysql.connector.connect(
            host=Config.DB_HOST,
            database=Config.LOG_DB_NAME,
            user=Config.DB_USER,
            password=Config.DB_PASSWORD,
            charset="utf8mb4"
        )
        cursor = conn.cursor(dictionary=True)
        cursor.execute(
            "SELECT * FROM mcp_log WHERE request LIKE 'TEST: DROP%' ORDER BY timestamp DESC LIMIT 1"
        )
        log = cursor.fetchone()
        cursor.close()
        conn.close()

        success = (
            log is not None
            and log["status"] == "denied"
            and "DROP" in (log["error_message"] or "")
        )
        report.add_test(
            "LOGGING",
            "Denied Log schreiben",
            success,
            f"log_written={log is not None}, status={log['status'] if log else 'N/A'}",
        )
    except Exception as e:
        report.add_test("LOGGING", "Denied Log schreiben", False, f"Exception: {e}")

    # Test 5.3: Error Log
    try:
        import mysql.connector

        log_entry = LogEntry(
            request="TEST: SELECT * FROM nonexistent",
            status="error",
            duration_ms=15,
            error_message="Table doesn't exist",
        )
        logger.log(log_entry)

        # Prüfe ob Log geschrieben wurde (mit mcp_readonly)
        conn = mysql.connector.connect(
            host=Config.DB_HOST,
            database=Config.LOG_DB_NAME,
            user=Config.DB_USER,
            password=Config.DB_PASSWORD,
            charset="utf8mb4"
        )
        cursor = conn.cursor(dictionary=True)
        cursor.execute(
            "SELECT * FROM mcp_log WHERE request LIKE 'TEST: SELECT * FROM nonexistent%' ORDER BY timestamp DESC LIMIT 1"
        )
        log = cursor.fetchone()
        cursor.close()
        conn.close()

        success = (
            log is not None
            and log["status"] == "error"
            and log["error_message"] is not None
        )
        report.add_test(
            "LOGGING",
            "Error Log schreiben",
            success,
            f"log_written={log is not None}, status={log['status'] if log else 'N/A'}",
        )
    except Exception as e:
        report.add_test("LOGGING", "Error Log schreiben", False, f"Exception: {e}")


def test_config(report: TestReport):
    """Test Konfiguration"""
    print("\n[6/6] Testing Config...")

    # Test 6.1: ALLOWED_DATABASES
    success = (
        "ki_protokoll" in Config.ALLOWED_DATABASES
        and "ki_system" in Config.ALLOWED_DATABASES
        and "mysql" not in Config.ALLOWED_DATABASES
    )
    report.add_test(
        "CONFIG",
        "ALLOWED_DATABASES korrekt",
        success,
        f"databases={Config.ALLOWED_DATABASES}",
    )

    # Test 6.2: BLOCKED_KEYWORDS
    required_keywords = [
        "DROP",
        "DELETE",
        "INSERT",
        "UPDATE",
        "SLEEP",
        "LOAD_FILE",
    ]
    success = all(kw in Config.BLOCKED_KEYWORDS for kw in required_keywords)
    report.add_test(
        "CONFIG",
        "BLOCKED_KEYWORDS enthält kritische Keywords",
        success,
        f"required={required_keywords}, actual={Config.BLOCKED_KEYWORDS}",
    )

    # Test 6.3: MAX_QUERY_LENGTH
    success = Config.MAX_QUERY_LENGTH == 2000
    report.add_test(
        "CONFIG",
        "MAX_QUERY_LENGTH = 2000",
        success,
        f"value={Config.MAX_QUERY_LENGTH}",
    )

    # Test 6.4: MAX_ROWS
    success = Config.MAX_ROWS == MAX_ROWS
    report.add_test(
        "CONFIG", f"MAX_ROWS = {MAX_ROWS}", success, f"value={Config.MAX_ROWS}"
    )

    # Test 6.5: ALLOWED_TABLES
    required_tables = ["mcp_log", "chunks", "protokoll"]
    success = all(tbl in Config.ALLOWED_TABLES for tbl in required_tables)
    report.add_test(
        "CONFIG",
        "ALLOWED_TABLES enthält wichtige Tabellen",
        success,
        f"required={required_tables}, count={len(Config.ALLOWED_TABLES)}",
    )


def main():
    """Führe alle Tests aus"""
    print("\n" + "=" * 80)
    print("MCP-DB SERVER - COMPREHENSIVE TESTING")
    print("=" * 80)

    report = TestReport()

    try:
        test_validator(report)
        test_db_select(report)
        test_db_schema(report)
        test_db_stats(report)
        test_logging(report)
        test_config(report)
    except Exception as e:
        print(f"\nFATAL ERROR: {e}")
        import traceback

        traceback.print_exc()

    report.print_report()

    # Exit Code
    sys.exit(0 if report.failed == 0 else 1)


if __name__ == "__main__":
    main()
← Übersicht Graph