pipeline_config.py

Code Hygiene Score: 100

Keine Issues gefunden.

Dependencies 4

Klassen 1

Funktionen 2

Code

#!/usr/bin/env python3
"""
Zentrale Pipeline-Konfiguration aus DB.

Single Source of Truth: ki_content.pipeline_steps.config

Erstellt: 2025-12-28 (Task #516)
Supervision: Genehmigt mit Auflagen
"""

import os

from db import db

# Default Pipeline-ID als Konstante (nicht inline hardcoded)
DEFAULT_PIPELINE_ID = 5


class PipelineConfigError(Exception):
    """
    Fehler wenn Config fehlt - KEIN stilles Fallback!

    Diese Exception soll Pipeline-Läufe stoppen wenn
    die Konfiguration unvollständig ist.
    """

    pass


def get_step_model(step_type: str, pipeline_id: int = None) -> str:
    """
    Liest Model aus pipeline_steps.config.

    Args:
        step_type: Step-Typ (z.B. 'text_semantic_analyze')
        pipeline_id: Pipeline-ID (Default aus ENV PIPELINE_ID oder DEFAULT_PIPELINE_ID)

    Returns:
        Model-Name als String (z.B. 'mistral', 'gemma3:27b-it-qat')

    Raises:
        PipelineConfigError: Bei fehlendem/ungültigem Config

    WICHTIG:
    - Wirft Exception wenn nicht konfiguriert!
    - Kein stilles Fallback auf hardcodierte Werte.
    - Mehrfachzeilen sind ein Datenfehler.

    Beispiel:
        model = get_step_model("text_semantic_analyze")
    """
    # Pipeline-ID aus Parameter, ENV oder Default-Konstante
    if pipeline_id is None:
        env_id = os.environ.get("PIPELINE_ID")
        pipeline_id = int(env_id) if env_id else DEFAULT_PIPELINE_ID

    cursor = db.execute(
        """SELECT JSON_UNQUOTE(JSON_EXTRACT(config, '$.model')) as model
           FROM pipeline_steps
           WHERE pipeline_id = %s AND step_type = %s AND enabled = 1""",
        (pipeline_id, step_type),
    )
    rows = cursor.fetchall()
    cursor.close()

    # Mehrfachzeilen = Datenfehler (UNIQUE Constraint sollte das verhindern)
    if len(rows) > 1:
        raise PipelineConfigError(
            f"DATENFEHLER: {len(rows)} Einträge für "
            f"pipeline_id={pipeline_id}, step_type='{step_type}'. "
            f"Erwarte genau 1. UNIQUE Constraint prüfen!"
        )

    # Keine Zeile = Config fehlt
    if len(rows) == 0:
        raise PipelineConfigError(
            f"Kein Model konfiguriert für step_type='{step_type}' "
            f"in pipeline_id={pipeline_id}. "
            f"Bitte in pipeline_steps.config setzen!"
        )

    model = rows[0].get("model")

    # Leerer String oder NULL = ungültig
    if not model or model.strip() == "":
        raise PipelineConfigError(
            f"Model ist leer für step_type='{step_type}' "
            f"in pipeline_id={pipeline_id}. "
            f"Bitte in pipeline_steps.config setzen!"
        )

    return model.strip()


def get_step_config(step_type: str, pipeline_id: int = None) -> dict:
    """
    Liest komplette Step-Config aus pipeline_steps.

    Args:
        step_type: Step-Typ (z.B. 'text_semantic_analyze')
        pipeline_id: Pipeline-ID (Default aus ENV PIPELINE_ID oder DEFAULT_PIPELINE_ID)

    Returns:
        Config als dict

    Raises:
        PipelineConfigError: Bei fehlendem Config
    """
    import json

    if pipeline_id is None:
        env_id = os.environ.get("PIPELINE_ID")
        pipeline_id = int(env_id) if env_id else DEFAULT_PIPELINE_ID

    cursor = db.execute(
        """SELECT config FROM pipeline_steps
           WHERE pipeline_id = %s AND step_type = %s AND enabled = 1""",
        (pipeline_id, step_type),
    )
    rows = cursor.fetchall()
    cursor.close()

    if len(rows) != 1:
        raise PipelineConfigError(f"Config nicht gefunden für step_type='{step_type}' in pipeline_id={pipeline_id}.")

    config_str = rows[0].get("config")
    if not config_str:
        return {}

    try:
        return json.loads(config_str)
    except json.JSONDecodeError as e:
        raise PipelineConfigError(f"Ungültiges JSON in config für step_type='{step_type}': {e}") from e
← Übersicht Graph