Chunk #544

Aus: Validators (Index: 0)

400 Tokens
Synced Status
Nächster (#1) »

Taxonomie

Kategorie Frameworks
Pfad Frameworks > Validators > Query Validation
Heading-Pfad Validators > MCP-DB Validators > QueryValidator

Entities

Name Typ
QueryValidator CLASS
SQL Query TECHNOLOGY
Regex TECHNOLOGY
Config CONFIG
Allowed Databases CONFIG
Blocked Keywords CONFIG
Allowed Tables CONFIG

Keywords

validation SQL query security database

Inhalt

``` `"""SRP: Separate Validierungslogik""" from typing import Tuple import re from config import Config class QueryValidator: """Validiert SQL Queries - SRP: Nur Validierung""" @staticmethod def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]: """ Validiert eine Query gegen alle Sicherheitsregeln. Returns: (is_valid, error_message) """ # Basis-Validierung if not query or len(query) < 1: return False, "Query must not be empty" if len(query) > Config.MAX_QUERY_LENGTH: return False, f"Query must be max {Config.MAX_QUERY_LENGTH} chars" # Nur SELECT erlaubt query_upper = query.strip().upper() if not query_upper.startswith("SELECT"): return False, "Only SELECT queries allowed" # Dangerous Keyword Blocklist for keyword in Config.BLOCKED_KEYWORDS: pattern = r'\b' + re.escape(keyword) + r'\b' if re.search(pattern, query_upper): return False, f"Blocked keyword detected: {keyword}" # Database Allowlist if database not in Config.ALLOWED_DATABASES: return False, f"Database '{database}' not allowed" # Max Rows prüfen if max_rows < 1 or max_rows > Config.MAX_ROWS: return False, f"max_rows must be 1-{Config.MAX_ROWS}" # Table Allowlist (mit information_schema Ausnahme) from_tables = QueryValidator._extract_table_names(query_upper) for table in from_tables: # Erlaube information_schema.TABLES für Schema-Tool if "INFORMATION_SCHEMA" in table: continue # Prüfe gegen Allowlist table_clean = table.split('.')[-1] if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]: return False, f"Table '{table}' not allowed"

Clean Content

```
`"""SRP: Separate Validierungslogik"""
from typing import Tuple
import re
from config import Config

class QueryValidator:
 """Validiert SQL Queries - SRP: Nur Validierung"""

 @staticmethod
 def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:
 """
 Validiert eine Query gegen alle Sicherheitsregeln.

 Returns:
 (is_valid, error_message)
 """
 # Basis-Validierung
 if not query or len(query) < 1:
 return False, "Query must not be empty"

 if len(query) > Config.MAX_QUERY_LENGTH:
 return False, f"Query must be max {Config.MAX_QUERY_LENGTH} chars"

 # Nur SELECT erlaubt
 query_upper = query.strip().upper()
 if not query_upper.startswith("SELECT"):
 return False, "Only SELECT queries allowed"

 # Dangerous Keyword Blocklist
 for keyword in Config.BLOCKED_KEYWORDS:
 pattern = r'\b' + re.escape(keyword) + r'\b'
 if re.search(pattern, query_upper):
 return False, f"Blocked keyword detected: {keyword}"

 # Database Allowlist
 if database not in Config.ALLOWED_DATABASES:
 return False, f"Database '{database}' not allowed"

 # Max Rows prüfen
 if max_rows < 1 or max_rows > Config.MAX_ROWS:
 return False, f"max_rows must be 1-{Config.MAX_ROWS}"

 # Table Allowlist (mit information_schema Ausnahme)
 from_tables = QueryValidator._extract_table_names(query_upper)
 for table in from_tables:
 # Erlaube information_schema.TABLES für Schema-Tool
 if "INFORMATION_SCHEMA" in table:
 continue
 # Prüfe gegen Allowlist
 table_clean = table.split('.')[-1]
 if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:
 return False, f"Table '{table}' not allowed"

Metadaten

Analysiert am20.12.2025 12:43:45
Analyse-Modellgemma3:4b-it-qat
Embedding-Modellmxbai-embed-large
Qdrant-IDb8acea9c-46ff-4854-9edb-99b16d875353
Erstellt20.12.2025 12:34:42
Aktualisiert20.12.2025 12:57:43