{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/dev.campus.systemische-tools.de\/crawler\/src\/domain\/value_objects\/normalized_url.py",
"content": "from __future__ import annotations\n\nimport hashlib\nfrom dataclasses import dataclass, field\nfrom urllib.parse import SplitResult, parse_qsl, urlencode, urlsplit\n\nDEFAULT_PORTS = {\"http\": 80, \"https\": 443}\n\n\n@dataclass(frozen=True, slots=True)\nclass NormalizedUrl:\n raw: str\n scheme: str\n host: str\n port: int | None\n path: str\n query: str\n url_hash: str = field(compare=False)\n\n @property\n def url(self) -> str:\n authority = self.host if self.port is None else f\"{self.host}:{self.port}\"\n tail = f\"?{self.query}\" if self.query else \"\"\n return f\"{self.scheme}:\/\/{authority}{self.path}{tail}\"\n\n\ndef normalize(raw_url: str, *, strip_tracking: bool = False) -> NormalizedUrl:\n parts = urlsplit(raw_url.strip())\n scheme = parts.scheme.lower()\n host = parts.hostname.lower() if parts.hostname else \"\"\n port = _effective_port(scheme, parts.port)\n path = parts.path or \"\/\"\n query = _normalize_query(parts.query, strip_tracking=strip_tracking)\n canonical = _canonical_string(scheme, host, port, path, query)\n return NormalizedUrl(\n raw=raw_url,\n scheme=scheme,\n host=host,\n port=port,\n path=path,\n query=query,\n url_hash=hashlib.sha256(canonical.encode(\"utf-8\")).hexdigest(),\n )\n\n\ndef _effective_port(scheme: str, port: int | None) -> int | None:\n if port is None:\n return None\n if DEFAULT_PORTS.get(scheme) == port:\n return None\n return port\n\n\ndef _normalize_query(raw_query: str, *, strip_tracking: bool) -> str:\n if not raw_query:\n return \"\"\n pairs = parse_qsl(raw_query, keep_blank_values=True)\n if strip_tracking:\n pairs = [(k, v) for k, v in pairs if not _is_tracking_key(k)]\n pairs.sort()\n return urlencode(pairs, doseq=True)\n\n\ndef _is_tracking_key(key: str) -> bool:\n lowered = key.lower()\n return lowered.startswith(\"utm_\") or lowered in {\"gclid\", \"fbclid\", \"mc_cid\", \"mc_eid\"}\n\n\ndef _canonical_string(scheme: str, host: str, port: int | None, path: str, query: str) -> str:\n authority = host if port is None else f\"{host}:{port}\"\n tail = f\"?{query}\" if query else \"\"\n return f\"{scheme}:\/\/{authority}{path}{tail}\"\n\n\n__all__ = [\"NormalizedUrl\", \"SplitResult\", \"normalize\"]\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/dev.campus.systemische-tools.de\/crawler\/src\/domain\/value_objects\/normalized_url.py",
"content": "from __future__ import annotations\n\nimport hashlib\nfrom dataclasses import dataclass, field\nfrom urllib.parse import SplitResult, parse_qsl, urlencode, urlsplit\n\nDEFAULT_PORTS = {\"http\": 80, \"https\": 443}\n\n\n@dataclass(frozen=True, slots=True)\nclass NormalizedUrl:\n raw: str\n scheme: str\n host: str\n port: int | None\n path: str\n query: str\n url_hash: str = field(compare=False)\n\n @property\n def url(self) -> str:\n authority = self.host if self.port is None else f\"{self.host}:{self.port}\"\n tail = f\"?{self.query}\" if self.query else \"\"\n return f\"{self.scheme}:\/\/{authority}{self.path}{tail}\"\n\n\ndef normalize(raw_url: str, *, strip_tracking: bool = False) -> NormalizedUrl:\n parts = urlsplit(raw_url.strip())\n scheme = parts.scheme.lower()\n host = parts.hostname.lower() if parts.hostname else \"\"\n port = _effective_port(scheme, parts.port)\n path = parts.path or \"\/\"\n query = _normalize_query(parts.query, strip_tracking=strip_tracking)\n canonical = _canonical_string(scheme, host, port, path, query)\n return NormalizedUrl(\n raw=raw_url,\n scheme=scheme,\n host=host,\n port=port,\n path=path,\n query=query,\n url_hash=hashlib.sha256(canonical.encode(\"utf-8\")).hexdigest(),\n )\n\n\ndef _effective_port(scheme: str, port: int | None) -> int | None:\n if port is None:\n return None\n if DEFAULT_PORTS.get(scheme) == port:\n return None\n return port\n\n\ndef _normalize_query(raw_query: str, *, strip_tracking: bool) -> str:\n if not raw_query:\n return \"\"\n pairs = parse_qsl(raw_query, keep_blank_values=True)\n if strip_tracking:\n pairs = [(k, v) for k, v in pairs if not _is_tracking_key(k)]\n pairs.sort()\n return urlencode(pairs, doseq=True)\n\n\ndef _is_tracking_key(key: str) -> bool:\n lowered = key.lower()\n return lowered.startswith(\"utm_\") or lowered in {\"gclid\", \"fbclid\", \"mc_cid\", \"mc_eid\"}\n\n\ndef _canonical_string(scheme: str, host: str, port: int | None, path: str, query: str) -> str:\n authority = host if port is None else f\"{host}:{port}\"\n tail = f\"?{query}\" if query else \"\"\n return f\"{scheme}:\/\/{authority}{path}{tail}\"\n\n\n__all__ = [\"NormalizedUrl\", \"SplitResult\", \"normalize\"]\n",
"structuredPatch": [],
"originalFile": null,
"userModified": false
}
}