content_generator.py

Code Hygiene Score: 61

Issues 2

Zeile Typ Beschreibung
44 magic_number Magic Number gefunden: 100
- coupling Klasse hat 21 Dependencies (max: 15)

Dependencies 21

Funktionen 3

Code

"""
Content Generator - Core content generation with LLM calls.
"""

import json
import sys
import time

sys.path.insert(0, "/var/www/scripts/pipeline")

from config import ANTHROPIC_API_KEY, ANTHROPIC_MODEL, OLLAMA_CHAT_MODEL, OLLAMA_HOST
from constants import MS_PER_SECOND
from db import db

from .config_loader import get_order, get_prompt, parse_author_profile, parse_structure
from .persistence import save_sources, save_version, update_order_status
from .rag_context import get_rag_context, get_semantic_context, get_taxonomy_context


def build_generation_prompt(
    briefing: str,
    context: list[dict],
    profile: dict | None,
    contract: dict | None,
    structure: dict | None = None,
    semantic: dict | None = None,
    taxonomy: list | None = None,
) -> str:
    """Build the content generation prompt."""

    # Format context
    context_text = ""
    for i, ctx in enumerate(context, 1):
        context_text += f"\n[Quelle {i}: {ctx['source']}]\n{ctx['content']}\n"

    # Build semantic context (entities and relations)
    semantic_text = ""
    if semantic:
        if semantic.get("entities"):
            semantic_text += "\n## Relevante Konzepte\n"
            for e in semantic["entities"][:10]:
                desc = e.get("description") or ""
                if desc:
                    semantic_text += f"- **{e['name']}** ({e['type']}): {desc[:100]}\n"
                else:
                    semantic_text += f"- **{e['name']}** ({e['type']})\n"

        if semantic.get("relations"):
            semantic_text += "\n## Konzept-Beziehungen\n"
            for r in semantic["relations"][:10]:
                semantic_text += f"- {r['source']} → {r['relation_type']} → {r['target']}\n"

    # Build taxonomy context
    taxonomy_text = ""
    if taxonomy:
        taxonomy_text = "\n## Thematische Einordnung\n"
        taxonomy_text += ", ".join([t["name"] for t in taxonomy])

    # Build profile instructions - detect new vs old format
    profile_text = ""
    if profile:
        config = profile.get("config", {})

        # Detect new format (has "haltung" or "tonalitaet" at top level)
        if "haltung" in config or "tonalitaet" in config or "grammatik_und_satzbau" in config:
            # New Cary-style profile
            profile_text = f"""
## Autorenprofil: {profile.get("name", "Standard")}

{parse_author_profile(config)}
"""
        else:
            # Old format - keep backwards compatibility
            autorenprofil = config.get("autorenprofil", config)

            stimme = autorenprofil.get("stimme", {})
            stimme_text = ""
            if stimme:
                stimme_text = f"""
### Stimme/Tonalität:
- Ton: {stimme.get("ton", "neutral")}
- Perspektive: {stimme.get("perspektive", "neutral")}
- Komplexität: {stimme.get("komplexitaet", "mittel")}"""

            stil = autorenprofil.get("stil", {})
            stil_text = ""
            if stil:
                stil_text = f"""
### Stil:
- Fachsprache: {"Ja" if stil.get("fachsprache", False) else "Nein"}
- Satzlänge: {stil.get("satzlaenge", "mittel")}"""

            tabus = autorenprofil.get("tabus", [])
            tabus_text = ""
            if tabus:
                tabus_text = f"""
### Zu vermeiden:
{", ".join(tabus[:5])}"""

            profile_text = f"""
## Autorenprofil: {profile.get("name", "Standard")}
{stimme_text}
{stil_text}
{tabus_text}
"""

    # Build contract requirements
    contract_text = ""
    if contract:
        config = contract.get("config", {})
        req = config.get("requirements", {})
        contract_text = f"""
Contract: {contract.get("name", "Standard")}
- Wortanzahl: {req.get("min_words", 500)} - {req.get("max_words", 5000)} Wörter
"""

    # Build structure instructions - detect new vs old format
    structure_text = ""
    output_format = "markdown"
    erlaubte_tags = []

    if structure:
        config = structure.get("config", {})

        # Detect new format (has "ausgabe" at top level)
        if "ausgabe" in config or "gesamtaufbau" in config:
            # New Blog-Struktur format
            parsed_text, output_format, erlaubte_tags = parse_structure(config)
            structure_text = f"""
## Struktur: {structure.get("name", "")}

{parsed_text}
"""
        else:
            # Old format
            structure_text = f"""
Struktur-Template: {structure.get("name", "")}
- Abschnitte: {json.dumps(config.get("sections", []), ensure_ascii=False)}
"""

    # Build format instruction based on structure's ausgabe
    format_instruction = ""
    if output_format == "body-html":
        tags_str = ", ".join(erlaubte_tags) if erlaubte_tags else "h1, h2, h3, h4, p, a, ol, ul, li, strong, table, hr"
        format_instruction = f"""7. **KRITISCH - Ausgabe als sauberes HTML:**
   - NUR diese Tags: {tags_str}
   - KEIN Markdown (keine ##, keine **, keine -)
   - KEIN div, span, br, img, script, style
   - Jeder Absatz in <p>-Tags
   - Überschriften als <h2>, <h3>, <h4>
   - Listen als <ul>/<ol> mit <li>"""

    # Load generate prompt template from database
    prompt_template = get_prompt("content-generate")

    if prompt_template:
        prompt = prompt_template.format(
            profile_text=profile_text,
            contract_text=contract_text,
            structure_text=structure_text,
            context=context_text,
            briefing=briefing,
            format_instruction=format_instruction,
            semantic_text=semantic_text,
            taxonomy_text=taxonomy_text,
        )
    else:
        # Fallback if prompt not in DB
        prompt = f"""Du bist ein professioneller Content-Autor. Erstelle basierend auf dem Briefing und dem bereitgestellten Kontext einen hochwertigen Text.

{profile_text}
{contract_text}
{structure_text}
{semantic_text}
{taxonomy_text}

## Kontext aus der Wissensbasis:
{context_text}

## Briefing:
{briefing}

## Anweisungen:
1. Nutze die Informationen aus dem Kontext als Grundlage
2. Halte dich an das Autorenprofil und den Schreibstil
3. Beachte die Vorgaben aus dem Contract
4. Strukturiere den Text gemäß dem Template (falls angegeben)
5. Schreibe auf Deutsch
6. Kennzeichne verwendete Quellen
7. Berücksichtige die relevanten Konzepte und deren Beziehungen
{format_instruction}

Erstelle nun den Content:"""

    return prompt


