web_generate.py

Code Hygiene Score: 81

Issues 1

Zeile Typ Beschreibung
194 magic_number Magic Number gefunden: 100

Dependencies 9

Funktionen 5

Code

#!/usr/bin/env python3
"""
Web Content Generation Interface for KI-System
Called from PHP with command and parameters.

Usage:
    python web_generate.py generate <order_id> [model] [collection] [limit]
    python web_generate.py critique <version_id> [model]
    python web_generate.py revise <version_id> [model]
"""

import json
import sys
from datetime import datetime

import mysql.connector

# Change to pipeline directory and add to path
PIPELINE_PATH = "/var/www/scripts/pipeline"
sys.path.insert(0, PIPELINE_PATH)

from config import DB_CONFIG
from generate import generate_content, revise_content, run_critique_round

# Max log lines to keep
MAX_LOG_LINES = 50


def log_step(order_id: int, message: str, step: str | None = None) -> None:
    """Append a log line and optionally update current step."""
    timestamp = datetime.now().strftime("%H:%M:%S")
    log_line = f"[{timestamp}] {message}"

    try:
        conn = mysql.connector.connect(**DB_CONFIG)
        cursor = conn.cursor()
        # Get current log
        cursor.execute("SELECT generation_log FROM content_orders WHERE id = %s", (order_id,))
        row = cursor.fetchone()
        current_log = row[0] if row and row[0] else ""

        # Append new line, keep last N lines
        lines = current_log.split("\n") if current_log else []
        lines.append(log_line)
        if len(lines) > MAX_LOG_LINES:
            lines = lines[-MAX_LOG_LINES:]
        new_log = "\n".join(lines)

        # Update DB
        if step:
            cursor.execute(
                "UPDATE content_orders SET generation_log = %s, generation_step = %s, updated_at = NOW() WHERE id = %s",
                (new_log, step, order_id),
            )
        else:
            cursor.execute(
                "UPDATE content_orders SET generation_log = %s, updated_at = NOW() WHERE id = %s",
                (new_log, order_id),
            )
        conn.commit()
        cursor.close()
        conn.close()
    except Exception as e:
        print(f"Log update failed: {e}", file=sys.stderr)


def update_generation_status(order_id: int, status: str, error: str | None = None) -> None:
    """Update generation_status in content_orders table."""
    try:
        conn = mysql.connector.connect(**DB_CONFIG)
        cursor = conn.cursor()
        if status == "generating":
            cursor.execute(
                "UPDATE content_orders SET generation_status = %s, generation_started_at = NOW(), generation_log = NULL, updated_at = NOW() WHERE id = %s",
                (status, order_id),
            )
        elif error:
            cursor.execute(
                "UPDATE content_orders SET generation_status = %s, generation_error = %s, updated_at = NOW() WHERE id = %s",
                (status, error, order_id),
            )
        else:
            cursor.execute(
                "UPDATE content_orders SET generation_status = %s, generation_error = NULL, updated_at = NOW() WHERE id = %s",
                (status, order_id),
            )
        conn.commit()
        cursor.close()
        conn.close()
    except Exception as e:
        print(f"DB update failed: {e}", file=sys.stderr)


def log_critique_step(order_id: int, message: str, step: str | None = None) -> None:
    """Append a log line for critique and optionally update current step."""
    timestamp = datetime.now().strftime("%H:%M:%S")
    log_line = f"[{timestamp}] {message}"

    try:
        conn = mysql.connector.connect(**DB_CONFIG)
        cursor = conn.cursor()
        cursor.execute("SELECT critique_log FROM content_orders WHERE id = %s", (order_id,))
        row = cursor.fetchone()
        current_log = row[0] if row and row[0] else ""

        lines = current_log.split("\n") if current_log else []
        lines.append(log_line)
        if len(lines) > MAX_LOG_LINES:
            lines = lines[-MAX_LOG_LINES:]
        new_log = "\n".join(lines)

        if step:
            cursor.execute(
                "UPDATE content_orders SET critique_log = %s, critique_step = %s, updated_at = NOW() WHERE id = %s",
                (new_log, step, order_id),
            )
        else:
            cursor.execute(
                "UPDATE content_orders SET critique_log = %s, updated_at = NOW() WHERE id = %s",
                (new_log, order_id),
            )
        conn.commit()
        cursor.close()
        conn.close()
    except Exception as e:
        print(f"Critique log update failed: {e}", file=sys.stderr)


