{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/mcp-servers\/mcp_tasks\/tools\/quality_tools.py",
"content": "\"\"\"Quality Tools für MCP-Tasks Server.\"\"\"\n\nimport json\nimport subprocess\nimport time\nfrom datetime import datetime\nfrom typing import Optional\n\nfrom config import Config\nfrom domain.contracts import QualityCheckResult\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\n\nfrom constants import (\n LOG_QUERY_MAX_LENGTH,\n LLM_TIMEOUT,\n MS_PER_SECOND,\n SEMGREP_TIMEOUT,\n)\n\n\ndef register_quality_tools(mcp):\n \"\"\"Registriert alle Quality-Tools.\"\"\"\n logger = get_logger(\"mcp-tasks\", Config)\n\n # ==================== quality_check ====================\n @mcp.tool()\n def quality_check(\n path: str = \"\/var\/www\/dev.campus.systemische-tools.de\",\n checks: str = \"all\",\n fix: bool = False,\n ) -> dict:\n \"\"\"\n Führt PHP-Quality-Prüfungen aus.\n\n Args:\n path: Zu prüfender Pfad\n checks: Welche Checks (phpstan, cs-fixer, semgrep, all)\n fix: Style-Probleme automatisch korrigieren (nur cs-fixer)\n\n Returns:\n Ergebnis aller Checks mit Details\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"path\": path, \"checks\": checks, \"fix\": fix})\n\n try:\n results = {}\n overall_passed = True\n\n check_list = (\n [\"phpstan\", \"cs-fixer\", \"semgrep\"] if checks == \"all\" else [checks]\n )\n\n for check_name in check_list:\n check_result = QualityCheckResult(check_name=check_name)\n\n if check_name == \"phpstan\":\n try:\n result = subprocess.run(\n [\n Config.PHPSTAN_BIN,\n \"analyse\",\n path,\n \"--level=5\",\n \"--no-progress\",\n \"--error-format=json\",\n ],\n capture_output=True,\n text=True,\n timeout=LLM_TIMEOUT,\n )\n if result.returncode == 0:\n check_result.passed = True\n check_result.issues = 0\n else:\n check_result.passed = False\n try:\n phpstan_output = json.loads(result.stdout)\n check_result.issues = phpstan_output.get(\n \"totals\", {}\n ).get(\"errors\", 0)\n except Exception:\n check_result.issues = 1\n check_result.details = result.stdout[:500]\n except subprocess.TimeoutExpired:\n check_result.passed = False\n check_result.details = \"PHPStan timeout\"\n except Exception as e:\n check_result.passed = False\n check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]\n\n elif check_name == \"cs-fixer\":\n try:\n cmd = [\n Config.CS_FIXER_BIN,\n \"fix\" if fix else \"check\",\n path,\n \"--dry-run\" if not fix else \"\",\n ]\n cmd = [c for c in cmd if c]\n result = subprocess.run(\n cmd,\n capture_output=True,\n text=True,\n timeout=LLM_TIMEOUT,\n )\n check_result.passed = result.returncode == 0\n check_result.issues = (\n result.stdout.count(\"CHANGED\") if not fix else 0\n )\n check_result.fixed = (\n result.stdout.count(\"CHANGED\") if fix else 0\n )\n except Exception as e:\n check_result.passed = False\n check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]\n\n elif check_name == \"semgrep\":\n try:\n result = subprocess.run(\n [Config.SEMGREP_BIN, \"--config=auto\", \"--json\", path],\n capture_output=True,\n text=True,\n timeout=SEMGREP_TIMEOUT,\n )\n try:\n semgrep_output = json.loads(result.stdout)\n issues = len(semgrep_output.get(\"results\", []))\n check_result.passed = issues == 0\n check_result.issues = issues\n except Exception:\n check_result.passed = result.returncode == 0\n except subprocess.TimeoutExpired:\n check_result.passed = False\n check_result.details = \"Semgrep timeout\"\n except Exception as e:\n check_result.passed = False\n check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]\n\n results[check_name] = check_result.to_dict()\n if not check_result.passed:\n overall_passed = False\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-tasks\",\n tool_name=\"quality_check\",\n request=request_str,\n status=\"success\" if overall_passed else \"error\",\n duration_ms=duration,\n )\n )\n\n return {\n \"success\": True,\n \"passed\": overall_passed,\n \"path\": path,\n \"results\": results,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-tasks\",\n tool_name=\"quality_check\",\n request=request_str,\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_QUERY_MAX_LENGTH],\n )\n )\n return {\"success\": False, \"error\": str(e)}\n\n # ==================== quality_report ====================\n @mcp.tool()\n def quality_report(\n scope: str = \"full\",\n format: str = \"json\",\n ) -> dict:\n \"\"\"\n Erstellt einen vollständigen Qualitätsbericht.\n\n Args:\n scope: full oder changes_only\n format: json oder markdown\n\n Returns:\n Vollständiger Report über alle Prüfungen\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"scope\": scope, \"format\": format})\n\n try:\n report = {\n \"generated_at\": time.strftime(\"%Y-%m-%d %H:%M:%S\"),\n \"scope\": scope,\n \"dev_path\": \"\/var\/www\/dev.campus.systemische-tools.de\",\n }\n\n # Quality-Checks ausführen\n quality_result = quality_check(\n path=\"\/var\/www\/dev.campus.systemische-tools.de\/src\",\n checks=\"all\",\n )\n report[\"quality_checks\"] = quality_result.get(\"results\", {})\n report[\"quality_passed\"] = quality_result.get(\"passed\", False)\n\n # Zusammenfassung\n report[\"all_passed\"] = report[\"quality_passed\"]\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-tasks\",\n tool_name=\"quality_report\",\n request=request_str,\n status=\"success\",\n duration_ms=duration,\n )\n )\n\n # Markdown-Format\n if format == \"markdown\":\n md = \"# Quality Report\\n\\n\"\n md += f\"Generated: {report['generated_at']}\\n\\n\"\n md += (\n f\"## Overall Status: \"\n f\"{'PASSED' if report['all_passed'] else 'FAILED'}\\n\\n\"\n )\n\n md += \"## Quality Checks\\n\\n\"\n for check_name, check_data in report[\"quality_checks\"].items():\n status = \"PASS\" if check_data.get(\"passed\") else \"FAIL\"\n md += f\"- {status} {check_name}: {check_data.get('issues', 0)} issues\\n\"\n\n return {\n \"success\": True,\n \"report\": md,\n \"format\": \"markdown\",\n }\n\n return {\n \"success\": True,\n \"report\": report,\n \"format\": \"json\",\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-tasks\",\n tool_name=\"quality_report\",\n request=request_str,\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_QUERY_MAX_LENGTH],\n )\n )\n return {\"success\": False, \"error\": str(e)}\n"
}
}
{
"tool_response": {
"type": "update",
"filePath": "\/var\/www\/mcp-servers\/mcp_tasks\/tools\/quality_tools.py",
"content": "\"\"\"Quality Tools für MCP-Tasks Server.\"\"\"\n\nimport json\nimport subprocess\nimport time\nfrom datetime import datetime\nfrom typing import Optional\n\nfrom config import Config\nfrom domain.contracts import QualityCheckResult\nfrom shared.domain import LogEntry\nfrom shared.infrastructure import get_logger\n\nfrom constants import (\n LOG_QUERY_MAX_LENGTH,\n LLM_TIMEOUT,\n MS_PER_SECOND,\n SEMGREP_TIMEOUT,\n)\n\n\ndef register_quality_tools(mcp):\n \"\"\"Registriert alle Quality-Tools.\"\"\"\n logger = get_logger(\"mcp-tasks\", Config)\n\n # ==================== quality_check ====================\n @mcp.tool()\n def quality_check(\n path: str = \"\/var\/www\/dev.campus.systemische-tools.de\",\n checks: str = \"all\",\n fix: bool = False,\n ) -> dict:\n \"\"\"\n Führt PHP-Quality-Prüfungen aus.\n\n Args:\n path: Zu prüfender Pfad\n checks: Welche Checks (phpstan, cs-fixer, semgrep, all)\n fix: Style-Probleme automatisch korrigieren (nur cs-fixer)\n\n Returns:\n Ergebnis aller Checks mit Details\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"path\": path, \"checks\": checks, \"fix\": fix})\n\n try:\n results = {}\n overall_passed = True\n\n check_list = (\n [\"phpstan\", \"cs-fixer\", \"semgrep\"] if checks == \"all\" else [checks]\n )\n\n for check_name in check_list:\n check_result = QualityCheckResult(check_name=check_name)\n\n if check_name == \"phpstan\":\n try:\n result = subprocess.run(\n [\n Config.PHPSTAN_BIN,\n \"analyse\",\n path,\n \"--level=5\",\n \"--no-progress\",\n \"--error-format=json\",\n ],\n capture_output=True,\n text=True,\n timeout=LLM_TIMEOUT,\n )\n if result.returncode == 0:\n check_result.passed = True\n check_result.issues = 0\n else:\n check_result.passed = False\n try:\n phpstan_output = json.loads(result.stdout)\n check_result.issues = phpstan_output.get(\n \"totals\", {}\n ).get(\"errors\", 0)\n except Exception:\n check_result.issues = 1\n check_result.details = result.stdout[:500]\n except subprocess.TimeoutExpired:\n check_result.passed = False\n check_result.details = \"PHPStan timeout\"\n except Exception as e:\n check_result.passed = False\n check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]\n\n elif check_name == \"cs-fixer\":\n try:\n cmd = [\n Config.CS_FIXER_BIN,\n \"fix\" if fix else \"check\",\n path,\n \"--dry-run\" if not fix else \"\",\n ]\n cmd = [c for c in cmd if c]\n result = subprocess.run(\n cmd,\n capture_output=True,\n text=True,\n timeout=LLM_TIMEOUT,\n )\n check_result.passed = result.returncode == 0\n check_result.issues = (\n result.stdout.count(\"CHANGED\") if not fix else 0\n )\n check_result.fixed = (\n result.stdout.count(\"CHANGED\") if fix else 0\n )\n except Exception as e:\n check_result.passed = False\n check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]\n\n elif check_name == \"semgrep\":\n try:\n result = subprocess.run(\n [Config.SEMGREP_BIN, \"--config=auto\", \"--json\", path],\n capture_output=True,\n text=True,\n timeout=SEMGREP_TIMEOUT,\n )\n try:\n semgrep_output = json.loads(result.stdout)\n issues = len(semgrep_output.get(\"results\", []))\n check_result.passed = issues == 0\n check_result.issues = issues\n except Exception:\n check_result.passed = result.returncode == 0\n except subprocess.TimeoutExpired:\n check_result.passed = False\n check_result.details = \"Semgrep timeout\"\n except Exception as e:\n check_result.passed = False\n check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]\n\n results[check_name] = check_result.to_dict()\n if not check_result.passed:\n overall_passed = False\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-tasks\",\n tool_name=\"quality_check\",\n request=request_str,\n status=\"success\" if overall_passed else \"error\",\n duration_ms=duration,\n )\n )\n\n return {\n \"success\": True,\n \"passed\": overall_passed,\n \"path\": path,\n \"results\": results,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-tasks\",\n tool_name=\"quality_check\",\n request=request_str,\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_QUERY_MAX_LENGTH],\n )\n )\n return {\"success\": False, \"error\": str(e)}\n\n # ==================== quality_report ====================\n @mcp.tool()\n def quality_report(\n scope: str = \"full\",\n format: str = \"json\",\n ) -> dict:\n \"\"\"\n Erstellt einen vollständigen Qualitätsbericht.\n\n Args:\n scope: full oder changes_only\n format: json oder markdown\n\n Returns:\n Vollständiger Report über alle Prüfungen\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"scope\": scope, \"format\": format})\n\n try:\n report = {\n \"generated_at\": time.strftime(\"%Y-%m-%d %H:%M:%S\"),\n \"scope\": scope,\n \"dev_path\": \"\/var\/www\/dev.campus.systemische-tools.de\",\n }\n\n # Quality-Checks ausführen\n quality_result = quality_check(\n path=\"\/var\/www\/dev.campus.systemische-tools.de\/src\",\n checks=\"all\",\n )\n report[\"quality_checks\"] = quality_result.get(\"results\", {})\n report[\"quality_passed\"] = quality_result.get(\"passed\", False)\n\n # Zusammenfassung\n report[\"all_passed\"] = report[\"quality_passed\"]\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-tasks\",\n tool_name=\"quality_report\",\n request=request_str,\n status=\"success\",\n duration_ms=duration,\n )\n )\n\n # Markdown-Format\n if format == \"markdown\":\n md = \"# Quality Report\\n\\n\"\n md += f\"Generated: {report['generated_at']}\\n\\n\"\n md += (\n f\"## Overall Status: \"\n f\"{'PASSED' if report['all_passed'] else 'FAILED'}\\n\\n\"\n )\n\n md += \"## Quality Checks\\n\\n\"\n for check_name, check_data in report[\"quality_checks\"].items():\n status = \"PASS\" if check_data.get(\"passed\") else \"FAIL\"\n md += f\"- {status} {check_name}: {check_data.get('issues', 0)} issues\\n\"\n\n return {\n \"success\": True,\n \"report\": md,\n \"format\": \"markdown\",\n }\n\n return {\n \"success\": True,\n \"report\": report,\n \"format\": \"json\",\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(\n LogEntry(\n timestamp=datetime.now(),\n client_name=\"mcp-tasks\",\n tool_name=\"quality_report\",\n request=request_str,\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_QUERY_MAX_LENGTH],\n )\n )\n return {\"success\": False, \"error\": str(e)}\n",
"structuredPatch": [
{
"oldStart": 1,
"oldLines": 25,
"newStart": 1,
"newLines": 28,
"lines": [
"-\"\"\"Quality Tools für MCP-Tasks Server\"\"\"",
"-import sys",
"-import time",
"+\"\"\"Quality Tools für MCP-Tasks Server.\"\"\"",
"+",
" import json",
" import subprocess",
"+import time",
"+from datetime import datetime",
" from typing import Optional",
" ",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_tasks\")",
"-sys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")",
"-",
"-from constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, LLM_TIMEOUT, MS_PER_SECOND, SEMGREP_TIMEOUT",
"-",
" from config import Config",
"-from domain.contracts import LogEntry, QualityCheckResult",
"-from infrastructure.protokoll_logger import get_logger",
"+from domain.contracts import QualityCheckResult",
"+from shared.domain import LogEntry",
"+from shared.infrastructure import get_logger",
" ",
"+from constants import (",
"+ LOG_QUERY_MAX_LENGTH,",
"+ LLM_TIMEOUT,",
"+ MS_PER_SECOND,",
"+ SEMGREP_TIMEOUT,",
"+)",
" ",
"+",
" def register_quality_tools(mcp):",
"- \"\"\"Registriert alle Quality-Tools\"\"\"",
"+ \"\"\"Registriert alle Quality-Tools.\"\"\"",
"+ logger = get_logger(\"mcp-tasks\", Config)",
" ",
"- logger = get_logger()",
"-",
" # ==================== quality_check ====================",
" @mcp.tool()",
" def quality_check("
]
},
{
"oldStart": 45,
"oldLines": 7,
"newStart": 48,
"newLines": 9,
"lines": [
" results = {}",
" overall_passed = True",
" ",
"- check_list = [\"phpstan\", \"cs-fixer\", \"semgrep\"] if checks == \"all\" else [checks]",
"+ check_list = (",
"+ [\"phpstan\", \"cs-fixer\", \"semgrep\"] if checks == \"all\" else [checks]",
"+ )",
" ",
" for check_name in check_list:",
" check_result = QualityCheckResult(check_name=check_name)"
]
},
{
"oldStart": 53,
"oldLines": 7,
"newStart": 58,
"newLines": 14,
"lines": [
" if check_name == \"phpstan\":",
" try:",
" result = subprocess.run(",
"- [Config.PHPSTAN_BIN, \"analyse\", path, \"--level=5\", \"--no-progress\", \"--error-format=json\"],",
"+ [",
"+ Config.PHPSTAN_BIN,",
"+ \"analyse\",",
"+ path,",
"+ \"--level=5\",",
"+ \"--no-progress\",",
"+ \"--error-format=json\",",
"+ ],",
" capture_output=True,",
" text=True,",
" timeout=LLM_TIMEOUT,"
]
},
{
"oldStart": 65,
"oldLines": 7,
"newStart": 77,
"newLines": 9,
"lines": [
" check_result.passed = False",
" try:",
" phpstan_output = json.loads(result.stdout)",
"- check_result.issues = phpstan_output.get(\"totals\", {}).get(\"errors\", 0)",
"+ check_result.issues = phpstan_output.get(",
"+ \"totals\", {}",
"+ ).get(\"errors\", 0)",
" except Exception:",
" check_result.issues = 1",
" check_result.details = result.stdout[:500]"
]
},
{
"oldStart": 78,
"oldLines": 7,
"newStart": 92,
"newLines": 12,
"lines": [
" ",
" elif check_name == \"cs-fixer\":",
" try:",
"- cmd = [Config.CS_FIXER_BIN, \"fix\" if fix else \"check\", path, \"--dry-run\" if not fix else \"\"]",
"+ cmd = [",
"+ Config.CS_FIXER_BIN,",
"+ \"fix\" if fix else \"check\",",
"+ path,",
"+ \"--dry-run\" if not fix else \"\",",
"+ ]",
" cmd = [c for c in cmd if c]",
" result = subprocess.run(",
" cmd,"
]
},
{
"oldStart": 87,
"oldLines": 8,
"newStart": 106,
"newLines": 12,
"lines": [
" timeout=LLM_TIMEOUT,",
" )",
" check_result.passed = result.returncode == 0",
"- check_result.issues = result.stdout.count(\"CHANGED\") if not fix else 0",
"- check_result.fixed = result.stdout.count(\"CHANGED\") if fix else 0",
"+ check_result.issues = (",
"+ result.stdout.count(\"CHANGED\") if not fix else 0",
"+ )",
"+ check_result.fixed = (",
"+ result.stdout.count(\"CHANGED\") if fix else 0",
"+ )",
" except Exception as e:",
" check_result.passed = False",
" check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]"
]
},
{
"oldStart": 120,
"oldLines": 12,
"newStart": 143,
"newLines": 16,
"lines": [
" overall_passed = False",
" ",
" duration = int((time.time() - start) * MS_PER_SECOND)",
"- logger.log(LogEntry(",
"- tool_name=\"quality_check\",",
"- request=request_str,",
"- status=\"success\" if overall_passed else \"error\",",
"- duration_ms=duration,",
"- ))",
"+ logger.log(",
"+ LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-tasks\",",
"+ tool_name=\"quality_check\",",
"+ request=request_str,",
"+ status=\"success\" if overall_passed else \"error\",",
"+ duration_ms=duration,",
"+ )",
"+ )",
" ",
" return {",
" \"success\": True,"
]
},
{
"oldStart": 136,
"oldLines": 13,
"newStart": 163,
"newLines": 17,
"lines": [
" ",
" except Exception as e:",
" duration = int((time.time() - start) * MS_PER_SECOND)",
"- logger.log(LogEntry(",
"- tool_name=\"quality_check\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- error_message=str(e)[:LOG_QUERY_MAX_LENGTH],",
"- ))",
"+ logger.log(",
"+ LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-tasks\",",
"+ tool_name=\"quality_check\",",
"+ request=request_str,",
"+ status=\"error\",",
"+ duration_ms=duration,",
"+ error_message=str(e)[:LOG_QUERY_MAX_LENGTH],",
"+ )",
"+ )",
" return {\"success\": False, \"error\": str(e)}",
" ",
" # ==================== quality_report ===================="
]
},
{
"oldStart": 183,
"oldLines": 18,
"newStart": 214,
"newLines": 25,
"lines": [
" report[\"all_passed\"] = report[\"quality_passed\"]",
" ",
" duration = int((time.time() - start) * MS_PER_SECOND)",
"- logger.log(LogEntry(",
"- tool_name=\"quality_report\",",
"- request=request_str,",
"- status=\"success\",",
"- duration_ms=duration,",
"- ))",
"+ logger.log(",
"+ LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-tasks\",",
"+ tool_name=\"quality_report\",",
"+ request=request_str,",
"+ status=\"success\",",
"+ duration_ms=duration,",
"+ )",
"+ )",
" ",
" # Markdown-Format",
" if format == \"markdown\":",
" md = \"# Quality Report\\n\\n\"",
" md += f\"Generated: {report['generated_at']}\\n\\n\"",
"- md += f\"## Overall Status: {'PASSED' if report['all_passed'] else 'FAILED'}\\n\\n\"",
"+ md += (",
"+ f\"## Overall Status: \"",
"+ f\"{'PASSED' if report['all_passed'] else 'FAILED'}\\n\\n\"",
"+ )",
" ",
" md += \"## Quality Checks\\n\\n\"",
" for check_name, check_data in report[\"quality_checks\"].items():"
]
},
{
"oldStart": 215,
"oldLines": 11,
"newStart": 253,
"newLines": 15,
"lines": [
" ",
" except Exception as e:",
" duration = int((time.time() - start) * MS_PER_SECOND)",
"- logger.log(LogEntry(",
"- tool_name=\"quality_report\",",
"- request=request_str,",
"- status=\"error\",",
"- duration_ms=duration,",
"- error_message=str(e)[:LOG_QUERY_MAX_LENGTH],",
"- ))",
"+ logger.log(",
"+ LogEntry(",
"+ timestamp=datetime.now(),",
"+ client_name=\"mcp-tasks\",",
"+ tool_name=\"quality_report\",",
"+ request=request_str,",
"+ status=\"error\",",
"+ duration_ms=duration,",
"+ error_message=str(e)[:LOG_QUERY_MAX_LENGTH],",
"+ )",
"+ )",
" return {\"success\": False, \"error\": str(e)}"
]
}
],
"originalFile": "\"\"\"Quality Tools für MCP-Tasks Server\"\"\"\nimport sys\nimport time\nimport json\nimport subprocess\nfrom typing import Optional\n\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/mcp_tasks\")\nsys.path.insert(0, \"\/var\/www\/mcp-servers\/shared\")\n\nfrom constants import LOG_ENTRY_MAX_LENGTH, LOG_QUERY_MAX_LENGTH, LLM_TIMEOUT, MS_PER_SECOND, SEMGREP_TIMEOUT\n\nfrom config import Config\nfrom domain.contracts import LogEntry, QualityCheckResult\nfrom infrastructure.protokoll_logger import get_logger\n\n\ndef register_quality_tools(mcp):\n \"\"\"Registriert alle Quality-Tools\"\"\"\n\n logger = get_logger()\n\n # ==================== quality_check ====================\n @mcp.tool()\n def quality_check(\n path: str = \"\/var\/www\/dev.campus.systemische-tools.de\",\n checks: str = \"all\",\n fix: bool = False,\n ) -> dict:\n \"\"\"\n Führt PHP-Quality-Prüfungen aus.\n\n Args:\n path: Zu prüfender Pfad\n checks: Welche Checks (phpstan, cs-fixer, semgrep, all)\n fix: Style-Probleme automatisch korrigieren (nur cs-fixer)\n\n Returns:\n Ergebnis aller Checks mit Details\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"path\": path, \"checks\": checks, \"fix\": fix})\n\n try:\n results = {}\n overall_passed = True\n\n check_list = [\"phpstan\", \"cs-fixer\", \"semgrep\"] if checks == \"all\" else [checks]\n\n for check_name in check_list:\n check_result = QualityCheckResult(check_name=check_name)\n\n if check_name == \"phpstan\":\n try:\n result = subprocess.run(\n [Config.PHPSTAN_BIN, \"analyse\", path, \"--level=5\", \"--no-progress\", \"--error-format=json\"],\n capture_output=True,\n text=True,\n timeout=LLM_TIMEOUT,\n )\n if result.returncode == 0:\n check_result.passed = True\n check_result.issues = 0\n else:\n check_result.passed = False\n try:\n phpstan_output = json.loads(result.stdout)\n check_result.issues = phpstan_output.get(\"totals\", {}).get(\"errors\", 0)\n except Exception:\n check_result.issues = 1\n check_result.details = result.stdout[:500]\n except subprocess.TimeoutExpired:\n check_result.passed = False\n check_result.details = \"PHPStan timeout\"\n except Exception as e:\n check_result.passed = False\n check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]\n\n elif check_name == \"cs-fixer\":\n try:\n cmd = [Config.CS_FIXER_BIN, \"fix\" if fix else \"check\", path, \"--dry-run\" if not fix else \"\"]\n cmd = [c for c in cmd if c]\n result = subprocess.run(\n cmd,\n capture_output=True,\n text=True,\n timeout=LLM_TIMEOUT,\n )\n check_result.passed = result.returncode == 0\n check_result.issues = result.stdout.count(\"CHANGED\") if not fix else 0\n check_result.fixed = result.stdout.count(\"CHANGED\") if fix else 0\n except Exception as e:\n check_result.passed = False\n check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]\n\n elif check_name == \"semgrep\":\n try:\n result = subprocess.run(\n [Config.SEMGREP_BIN, \"--config=auto\", \"--json\", path],\n capture_output=True,\n text=True,\n timeout=SEMGREP_TIMEOUT,\n )\n try:\n semgrep_output = json.loads(result.stdout)\n issues = len(semgrep_output.get(\"results\", []))\n check_result.passed = issues == 0\n check_result.issues = issues\n except Exception:\n check_result.passed = result.returncode == 0\n except subprocess.TimeoutExpired:\n check_result.passed = False\n check_result.details = \"Semgrep timeout\"\n except Exception as e:\n check_result.passed = False\n check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]\n\n results[check_name] = check_result.to_dict()\n if not check_result.passed:\n overall_passed = False\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(LogEntry(\n tool_name=\"quality_check\",\n request=request_str,\n status=\"success\" if overall_passed else \"error\",\n duration_ms=duration,\n ))\n\n return {\n \"success\": True,\n \"passed\": overall_passed,\n \"path\": path,\n \"results\": results,\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(LogEntry(\n tool_name=\"quality_check\",\n request=request_str,\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_QUERY_MAX_LENGTH],\n ))\n return {\"success\": False, \"error\": str(e)}\n\n # ==================== quality_report ====================\n @mcp.tool()\n def quality_report(\n scope: str = \"full\",\n format: str = \"json\",\n ) -> dict:\n \"\"\"\n Erstellt einen vollständigen Qualitätsbericht.\n\n Args:\n scope: full oder changes_only\n format: json oder markdown\n\n Returns:\n Vollständiger Report über alle Prüfungen\n \"\"\"\n start = time.time()\n request_str = json.dumps({\"scope\": scope, \"format\": format})\n\n try:\n report = {\n \"generated_at\": time.strftime(\"%Y-%m-%d %H:%M:%S\"),\n \"scope\": scope,\n \"dev_path\": \"\/var\/www\/dev.campus.systemische-tools.de\",\n }\n\n # Quality-Checks ausführen\n quality_result = quality_check(\n path=\"\/var\/www\/dev.campus.systemische-tools.de\/src\",\n checks=\"all\",\n )\n report[\"quality_checks\"] = quality_result.get(\"results\", {})\n report[\"quality_passed\"] = quality_result.get(\"passed\", False)\n\n # Zusammenfassung\n report[\"all_passed\"] = report[\"quality_passed\"]\n\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(LogEntry(\n tool_name=\"quality_report\",\n request=request_str,\n status=\"success\",\n duration_ms=duration,\n ))\n\n # Markdown-Format\n if format == \"markdown\":\n md = \"# Quality Report\\n\\n\"\n md += f\"Generated: {report['generated_at']}\\n\\n\"\n md += f\"## Overall Status: {'PASSED' if report['all_passed'] else 'FAILED'}\\n\\n\"\n\n md += \"## Quality Checks\\n\\n\"\n for check_name, check_data in report[\"quality_checks\"].items():\n status = \"PASS\" if check_data.get(\"passed\") else \"FAIL\"\n md += f\"- {status} {check_name}: {check_data.get('issues', 0)} issues\\n\"\n\n return {\n \"success\": True,\n \"report\": md,\n \"format\": \"markdown\",\n }\n\n return {\n \"success\": True,\n \"report\": report,\n \"format\": \"json\",\n }\n\n except Exception as e:\n duration = int((time.time() - start) * MS_PER_SECOND)\n logger.log(LogEntry(\n tool_name=\"quality_report\",\n request=request_str,\n status=\"error\",\n duration_ms=duration,\n error_message=str(e)[:LOG_QUERY_MAX_LENGTH],\n ))\n return {\"success\": False, \"error\": str(e)}\n"
}
}