base.py
- Pfad:
/var/www/mcp-servers/mcp-tasks/tools/task_tools/base.py - Namespace: -
- Zeilen: 66 | Größe: 1,819 Bytes
- Geändert: 2025-12-28 12:57:38 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 95
- Dependencies: 80 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 7
- use json
- use datetime.datetime
- use typing.Any
- use config.Config
- use infrastructure.task_repository.TaskRepository
- use shared.domain.LogEntry
- use shared.infrastructure.get_logger
Funktionen 6
-
get_repo()Zeile 13 -
get_task_logger()Zeile 18 -
validate_status()Zeile 23 -
validate_type()Zeile 30 -
validate_executor_type()Zeile 37 -
log_tool_call()Zeile 44
Code
"""Base utilities for task tools."""
import json
from datetime import datetime
from typing import Any
from config import Config
from infrastructure.task_repository import TaskRepository
from shared.domain import LogEntry
from shared.infrastructure import get_logger
def get_repo():
"""Get TaskRepository singleton."""
return TaskRepository()
def get_task_logger():
"""Get protokoll logger singleton."""
return get_logger("mcp-tasks", Config)
def validate_status(status: str) -> tuple[bool, str | None]:
"""Validate status value."""
if status and status not in Config.VALID_STATUSES:
return False, f"Invalid status: {status}"
return True, None
def validate_type(task_type: str) -> tuple[bool, str | None]:
"""Validate task type value."""
if task_type and task_type not in Config.VALID_TYPES:
return False, f"Invalid type: {task_type}"
return True, None
def validate_executor_type(executor_type: str) -> tuple[bool, str | None]:
"""Validate executor type value."""
if executor_type not in Config.VALID_EXECUTOR_TYPES:
return False, f"Invalid executor_type: {executor_type}"
return True, None
def log_tool_call(
logger,
tool_name: str,
request_str: str,
status: str,
duration_ms: int = 0,
task_id: int | None = None,
error_message: str | None = None,
):
"""Log a tool call to protokoll."""
logger.log(
LogEntry(
timestamp=datetime.now(),
client_name="mcp-tasks",
tool_name=tool_name,
request=request_str,
status=status,
duration_ms=duration_ms,
context_id=task_id, # task_id maps to context_id in shared LogEntry
error_message=error_message[:200] if error_message else None,
)
)