contract_repository.py

Code Hygiene Score: 67

Keine Issues gefunden.

Dependencies 13

Klassen 1

Code

"""Contract Repository - Datenbankzugriff für Contracts."""

import json
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional

from config import Config
from domain.contracts import Contract, ContractHistory, ContractStatus, ContractValidation
from shared.infrastructure import SimpleDbConnection


class ContractRepository:
    """Repository für Contract-CRUD-Operationen"""

    # ==================== CONTRACT CRUD ====================

    def find_by_id(self, contract_id: int) -> Optional[Contract]:
        """Findet Contract nach ID"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute("SELECT * FROM contracts WHERE id = %s", (contract_id,))
                row = cursor.fetchone()
                return self._row_to_contract(row) if row else None

    def find_by_name(self, name: str, version: Optional[str] = None) -> Optional[Contract]:
        """Findet Contract nach Name (optional: spezifische Version)"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                if version:
                    cursor.execute(
                        "SELECT * FROM contracts WHERE name = %s AND version = %s",
                        (name, version)
                    )
                else:
                    # Neueste aktive Version
                    cursor.execute(
                        """SELECT * FROM contracts
                           WHERE name = %s AND status = 'active'
                           ORDER BY version DESC LIMIT 1""",
                        (name,)
                    )
                row = cursor.fetchone()
                return self._row_to_contract(row) if row else None

    def find_all(
        self,
        status: Optional[str] = None,
        search: Optional[str] = None,
        limit: int = 50,
        offset: int = 0,
    ) -> List[Contract]:
        """Findet Contracts mit Filtern"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                sql = "SELECT * FROM contracts WHERE 1=1"
                params = []

                if status:
                    sql += " AND status = %s"
                    params.append(status)

                if search:
                    sql += " AND (name LIKE %s OR scope_description LIKE %s)"
                    params.extend([f"%{search}%", f"%{search}%"])

                sql += " ORDER BY name ASC, version DESC LIMIT %s OFFSET %s"
                params.extend([limit, offset])

                cursor.execute(sql, params)
                rows = cursor.fetchall()
                return [self._row_to_contract(row) for row in rows]

    def count(self, status: Optional[str] = None, search: Optional[str] = None) -> int:
        """Zählt Contracts mit Filtern"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                sql = "SELECT COUNT(*) as cnt FROM contracts WHERE 1=1"
                params = []

                if status:
                    sql += " AND status = %s"
                    params.append(status)

                if search:
                    sql += " AND (name LIKE %s OR scope_description LIKE %s)"
                    params.extend([f"%{search}%", f"%{search}%"])

                cursor.execute(sql, params)
                row = cursor.fetchone()
                return row["cnt"] if row else 0

    def create(self, contract: Contract) -> int:
        """Erstellt neuen Contract, gibt ID zurück"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                contract.uuid = str(uuid.uuid4())
                contract.created_at = datetime.now()
                contract.updated_at = datetime.now()

                sql = """
                    INSERT INTO contracts
                    (uuid, name, version, status, yaml_content, scope_description,
                     created_at, created_by, updated_at)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
                """
                status_value = contract.status.value if isinstance(contract.status, ContractStatus) else contract.status
                cursor.execute(sql, (
                    contract.uuid,
                    contract.name,
                    contract.version,
                    status_value,
                    contract.yaml_content,
                    contract.scope_description,
                    contract.created_at,
                    contract.created_by,
                    contract.updated_at,
                ))
                conn.commit()
                return cursor.lastrowid

    def update(self, contract_id: int, updates: Dict[str, Any]) -> bool:
        """Aktualisiert Contract-Felder"""
        if not updates:
            return False

        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                set_parts = []
                params = []

                for key, value in updates.items():
                    if key in ["name", "version", "status", "yaml_content", "scope_description"]:
                        set_parts.append(f"{key} = %s")
                        if key == "status" and isinstance(value, ContractStatus):
                            params.append(value.value)
                        else:
                            params.append(value)

                if not set_parts:
                    return False

                set_parts.append("updated_at = %s")
                params.append(datetime.now())
                params.append(contract_id)

                sql = f"UPDATE contracts SET {', '.join(set_parts)} WHERE id = %s"
                cursor.execute(sql, params)
                conn.commit()
                return cursor.rowcount > 0

    def delete(self, contract_id: int) -> bool:
        """Löscht Contract (cascade löscht auch history und validations)"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute("DELETE FROM contracts WHERE id = %s", (contract_id,))
                conn.commit()
                return cursor.rowcount > 0

    def deprecate(self, contract_id: int) -> bool:
        """Markiert Contract als deprecated"""
        return self.update(contract_id, {"status": "deprecated"})

    # ==================== VERSIONING ====================

    def create_new_version(
        self,
        contract_id: int,
        new_yaml: str,
        new_version: str,
        change_description: str,
        changed_by: str = "mcp-contracts"
    ) -> int:
        """Erstellt neue Version eines Contracts"""
        old_contract = self.find_by_id(contract_id)
        if not old_contract:
            raise ValueError(f"Contract {contract_id} not found")

        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                # Historie speichern
                cursor.execute("""
                    INSERT INTO contract_history
                    (contract_id, previous_yaml, previous_version, change_description, changed_by)
                    VALUES (%s, %s, %s, %s, %s)
                """, (
                    contract_id,
                    old_contract.yaml_content,
                    old_contract.version,
                    change_description,
                    changed_by,
                ))

                # Contract aktualisieren
                cursor.execute("""
                    UPDATE contracts
                    SET yaml_content = %s, version = %s, updated_at = %s
                    WHERE id = %s
                """, (new_yaml, new_version, datetime.now(), contract_id))

                conn.commit()
                return contract_id

    def get_versions(self, contract_id: int) -> List[Dict[str, Any]]:
        """Holt alle Versionen eines Contracts"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                # Aktuelle Version
                cursor.execute(
                    "SELECT version, updated_at, 'current' as type FROM contracts WHERE id = %s",
                    (contract_id,)
                )
                current = cursor.fetchone()

                # Historische Versionen
                cursor.execute("""
                    SELECT previous_version as version, changed_at as updated_at,
                           'history' as type, change_description, changed_by
                    FROM contract_history
                    WHERE contract_id = %s
                    ORDER BY changed_at DESC
                """, (contract_id,))
                history = cursor.fetchall()

                versions = []
                if current:
                    versions.append({
                        "version": current["version"],
                        "updated_at": current["updated_at"].isoformat() if current["updated_at"] else None,
                        "type": "current",
                    })
                for h in history:
                    versions.append({
                        "version": h["version"],
                        "updated_at": h["updated_at"].isoformat() if h["updated_at"] else None,
                        "type": "history",
                        "change_description": h["change_description"],
                        "changed_by": h["changed_by"],
                    })
                return versions

    # ==================== HISTORY ====================

    def get_history(self, contract_id: int) -> List[ContractHistory]:
        """Holt Änderungshistorie eines Contracts"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute("""
                    SELECT * FROM contract_history
                    WHERE contract_id = %s
                    ORDER BY changed_at DESC
                """, (contract_id,))
                rows = cursor.fetchall()
                return [self._row_to_history(row) for row in rows]

    # ==================== VALIDATIONS ====================

    def save_validation(self, validation: ContractValidation) -> int:
        """Speichert Validierungsergebnis"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                violations_json = json.dumps(validation.violations) if validation.violations else None

                cursor.execute("""
                    INSERT INTO contract_validations
                    (contract_id, validated_at, result, critical_count, major_count,
                     minor_count, violations, triggered_by, target_path, duration_ms)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """, (
                    validation.contract_id,
                    validation.validated_at or datetime.now(),
                    validation.result,
                    validation.critical_count,
                    validation.major_count,
                    validation.minor_count,
                    violations_json,
                    validation.triggered_by,
                    validation.target_path,
                    validation.duration_ms,
                ))
                conn.commit()
                return cursor.lastrowid

    def get_validations(
        self,
        contract_id: int,
        limit: int = 10,
    ) -> List[ContractValidation]:
        """Holt letzte Validierungen eines Contracts"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                cursor.execute("""
                    SELECT * FROM contract_validations
                    WHERE contract_id = %s
                    ORDER BY validated_at DESC
                    LIMIT %s
                """, (contract_id, limit))
                rows = cursor.fetchall()
                return [self._row_to_validation(row) for row in rows]

    def get_last_validation(self, contract_id: int) -> Optional[ContractValidation]:
        """Holt letzte Validierung eines Contracts"""
        validations = self.get_validations(contract_id, limit=1)
        return validations[0] if validations else None

    # ==================== STATISTICS ====================

    def get_statistics(self) -> Dict[str, Any]:
        """Holt Contract-Statistiken"""
        with SimpleDbConnection.get_connection(Config) as conn:
            with conn.cursor() as cursor:
                # Contracts nach Status
                cursor.execute("""
                    SELECT status, COUNT(*) as count
                    FROM contracts
                    GROUP BY status
                """)
                by_status = {row["status"]: row["count"] for row in cursor.fetchall()}

                # Validierungen letzte 7 Tage
                cursor.execute("""
                    SELECT result, COUNT(*) as count
                    FROM contract_validations
                    WHERE validated_at >= DATE_SUB(NOW(), INTERVAL 7 DAY)
                    GROUP BY result
                """)
                validations_7d = {row["result"]: row["count"] for row in cursor.fetchall()}

                # Gesamtzahlen
                cursor.execute("SELECT COUNT(*) as total FROM contracts")
                total = cursor.fetchone()["total"]

                cursor.execute("SELECT COUNT(*) as total FROM contract_validations")
                total_validations = cursor.fetchone()["total"]

                return {
                    "total_contracts": total,
                    "by_status": by_status,
                    "total_validations": total_validations,
                    "validations_last_7_days": validations_7d,
                }

    # ==================== HELPERS ====================

    def _row_to_contract(self, row: Dict[str, Any]) -> Contract:
        """Konvertiert DB-Row zu Contract"""
        return Contract(
            id=row["id"],
            uuid=row["uuid"],
            name=row["name"],
            version=row["version"],
            status=ContractStatus(row["status"]) if row["status"] else ContractStatus.ACTIVE,
            yaml_content=row["yaml_content"],
            scope_description=row.get("scope_description"),
            created_at=row["created_at"],
            created_by=row.get("created_by", "system"),
            updated_at=row["updated_at"],
        )

    def _row_to_history(self, row: Dict[str, Any]) -> ContractHistory:
        """Konvertiert DB-Row zu ContractHistory"""
        return ContractHistory(
            id=row["id"],
            contract_id=row["contract_id"],
            previous_yaml=row.get("previous_yaml"),
            previous_version=row.get("previous_version"),
            change_description=row.get("change_description"),
            changed_at=row["changed_at"],
            changed_by=row.get("changed_by", "system"),
        )

    def _row_to_validation(self, row: Dict[str, Any]) -> ContractValidation:
        """Konvertiert DB-Row zu ContractValidation"""
        violations = []
        if row.get("violations"):
            try:
                violations = json.loads(row["violations"]) if isinstance(row["violations"], str) else row["violations"]
            except Exception:
                violations = []

        return ContractValidation(
            id=row["id"],
            contract_id=row["contract_id"],
            validated_at=row["validated_at"],
            result=row["result"],
            critical_count=row.get("critical_count", 0),
            major_count=row.get("major_count", 0),
            minor_count=row.get("minor_count", 0),
            violations=violations,
            triggered_by=row.get("triggered_by", "mcp"),
            target_path=row.get("target_path"),
            duration_ms=row.get("duration_ms", 0),
        )
← Übersicht Graph