{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/db_prompts.py",
"content": "\"\"\"\nDatabase Prompts Mixin\n\nSingle Responsibility: Prompt retrieval and management.\n\"\"\"\n\n\nclass PromptsMixin:\n \"\"\"Mixin for prompt operations.\n\n Provides:\n - get_prompt(): Get prompt by name\n - get_prompt_by_use_case(): Get prompt by use case with full metadata\n \"\"\"\n\n def get_prompt(self, name: str, version: str = None) -> str | None:\n \"\"\"Get a prompt by name (optionally specific version).\n\n Args:\n name: Prompt name\n version: Optional specific version\n\n Returns:\n Prompt content or None if not found\n \"\"\"\n if version:\n cursor = self.execute(\n \"SELECT content FROM prompts WHERE name = %s AND version = %s\",\n (name, version),\n )\n else:\n cursor = self.execute(\n \"\"\"SELECT content FROM prompts\n WHERE name = %s AND is_active = 1\n ORDER BY version DESC LIMIT 1\"\"\",\n (name,),\n )\n result = cursor.fetchone()\n cursor.close()\n return result[\"content\"] if result else None\n\n def get_prompt_by_use_case(self, use_case: str, version: str = None) -> dict | None:\n \"\"\"Get prompt by use_case with full metadata for provenance tracking.\n\n Args:\n use_case: The use case (entity_extraction, semantic_analysis, etc.)\n version: Optional specific version (otherwise latest active)\n\n Returns:\n Dict with id, name, version, content, use_case or None\n \"\"\"\n if version:\n cursor = self.execute(\n \"\"\"SELECT id, name, version, content, use_case\n FROM prompts\n WHERE use_case = %s AND version = %s\"\"\",\n (use_case, version),\n )\n else:\n cursor = self.execute(\n \"\"\"SELECT id, name, version, content, use_case\n FROM prompts\n WHERE use_case = %s AND is_active = 1\n ORDER BY version DESC LIMIT 1\"\"\",\n (use_case,),\n )\n result = cursor.fetchone()\n cursor.close()\n return result\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/scripts\/pipeline\/db_prompts.py",
"content": "\"\"\"\nDatabase Prompts Mixin\n\nSingle Responsibility: Prompt retrieval and management.\n\"\"\"\n\n\nclass PromptsMixin:\n \"\"\"Mixin for prompt operations.\n\n Provides:\n - get_prompt(): Get prompt by name\n - get_prompt_by_use_case(): Get prompt by use case with full metadata\n \"\"\"\n\n def get_prompt(self, name: str, version: str = None) -> str | None:\n \"\"\"Get a prompt by name (optionally specific version).\n\n Args:\n name: Prompt name\n version: Optional specific version\n\n Returns:\n Prompt content or None if not found\n \"\"\"\n if version:\n cursor = self.execute(\n \"SELECT content FROM prompts WHERE name = %s AND version = %s\",\n (name, version),\n )\n else:\n cursor = self.execute(\n \"\"\"SELECT content FROM prompts\n WHERE name = %s AND is_active = 1\n ORDER BY version DESC LIMIT 1\"\"\",\n (name,),\n )\n result = cursor.fetchone()\n cursor.close()\n return result[\"content\"] if result else None\n\n def get_prompt_by_use_case(self, use_case: str, version: str = None) -> dict | None:\n \"\"\"Get prompt by use_case with full metadata for provenance tracking.\n\n Args:\n use_case: The use case (entity_extraction, semantic_analysis, etc.)\n version: Optional specific version (otherwise latest active)\n\n Returns:\n Dict with id, name, version, content, use_case or None\n \"\"\"\n if version:\n cursor = self.execute(\n \"\"\"SELECT id, name, version, content, use_case\n FROM prompts\n WHERE use_case = %s AND version = %s\"\"\",\n (use_case, version),\n )\n else:\n cursor = self.execute(\n \"\"\"SELECT id, name, version, content, use_case\n FROM prompts\n WHERE use_case = %s AND is_active = 1\n ORDER BY version DESC LIMIT 1\"\"\",\n (use_case,),\n )\n result = cursor.fetchone()\n cursor.close()\n return result\n",
"structuredPatch": [],
"originalFile": null
}
}