pipeline_config.py
- Pfad:
/var/www/scripts/pipeline/pipeline_config.py - Namespace: pipeline
- Zeilen: 134 | Größe: 3,854 Bytes
- Geändert: 2025-12-31 03:01:27 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 100
- Dependencies: 100 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 4
- extends Exception
- use os
- use db.db
- use json
Klassen 1
-
PipelineConfigErrorclass Zeile 19
Funktionen 2
-
get_step_model()Zeile 30 -
get_step_config()Zeile 95
Code
#!/usr/bin/env python3
"""
Zentrale Pipeline-Konfiguration aus DB.
Single Source of Truth: ki_content.pipeline_steps.config
Erstellt: 2025-12-28 (Task #516)
Supervision: Genehmigt mit Auflagen
"""
import os
from db import db
# Default Pipeline-ID als Konstante (nicht inline hardcoded)
DEFAULT_PIPELINE_ID = 5
class PipelineConfigError(Exception):
"""
Fehler wenn Config fehlt - KEIN stilles Fallback!
Diese Exception soll Pipeline-Läufe stoppen wenn
die Konfiguration unvollständig ist.
"""
pass
def get_step_model(step_type: str, pipeline_id: int = None) -> str:
"""
Liest Model aus pipeline_steps.config.
Args:
step_type: Step-Typ (z.B. 'text_semantic_analyze')
pipeline_id: Pipeline-ID (Default aus ENV PIPELINE_ID oder DEFAULT_PIPELINE_ID)
Returns:
Model-Name als String (z.B. 'mistral', 'gemma3:27b-it-qat')
Raises:
PipelineConfigError: Bei fehlendem/ungültigem Config
WICHTIG:
- Wirft Exception wenn nicht konfiguriert!
- Kein stilles Fallback auf hardcodierte Werte.
- Mehrfachzeilen sind ein Datenfehler.
Beispiel:
model = get_step_model("text_semantic_analyze")
"""
# Pipeline-ID aus Parameter, ENV oder Default-Konstante
if pipeline_id is None:
env_id = os.environ.get("PIPELINE_ID")
pipeline_id = int(env_id) if env_id else DEFAULT_PIPELINE_ID
cursor = db.execute(
"""SELECT JSON_UNQUOTE(JSON_EXTRACT(config, '$.model')) as model
FROM pipeline_steps
WHERE pipeline_id = %s AND step_type = %s AND enabled = 1""",
(pipeline_id, step_type),
)
rows = cursor.fetchall()
cursor.close()
# Mehrfachzeilen = Datenfehler (UNIQUE Constraint sollte das verhindern)
if len(rows) > 1:
raise PipelineConfigError(
f"DATENFEHLER: {len(rows)} Einträge für "
f"pipeline_id={pipeline_id}, step_type='{step_type}'. "
f"Erwarte genau 1. UNIQUE Constraint prüfen!"
)
# Keine Zeile = Config fehlt
if len(rows) == 0:
raise PipelineConfigError(
f"Kein Model konfiguriert für step_type='{step_type}' "
f"in pipeline_id={pipeline_id}. "
f"Bitte in pipeline_steps.config setzen!"
)
model = rows[0].get("model")
# Leerer String oder NULL = ungültig
if not model or model.strip() == "":
raise PipelineConfigError(
f"Model ist leer für step_type='{step_type}' "
f"in pipeline_id={pipeline_id}. "
f"Bitte in pipeline_steps.config setzen!"
)
return model.strip()
def get_step_config(step_type: str, pipeline_id: int = None) -> dict:
"""
Liest komplette Step-Config aus pipeline_steps.
Args:
step_type: Step-Typ (z.B. 'text_semantic_analyze')
pipeline_id: Pipeline-ID (Default aus ENV PIPELINE_ID oder DEFAULT_PIPELINE_ID)
Returns:
Config als dict
Raises:
PipelineConfigError: Bei fehlendem Config
"""
import json
if pipeline_id is None:
env_id = os.environ.get("PIPELINE_ID")
pipeline_id = int(env_id) if env_id else DEFAULT_PIPELINE_ID
cursor = db.execute(
"""SELECT config FROM pipeline_steps
WHERE pipeline_id = %s AND step_type = %s AND enabled = 1""",
(pipeline_id, step_type),
)
rows = cursor.fetchall()
cursor.close()
if len(rows) != 1:
raise PipelineConfigError(f"Config nicht gefunden für step_type='{step_type}' in pipeline_id={pipeline_id}.")
config_str = rows[0].get("config")
if not config_str:
return {}
try:
return json.loads(config_str)
except json.JSONDecodeError as e:
raise PipelineConfigError(f"Ungültiges JSON in config für step_type='{step_type}': {e}") from e