content_generator.py
- Pfad:
/var/www/scripts/pipeline/generators/content_generator.py
- Namespace: pipeline
- Zeilen: 401 | Größe: 13,025 Bytes
- Geändert: 2025-12-27 16:07:23 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 61
- Dependencies: 0 (25%)
- LOC: 33 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 90 (10%)
Issues 2
| Zeile |
Typ |
Beschreibung |
| 44 |
magic_number |
Magic Number gefunden: 100 |
| - |
coupling |
Klasse hat 21 Dependencies (max: 15) |
Dependencies 21
- use json
- use sys
- use time
- use config.ANTHROPIC_API_KEY
- use config.ANTHROPIC_MODEL
- use config.OLLAMA_CHAT_MODEL
- use config.OLLAMA_HOST
- use constants.MS_PER_SECOND
- use db.db
- use config_loader.get_order
- use config_loader.get_prompt
- use config_loader.parse_author_profile
- use config_loader.parse_structure
- use persistence.save_sources
- use persistence.save_version
- use persistence.update_order_status
- use rag_context.get_rag_context
- use rag_context.get_semantic_context
- use rag_context.get_taxonomy_context
- use anthropic
- use requests
Funktionen 3
-
build_generation_prompt()
Zeile 20
-
call_llm()
Zeile 198
-
generate_content()
Zeile 291
Code
"""
Content Generator - Core content generation with LLM calls.
"""
import json
import sys
import time
sys.path.insert(0, "/var/www/scripts/pipeline")
from config import ANTHROPIC_API_KEY, ANTHROPIC_MODEL, OLLAMA_CHAT_MODEL, OLLAMA_HOST
from constants import MS_PER_SECOND
from db import db
from .config_loader import get_order, get_prompt, parse_author_profile, parse_structure
from .persistence import save_sources, save_version, update_order_status
from .rag_context import get_rag_context, get_semantic_context, get_taxonomy_context
def build_generation_prompt(
briefing: str,
context: list[dict],
profile: dict | None,
contract: dict | None,
structure: dict | None = None,
semantic: dict | None = None,
taxonomy: list | None = None,
) -> str:
"""Build the content generation prompt."""
# Format context
context_text = ""
for i, ctx in enumerate(context, 1):
context_text += f"\n[Quelle {i}: {ctx['source']}]\n{ctx['content']}\n"
# Build semantic context (entities and relations)
semantic_text = ""
if semantic:
if semantic.get("entities"):
semantic_text += "\n## Relevante Konzepte\n"
for e in semantic["entities"][:10]:
desc = e.get("description") or ""
if desc:
semantic_text += f"- **{e['name']}** ({e['type']}): {desc[:100]}\n"
else:
semantic_text += f"- **{e['name']}** ({e['type']})\n"
if semantic.get("relations"):
semantic_text += "\n## Konzept-Beziehungen\n"
for r in semantic["relations"][:10]:
semantic_text += f"- {r['source']} → {r['relation_type']} → {r['target']}\n"
# Build taxonomy context
taxonomy_text = ""
if taxonomy:
taxonomy_text = "\n## Thematische Einordnung\n"
taxonomy_text += ", ".join([t["name"] for t in taxonomy])
# Build profile instructions - detect new vs old format
profile_text = ""
if profile:
config = profile.get("config", {})
# Detect new format (has "haltung" or "tonalitaet" at top level)
if "haltung" in config or "tonalitaet" in config or "grammatik_und_satzbau" in config:
# New Cary-style profile
profile_text = f"""
## Autorenprofil: {profile.get("name", "Standard")}
{parse_author_profile(config)}
"""
else:
# Old format - keep backwards compatibility
autorenprofil = config.get("autorenprofil", config)
stimme = autorenprofil.get("stimme", {})
stimme_text = ""
if stimme:
stimme_text = f"""
### Stimme/Tonalität:
- Ton: {stimme.get("ton", "neutral")}
- Perspektive: {stimme.get("perspektive", "neutral")}
- Komplexität: {stimme.get("komplexitaet", "mittel")}"""
stil = autorenprofil.get("stil", {})
stil_text = ""
if stil:
stil_text = f"""
### Stil:
- Fachsprache: {"Ja" if stil.get("fachsprache", False) else "Nein"}
- Satzlänge: {stil.get("satzlaenge", "mittel")}"""
tabus = autorenprofil.get("tabus", [])
tabus_text = ""
if tabus:
tabus_text = f"""
### Zu vermeiden:
{", ".join(tabus[:5])}"""
profile_text = f"""
## Autorenprofil: {profile.get("name", "Standard")}
{stimme_text}
{stil_text}
{tabus_text}
"""
# Build contract requirements
contract_text = ""
if contract:
config = contract.get("config", {})
req = config.get("requirements", {})
contract_text = f"""
Contract: {contract.get("name", "Standard")}
- Wortanzahl: {req.get("min_words", 500)} - {req.get("max_words", 5000)} Wörter
"""
# Build structure instructions - detect new vs old format
structure_text = ""
output_format = "markdown"
erlaubte_tags = []
if structure:
config = structure.get("config", {})
# Detect new format (has "ausgabe" at top level)
if "ausgabe" in config or "gesamtaufbau" in config:
# New Blog-Struktur format
parsed_text, output_format, erlaubte_tags = parse_structure(config)
structure_text = f"""
## Struktur: {structure.get("name", "")}
{parsed_text}
"""
else:
# Old format
structure_text = f"""
Struktur-Template: {structure.get("name", "")}
- Abschnitte: {json.dumps(config.get("sections", []), ensure_ascii=False)}
"""
# Build format instruction based on structure's ausgabe
format_instruction = ""
if output_format == "body-html":
tags_str = ", ".join(erlaubte_tags) if erlaubte_tags else "h1, h2, h3, h4, p, a, ol, ul, li, strong, table, hr"
format_instruction = f"""7. **KRITISCH - Ausgabe als sauberes HTML:**
- NUR diese Tags: {tags_str}
- KEIN Markdown (keine ##, keine **, keine -)
- KEIN div, span, br, img, script, style
- Jeder Absatz in <p>-Tags
- Überschriften als <h2>, <h3>, <h4>
- Listen als <ul>/<ol> mit <li>"""
# Load generate prompt template from database
prompt_template = get_prompt("content-generate")
if prompt_template:
prompt = prompt_template.format(
profile_text=profile_text,
contract_text=contract_text,
structure_text=structure_text,
context=context_text,
briefing=briefing,
format_instruction=format_instruction,
semantic_text=semantic_text,
taxonomy_text=taxonomy_text,
)
else:
# Fallback if prompt not in DB
prompt = f"""Du bist ein professioneller Content-Autor. Erstelle basierend auf dem Briefing und dem bereitgestellten Kontext einen hochwertigen Text.
{profile_text}
{contract_text}
{structure_text}
{semantic_text}
{taxonomy_text}
## Kontext aus der Wissensbasis:
{context_text}
## Briefing:
{briefing}
## Anweisungen:
1. Nutze die Informationen aus dem Kontext als Grundlage
2. Halte dich an das Autorenprofil und den Schreibstil
3. Beachte die Vorgaben aus dem Contract
4. Strukturiere den Text gemäß dem Template (falls angegeben)
5. Schreibe auf Deutsch
6. Kennzeichne verwendete Quellen
7. Berücksichtige die relevanten Konzepte und deren Beziehungen
{format_instruction}
Erstelle nun den Content:"""
return prompt
def call_llm(prompt: str, model: str = "anthropic", client_name: str = "content-studio") -> str:
"""
Call LLM to generate content with protokoll logging.
Args:
prompt: The prompt to send
model: 'anthropic' or 'ollama'
client_name: Identifier for protokoll logging
Returns:
Generated text content
"""
start_time = time.time()
response_text = ""
tokens_input = 0
tokens_output = 0
model_name = ""
error_message = None
status = "completed"
try:
if model == "anthropic" and ANTHROPIC_API_KEY:
import anthropic
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
model_name = ANTHROPIC_MODEL
message = client.messages.create(
model=ANTHROPIC_MODEL, max_tokens=4000, messages=[{"role": "user", "content": prompt}]
)
response_text = message.content[0].text
# Extract token usage from Anthropic response
if hasattr(message, "usage"):
tokens_input = getattr(message.usage, "input_tokens", 0)
tokens_output = getattr(message.usage, "output_tokens", 0)
else:
# Fallback to Ollama
import requests
model_name = OLLAMA_CHAT_MODEL
response = requests.post(
f"{OLLAMA_HOST}/api/generate",
json={"model": OLLAMA_CHAT_MODEL, "prompt": prompt, "stream": False},
timeout=900, # 15 min for large models
)
response.raise_for_status()
result = response.json()
response_text = result.get("response", "")
# Extract token counts from Ollama response
tokens_input = result.get("prompt_eval_count", 0)
tokens_output = result.get("eval_count", 0)
# Clean up model artifacts (Gemma, Llama, etc.)
artifacts = [
"<start_of_turn>",
"</start_of_turn>",
"<end_of_turn>",
"</end_of_turn>",
"</s>",
"<|eot_id|>",
"<|im_end|>",
]
for artifact in artifacts:
response_text = response_text.replace(artifact, "").strip()
except Exception as e:
status = "error"
error_message = str(e)
raise
finally:
# Calculate duration
duration_ms = int((time.time() - start_time) * MS_PER_SECOND)
# Log to protokoll
db.log_to_protokoll(
client_name=client_name,
request=prompt,
response=response_text if status == "completed" else None,
model_name=model_name,
tokens_input=tokens_input,
tokens_output=tokens_output,
duration_ms=duration_ms,
status=status,
error_message=error_message,
)
return response_text
def generate_content(
order_id: int, model: str = "anthropic", collection: str = "documents", context_limit: int = 5
) -> dict:
"""
Main content generation function.
Args:
order_id: Content order ID
model: 'anthropic' or 'ollama'
collection: Qdrant collection to search
context_limit: Number of context chunks
Returns:
dict with version_id, content, sources
"""
db.connect()
try:
# Load order
order = get_order(order_id)
if not order:
return {"error": f"Order {order_id} not found"}
# Update status
update_order_status(order_id, "generating")
# Get RAG context
context = get_rag_context(order["briefing"], collection, context_limit)
# Extract chunk_ids and document_ids for semantic context
chunk_ids = [c.get("chunk_id") for c in context if c.get("chunk_id")]
doc_ids = list({c.get("document_id") for c in context if c.get("document_id")})
# Load semantic context (entities and relations)
semantic = get_semantic_context(chunk_ids) if chunk_ids else None
# Load taxonomy context
taxonomy = get_taxonomy_context(doc_ids) if doc_ids else None
# Build profile/contract/structure
profile = None
if order.get("profile_config"):
config = (
json.loads(order["profile_config"])
if isinstance(order["profile_config"], str)
else order["profile_config"]
)
profile = {"name": order["profile_name"], "config": config}
contract = None
if order.get("contract_config"):
config = (
json.loads(order["contract_config"])
if isinstance(order["contract_config"], str)
else order["contract_config"]
)
contract = {"name": order["contract_name"], "config": config}
structure = None
output_format = "markdown" # Default
if order.get("structure_config"):
config = (
json.loads(order["structure_config"])
if isinstance(order["structure_config"], str)
else order["structure_config"]
)
structure = {"name": order["structure_name"], "config": config}
# Determine output format from structure
ausgabe = config.get("ausgabe", {})
output_format = ausgabe.get("format", "markdown")
# Build prompt
prompt = build_generation_prompt(
order["briefing"], context, profile, contract, structure, semantic=semantic, taxonomy=taxonomy
)
# Generate content
content = call_llm(prompt, model, client_name="content-studio-generate")
# Get current version number
cursor = db.execute(
"SELECT MAX(version_number) as max_v FROM content_versions WHERE order_id = %s", (order_id,)
)
result = cursor.fetchone()
cursor.close()
version_number = (result["max_v"] or 0) + 1
# Save version with correct format
version_id = save_version(order_id, content, version_number, output_format)
# Save sources
save_sources(order_id, context)
# Update status
update_order_status(order_id, "critique")
return {
"success": True,
"order_id": order_id,
"version_id": version_id,
"version_number": version_number,
"content": content,
"sources": [{"source": c["source"], "score": c["score"]} for c in context],
}
except Exception as e:
update_order_status(order_id, "draft")
return {"error": str(e)}
finally:
db.disconnect()