{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/dev.campus.systemische-tools.de\/crawler\/src\/interfaces\/cli\/worker.py",
"content": "from __future__ import annotations\n\nimport asyncio\nfrom dataclasses import dataclass\n\nfrom src.application.use_cases.process_url import ProcessUrlUseCase\nfrom src.domain.entities.crawl_error import CrawlError\nfrom src.domain.ports.clock_port import ClockPort\nfrom src.domain.ports.crawl_error_repository_port import CrawlErrorRepositoryPort\nfrom src.domain.ports.url_queue_port import UrlQueuePort\n\n\n@dataclass(frozen=True, slots=True)\nclass WorkerConfig:\n crawl_id: int\n concurrency: int\n max_urls: int | None = None\n\n\nclass QueueWorker:\n def __init__(\n self,\n *,\n queue: UrlQueuePort,\n process_url: ProcessUrlUseCase,\n errors: CrawlErrorRepositoryPort,\n clock: ClockPort,\n ) -> None:\n self._queue = queue\n self._process = process_url\n self._errors = errors\n self._clock = clock\n\n async def run(self, config: WorkerConfig) -> int:\n semaphore = asyncio.Semaphore(config.concurrency)\n processed = 0\n tasks: list[asyncio.Task[None]] = []\n while processed < (config.max_urls or float(\"inf\")):\n url = self._queue.dequeue(config.crawl_id)\n if url is None:\n if not tasks:\n break\n await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)\n tasks = [t for t in tasks if not t.done()]\n continue\n processed += 1\n tasks.append(asyncio.create_task(self._wrap(semaphore, config.crawl_id, url)))\n tasks = [t for t in tasks if not t.done()]\n if tasks:\n await asyncio.gather(*tasks, return_exceptions=True)\n return processed\n\n async def _wrap(\n self, semaphore: asyncio.Semaphore, crawl_id: int, url: object,\n ) -> None:\n async with semaphore:\n try:\n await self._process.execute(crawl_id=crawl_id, url=url) # type: ignore[arg-type]\n except Exception as exc: # noqa: BLE001 — interface-layer boundary\n self._errors.save(CrawlError(\n id=None, crawl_id=crawl_id, url=getattr(url, \"url\", \"?\"),\n error_type=type(exc).__name__, error_msg=str(exc),\n occurred_at=self._clock.now(),\n ))\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/dev.campus.systemische-tools.de\/crawler\/src\/interfaces\/cli\/worker.py",
"content": "from __future__ import annotations\n\nimport asyncio\nfrom dataclasses import dataclass\n\nfrom src.application.use_cases.process_url import ProcessUrlUseCase\nfrom src.domain.entities.crawl_error import CrawlError\nfrom src.domain.ports.clock_port import ClockPort\nfrom src.domain.ports.crawl_error_repository_port import CrawlErrorRepositoryPort\nfrom src.domain.ports.url_queue_port import UrlQueuePort\n\n\n@dataclass(frozen=True, slots=True)\nclass WorkerConfig:\n crawl_id: int\n concurrency: int\n max_urls: int | None = None\n\n\nclass QueueWorker:\n def __init__(\n self,\n *,\n queue: UrlQueuePort,\n process_url: ProcessUrlUseCase,\n errors: CrawlErrorRepositoryPort,\n clock: ClockPort,\n ) -> None:\n self._queue = queue\n self._process = process_url\n self._errors = errors\n self._clock = clock\n\n async def run(self, config: WorkerConfig) -> int:\n semaphore = asyncio.Semaphore(config.concurrency)\n processed = 0\n tasks: list[asyncio.Task[None]] = []\n while processed < (config.max_urls or float(\"inf\")):\n url = self._queue.dequeue(config.crawl_id)\n if url is None:\n if not tasks:\n break\n await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)\n tasks = [t for t in tasks if not t.done()]\n continue\n processed += 1\n tasks.append(asyncio.create_task(self._wrap(semaphore, config.crawl_id, url)))\n tasks = [t for t in tasks if not t.done()]\n if tasks:\n await asyncio.gather(*tasks, return_exceptions=True)\n return processed\n\n async def _wrap(\n self, semaphore: asyncio.Semaphore, crawl_id: int, url: object,\n ) -> None:\n async with semaphore:\n try:\n await self._process.execute(crawl_id=crawl_id, url=url) # type: ignore[arg-type]\n except Exception as exc: # noqa: BLE001 — interface-layer boundary\n self._errors.save(CrawlError(\n id=None, crawl_id=crawl_id, url=getattr(url, \"url\", \"?\"),\n error_type=type(exc).__name__, error_msg=str(exc),\n occurred_at=self._clock.now(),\n ))\n",
"structuredPatch": [],
"originalFile": null,
"userModified": false
}
}