{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/dev.campus.systemische-tools.de\/crawler\/src\/infrastructure\/persistence\/crawl_repository.py",
"content": "from __future__ import annotations\n\nfrom dataclasses import replace\n\nfrom src.domain.entities.crawl import Crawl\nfrom src.domain.ports.crawl_repository_port import CrawlRepositoryPort\nfrom src.infrastructure.persistence.connection import MariaDbConnectionFactory\nfrom src.infrastructure.persistence.mappers.crawl_mapper import crawl_to_row, row_to_crawl\n\n_INSERT = (\n \"INSERT INTO crawls (base_url, mode, started_at, finished_at, status, config_json,\"\n \" total_urls, total_errors, trigger_source)\"\n \" VALUES (%(base_url)s, %(mode)s, %(started_at)s, %(finished_at)s, %(status)s,\"\n \" %(config_json)s, %(total_urls)s, %(total_errors)s, %(trigger_source)s)\"\n)\n_UPDATE = (\n \"UPDATE crawls SET finished_at=%(finished_at)s, status=%(status)s,\"\n \" total_urls=%(total_urls)s, total_errors=%(total_errors)s WHERE id=%(id)s\"\n)\n_SELECT = \"SELECT * FROM crawls WHERE id=%s\"\n_SELECT_RECENT = \"SELECT * FROM crawls ORDER BY started_at DESC LIMIT %s\"\n_UPDATE_COUNTERS = (\n \"UPDATE crawls SET total_urls=%s, total_errors=%s WHERE id=%s\"\n)\n\n\nclass CrawlRepository(CrawlRepositoryPort):\n def __init__(self, connections: MariaDbConnectionFactory) -> None:\n self._connections = connections\n\n def save(self, crawl: Crawl) -> Crawl:\n with self._connections.session() as conn:\n cur = conn.cursor(dictionary=True)\n if crawl.id is None:\n cur.execute(_INSERT, crawl_to_row(crawl))\n return replace(crawl, id=int(cur.lastrowid))\n payload = crawl_to_row(crawl) | {\"id\": crawl.id}\n cur.execute(_UPDATE, payload)\n return crawl\n\n def get(self, crawl_id: int) -> Crawl | None:\n with self._connections.session() as conn:\n cur = conn.cursor(dictionary=True)\n cur.execute(_SELECT, (crawl_id,))\n row = cur.fetchone()\n return row_to_crawl(row) if row else None\n\n def list_recent(self, limit: int) -> list[Crawl]:\n with self._connections.session() as conn:\n cur = conn.cursor(dictionary=True)\n cur.execute(_SELECT_RECENT, (limit,))\n return [row_to_crawl(r) for r in cur.fetchall()]\n\n def update_counters(self, crawl_id: int, total_urls: int, total_errors: int) -> None:\n with self._connections.session() as conn:\n conn.cursor().execute(_UPDATE_COUNTERS, (total_urls, total_errors, crawl_id))\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/dev.campus.systemische-tools.de\/crawler\/src\/infrastructure\/persistence\/crawl_repository.py",
"content": "from __future__ import annotations\n\nfrom dataclasses import replace\n\nfrom src.domain.entities.crawl import Crawl\nfrom src.domain.ports.crawl_repository_port import CrawlRepositoryPort\nfrom src.infrastructure.persistence.connection import MariaDbConnectionFactory\nfrom src.infrastructure.persistence.mappers.crawl_mapper import crawl_to_row, row_to_crawl\n\n_INSERT = (\n \"INSERT INTO crawls (base_url, mode, started_at, finished_at, status, config_json,\"\n \" total_urls, total_errors, trigger_source)\"\n \" VALUES (%(base_url)s, %(mode)s, %(started_at)s, %(finished_at)s, %(status)s,\"\n \" %(config_json)s, %(total_urls)s, %(total_errors)s, %(trigger_source)s)\"\n)\n_UPDATE = (\n \"UPDATE crawls SET finished_at=%(finished_at)s, status=%(status)s,\"\n \" total_urls=%(total_urls)s, total_errors=%(total_errors)s WHERE id=%(id)s\"\n)\n_SELECT = \"SELECT * FROM crawls WHERE id=%s\"\n_SELECT_RECENT = \"SELECT * FROM crawls ORDER BY started_at DESC LIMIT %s\"\n_UPDATE_COUNTERS = (\n \"UPDATE crawls SET total_urls=%s, total_errors=%s WHERE id=%s\"\n)\n\n\nclass CrawlRepository(CrawlRepositoryPort):\n def __init__(self, connections: MariaDbConnectionFactory) -> None:\n self._connections = connections\n\n def save(self, crawl: Crawl) -> Crawl:\n with self._connections.session() as conn:\n cur = conn.cursor(dictionary=True)\n if crawl.id is None:\n cur.execute(_INSERT, crawl_to_row(crawl))\n return replace(crawl, id=int(cur.lastrowid))\n payload = crawl_to_row(crawl) | {\"id\": crawl.id}\n cur.execute(_UPDATE, payload)\n return crawl\n\n def get(self, crawl_id: int) -> Crawl | None:\n with self._connections.session() as conn:\n cur = conn.cursor(dictionary=True)\n cur.execute(_SELECT, (crawl_id,))\n row = cur.fetchone()\n return row_to_crawl(row) if row else None\n\n def list_recent(self, limit: int) -> list[Crawl]:\n with self._connections.session() as conn:\n cur = conn.cursor(dictionary=True)\n cur.execute(_SELECT_RECENT, (limit,))\n return [row_to_crawl(r) for r in cur.fetchall()]\n\n def update_counters(self, crawl_id: int, total_urls: int, total_errors: int) -> None:\n with self._connections.session() as conn:\n conn.cursor().execute(_UPDATE_COUNTERS, (total_urls, total_errors, crawl_id))\n",
"structuredPatch": [],
"originalFile": null,
"userModified": false
}
}