{
"event": "PreToolUse",
"tool_name": "Read",
"tool_input": {
"file_path": "\/var\/www\/dev.campus.systemische-tools.de\/crawler\/src\/infrastructure\/queue\/db_url_queue.py",
"limit": 45
}
}
{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/dev.campus.systemische-tools.de\/crawler\/src\/infrastructure\/queue\/db_url_queue.py",
"content": "from __future__ import annotations\n\nfrom datetime import UTC, datetime\n\nfrom src.domain.ports.url_queue_port import UrlQueuePort\nfrom src.domain.value_objects.normalized_url import NormalizedUrl, normalize\nfrom src.infrastructure.persistence.connection import MariaDbConnectionFactory\n\n_ENQUEUE = (\n \"INSERT INTO url_queue (crawl_id, url_hash, url, enqueued_at, status)\"\n \" VALUES (%s, %s, %s, %s, 'pending')\"\n \" ON DUPLICATE KEY UPDATE id=id\"\n)\n_CLAIM_SELECT = (\n \"SELECT id, url_hash, url FROM url_queue\"\n \" WHERE crawl_id=%s AND status='pending'\"\n \" ORDER BY enqueued_at LIMIT 1 FOR UPDATE\"\n)\n_CLAIM_UPDATE = \"UPDATE url_queue SET status='in_progress' WHERE id=%s\"\n_PENDING_COUNT = \"SELECT COUNT(*) AS c FROM url_queue WHERE crawl_id=%s AND status='pending'\"\n_MARK_DONE = \"UPDATE url_queue SET status='done' WHERE crawl_id=%s AND url_hash=%s\"\n\n\nclass DbUrlQueue(UrlQueuePort):\n def __init__(self, connections: MariaDbConnectionFactory) -> None:\n self._connections = connections\n\n def enqueue(self, crawl_id: int, url: NormalizedUrl) -> bool:\n with self._connections.session() as conn:\n cur = conn.cursor()\n cur.execute(\n _ENQUEUE,\n (crawl_id, url.url_hash, url.url, datetime.now(tz=UTC)),\n )\n return bool(cur.rowcount > 0)\n\n def dequeue(self, crawl_id: int) -> NormalizedUrl | None:\n with self._connections.session() as conn:\n cur = conn.cursor(dictionary=True)\n cur.execute(_CLAIM_SELECT, (crawl_id,))\n row = cur.fetchone()\n if row is None:\n return None\n cur.execute(_CLAIM_UPDATE, (row[\"id\"],))\n return _rebuild_url(row[\"url\"], row[\"url_hash\"])",
"numLines": 45,
"startLine": 1,
"totalLines": 72
}
}
}