web_generate.py
- Pfad:
/var/www/scripts/pipeline/web_generate.py
- Namespace: pipeline
- Zeilen: 320 | Größe: 13,391 Bytes
- Geändert: 2025-12-30 22:47:15 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 81
- Dependencies: 60 (25%)
- LOC: 60 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 90 (10%)
Issues 1
| Zeile |
Typ |
Beschreibung |
| 194 |
magic_number |
Magic Number gefunden: 100 |
Dependencies 9
- use json
- use sys
- use datetime.datetime
- use mysql.connector
- use config.DB_CONFIG
- use generate.generate_content
- use generate.revise_content
- use generate.run_critique_round
- use db.db
Funktionen 5
-
log_step()
Zeile 29
-
update_generation_status()
Zeile 67
-
log_critique_step()
Zeile 94
-
update_critique_status()
Zeile 129
-
main()
Zeile 156
Code
#!/usr/bin/env python3
"""
Web Content Generation Interface for KI-System
Called from PHP with command and parameters.
Usage:
python web_generate.py generate <order_id> [model] [collection] [limit]
python web_generate.py critique <version_id> [model]
python web_generate.py revise <version_id> [model]
"""
import json
import sys
from datetime import datetime
import mysql.connector
# Change to pipeline directory and add to path
PIPELINE_PATH = "/var/www/scripts/pipeline"
sys.path.insert(0, PIPELINE_PATH)
from config import DB_CONFIG
from generate import generate_content, revise_content, run_critique_round
# Max log lines to keep
MAX_LOG_LINES = 50
def log_step(order_id: int, message: str, step: str | None = None) -> None:
"""Append a log line and optionally update current step."""
timestamp = datetime.now().strftime("%H:%M:%S")
log_line = f"[{timestamp}] {message}"
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
# Get current log
cursor.execute("SELECT generation_log FROM content_orders WHERE id = %s", (order_id,))
row = cursor.fetchone()
current_log = row[0] if row and row[0] else ""
# Append new line, keep last N lines
lines = current_log.split("\n") if current_log else []
lines.append(log_line)
if len(lines) > MAX_LOG_LINES:
lines = lines[-MAX_LOG_LINES:]
new_log = "\n".join(lines)
# Update DB
if step:
cursor.execute(
"UPDATE content_orders SET generation_log = %s, generation_step = %s, updated_at = NOW() WHERE id = %s",
(new_log, step, order_id),
)
else:
cursor.execute(
"UPDATE content_orders SET generation_log = %s, updated_at = NOW() WHERE id = %s",
(new_log, order_id),
)
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Log update failed: {e}", file=sys.stderr)
def update_generation_status(order_id: int, status: str, error: str | None = None) -> None:
"""Update generation_status in content_orders table."""
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
if status == "generating":
cursor.execute(
"UPDATE content_orders SET generation_status = %s, generation_started_at = NOW(), generation_log = NULL, updated_at = NOW() WHERE id = %s",
(status, order_id),
)
elif error:
cursor.execute(
"UPDATE content_orders SET generation_status = %s, generation_error = %s, updated_at = NOW() WHERE id = %s",
(status, error, order_id),
)
else:
cursor.execute(
"UPDATE content_orders SET generation_status = %s, generation_error = NULL, updated_at = NOW() WHERE id = %s",
(status, order_id),
)
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"DB update failed: {e}", file=sys.stderr)
def log_critique_step(order_id: int, message: str, step: str | None = None) -> None:
"""Append a log line for critique and optionally update current step."""
timestamp = datetime.now().strftime("%H:%M:%S")
log_line = f"[{timestamp}] {message}"
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute("SELECT critique_log FROM content_orders WHERE id = %s", (order_id,))
row = cursor.fetchone()
current_log = row[0] if row and row[0] else ""
lines = current_log.split("\n") if current_log else []
lines.append(log_line)
if len(lines) > MAX_LOG_LINES:
lines = lines[-MAX_LOG_LINES:]
new_log = "\n".join(lines)
if step:
cursor.execute(
"UPDATE content_orders SET critique_log = %s, critique_step = %s, updated_at = NOW() WHERE id = %s",
(new_log, step, order_id),
)
else:
cursor.execute(
"UPDATE content_orders SET critique_log = %s, updated_at = NOW() WHERE id = %s",
(new_log, order_id),
)
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Critique log update failed: {e}", file=sys.stderr)
def update_critique_status(order_id: int, status: str, error: str | None = None) -> None:
"""Update critique_status in content_orders table."""
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()
if status == "critiquing":
cursor.execute(
"UPDATE content_orders SET critique_status = %s, critique_started_at = NOW(), critique_log = NULL, updated_at = NOW() WHERE id = %s",
(status, order_id),
)
elif error:
cursor.execute(
"UPDATE content_orders SET critique_status = %s, critique_error = %s, updated_at = NOW() WHERE id = %s",
(status, error, order_id),
)
else:
cursor.execute(
"UPDATE content_orders SET critique_status = %s, critique_error = NULL, updated_at = NOW() WHERE id = %s",
(status, order_id),
)
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Critique DB update failed: {e}", file=sys.stderr)
def main():
"""Route CLI commands to content generation functions."""
if len(sys.argv) < 3:
print(json.dumps({"error": "Usage: web_generate.py <command> <id> [options]"}))
return
command = sys.argv[1]
entity_id = int(sys.argv[2])
try:
if command == "generate":
model = sys.argv[3] if len(sys.argv) > 3 else "anthropic"
collection = sys.argv[4] if len(sys.argv) > 4 else "documents"
limit = int(sys.argv[5]) if len(sys.argv) > 5 else 5
# Update status to generating
update_generation_status(entity_id, "generating")
# Log start
model_display = model.replace("ollama:", "Ollama: ") if model.startswith("ollama:") else f"Claude ({model})"
log_step(entity_id, "Starte Content-Generierung...", "init")
log_step(entity_id, f"Modell: {model_display}")
log_step(entity_id, f"Collection: {collection}, Kontext-Limit: {limit}")
# Log RAG search
log_step(entity_id, "Suche relevante Dokumente (RAG)...", "rag")
# Run generation with intermediate logging
result = generate_content(order_id=entity_id, model=model, collection=collection, context_limit=limit)
# Log result
if result.get("error"):
log_step(entity_id, f"Fehler: {result['error']}", "error")
update_generation_status(entity_id, "failed", result["error"])
else:
sources = result.get("sources", [])
log_step(entity_id, f"{len(sources)} Quellen gefunden", "sources")
for src in sources[:3]:
log_step(entity_id, f" - {src.get('source', 'Unbekannt')} ({int(src.get('score', 0) * 100)}%)")
log_step(entity_id, "Generiere Content mit KI...", "llm")
log_step(entity_id, "Content erfolgreich generiert!", "done")
log_step(entity_id, f"Version {result.get('version_number', '?')} erstellt")
update_generation_status(entity_id, "completed")
elif command == "critique":
model = sys.argv[3] if len(sys.argv) > 3 else "anthropic"
# order_id is passed as 5th argument for logging
order_id = int(sys.argv[4]) if len(sys.argv) > 4 else None
if order_id:
# Async mode with live logging
update_critique_status(order_id, "critiquing")
log_critique_step(order_id, "Starte Kritik-Runde...", "init")
model_display = (
model.replace("ollama:", "Ollama: ") if model.startswith("ollama:") else f"Claude ({model})"
)
log_critique_step(order_id, f"Modell: {model_display}")
# Get critics list (filtered by selected_critics from order)
log_critique_step(order_id, "Lade Kritiker...", "critics")
try:
from db import db
db.connect()
# Get selected_critics from order
cursor = db.execute(
"SELECT selected_critics, quality_check FROM content_orders WHERE id = %s", (order_id,)
)
order_row = cursor.fetchone()
cursor.close()
if not order_row or not order_row.get("quality_check", False):
log_critique_step(order_id, "Qualitätsprüfung deaktiviert", "skipped")
db.disconnect()
result = {"success": True, "skipped": True, "message": "Qualitätsprüfung deaktiviert"}
print(json.dumps(result, ensure_ascii=False))
return
# Parse selected_critics
selected_raw = order_row.get("selected_critics")
if selected_raw:
import json as json_module
selected_ids = (
json_module.loads(selected_raw) if isinstance(selected_raw, str) else selected_raw
)
else:
selected_ids = []
# Get critics - filter by selected_critics if specified
if selected_ids:
placeholders = ", ".join(["%s"] * len(selected_ids))
sql = (
"SELECT name FROM content_config "
f"WHERE type = 'critic' AND status = 'active' AND id IN ({placeholders}) "
"ORDER BY sort_order"
)
cursor = db.execute(sql, tuple(selected_ids))
else:
sql = (
"SELECT name FROM content_config "
"WHERE type = 'critic' AND status = 'active' ORDER BY sort_order"
)
cursor = db.execute(sql)
critics = cursor.fetchall()
cursor.close()
db.disconnect()
critic_names = [c["name"] for c in critics]
if critic_names:
log_critique_step(order_id, f"Kritiker: {', '.join(critic_names)}")
log_critique_step(order_id, f"{len(critics)} Kritiker werden Content prüfen")
# Run critique round
for i, critic in enumerate(critics, 1):
log_critique_step(
order_id, f"[{i}/{len(critics)}] {critic['name']} analysiert...", f"critic_{i}"
)
result = run_critique_round(version_id=entity_id, model=model)
if result.get("error"):
log_critique_step(order_id, f"Fehler: {result['error']}", "error")
update_critique_status(order_id, "failed", result["error"])
else:
passed_count = sum(1 for c in result.get("critiques", []) if c.get("passed", False))
total_count = len(result.get("critiques", []))
log_critique_step(order_id, f"Kritik-Runde {result.get('round', '?')} abgeschlossen", "done")
log_critique_step(order_id, f"{passed_count} von {total_count} Kritikern zufrieden")
if result.get("all_passed"):
log_critique_step(order_id, "Alle Prüfungen bestanden!")
else:
log_critique_step(order_id, "Revision empfohlen")
update_critique_status(order_id, "completed")
except Exception as e:
log_critique_step(order_id, f"Fehler: {e}", "error")
update_critique_status(order_id, "failed", str(e))
result = {"error": str(e)}
else:
# Sync mode (backward compatibility)
result = run_critique_round(version_id=entity_id, model=model)
elif command == "revise":
model = sys.argv[3] if len(sys.argv) > 3 else "anthropic"
result = revise_content(version_id=entity_id, model=model)
else:
result = {"error": f"Unknown command: {command}"}
print(json.dumps(result, ensure_ascii=False))
except Exception as e:
# On exception, mark as failed
if command == "generate":
log_step(entity_id, f"Fehler: {e}", "error")
update_generation_status(entity_id, "failed", str(e))
print(json.dumps({"error": str(e)}))
if __name__ == "__main__":
main()