Chunk #544
Aus: Validators (Index: 0)
400
Tokens
Synced
Status
Taxonomie
| Kategorie | Frameworks |
|---|---|
| Pfad | Frameworks > Validators > Query Validation |
| Heading-Pfad | Validators > MCP-DB Validators > QueryValidator |
Entities
| Name | Typ |
|---|---|
| QueryValidator | CLASS |
| SQL Query | TECHNOLOGY |
| Regex | TECHNOLOGY |
| Config | CONFIG |
| Allowed Databases | CONFIG |
| Blocked Keywords | CONFIG |
| Allowed Tables | CONFIG |
Keywords
Inhalt
```
`"""SRP: Separate Validierungslogik"""
from typing import Tuple
import re
from config import Config
class QueryValidator:
"""Validiert SQL Queries - SRP: Nur Validierung"""
@staticmethod
def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:
"""
Validiert eine Query gegen alle Sicherheitsregeln.
Returns:
(is_valid, error_message)
"""
# Basis-Validierung
if not query or len(query) < 1:
return False, "Query must not be empty"
if len(query) > Config.MAX_QUERY_LENGTH:
return False, f"Query must be max {Config.MAX_QUERY_LENGTH} chars"
# Nur SELECT erlaubt
query_upper = query.strip().upper()
if not query_upper.startswith("SELECT"):
return False, "Only SELECT queries allowed"
# Dangerous Keyword Blocklist
for keyword in Config.BLOCKED_KEYWORDS:
pattern = r'\b' + re.escape(keyword) + r'\b'
if re.search(pattern, query_upper):
return False, f"Blocked keyword detected: {keyword}"
# Database Allowlist
if database not in Config.ALLOWED_DATABASES:
return False, f"Database '{database}' not allowed"
# Max Rows prüfen
if max_rows < 1 or max_rows > Config.MAX_ROWS:
return False, f"max_rows must be 1-{Config.MAX_ROWS}"
# Table Allowlist (mit information_schema Ausnahme)
from_tables = QueryValidator._extract_table_names(query_upper)
for table in from_tables:
# Erlaube information_schema.TABLES für Schema-Tool
if "INFORMATION_SCHEMA" in table:
continue
# Prüfe gegen Allowlist
table_clean = table.split('.')[-1]
if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:
return False, f"Table '{table}' not allowed"
Clean Content
```
`"""SRP: Separate Validierungslogik"""
from typing import Tuple
import re
from config import Config
class QueryValidator:
"""Validiert SQL Queries - SRP: Nur Validierung"""
@staticmethod
def validate_query(query: str, database: str, max_rows: int) -> Tuple[bool, str]:
"""
Validiert eine Query gegen alle Sicherheitsregeln.
Returns:
(is_valid, error_message)
"""
# Basis-Validierung
if not query or len(query) < 1:
return False, "Query must not be empty"
if len(query) > Config.MAX_QUERY_LENGTH:
return False, f"Query must be max {Config.MAX_QUERY_LENGTH} chars"
# Nur SELECT erlaubt
query_upper = query.strip().upper()
if not query_upper.startswith("SELECT"):
return False, "Only SELECT queries allowed"
# Dangerous Keyword Blocklist
for keyword in Config.BLOCKED_KEYWORDS:
pattern = r'\b' + re.escape(keyword) + r'\b'
if re.search(pattern, query_upper):
return False, f"Blocked keyword detected: {keyword}"
# Database Allowlist
if database not in Config.ALLOWED_DATABASES:
return False, f"Database '{database}' not allowed"
# Max Rows prüfen
if max_rows < 1 or max_rows > Config.MAX_ROWS:
return False, f"max_rows must be 1-{Config.MAX_ROWS}"
# Table Allowlist (mit information_schema Ausnahme)
from_tables = QueryValidator._extract_table_names(query_upper)
for table in from_tables:
# Erlaube information_schema.TABLES für Schema-Tool
if "INFORMATION_SCHEMA" in table:
continue
# Prüfe gegen Allowlist
table_clean = table.split('.')[-1]
if table_clean not in [t.upper() for t in Config.ALLOWED_TABLES]:
return False, f"Table '{table}' not allowed"
Metadaten
| Analysiert am | 20.12.2025 12:43:45 |
|---|---|
| Analyse-Modell | gemma3:4b-it-qat |
| Embedding-Modell | mxbai-embed-large |
| Qdrant-ID | b8acea9c-46ff-4854-9edb-99b16d875353 |
| Erstellt | 20.12.2025 12:34:42 |
| Aktualisiert | 20.12.2025 12:57:43 |