query_validator.py
- Pfad:
/var/www/mcp-servers/mcp-db/validators/query_validator.py - Namespace: -
- Zeilen: 79 | Größe: 2,660 Bytes
- Geändert: 2025-12-28 13:14:44 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 100
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 3
- use re
- use typing.Tuple
- use config.Config
Klassen 1
-
QueryValidatorclass Zeile 9
Code
"""Query Validator - SRP: Separate Validierungslogik"""
import re
from typing import Tuple
from config import Config
class QueryValidator:
"""Validiert SQL Queries - SRP: Nur Validierung"""
@staticmethod
def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:
"""
Validiert eine Query gegen alle Sicherheitsregeln.
Returns:
(is_valid, error_message)
"""
# Basis-Validierung
if not query or len(query) < 1:
return False, "Query must not be empty"
if len(query) > Config.MAX_QUERY_LENGTH:
return False, f"Query must be max {Config.MAX_QUERY_LENGTH} chars"
# Nur SELECT erlaubt
query_upper = query.strip().upper()
if not query_upper.startswith("SELECT"):
return False, "Only SELECT queries allowed"
# Dangerous Keyword Blocklist
for keyword in Config.BLOCKED_KEYWORDS:
# Pruefe Keyword mit Word Boundaries
pattern = r"\b" + re.escape(keyword) + r"\b"
if re.search(pattern, query_upper):
return False, f"Blocked keyword detected: {keyword}"
# Database Allowlist
if database not in Config.ALLOWED_DATABASES:
return (
False,
f"Database '{database}' not allowed. Allowed: {', '.join(Config.ALLOWED_DATABASES)}",
)
# Max Rows pruefen
if max_rows < 1 or max_rows > Config.MAX_ROWS:
return False, f"max_rows must be 1-{Config.MAX_ROWS}"
# Table Allowlist (information_schema Schutz)
from_tables = QueryValidator._extract_table_names(query_upper)
for table in from_tables:
# Erlaube information_schema.TABLES fuer Schema-Tool
if "INFORMATION_SCHEMA" in table:
continue
# Pruefe gegen Allowlist
table_clean = table.split(".")[-1]
if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:
return (
False,
f"Table '{table}' not allowed. Allowed: {', '.join(Config.ALLOWED_TABLES)}",
)
return True, ""
@staticmethod
def _extract_table_names(query_upper: str) -> list:
"""Extrahiert Tabellennamen aus FROM und JOIN Clauses"""
tables = []
# FROM table, JOIN table
from_match = re.findall(r"\bFROM\s+([a-zA-Z0-9_\.]+)", query_upper)
join_match = re.findall(r"\bJOIN\s+([a-zA-Z0-9_\.]+)", query_upper)
tables.extend(from_match)
tables.extend(join_match)
return tables