statement_analyzer.py

Code Hygiene Score: 79

Issues 1

Zeile Typ Beschreibung
90 magic_number Magic Number gefunden: 1000

Dependencies 11

Funktionen 4

Code

"""
Statement Analyzer - Extract SPO-triplets (Subject-Predicate-Object statements).
"""

import json
import re
import sys
import time

import requests

sys.path.insert(0, "/var/www/scripts/pipeline")

from config import ANTHROPIC_MODEL, OLLAMA_CHAT_MODEL, OLLAMA_HOST
from db import db
from protokoll import protokoll

from .entity_extractor import find_entity_by_name


def extract_statements(chunk_id: int, text: str, client=None) -> dict:
    """
    Extract SPO-triplets (Subject-Predicate-Object statements) from text.

    Args:
        chunk_id: ID of the chunk being analyzed
        text: Text content to extract statements from
        client: Optional Anthropic client (falls back to Ollama if None)

    Returns:
        Dict with statements list, prompt_id, prompt_version, model_used
    """
    prompt_data = db.get_prompt_by_use_case("statement_extraction")
    prompt_template = prompt_data["content"] if prompt_data else None
    prompt_id = prompt_data["id"] if prompt_data else None
    prompt_version = prompt_data["version"] if prompt_data else None

    if not prompt_template:
        db.log("WARNING", "statement_extraction prompt not found in DB, using fallback")
        prompt_template = """Extrahiere alle faktischen Aussagen aus dem Text als SPO-Tripel.

Regeln:
- Subject: Eine benannte Entität (Person, Organisation, Konzept, Methode)
- Predicate: Die Beziehung oder Eigenschaft (z.B. "entwickelte", "basiert auf", "ist Teil von")
- Object: Eine Entität oder ein Literal-Wert

Antworte NUR im JSON-Format:
{"statements": [
  {"subject": "Name der Subject-Entität", "predicate": "Beziehung", "object": "Name oder Wert", "confidence": 0.0-1.0}
]}

Text:
{{TEXT}}"""

    prompt = prompt_template.replace("{{TEXT}}", text[:3000])

    try:
        start_time = time.time()
        tokens_in, tokens_out = 0, 0
        model_name = ""

        if client:
            message = client.messages.create(
                model=ANTHROPIC_MODEL,
                max_tokens=1500,
                messages=[{"role": "user", "content": prompt}],
            )
            response_text = message.content[0].text
            tokens_in = message.usage.input_tokens
            tokens_out = message.usage.output_tokens
            model_name = ANTHROPIC_MODEL
        else:
            response = requests.post(
                f"{OLLAMA_HOST}/api/generate",
                json={
                    "model": OLLAMA_CHAT_MODEL,
                    "prompt": prompt,
                    "stream": False,
                    "format": "json",
                },
                timeout=120,
            )
            response.raise_for_status()
            data = response.json()
            response_text = data.get("response", "{}")
            tokens_in = data.get("prompt_eval_count", 0)
            tokens_out = data.get("eval_count", 0)
            model_name = f"ollama:{OLLAMA_CHAT_MODEL}"

        duration_ms = int((time.time() - start_time) * 1000)

        protokoll.log_llm_call(
            request=f"[statement_extraction] chunk_id={chunk_id}",
            response=response_text[:2000],
            model_name=model_name,
            tokens_input=tokens_in,
            tokens_output=tokens_out,
            duration_ms=duration_ms,
            status="completed",
        )

        json_match = re.search(r"\{[\s\S]*\}", response_text)
        if json_match:
            result = json.loads(json_match.group())
            return {
                "statements": result.get("statements", []),
                "prompt_id": prompt_id,
                "prompt_version": prompt_version,
                "model_used": model_name,
            }
        return {"statements": [], "prompt_id": prompt_id, "prompt_version": prompt_version}

    except Exception as e:
        db.log("ERROR", f"Statement extraction failed for chunk {chunk_id}: {e}")
        protokoll.log_llm_call(
            request=f"[statement_extraction] chunk_id={chunk_id}",
            model_name=ANTHROPIC_MODEL if client else f"ollama:{OLLAMA_CHAT_MODEL}",
            status="error",
            error_message=str(e),
        )
        return {"statements": [], "prompt_id": prompt_id, "prompt_version": prompt_version}


