{
"tool_response": {
"stdout": "__init__.py\nnftables_client.py\n__pycache__\nsystemctl_client.py\n\"\"\"\nNftablesClient - Wrapper fuer nftables Operationen.\n\nArtefakt: kiglove~sysop~firewall~config\n\nUsage:\n from shared.sysop.nftables_client import NftablesClient\n\n # Status abfragen\n status = NftablesClient.status()\n\n # Regeln auflisten\n rules = NftablesClient.list_rules(table=\"inet filter\")\n\n # Set-Elemente hinzufuegen\n NftablesClient.add_to_set(\"geoblock_v4\", \"1.2.3.0\/24\")\n\"\"\"\n\nimport subprocess\nfrom pathlib import Path\nfrom typing import Any\n\nfrom ..command_logger import CommandLogger\nfrom ..debug_logger import DebugLogger\nfrom ..kiglove_artefact_reader import KIGloveArtefactReader\n\n\nclass NftablesClient:\n \"\"\"Fuehrt nftables-Befehle aus (artefact-gesteuert).\"\"\"\n\n NFT_BIN = \"\/usr\/sbin\/nft\"\n NFT_CONF = \"\/etc\/nftables.conf\"\n\n @classmethod\n def _get_config(cls) -> dict[str, Any]:\n \"\"\"Firewall-Config aus Artefakt laden.\"\"\"\n return KIGloveArtefactReader.get_yaml(\"kiglove~sysop~firewall~config\") or {}\n\n @classmethod\n def _get_allowed_sets(cls) -> list[str]:\n \"\"\"Erlaubte Sets aus Artefakt laden.\"\"\"\n config = cls._get_config()\n return config.get(\"allowed_sets\", [])\n\n @classmethod\n def _run_nft(cls, args: list[str], check: bool = True) -> subprocess.CompletedProcess:\n \"\"\"\n Fuehrt nft-Befehl aus.\n\n Args:\n args: Argumente fuer nft (ohne 'nft' selbst).\n check: Bei Fehler Exception werfen.\n\n Returns:\n CompletedProcess mit stdout\/stderr.\n \"\"\"\n cmd = [cls.NFT_BIN] + args\n try:\n result = subprocess.run( # noqa: S603\n cmd,\n capture_output=True,\n text=True,\n check=check,\n timeout=30,\n )\n return result\n except subprocess.CalledProcessError as e:\n DebugLogger.error(f\"nft failed: {e.stderr}\")\n raise\n\n @classmethod\n def status(cls) -> dict[str, Any]:\n \"\"\"\n Gibt Firewall-Status zurueck.\n\n Returns:\n Dict mit tables, rule_count, active.\n \"\"\"\n try:\n result = cls._run_nft([\"list\", \"tables\"], check=False)\n if result.returncode != 0:\n return {\"active\": False, \"error\": result.stderr.strip()}\n\n tables = [t for t in result.stdout.strip().split(\"\\n\") if t]\n\n result_rules = cls._run_nft([\"list\", \"ruleset\"], check=False)\n rule_count = result_rules.stdout.count(\" rule \") if result_rules.stdout else 0\n\n return {\n \"active\": True,\n \"tables\": tables,\n \"table_count\": len(tables),\n \"rule_count\": rule_count\n }\n except (OSError, subprocess.TimeoutExpired) as e:\n DebugLogger.error(f\"status failed: {e}\")\n return {\"active\": False, \"error\": str(e)}\n\n @classmethod\n def list_rules(cls, table: str | None = None) -> str:\n \"\"\"\n Listet aktive Regeln auf.\n\n Args:\n table: Optional - nur bestimmte Tabelle.\n\n Returns:\n Ruleset als String.\n \"\"\"\n if table:\n args = [\"list\", \"table\", table]\n else:\n args = [\"list\", \"ruleset\"]\n\n result = cls._run_nft(args, check=False)\n if result.returncode != 0:\n raise RuntimeError(f\"list_rules failed: {result.stderr}\")\n\n return result.stdout\n\n @classmethod\n def reload(cls, config_file: str | None = None) -> tuple[bool, str]:\n \"\"\"\n Laedt Firewall-Regeln neu.\n\n Args:\n config_file: Optional - alternatives Config-File.\n\n Returns:\n Tuple (success, message).\n \"\"\"\n conf = config_file or cls.NFT_CONF\n\n if not Path(conf).exists():\n return False, f\"Config nicht gefunden: {conf}\"\n\n result = cls._run_nft([\"-f\", conf], check=False)\n\n if result.returncode != 0:\n DebugLogger.error(f\"reload failed: {result.stderr}\")\n return False, result.stderr.strip()\n\n CommandLogger.log(\"nftables\", \"reload\", {\"config\": conf}, \"success\")\n return True, f\"Regeln geladen aus {conf}\"\n\n @classmethod\n def list_sets(cls) -> list[dict[str, Any]]:\n \"\"\"\n Listet alle nftables Sets auf.\n\n Returns:\n Liste von Sets mit name, type, elements.\n \"\"\"\n result = cls._run_nft([\"list\", \"sets\"], check=False)\n if result.returncode != 0:\n return []\n\n sets: list[dict[str, Any]] = []\n current_set: dict[str, Any] | None = None\n\n for line in result.stdout.split(\"\\n\"):\n line = line.strip()\n if line.startswith(\"set \"):\n if current_set:\n sets.append(current_set)\n name = line.split()[1]\n current_set = {\"name\": name, \"type\": \"\", \"elements\": []}\n elif current_set and line.startswith(\"type \"):\n current_set[\"type\"] = line.replace(\"type \", \"\").rstrip(\";\")\n elif current_set and line.startswith(\"elements = {\"):\n elem_str = line.replace(\"elements = {\", \"\").rstrip(\" }\")\n if elem_str:\n current_set[\"elements\"] = [e.strip() for e in elem_str.split(\",\")]\n\n if current_set:\n sets.append(current_set)\n\n return sets\n\n @classmethod\n def add_to_set(\n cls, set_name: str, element: str, table: str = \"inet filter\"\n ) -> tuple[bool, str]:\n \"\"\"\n Fuegt Element zu nftables Set hinzu.\n\n Args:\n set_name: Name des Sets.\n element: IP\/CIDR zum Hinzufuegen.\n table: Tabelle (default: inet filter).\n\n Returns:\n Tuple (success, message).\n \"\"\"\n allowed = cls._get_allowed_sets()\n if allowed and set_name not in allowed:\n return False, f\"Set '{set_name}' nicht in Allowlist\"\n\n result = cls._run_nft(\n [\"add\", \"element\", table, set_name, \"{\", element, \"}\"],\n check=False\n )\n\n if result.returncode != 0:\n DebugLogger.error(f\"add_to_set failed: {result.stderr}\")\n return False, result.stderr.strip()\n\n CommandLogger.log(\n \"nftables\", \"add_to_set\",\n {\"set\": set_name, \"element\": element, \"table\": table},\n \"success\"\n )\n return True, f\"Element {element} zu {set_name} hinzugefuegt\"\n\n @classmethod\n def remove_from_set(\n cls, set_name: str, element: str, table: str = \"inet filter\"\n ) -> tuple[bool, str]:\n \"\"\"\n Entfernt Element aus nftables Set.\n\n Args:\n set_name: Name des Sets.\n element: IP\/CIDR zum Entfernen.\n table: Tabelle (default: inet filter).\n\n Returns:\n Tuple (success, message).\n \"\"\"\n allowed = cls._get_allowed_sets()\n if allowed and set_name not in allowed:\n return False, f\"Set '{set_name}' nicht in Allowlist\"\n\n result = cls._run_nft(\n [\"delete\", \"element\", table, set_name, \"{\", element, \"}\"],\n check=False\n )\n\n if result.returncode != 0:\n DebugLogger.error(f\"remove_from_set failed: {result.stderr}\")\n return False, result.stderr.strip()\n\n CommandLogger.log(\n \"nftables\", \"remove_from_set\",\n {\"set\": set_name, \"element\": element, \"table\": table},\n \"success\"\n )\n return True, f\"Element {element} aus {set_name} entfernt\"\n\n @classmethod\n def flush_set(cls, set_name: str, table: str = \"inet filter\") -> tuple[bool, str]:\n \"\"\"\n Leert ein nftables Set.\n\n Args:\n set_name: Name des Sets.\n table: Tabelle (default: inet filter).\n\n Returns:\n Tuple (success, message).\n \"\"\"\n allowed = cls._get_allowed_sets()\n if allowed and set_name not in allowed:\n return False, f\"Set '{set_name}' nicht in Allowlist\"\n\n result = cls._run_nft([\"flush\", \"set\", table, set_name], check=False)\n\n if result.returncode != 0:\n return False, result.stderr.strip()\n\n CommandLogger.log(\n \"nftables\", \"flush_set\",\n {\"set\": set_name, \"table\": table},\n \"success\"\n )\n return True, f\"Set {set_name} geleert\"\n\n @classmethod\n def create_set(\n cls,\n set_name: str,\n set_type: str = \"ipv4_addr\",\n table: str = \"inet filter\",\n flags: str = \"interval\"\n ) -> tuple[bool, str]:\n \"\"\"\n Erstellt ein nftables Set.\n\n Args:\n set_name: Name des Sets.\n set_type: Typ (ipv4_addr, ipv6_addr, etc).\n table: Tabelle (default: inet filter).\n flags: Set-Flags (interval fuer CIDR).\n\n Returns:\n Tuple (success, message).\n \"\"\"\n allowed = cls._get_allowed_sets()\n if allowed and set_name not in allowed:\n return False, f\"Set {set_name!r} nicht in Allowlist\"\n\n # Tabelle erstellen falls nicht vorhanden\n table_parts = table.split()\n if len(table_parts) == 2:\n cls._run_nft([\"add\", \"table\"] + table_parts, check=False)\n\n # Set erstellen\n result = cls._run_nft(\n [\"add\", \"set\", table, set_name, \"{\",\n \"type\", set_type, \";\", \"flags\", flags, \";\", \"}\"],\n check=False\n )\n\n if result.returncode != 0:\n if \"File exists\" in result.stderr:\n return True, f\"Set {set_name} existiert bereits\"\n DebugLogger.error(f\"create_set failed: {result.stderr}\")\n return False, result.stderr.strip()\n\n CommandLogger.log(\n \"nftables\", \"create_set\",\n {\"set\": set_name, \"type\": set_type, \"table\": table, \"flags\": flags},\n \"success\"\n )\n return True, f\"Set... [TRUNCATED-14a6039cc2d460c8]",
"stderr": "",
"interrupted": false,
"isImage": false,
"noOutputExpected": false
}
}