contract_repository.py
- Pfad:
/var/www/mcp-servers/mcp-contracts/infrastructure/contract_repository.py - Namespace: -
- Zeilen: 394 | Größe: 16,106 Bytes
- Geändert: 2025-12-28 12:52:21 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 67
- Dependencies: 20 (25%)
- LOC: 35 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 13
- use json
- use uuid
- use datetime.datetime
- use typing.Any
- use typing.Dict
- use typing.List
- use typing.Optional
- use config.Config
- use domain.contracts.Contract
- use domain.contracts.ContractHistory
- use domain.contracts.ContractStatus
- use domain.contracts.ContractValidation
- use shared.infrastructure.SimpleDbConnection
Klassen 1
-
ContractRepositoryclass Zeile 13
Code
"""Contract Repository - Datenbankzugriff für Contracts."""
import json
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from config import Config
from domain.contracts import Contract, ContractHistory, ContractStatus, ContractValidation
from shared.infrastructure import SimpleDbConnection
class ContractRepository:
"""Repository für Contract-CRUD-Operationen"""
# ==================== CONTRACT CRUD ====================
def find_by_id(self, contract_id: int) -> Optional[Contract]:
"""Findet Contract nach ID"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM contracts WHERE id = %s", (contract_id,))
row = cursor.fetchone()
return self._row_to_contract(row) if row else None
def find_by_name(self, name: str, version: Optional[str] = None) -> Optional[Contract]:
"""Findet Contract nach Name (optional: spezifische Version)"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
if version:
cursor.execute(
"SELECT * FROM contracts WHERE name = %s AND version = %s",
(name, version)
)
else:
# Neueste aktive Version
cursor.execute(
"""SELECT * FROM contracts
WHERE name = %s AND status = 'active'
ORDER BY version DESC LIMIT 1""",
(name,)
)
row = cursor.fetchone()
return self._row_to_contract(row) if row else None
def find_all(
self,
status: Optional[str] = None,
search: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> List[Contract]:
"""Findet Contracts mit Filtern"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
sql = "SELECT * FROM contracts WHERE 1=1"
params = []
if status:
sql += " AND status = %s"
params.append(status)
if search:
sql += " AND (name LIKE %s OR scope_description LIKE %s)"
params.extend([f"%{search}%", f"%{search}%"])
sql += " ORDER BY name ASC, version DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
cursor.execute(sql, params)
rows = cursor.fetchall()
return [self._row_to_contract(row) for row in rows]
def count(self, status: Optional[str] = None, search: Optional[str] = None) -> int:
"""Zählt Contracts mit Filtern"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
sql = "SELECT COUNT(*) as cnt FROM contracts WHERE 1=1"
params = []
if status:
sql += " AND status = %s"
params.append(status)
if search:
sql += " AND (name LIKE %s OR scope_description LIKE %s)"
params.extend([f"%{search}%", f"%{search}%"])
cursor.execute(sql, params)
row = cursor.fetchone()
return row["cnt"] if row else 0
def create(self, contract: Contract) -> int:
"""Erstellt neuen Contract, gibt ID zurück"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
contract.uuid = str(uuid.uuid4())
contract.created_at = datetime.now()
contract.updated_at = datetime.now()
sql = """
INSERT INTO contracts
(uuid, name, version, status, yaml_content, scope_description,
created_at, created_by, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
status_value = contract.status.value if isinstance(contract.status, ContractStatus) else contract.status
cursor.execute(sql, (
contract.uuid,
contract.name,
contract.version,
status_value,
contract.yaml_content,
contract.scope_description,
contract.created_at,
contract.created_by,
contract.updated_at,
))
conn.commit()
return cursor.lastrowid
def update(self, contract_id: int, updates: Dict[str, Any]) -> bool:
"""Aktualisiert Contract-Felder"""
if not updates:
return False
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
set_parts = []
params = []
for key, value in updates.items():
if key in ["name", "version", "status", "yaml_content", "scope_description"]:
set_parts.append(f"{key} = %s")
if key == "status" and isinstance(value, ContractStatus):
params.append(value.value)
else:
params.append(value)
if not set_parts:
return False
set_parts.append("updated_at = %s")
params.append(datetime.now())
params.append(contract_id)
sql = f"UPDATE contracts SET {', '.join(set_parts)} WHERE id = %s"
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount > 0
def delete(self, contract_id: int) -> bool:
"""Löscht Contract (cascade löscht auch history und validations)"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM contracts WHERE id = %s", (contract_id,))
conn.commit()
return cursor.rowcount > 0
def deprecate(self, contract_id: int) -> bool:
"""Markiert Contract als deprecated"""
return self.update(contract_id, {"status": "deprecated"})
# ==================== VERSIONING ====================
def create_new_version(
self,
contract_id: int,
new_yaml: str,
new_version: str,
change_description: str,
changed_by: str = "mcp-contracts"
) -> int:
"""Erstellt neue Version eines Contracts"""
old_contract = self.find_by_id(contract_id)
if not old_contract:
raise ValueError(f"Contract {contract_id} not found")
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
# Historie speichern
cursor.execute("""
INSERT INTO contract_history
(contract_id, previous_yaml, previous_version, change_description, changed_by)
VALUES (%s, %s, %s, %s, %s)
""", (
contract_id,
old_contract.yaml_content,
old_contract.version,
change_description,
changed_by,
))
# Contract aktualisieren
cursor.execute("""
UPDATE contracts
SET yaml_content = %s, version = %s, updated_at = %s
WHERE id = %s
""", (new_yaml, new_version, datetime.now(), contract_id))
conn.commit()
return contract_id
def get_versions(self, contract_id: int) -> List[Dict[str, Any]]:
"""Holt alle Versionen eines Contracts"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
# Aktuelle Version
cursor.execute(
"SELECT version, updated_at, 'current' as type FROM contracts WHERE id = %s",
(contract_id,)
)
current = cursor.fetchone()
# Historische Versionen
cursor.execute("""
SELECT previous_version as version, changed_at as updated_at,
'history' as type, change_description, changed_by
FROM contract_history
WHERE contract_id = %s
ORDER BY changed_at DESC
""", (contract_id,))
history = cursor.fetchall()
versions = []
if current:
versions.append({
"version": current["version"],
"updated_at": current["updated_at"].isoformat() if current["updated_at"] else None,
"type": "current",
})
for h in history:
versions.append({
"version": h["version"],
"updated_at": h["updated_at"].isoformat() if h["updated_at"] else None,
"type": "history",
"change_description": h["change_description"],
"changed_by": h["changed_by"],
})
return versions
# ==================== HISTORY ====================
def get_history(self, contract_id: int) -> List[ContractHistory]:
"""Holt Änderungshistorie eines Contracts"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM contract_history
WHERE contract_id = %s
ORDER BY changed_at DESC
""", (contract_id,))
rows = cursor.fetchall()
return [self._row_to_history(row) for row in rows]
# ==================== VALIDATIONS ====================
def save_validation(self, validation: ContractValidation) -> int:
"""Speichert Validierungsergebnis"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
violations_json = json.dumps(validation.violations) if validation.violations else None
cursor.execute("""
INSERT INTO contract_validations
(contract_id, validated_at, result, critical_count, major_count,
minor_count, violations, triggered_by, target_path, duration_ms)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
validation.contract_id,
validation.validated_at or datetime.now(),
validation.result,
validation.critical_count,
validation.major_count,
validation.minor_count,
violations_json,
validation.triggered_by,
validation.target_path,
validation.duration_ms,
))
conn.commit()
return cursor.lastrowid
def get_validations(
self,
contract_id: int,
limit: int = 10,
) -> List[ContractValidation]:
"""Holt letzte Validierungen eines Contracts"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM contract_validations
WHERE contract_id = %s
ORDER BY validated_at DESC
LIMIT %s
""", (contract_id, limit))
rows = cursor.fetchall()
return [self._row_to_validation(row) for row in rows]
def get_last_validation(self, contract_id: int) -> Optional[ContractValidation]:
"""Holt letzte Validierung eines Contracts"""
validations = self.get_validations(contract_id, limit=1)
return validations[0] if validations else None
# ==================== STATISTICS ====================
def get_statistics(self) -> Dict[str, Any]:
"""Holt Contract-Statistiken"""
with SimpleDbConnection.get_connection(Config) as conn:
with conn.cursor() as cursor:
# Contracts nach Status
cursor.execute("""
SELECT status, COUNT(*) as count
FROM contracts
GROUP BY status
""")
by_status = {row["status"]: row["count"] for row in cursor.fetchall()}
# Validierungen letzte 7 Tage
cursor.execute("""
SELECT result, COUNT(*) as count
FROM contract_validations
WHERE validated_at >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY result
""")
validations_7d = {row["result"]: row["count"] for row in cursor.fetchall()}
# Gesamtzahlen
cursor.execute("SELECT COUNT(*) as total FROM contracts")
total = cursor.fetchone()["total"]
cursor.execute("SELECT COUNT(*) as total FROM contract_validations")
total_validations = cursor.fetchone()["total"]
return {
"total_contracts": total,
"by_status": by_status,
"total_validations": total_validations,
"validations_last_7_days": validations_7d,
}
# ==================== HELPERS ====================
def _row_to_contract(self, row: Dict[str, Any]) -> Contract:
"""Konvertiert DB-Row zu Contract"""
return Contract(
id=row["id"],
uuid=row["uuid"],
name=row["name"],
version=row["version"],
status=ContractStatus(row["status"]) if row["status"] else ContractStatus.ACTIVE,
yaml_content=row["yaml_content"],
scope_description=row.get("scope_description"),
created_at=row["created_at"],
created_by=row.get("created_by", "system"),
updated_at=row["updated_at"],
)
def _row_to_history(self, row: Dict[str, Any]) -> ContractHistory:
"""Konvertiert DB-Row zu ContractHistory"""
return ContractHistory(
id=row["id"],
contract_id=row["contract_id"],
previous_yaml=row.get("previous_yaml"),
previous_version=row.get("previous_version"),
change_description=row.get("change_description"),
changed_at=row["changed_at"],
changed_by=row.get("changed_by", "system"),
)
def _row_to_validation(self, row: Dict[str, Any]) -> ContractValidation:
"""Konvertiert DB-Row zu ContractValidation"""
violations = []
if row.get("violations"):
try:
violations = json.loads(row["violations"]) if isinstance(row["violations"], str) else row["violations"]
except Exception:
violations = []
return ContractValidation(
id=row["id"],
contract_id=row["contract_id"],
validated_at=row["validated_at"],
result=row["result"],
critical_count=row.get("critical_count", 0),
major_count=row.get("major_count", 0),
minor_count=row.get("minor_count", 0),
violations=violations,
triggered_by=row.get("triggered_by", "mcp"),
target_path=row.get("target_path"),
duration_ms=row.get("duration_ms", 0),
)