def update_critique_status(order_id: int, status: str, error: str | None = None) -> None:
    """Update critique_status in content_orders table."""
    try:
        conn = mysql.connector.connect(**DB_CONFIG)
        cursor = conn.cursor()
        if status == "critiquing":
            cursor.execute(
                "UPDATE content_orders SET critique_status = %s, critique_started_at = NOW(), critique_log = NULL, updated_at = NOW() WHERE id = %s",
                (status, order_id),
            )
        elif error:
            cursor.execute(
                "UPDATE content_orders SET critique_status = %s, critique_error = %s, updated_at = NOW() WHERE id = %s",
                (status, error, order_id),
            )
        else:
            cursor.execute(
                "UPDATE content_orders SET critique_status = %s, critique_error = NULL, updated_at = NOW() WHERE id = %s",
                (status, order_id),
            )
        conn.commit()
        cursor.close()
        conn.close()
    except Exception as e:
        print(f"Critique DB update failed: {e}", file=sys.stderr)


def main():
    """Route CLI commands to content generation functions."""
    if len(sys.argv) < 3:
        print(json.dumps({"error": "Usage: web_generate.py <command> <id> [options]"}))
        return

    command = sys.argv[1]
    entity_id = int(sys.argv[2])

    try:
        if command == "generate":
            model = sys.argv[3] if len(sys.argv) > 3 else "anthropic"
            collection = sys.argv[4] if len(sys.argv) > 4 else "documents"
            limit = int(sys.argv[5]) if len(sys.argv) > 5 else 5

            # Update status to generating
            update_generation_status(entity_id, "generating")

            # Log start
            model_display = model.replace("ollama:", "Ollama: ") if model.startswith("ollama:") else f"Claude ({model})"
            log_step(entity_id, "Starte Content-Generierung...", "init")
            log_step(entity_id, f"Modell: {model_display}")
            log_step(entity_id, f"Collection: {collection}, Kontext-Limit: {limit}")

            # Log RAG search
            log_step(entity_id, "Suche relevante Dokumente (RAG)...", "rag")

            # Run generation with intermediate logging
            result = generate_content(order_id=entity_id, model=model, collection=collection, context_limit=limit)

            # Log result
            if result.get("error"):
                log_step(entity_id, f"Fehler: {result['error']}", "error")
                update_generation_status(entity_id, "failed", result["error"])
            else:
                sources = result.get("sources", [])
                log_step(entity_id, f"{len(sources)} Quellen gefunden", "sources")
                for src in sources[:3]:
                    log_step(entity_id, f"  - {src.get('source', 'Unbekannt')} ({int(src.get('score', 0) * 100)}%)")

                log_step(entity_id, "Generiere Content mit KI...", "llm")
                log_step(entity_id, "Content erfolgreich generiert!", "done")
                log_step(entity_id, f"Version {result.get('version_number', '?')} erstellt")
                update_generation_status(entity_id, "completed")

        elif command == "critique":
            model = sys.argv[3] if len(sys.argv) > 3 else "anthropic"
            # order_id is passed as 5th argument for logging
            order_id = int(sys.argv[4]) if len(sys.argv) > 4 else None

            if order_id:
                # Async mode with live logging
                update_critique_status(order_id, "critiquing")
                log_critique_step(order_id, "Starte Kritik-Runde...", "init")
                model_display = (
                    model.replace("ollama:", "Ollama: ") if model.startswith("ollama:") else f"Claude ({model})"
                )
                log_critique_step(order_id, f"Modell: {model_display}")

                # Get critics list (filtered by selected_critics from order)
                log_critique_step(order_id, "Lade Kritiker...", "critics")

                try:
                    from db import db

                    db.connect()
                    # Get selected_critics from order
                    cursor = db.execute(
                        "SELECT selected_critics, quality_check FROM content_orders WHERE id = %s", (order_id,)
                    )
                    order_row = cursor.fetchone()
                    cursor.close()

                    if not order_row or not order_row.get("quality_check", False):
                        log_critique_step(order_id, "Qualitätsprüfung deaktiviert", "skipped")
                        db.disconnect()
                        result = {"success": True, "skipped": True, "message": "Qualitätsprüfung deaktiviert"}
                        print(json.dumps(result, ensure_ascii=False))
                        return

                    # Parse selected_critics
                    selected_raw = order_row.get("selected_critics")
                    if selected_raw:
                        import json as json_module

                        selected_ids = (
                            json_module.loads(selected_raw) if isinstance(selected_raw, str) else selected_raw
                        )
                    else:
                        selected_ids = []

                    # Get critics - filter by selected_critics if specified
                    if selected_ids:
                        placeholders = ", ".join(["%s"] * len(selected_ids))
                        sql = (
                            "SELECT name FROM content_config "
                            f"WHERE type = 'critic' AND status = 'active' AND id IN ({placeholders}) "
                            "ORDER BY sort_order"
                        )
                        cursor = db.execute(sql, tuple(selected_ids))
                    else:
                        sql = (
                            "SELECT name FROM content_config "
                            "WHERE type = 'critic' AND status = 'active' ORDER BY sort_order"
                        )
                        cursor = db.execute(sql)
                    critics = cursor.fetchall()
                    cursor.close()
                    db.disconnect()

                    critic_names = [c["name"] for c in critics]
                    if critic_names:
                        log_critique_step(order_id, f"Kritiker: {', '.join(critic_names)}")
                    log_critique_step(order_id, f"{len(critics)} Kritiker werden Content prüfen")

                    # Run critique round
                    for i, critic in enumerate(critics, 1):
                        log_critique_step(
                            order_id, f"[{i}/{len(critics)}] {critic['name']} analysiert...", f"critic_{i}"
                        )

                    result = run_critique_round(version_id=entity_id, model=model)

                    if result.get("error"):
                        log_critique_step(order_id, f"Fehler: {result['error']}", "error")
                        update_critique_status(order_id, "failed", result["error"])
                    else:
                        passed_count = sum(1 for c in result.get("critiques", []) if c.get("passed", False))
                        total_count = len(result.get("critiques", []))
                        log_critique_step(order_id, f"Kritik-Runde {result.get('round', '?')} abgeschlossen", "done")
                        log_critique_step(order_id, f"{passed_count} von {total_count} Kritikern zufrieden")
                        if result.get("all_passed"):
                            log_critique_step(order_id, "Alle Prüfungen bestanden!")
                        else:
                            log_critique_step(order_id, "Revision empfohlen")
                        update_critique_status(order_id, "completed")

                except Exception as e:
                    log_critique_step(order_id, f"Fehler: {e}", "error")
                    update_critique_status(order_id, "failed", str(e))
                    result = {"error": str(e)}
            else:
                # Sync mode (backward compatibility)
                result = run_critique_round(version_id=entity_id, model=model)

        elif command == "revise":
            model = sys.argv[3] if len(sys.argv) > 3 else "anthropic"
            result = revise_content(version_id=entity_id, model=model)

        else:
            result = {"error": f"Unknown command: {command}"}

        print(json.dumps(result, ensure_ascii=False))

    except Exception as e:
        # On exception, mark as failed
        if command == "generate":
            log_step(entity_id, f"Fehler: {e}", "error")
            update_generation_status(entity_id, "failed", str(e))
        print(json.dumps({"error": str(e)}))


if __name__ == "__main__":
    main()
← Übersicht