def call_llm(prompt: str, model: str = "anthropic", client_name: str = "content-studio") -> str:
    """
    Call LLM to generate content with protokoll logging.

    Args:
        prompt: The prompt to send
        model: 'anthropic' or 'ollama'
        client_name: Identifier for protokoll logging

    Returns:
        Generated text content
    """
    start_time = time.time()
    response_text = ""
    tokens_input = 0
    tokens_output = 0
    model_name = ""
    error_message = None
    status = "completed"

    try:
        if model == "anthropic" and ANTHROPIC_API_KEY:
            import anthropic

            client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
            model_name = ANTHROPIC_MODEL

            message = client.messages.create(
                model=ANTHROPIC_MODEL, max_tokens=4000, messages=[{"role": "user", "content": prompt}]
            )
            response_text = message.content[0].text

            # Extract token usage from Anthropic response
            if hasattr(message, "usage"):
                tokens_input = getattr(message.usage, "input_tokens", 0)
                tokens_output = getattr(message.usage, "output_tokens", 0)
        else:
            # Fallback to Ollama
            import requests

            model_name = OLLAMA_CHAT_MODEL

            response = requests.post(
                f"{OLLAMA_HOST}/api/generate",
                json={"model": OLLAMA_CHAT_MODEL, "prompt": prompt, "stream": False},
                timeout=900,  # 15 min for large models
            )
            response.raise_for_status()
            result = response.json()
            response_text = result.get("response", "")

            # Extract token counts from Ollama response
            tokens_input = result.get("prompt_eval_count", 0)
            tokens_output = result.get("eval_count", 0)

        # Clean up model artifacts (Gemma, Llama, etc.)
        artifacts = [
            "<start_of_turn>",
            "</start_of_turn>",
            "<end_of_turn>",
            "</end_of_turn>",
            "</s>",
            "<|eot_id|>",
            "<|im_end|>",
        ]
        for artifact in artifacts:
            response_text = response_text.replace(artifact, "").strip()

    except Exception as e:
        status = "error"
        error_message = str(e)
        raise

    finally:
        # Calculate duration
        duration_ms = int((time.time() - start_time) * MS_PER_SECOND)

        # Log to protokoll
        db.log_to_protokoll(
            client_name=client_name,
            request=prompt,
            response=response_text if status == "completed" else None,
            model_name=model_name,
            tokens_input=tokens_input,
            tokens_output=tokens_output,
            duration_ms=duration_ms,
            status=status,
            error_message=error_message,
        )

    return response_text


