{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/dev.campus.systemische-tools.de\/src\/Domain\/Entity\/TaskResultBuilder.php",
"content": "<?php\n\ndeclare(strict_types=1);\n\nnamespace Domain\\Entity;\n\n\/\/ @responsibility: Builder für TaskResult-Entity (optionale Felder, Validierung)\n\nuse Domain\\ValueObject\\AssigneeType;\nuse Domain\\ValueObject\\ExecutionDuration;\nuse Domain\\ValueObject\\ExecutionMetrics;\nuse Domain\\ValueObject\\ResultStatus;\nuse Domain\\ValueObject\\TokenCount;\n\nfinal class TaskResultBuilder\n{\n private int $taskId;\n private ?int $assignmentId = null;\n private string $executor;\n private AssigneeType $executorType;\n private ?string $modelName = null;\n private ?string $request = null;\n private ?string $response = null;\n private ExecutionDuration $duration;\n private ExecutionMetrics $metrics;\n private ResultStatus $status = ResultStatus::SUCCESS;\n private ?string $errorMessage = null;\n\n public function __construct(int $taskId, string $executor, AssigneeType $executorType)\n {\n $this->taskId = $taskId;\n $this->executor = $executor;\n $this->executorType = $executorType;\n $this->duration = ExecutionDuration::now();\n $this->metrics = ExecutionMetrics::zero();\n }\n\n public static function for(int $taskId, string $executor, AssigneeType $executorType): self\n {\n return new self($taskId, $executor, $executorType);\n }\n\n public function withAssignment(int $assignmentId): self\n {\n $this->assignmentId = $assignmentId;\n\n return $this;\n }\n\n public function withModel(string $modelName): self\n {\n $this->modelName = $modelName;\n\n return $this;\n }\n\n public function withRequest(string $request): self\n {\n $this->request = $request;\n\n return $this;\n }\n\n public function withResponse(string $response): self\n {\n $this->response = $response;\n $this->duration = $this->duration->completeNow();\n\n return $this;\n }\n\n public function withDuration(ExecutionDuration $duration): self\n {\n $this->duration = $duration;\n\n return $this;\n }\n\n public function withMetrics(ExecutionMetrics $metrics): self\n {\n $this->metrics = $metrics;\n\n return $this;\n }\n\n public function withTokens(int $input, int $output): self\n {\n $this->metrics = $this->metrics->withTokens(TokenCount::create($input, $output));\n\n return $this;\n }\n\n public function withCost(float $costUsd): self\n {\n $this->metrics = $this->metrics->withCost($costUsd);\n\n return $this;\n }\n\n public function withStatus(ResultStatus $status): self\n {\n $this->status = $status;\n\n return $this;\n }\n\n public function withError(string $errorMessage): self\n {\n $this->errorMessage = $errorMessage;\n $this->status = ResultStatus::ERROR;\n\n return $this;\n }\n\n public function estimateTokens(): self\n {\n if ($this->request !== null && $this->response !== null) {\n $this->metrics = ExecutionMetrics::fromEstimation($this->request, $this->response);\n }\n\n return $this;\n }\n\n public function build(): TaskResult\n {\n return new TaskResult(\n $this->taskId,\n $this->assignmentId,\n $this->executor,\n $this->executorType,\n $this->modelName,\n $this->request,\n $this->response,\n $this->duration,\n $this->metrics,\n $this->status,\n $this->errorMessage\n );\n }\n}\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/mcp-servers\/mcp-db\/tests\/test_db_operations.py",
"content": "#!\/usr\/bin\/env python3\n\"\"\"Tests für DB Operations (SELECT, SCHEMA, STATS)\"\"\"\n\nimport sys\nfrom pathlib import Path\n\nimport pytest\n\nsys.path.insert(0, \"\/opt\/mcp-servers\/mcp-db\")\n\nfrom dotenv import load_dotenv\n\nload_dotenv(Path(\"\/opt\/mcp-servers\/mcp-db\/.env\"))\n\nfrom config import Config\nfrom infrastructure.db_connection import DatabaseConnection\n\n\nclass TestDbSelect:\n \"\"\"Test db_select Tool\"\"\"\n\n def test_simple_select_query(self, ki_protokoll_connection):\n \"\"\"Test: Einfache SELECT Query\"\"\"\n cursor = ki_protokoll_connection.cursor(dictionary=True)\n cursor.execute(\"SELECT * FROM mcp_log ORDER BY timestamp DESC LIMIT 10\")\n rows = cursor.fetchall()\n cursor.close()\n\n assert len(rows) >= 0\n\n def test_select_with_prepared_statement(self, ki_protokoll_connection):\n \"\"\"Test: SELECT mit Prepared Statement\"\"\"\n cursor = ki_protokoll_connection.cursor(dictionary=True)\n cursor.execute(\n \"SELECT * FROM mcp_log WHERE status = %s LIMIT 5\", (\"success\",)\n )\n rows = cursor.fetchall()\n cursor.close()\n\n assert isinstance(rows, list)\n\n def test_select_with_max_rows_limit(self, ki_protokoll_connection):\n \"\"\"Test: SELECT mit max_rows Limit\"\"\"\n cursor = ki_protokoll_connection.cursor(dictionary=True)\n cursor.execute(\"SELECT * FROM mcp_log LIMIT 3\")\n rows = cursor.fetchall()\n cursor.close()\n\n assert len(rows) <= 3\n\n def test_select_on_ki_system_database(self, ki_system_connection):\n \"\"\"Test: SELECT auf ki_system Datenbank\"\"\"\n cursor = ki_system_connection.cursor(dictionary=True)\n cursor.execute(\"SELECT * FROM chunks LIMIT 5\")\n rows = cursor.fetchall()\n cursor.close()\n\n assert isinstance(rows, list)\n\n def test_invalid_query_returns_error(self, ki_protokoll_connection):\n \"\"\"Test: Ungültige Query gibt ERROR zurück\"\"\"\n cursor = ki_protokoll_connection.cursor(dictionary=True)\n with pytest.raises(Exception):\n cursor.execute(\"SELECT * FROM nonexistent_table\")\n cursor.fetchall()\n cursor.close()\n\n\nclass TestDbSchema:\n \"\"\"Test db_schema Tool\"\"\"\n\n def test_schema_ki_protokoll(self):\n \"\"\"Test: Schema von ki_protokoll abrufen\"\"\"\n assert \"ki_protokoll\" in Config.ALLOWED_DATABASES\n\n with DatabaseConnection.get_connection(\"ki_protokoll\") as conn:\n cursor = conn.cursor(dictionary=True)\n cursor.execute(\n \"\"\"SELECT TABLE_NAME, TABLE_ROWS, CREATE_TIME\n FROM information_schema.TABLES\n WHERE TABLE_SCHEMA = %s\n AND TABLE_TYPE = 'BASE TABLE'\n ORDER BY TABLE_NAME\"\"\",\n (\"ki_protokoll\",),\n )\n tables = cursor.fetchall()\n cursor.close()\n\n assert len(tables) > 0\n assert any(t[\"TABLE_NAME\"] == \"mcp_log\" for t in tables)\n\n def test_schema_ki_system(self):\n \"\"\"Test: Schema von ki_system abrufen\"\"\"\n assert \"ki_system\" in Config.ALLOWED_DATABASES\n\n with DatabaseConnection.get_connection(\"ki_system\") as conn:\n cursor = conn.cursor(dictionary=True)\n cursor.execute(\n \"\"\"SELECT TABLE_NAME, TABLE_ROWS, CREATE_TIME\n FROM information_schema.TABLES\n WHERE TABLE_SCHEMA = %s\n AND TABLE_TYPE = 'BASE TABLE'\n ORDER BY TABLE_NAME\"\"\",\n (\"ki_system\",),\n )\n tables = cursor.fetchall()\n cursor.close()\n\n assert len(tables) > 0\n\n def test_mysql_database_blocked(self):\n \"\"\"Test: Nicht-erlaubte Datenbank (mysql) blockiert\"\"\"\n assert \"mysql\" not in Config.ALLOWED_DATABASES\n\n\nclass TestDbStats:\n \"\"\"Test db_stats Tool\"\"\"\n\n def test_get_last_logs(self, ki_protokoll_connection):\n \"\"\"Test: Letzte Logs abrufen (Limit=10)\"\"\"\n cursor = ki_protokoll_connection.cursor(dictionary=True)\n cursor.execute(\n \"\"\"SELECT id, timestamp, client_name, request, status,\n duration_ms, error_message\n FROM mcp_log\n ORDER BY timestamp DESC\n LIMIT 10\"\"\"\n )\n logs = cursor.fetchall()\n cursor.close()\n\n assert isinstance(logs, list)\n\n def test_limit_parameter_works(self, ki_protokoll_connection):\n \"\"\"Test: Limit Parameter funktioniert\"\"\"\n limit = 3\n cursor = ki_protokoll_connection.cursor(dictionary=True)\n cursor.execute(\n \"\"\"SELECT id, timestamp, client_name, request, status,\n duration_ms, error_message\n FROM mcp_log\n ORDER BY timestamp DESC\n LIMIT %s\"\"\",\n (limit,),\n )\n logs = cursor.fetchall()\n cursor.close()\n\n assert len(logs) <= limit\n\n\nif __name__ == \"__main__\":\n pytest.main([__file__, \"-v\"])\n",
"structuredPatch": [],
"originalFile": null
}
}