Protokoll #32559

ID32559
Zeitstempel2026-02-15 03:07:39.129894
Clientroot
IP145.224.72.253
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens2,763 (Input: 66, Output: 2,697)
Dauer235 ms
Request-Zeit2026-02-15 03:07:39.129894
Response-Zeit2026-02-15 03:07:39.364863

Request

{
    "event": "PreToolUse",
    "tool_name": "Bash",
    "tool_input": {
        "command": "ssh root@prod.karlkratz.com \"ls \/var\/www\/kiglove\/shared\/sysop\/ && cat \/var\/www\/kiglove\/shared\/sysop\/nftables_client.py\" 2>&1",
        "timeout": 15000,
        "description": "Read nftables client module"
    }
}

Response

{
    "tool_response": {
        "stdout": "__init__.py\nnftables_client.py\n__pycache__\nsystemctl_client.py\n\"\"\"\nNftablesClient - Wrapper fuer nftables Operationen.\n\nArtefakt: kiglove~sysop~firewall~config\n\nUsage:\n    from shared.sysop.nftables_client import NftablesClient\n\n    # Status abfragen\n    status = NftablesClient.status()\n\n    # Regeln auflisten\n    rules = NftablesClient.list_rules(table=\"inet filter\")\n\n    # Set-Elemente hinzufuegen\n    NftablesClient.add_to_set(\"geoblock_v4\", \"1.2.3.0\/24\")\n\"\"\"\n\nimport subprocess\nfrom pathlib import Path\nfrom typing import Any\n\nfrom ..command_logger import CommandLogger\nfrom ..debug_logger import DebugLogger\nfrom ..kiglove_artefact_reader import KIGloveArtefactReader\n\n\nclass NftablesClient:\n    \"\"\"Fuehrt nftables-Befehle aus (artefact-gesteuert).\"\"\"\n\n    NFT_BIN = \"\/usr\/sbin\/nft\"\n    NFT_CONF = \"\/etc\/nftables.conf\"\n\n    @classmethod\n    def _get_config(cls) -> dict[str, Any]:\n        \"\"\"Firewall-Config aus Artefakt laden.\"\"\"\n        return KIGloveArtefactReader.get_yaml(\"kiglove~sysop~firewall~config\") or {}\n\n    @classmethod\n    def _get_allowed_sets(cls) -> list[str]:\n        \"\"\"Erlaubte Sets aus Artefakt laden.\"\"\"\n        config = cls._get_config()\n        return config.get(\"allowed_sets\", [])\n\n    @classmethod\n    def _run_nft(cls, args: list[str], check: bool = True) -> subprocess.CompletedProcess:\n        \"\"\"\n        Fuehrt nft-Befehl aus.\n\n        Args:\n            args: Argumente fuer nft (ohne 'nft' selbst).\n            check: Bei Fehler Exception werfen.\n\n        Returns:\n            CompletedProcess mit stdout\/stderr.\n        \"\"\"\n        cmd = [cls.NFT_BIN] + args\n        try:\n            result = subprocess.run(  # noqa: S603\n                cmd,\n                capture_output=True,\n                text=True,\n                check=check,\n                timeout=30,\n            )\n            return result\n        except subprocess.CalledProcessError as e:\n            DebugLogger.error(f\"nft failed: {e.stderr}\")\n            raise\n\n    @classmethod\n    def status(cls) -> dict[str, Any]:\n        \"\"\"\n        Gibt Firewall-Status zurueck.\n\n        Returns:\n            Dict mit tables, rule_count, active.\n        \"\"\"\n        try:\n            result = cls._run_nft([\"list\", \"tables\"], check=False)\n            if result.returncode != 0:\n                return {\"active\": False, \"error\": result.stderr.strip()}\n\n            tables = [t for t in result.stdout.strip().split(\"\\n\") if t]\n\n            result_rules = cls._run_nft([\"list\", \"ruleset\"], check=False)\n            rule_count = result_rules.stdout.count(\" rule \") if result_rules.stdout else 0\n\n            return {\n                \"active\": True,\n                \"tables\": tables,\n                \"table_count\": len(tables),\n                \"rule_count\": rule_count\n            }\n        except (OSError, subprocess.TimeoutExpired) as e:\n            DebugLogger.error(f\"status failed: {e}\")\n            return {\"active\": False, \"error\": str(e)}\n\n    @classmethod\n    def list_rules(cls, table: str | None = None) -> str:\n        \"\"\"\n        Listet aktive Regeln auf.\n\n        Args:\n            table: Optional - nur bestimmte Tabelle.\n\n        Returns:\n            Ruleset als String.\n        \"\"\"\n        if table:\n            args = [\"list\", \"table\", table]\n        else:\n            args = [\"list\", \"ruleset\"]\n\n        result = cls._run_nft(args, check=False)\n        if result.returncode != 0:\n            raise RuntimeError(f\"list_rules failed: {result.stderr}\")\n\n        return result.stdout\n\n    @classmethod\n    def reload(cls, config_file: str | None = None) -> tuple[bool, str]:\n        \"\"\"\n        Laedt Firewall-Regeln neu.\n\n        Args:\n            config_file: Optional - alternatives Config-File.\n\n        Returns:\n            Tuple (success, message).\n        \"\"\"\n        conf = config_file or cls.NFT_CONF\n\n        if not Path(conf).exists():\n            return False, f\"Config nicht gefunden: {conf}\"\n\n        result = cls._run_nft([\"-f\", conf], check=False)\n\n        if result.returncode != 0:\n            DebugLogger.error(f\"reload failed: {result.stderr}\")\n            return False, result.stderr.strip()\n\n        CommandLogger.log(\"nftables\", \"reload\", {\"config\": conf}, \"success\")\n        return True, f\"Regeln geladen aus {conf}\"\n\n    @classmethod\n    def list_sets(cls) -> list[dict[str, Any]]:\n        \"\"\"\n        Listet alle nftables Sets auf.\n\n        Returns:\n            Liste von Sets mit name, type, elements.\n        \"\"\"\n        result = cls._run_nft([\"list\", \"sets\"], check=False)\n        if result.returncode != 0:\n            return []\n\n        sets: list[dict[str, Any]] = []\n        current_set: dict[str, Any] | None = None\n\n        for line in result.stdout.split(\"\\n\"):\n            line = line.strip()\n            if line.startswith(\"set \"):\n                if current_set:\n                    sets.append(current_set)\n                name = line.split()[1]\n                current_set = {\"name\": name, \"type\": \"\", \"elements\": []}\n            elif current_set and line.startswith(\"type \"):\n                current_set[\"type\"] = line.replace(\"type \", \"\").rstrip(\";\")\n            elif current_set and line.startswith(\"elements = {\"):\n                elem_str = line.replace(\"elements = {\", \"\").rstrip(\" }\")\n                if elem_str:\n                    current_set[\"elements\"] = [e.strip() for e in elem_str.split(\",\")]\n\n        if current_set:\n            sets.append(current_set)\n\n        return sets\n\n    @classmethod\n    def add_to_set(\n        cls, set_name: str, element: str, table: str = \"inet filter\"\n    ) -> tuple[bool, str]:\n        \"\"\"\n        Fuegt Element zu nftables Set hinzu.\n\n        Args:\n            set_name: Name des Sets.\n            element: IP\/CIDR zum Hinzufuegen.\n            table: Tabelle (default: inet filter).\n\n        Returns:\n            Tuple (success, message).\n        \"\"\"\n        allowed = cls._get_allowed_sets()\n        if allowed and set_name not in allowed:\n            return False, f\"Set '{set_name}' nicht in Allowlist\"\n\n        result = cls._run_nft(\n            [\"add\", \"element\", table, set_name, \"{\", element, \"}\"],\n            check=False\n        )\n\n        if result.returncode != 0:\n            DebugLogger.error(f\"add_to_set failed: {result.stderr}\")\n            return False, result.stderr.strip()\n\n        CommandLogger.log(\n            \"nftables\", \"add_to_set\",\n            {\"set\": set_name, \"element\": element, \"table\": table},\n            \"success\"\n        )\n        return True, f\"Element {element} zu {set_name} hinzugefuegt\"\n\n    @classmethod\n    def remove_from_set(\n        cls, set_name: str, element: str, table: str = \"inet filter\"\n    ) -> tuple[bool, str]:\n        \"\"\"\n        Entfernt Element aus nftables Set.\n\n        Args:\n            set_name: Name des Sets.\n            element: IP\/CIDR zum Entfernen.\n            table: Tabelle (default: inet filter).\n\n        Returns:\n            Tuple (success, message).\n        \"\"\"\n        allowed = cls._get_allowed_sets()\n        if allowed and set_name not in allowed:\n            return False, f\"Set '{set_name}' nicht in Allowlist\"\n\n        result = cls._run_nft(\n            [\"delete\", \"element\", table, set_name, \"{\", element, \"}\"],\n            check=False\n        )\n\n        if result.returncode != 0:\n            DebugLogger.error(f\"remove_from_set failed: {result.stderr}\")\n            return False, result.stderr.strip()\n\n        CommandLogger.log(\n            \"nftables\", \"remove_from_set\",\n            {\"set\": set_name, \"element\": element, \"table\": table},\n            \"success\"\n        )\n        return True, f\"Element {element} aus {set_name} entfernt\"\n\n    @classmethod\n    def flush_set(cls, set_name: str, table: str = \"inet filter\") -> tuple[bool, str]:\n        \"\"\"\n        Leert ein nftables Set.\n\n        Args:\n            set_name: Name des Sets.\n            table: Tabelle (default: inet filter).\n\n        Returns:\n            Tuple (success, message).\n        \"\"\"\n        allowed = cls._get_allowed_sets()\n        if allowed and set_name not in allowed:\n            return False, f\"Set '{set_name}' nicht in Allowlist\"\n\n        result = cls._run_nft([\"flush\", \"set\", table, set_name], check=False)\n\n        if result.returncode != 0:\n            return False, result.stderr.strip()\n\n        CommandLogger.log(\n            \"nftables\", \"flush_set\",\n            {\"set\": set_name, \"table\": table},\n            \"success\"\n        )\n        return True, f\"Set {set_name} geleert\"\n\n    @classmethod\n    def create_set(\n        cls,\n        set_name: str,\n        set_type: str = \"ipv4_addr\",\n        table: str = \"inet filter\",\n        flags: str = \"interval\"\n    ) -> tuple[bool, str]:\n        \"\"\"\n        Erstellt ein nftables Set.\n\n        Args:\n            set_name: Name des Sets.\n            set_type: Typ (ipv4_addr, ipv6_addr, etc).\n            table: Tabelle (default: inet filter).\n            flags: Set-Flags (interval fuer CIDR).\n\n        Returns:\n            Tuple (success, message).\n        \"\"\"\n        allowed = cls._get_allowed_sets()\n        if allowed and set_name not in allowed:\n            return False, f\"Set {set_name!r} nicht in Allowlist\"\n\n        # Tabelle erstellen falls nicht vorhanden\n        table_parts = table.split()\n        if len(table_parts) == 2:\n            cls._run_nft([\"add\", \"table\"] + table_parts, check=False)\n\n        # Set erstellen\n        result = cls._run_nft(\n            [\"add\", \"set\", table, set_name, \"{\",\n             \"type\", set_type, \";\", \"flags\", flags, \";\", \"}\"],\n            check=False\n        )\n\n        if result.returncode != 0:\n            if \"File exists\" in result.stderr:\n                return True, f\"Set {set_name} existiert bereits\"\n            DebugLogger.error(f\"create_set failed: {result.stderr}\")\n            return False, result.stderr.strip()\n\n        CommandLogger.log(\n            \"nftables\", \"create_set\",\n            {\"set\": set_name, \"type\": set_type, \"table\": table, \"flags\": flags},\n            \"success\"\n        )\n        return True, f\"Set... [TRUNCATED-14a6039cc2d460c8]",
        "stderr": "",
        "interrupted": false,
        "isImage": false,
        "noOutputExpected": false
    }
}
← Vorheriger Zur Liste Nächster →