def generate_content(
    order_id: int, model: str = "anthropic", collection: str = "documents", context_limit: int = 5
) -> dict:
    """
    Main content generation function.

    Args:
        order_id: Content order ID
        model: 'anthropic' or 'ollama'
        collection: Qdrant collection to search
        context_limit: Number of context chunks

    Returns:
        dict with version_id, content, sources
    """
    db.connect()

    try:
        # Load order
        order = get_order(order_id)
        if not order:
            return {"error": f"Order {order_id} not found"}

        # Update status
        update_order_status(order_id, "generating")

        # Get RAG context
        context = get_rag_context(order["briefing"], collection, context_limit)

        # Extract chunk_ids and document_ids for semantic context
        chunk_ids = [c.get("chunk_id") for c in context if c.get("chunk_id")]
        doc_ids = list({c.get("document_id") for c in context if c.get("document_id")})

        # Load semantic context (entities and relations)
        semantic = get_semantic_context(chunk_ids) if chunk_ids else None

        # Load taxonomy context
        taxonomy = get_taxonomy_context(doc_ids) if doc_ids else None

        # Build profile/contract/structure
        profile = None
        if order.get("profile_config"):
            config = (
                json.loads(order["profile_config"])
                if isinstance(order["profile_config"], str)
                else order["profile_config"]
            )
            profile = {"name": order["profile_name"], "config": config}

        contract = None
        if order.get("contract_config"):
            config = (
                json.loads(order["contract_config"])
                if isinstance(order["contract_config"], str)
                else order["contract_config"]
            )
            contract = {"name": order["contract_name"], "config": config}

        structure = None
        output_format = "markdown"  # Default
        if order.get("structure_config"):
            config = (
                json.loads(order["structure_config"])
                if isinstance(order["structure_config"], str)
                else order["structure_config"]
            )
            structure = {"name": order["structure_name"], "config": config}
            # Determine output format from structure
            ausgabe = config.get("ausgabe", {})
            output_format = ausgabe.get("format", "markdown")

        # Build prompt
        prompt = build_generation_prompt(
            order["briefing"], context, profile, contract, structure, semantic=semantic, taxonomy=taxonomy
        )

        # Generate content
        content = call_llm(prompt, model, client_name="content-studio-generate")

        # Get current version number
        cursor = db.execute(
            "SELECT MAX(version_number) as max_v FROM content_versions WHERE order_id = %s", (order_id,)
        )
        result = cursor.fetchone()
        cursor.close()
        version_number = (result["max_v"] or 0) + 1

        # Save version with correct format
        version_id = save_version(order_id, content, version_number, output_format)

        # Save sources
        save_sources(order_id, context)

        # Update status
        update_order_status(order_id, "critique")

        return {
            "success": True,
            "order_id": order_id,
            "version_id": version_id,
            "version_number": version_number,
            "content": content,
            "sources": [{"source": c["source"], "score": c["score"]} for c in context],
        }

    except Exception as e:
        update_order_status(order_id, "draft")
        return {"error": str(e)}
    finally:
        db.disconnect()
← Übersicht