quality_tools.py

Code Hygiene Score: 75

Keine Issues gefunden.

Dependencies 13

Funktionen 1

Code

"""Quality Tools für MCP-Tasks Server."""

import json
import subprocess
import time
from datetime import datetime
from typing import Optional

from config import Config
from domain.contracts import QualityCheckResult
from shared.domain import LogEntry
from shared.infrastructure import get_logger

from constants import (
    LOG_QUERY_MAX_LENGTH,
    LLM_TIMEOUT,
    MS_PER_SECOND,
    SEMGREP_TIMEOUT,
)


def register_quality_tools(mcp):
    """Registriert alle Quality-Tools."""
    logger = get_logger("mcp-tasks", Config)

    # ==================== quality_check ====================
    @mcp.tool()
    def quality_check(
        path: str = "/var/www/dev.campus.systemische-tools.de",
        checks: str = "all",
        fix: bool = False,
    ) -> dict:
        """
        Führt PHP-Quality-Prüfungen aus.

        Args:
            path: Zu prüfender Pfad
            checks: Welche Checks (phpstan, cs-fixer, semgrep, all)
            fix: Style-Probleme automatisch korrigieren (nur cs-fixer)

        Returns:
            Ergebnis aller Checks mit Details
        """
        start = time.time()
        request_str = json.dumps({"path": path, "checks": checks, "fix": fix})

        try:
            results = {}
            overall_passed = True

            check_list = (
                ["phpstan", "cs-fixer", "semgrep"] if checks == "all" else [checks]
            )

            for check_name in check_list:
                check_result = QualityCheckResult(check_name=check_name)

                if check_name == "phpstan":
                    try:
                        result = subprocess.run(
                            [
                                Config.PHPSTAN_BIN,
                                "analyse",
                                path,
                                "--level=5",
                                "--no-progress",
                                "--error-format=json",
                            ],
                            capture_output=True,
                            text=True,
                            timeout=LLM_TIMEOUT,
                        )
                        if result.returncode == 0:
                            check_result.passed = True
                            check_result.issues = 0
                        else:
                            check_result.passed = False
                            try:
                                phpstan_output = json.loads(result.stdout)
                                check_result.issues = phpstan_output.get(
                                    "totals", {}
                                ).get("errors", 0)
                            except Exception:
                                check_result.issues = 1
                            check_result.details = result.stdout[:500]
                    except subprocess.TimeoutExpired:
                        check_result.passed = False
                        check_result.details = "PHPStan timeout"
                    except Exception as e:
                        check_result.passed = False
                        check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]

                elif check_name == "cs-fixer":
                    try:
                        cmd = [
                            Config.CS_FIXER_BIN,
                            "fix" if fix else "check",
                            path,
                            "--dry-run" if not fix else "",
                        ]
                        cmd = [c for c in cmd if c]
                        result = subprocess.run(
                            cmd,
                            capture_output=True,
                            text=True,
                            timeout=LLM_TIMEOUT,
                        )
                        check_result.passed = result.returncode == 0
                        check_result.issues = (
                            result.stdout.count("CHANGED") if not fix else 0
                        )
                        check_result.fixed = (
                            result.stdout.count("CHANGED") if fix else 0
                        )
                    except Exception as e:
                        check_result.passed = False
                        check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]

                elif check_name == "semgrep":
                    try:
                        result = subprocess.run(
                            [Config.SEMGREP_BIN, "--config=auto", "--json", path],
                            capture_output=True,
                            text=True,
                            timeout=SEMGREP_TIMEOUT,
                        )
                        try:
                            semgrep_output = json.loads(result.stdout)
                            issues = len(semgrep_output.get("results", []))
                            check_result.passed = issues == 0
                            check_result.issues = issues
                        except Exception:
                            check_result.passed = result.returncode == 0
                    except subprocess.TimeoutExpired:
                        check_result.passed = False
                        check_result.details = "Semgrep timeout"
                    except Exception as e:
                        check_result.passed = False
                        check_result.details = str(e)[:LOG_QUERY_MAX_LENGTH]

                results[check_name] = check_result.to_dict()
                if not check_result.passed:
                    overall_passed = False

            duration = int((time.time() - start) * MS_PER_SECOND)
            logger.log(
                LogEntry(
                    timestamp=datetime.now(),
                    client_name="mcp-tasks",
                    tool_name="quality_check",
                    request=request_str,
                    status="success" if overall_passed else "error",
                    duration_ms=duration,
                )
            )

            return {
                "success": True,
                "passed": overall_passed,
                "path": path,
                "results": results,
            }

        except Exception as e:
            duration = int((time.time() - start) * MS_PER_SECOND)
            logger.log(
                LogEntry(
                    timestamp=datetime.now(),
                    client_name="mcp-tasks",
                    tool_name="quality_check",
                    request=request_str,
                    status="error",
                    duration_ms=duration,
                    error_message=str(e)[:LOG_QUERY_MAX_LENGTH],
                )
            )
            return {"success": False, "error": str(e)}

    # ==================== quality_report ====================
    @mcp.tool()
    def quality_report(
        scope: str = "full",
        format: str = "json",
    ) -> dict:
        """
        Erstellt einen vollständigen Qualitätsbericht.

        Args:
            scope: full oder changes_only
            format: json oder markdown

        Returns:
            Vollständiger Report über alle Prüfungen
        """
        start = time.time()
        request_str = json.dumps({"scope": scope, "format": format})

        try:
            report = {
                "generated_at": time.strftime("%Y-%m-%d %H:%M:%S"),
                "scope": scope,
                "dev_path": "/var/www/dev.campus.systemische-tools.de",
            }

            # Quality-Checks ausführen
            quality_result = quality_check(
                path="/var/www/dev.campus.systemische-tools.de/src",
                checks="all",
            )
            report["quality_checks"] = quality_result.get("results", {})
            report["quality_passed"] = quality_result.get("passed", False)

            # Zusammenfassung
            report["all_passed"] = report["quality_passed"]

            duration = int((time.time() - start) * MS_PER_SECOND)
            logger.log(
                LogEntry(
                    timestamp=datetime.now(),
                    client_name="mcp-tasks",
                    tool_name="quality_report",
                    request=request_str,
                    status="success",
                    duration_ms=duration,
                )
            )

            # Markdown-Format
            if format == "markdown":
                md = "# Quality Report\n\n"
                md += f"Generated: {report['generated_at']}\n\n"
                md += (
                    f"## Overall Status: "
                    f"{'PASSED' if report['all_passed'] else 'FAILED'}\n\n"
                )

                md += "## Quality Checks\n\n"
                for check_name, check_data in report["quality_checks"].items():
                    status = "PASS" if check_data.get("passed") else "FAIL"
                    md += f"- {status} {check_name}: {check_data.get('issues', 0)} issues\n"

                return {
                    "success": True,
                    "report": md,
                    "format": "markdown",
                }

            return {
                "success": True,
                "report": report,
                "format": "json",
            }

        except Exception as e:
            duration = int((time.time() - start) * MS_PER_SECOND)
            logger.log(
                LogEntry(
                    timestamp=datetime.now(),
                    client_name="mcp-tasks",
                    tool_name="quality_report",
                    request=request_str,
                    status="error",
                    duration_ms=duration,
                    error_message=str(e)[:LOG_QUERY_MAX_LENGTH],
                )
            )
            return {"success": False, "error": str(e)}
← Übersicht