semantic_analyzer.py

Code Hygiene Score: 80

Keine Issues gefunden.

Dependencies 13

Funktionen 2

Code

"""
Semantic Analyzer - Analyze chunks for summary, keywords, sentiment, topics.
"""

import json
import re
import sys
import time

import requests

sys.path.insert(0, "/var/www/scripts/pipeline")

from config import ANTHROPIC_MODEL, OLLAMA_CHAT_MODEL, OLLAMA_HOST
from constants import BATCH_LIMIT, MS_PER_SECOND, OLLAMA_TIMEOUT
from db import db
from protokoll import protokoll


def analyze_chunk_semantics(chunk_id: int, content: str, client=None) -> dict | None:
    """
    Analyze a single chunk for semantics (summary, keywords, sentiment, topics).
    Stores result in chunk_semantics table.
    """
    prompt_template = db.get_prompt("chunk_semantics")

    if not prompt_template:
        prompt_template = """Analysiere diesen Textabschnitt und extrahiere:

1. **summary**: Eine kurze Zusammenfassung (1-2 Sätze)
2. **keywords**: 3-5 wichtige Schlüsselwörter
3. **sentiment**: Stimmung (positive, negative, neutral, mixed)
4. **topics**: 2-3 Hauptthemen

Antworte NUR im JSON-Format:
{"summary": "...", "keywords": ["...", "..."], "sentiment": "neutral", "topics": ["...", "..."]}

Text:
{{TEXT}}"""

    # Support both {text} and {{TEXT}} placeholders
    prompt = prompt_template.replace("{{TEXT}}", content[:2000]).replace("{text}", content[:2000])

    try:
        start_time = time.time()
        tokens_in, tokens_out = 0, 0
        model_name = ""

        if client:
            message = client.messages.create(
                model=ANTHROPIC_MODEL, max_tokens=500, messages=[{"role": "user", "content": prompt}]
            )
            response_text = message.content[0].text
            tokens_in = message.usage.input_tokens
            tokens_out = message.usage.output_tokens
            model_name = ANTHROPIC_MODEL
        else:
            response = requests.post(
                f"{OLLAMA_HOST}/api/generate",
                json={"model": OLLAMA_CHAT_MODEL, "prompt": prompt, "stream": False, "format": "json"},
                timeout=OLLAMA_TIMEOUT,
            )
            response.raise_for_status()
            data = response.json()
            response_text = data.get("response", "{}")
            tokens_in = data.get("prompt_eval_count", 0)
            tokens_out = data.get("eval_count", 0)
            model_name = f"ollama:{OLLAMA_CHAT_MODEL}"

        duration_ms = int((time.time() - start_time) * MS_PER_SECOND)

        protokoll.log_llm_call(
            request=f"[chunk_semantics] chunk_id={chunk_id}",
            response=response_text[:BATCH_LIMIT],
            model_name=model_name,
            tokens_input=tokens_in,
            tokens_output=tokens_out,
            duration_ms=duration_ms,
            status="completed",
        )

        # Extract first valid JSON object (handle multiple JSON blocks from Ollama)
        json_match = re.search(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", response_text)
        if json_match:
            try:
                result = json.loads(json_match.group())
            except json.JSONDecodeError:
                # Fallback: try to find simpler JSON structure
                simple_match = re.search(r'\{"summary":\s*"[^"]*"[^}]*\}', response_text)
                if simple_match:
                    result = json.loads(simple_match.group())
                else:
                    return None

            cursor = db.execute(
                """INSERT INTO chunk_semantics
                   (chunk_id, summary, keywords, sentiment, topics, language, analyzed_at, analysis_model)
                   VALUES (%s, %s, %s, %s, %s, 'de', NOW(), %s)
                   ON DUPLICATE KEY UPDATE
                   summary = VALUES(summary), keywords = VALUES(keywords),
                   sentiment = VALUES(sentiment), topics = VALUES(topics),
                   analyzed_at = NOW(), analysis_model = VALUES(analysis_model)""",
                (
                    chunk_id,
                    result.get("summary", ""),
                    json.dumps(result.get("keywords", []), ensure_ascii=False),
                    result.get("sentiment", "neutral"),
                    json.dumps(result.get("topics", []), ensure_ascii=False),
                    model_name,
                ),
            )
            db.commit()
            cursor.close()
            return result

        return None

    except Exception as e:
        db.log("ERROR", f"Chunk semantics analysis failed: {e}")
        return None


def analyze_chunks_semantics(document_id: int, client=None, progress=None) -> int:
    """Analyze all chunks of a document for semantics."""
    cursor = db.execute("SELECT id, content FROM chunks WHERE document_id = %s ORDER BY chunk_index", (document_id,))
    chunks = cursor.fetchall()
    cursor.close()

    total = len(chunks)
    db.log("INFO", f"Analyzing semantics for {total} chunks")
    if progress:
        progress.add_log(f"Semantik: Analysiere {total} Chunks...")

    analyzed = 0
    for i, chunk in enumerate(chunks, 1):
        if progress:
            progress.add_log(f"Semantik: Chunk {i}/{total}...")

        result = analyze_chunk_semantics(chunk["id"], chunk["content"], client)
        if result:
            analyzed += 1

    db.log("INFO", f"Analyzed {analyzed}/{total} chunks for semantics")
    if progress:
        progress.add_log(f"Semantik: {analyzed}/{total} Chunks analysiert")
    return analyzed
← Übersicht