def store_statements(
    chunk_id: int,
    statements: list[dict],
    prompt_version: str = None,
    model_used: str = None,
) -> int:
    """
    Store extracted statements in the database with entity linking.

    Args:
        chunk_id: ID of the source chunk
        statements: List of statement dicts with subject, predicate, object, confidence
        prompt_version: Version of the prompt used for extraction
        model_used: Model used for extraction

    Returns:
        Number of successfully stored statements
    """
    stored = 0

    for stmt in statements:
        try:
            subject_name = stmt.get("subject", "").strip()
            predicate = stmt.get("predicate", "").strip()
            object_value = stmt.get("object", "").strip()
            confidence = float(stmt.get("confidence", 0.8))

            if not subject_name or not predicate:
                continue

            subject_entity = find_entity_by_name(subject_name)
            if not subject_entity:
                db.log("DEBUG", f"Subject entity not found: {subject_name}")
                continue

            subject_entity_id = subject_entity["id"]

            object_entity_id = None
            object_literal = None

            if object_value:
                object_entity = find_entity_by_name(object_value)
                if object_entity:
                    object_entity_id = object_entity["id"]
                else:
                    object_literal = object_value

            cursor = db.execute(
                """INSERT INTO statements
                   (subject_entity_id, predicate, object_entity_id, object_literal,
                    chunk_id, confidence, status, created_at)
                   VALUES (%s, %s, %s, %s, %s, %s, 'extracted', NOW())""",
                (
                    subject_entity_id,
                    predicate[:255],
                    object_entity_id,
                    object_literal,
                    chunk_id,
                    confidence,
                ),
            )
            db.commit()
            statement_id = cursor.lastrowid
            cursor.close()

            db.log_provenance(
                artifact_type="statement",
                artifact_id=statement_id,
                source_type="extraction",
                source_id=chunk_id,
                pipeline_step="statement_extract",
                model_used=model_used,
                prompt_version=prompt_version,
            )

            stored += 1

        except Exception as e:
            db.log("WARNING", f"Failed to store statement: {e}")

    return stored


def analyze_chunk_statements(chunk_id: int, content: str, client=None) -> int:
    """
    Extract and store statements for a single chunk.

    Args:
        chunk_id: ID of the chunk
        content: Text content of the chunk
        client: Optional Anthropic client

    Returns:
        Number of statements stored
    """
    result = extract_statements(chunk_id, content, client)
    statements = result.get("statements", [])
    if statements:
        stored = store_statements(
            chunk_id,
            statements,
            prompt_version=result.get("prompt_version"),
            model_used=result.get("model_used"),
        )
        db.log("INFO", f"Chunk {chunk_id}: {stored}/{len(statements)} statements stored")
        return stored
    return 0


def analyze_document_statements(document_id: int, client=None, progress=None) -> int:
    """
    Extract statements from all chunks of a document.

    Args:
        document_id: ID of the document
        client: Optional Anthropic client
        progress: Optional PipelineProgress for logging

    Returns:
        Total number of statements stored
    """
    cursor = db.execute(
        "SELECT id, content FROM chunks WHERE document_id = %s ORDER BY chunk_index",
        (document_id,),
    )
    chunks = cursor.fetchall()
    cursor.close()

    total = len(chunks)
    db.log("INFO", f"Extracting statements from {total} chunks")
    if progress:
        progress.add_log(f"Statements: Extrahiere aus {total} Chunks...")

    total_stored = 0
    for i, chunk in enumerate(chunks, 1):
        if progress:
            progress.add_log(f"Statements: Chunk {i}/{total}...")

        stored = analyze_chunk_statements(chunk["id"], chunk["content"], client)
        total_stored += stored

    db.log("INFO", f"Total statements stored: {total_stored}")
    if progress:
        progress.add_log(f"Statements: {total_stored} Aussagen extrahiert")

    return total_stored
← Übersicht