statement_analyzer.py
- Pfad:
/var/www/scripts/pipeline/analyzers/statement_analyzer.py
- Namespace: pipeline
- Zeilen: 270 | Größe: 8,820 Bytes
- Geändert: 2025-12-25 14:02:28 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 79
- Dependencies: 40 (25%)
- LOC: 76 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 90 (10%)
Issues 1
| Zeile |
Typ |
Beschreibung |
| 90 |
magic_number |
Magic Number gefunden: 1000 |
Dependencies 11
- use json
- use re
- use sys
- use time
- use requests
- use config.ANTHROPIC_MODEL
- use config.OLLAMA_CHAT_MODEL
- use config.OLLAMA_HOST
- use db.db
- use protokoll.protokoll
- use entity_extractor.find_entity_by_name
Funktionen 4
-
extract_statements()
Zeile 21
-
store_statements()
Zeile 124
-
analyze_chunk_statements()
Zeile 207
-
analyze_document_statements()
Zeile 233
Code
"""
Statement Analyzer - Extract SPO-triplets (Subject-Predicate-Object statements).
"""
import json
import re
import sys
import time
import requests
sys.path.insert(0, "/var/www/scripts/pipeline")
from config import ANTHROPIC_MODEL, OLLAMA_CHAT_MODEL, OLLAMA_HOST
from db import db
from protokoll import protokoll
from .entity_extractor import find_entity_by_name
def extract_statements(chunk_id: int, text: str, client=None) -> dict:
"""
Extract SPO-triplets (Subject-Predicate-Object statements) from text.
Args:
chunk_id: ID of the chunk being analyzed
text: Text content to extract statements from
client: Optional Anthropic client (falls back to Ollama if None)
Returns:
Dict with statements list, prompt_id, prompt_version, model_used
"""
prompt_data = db.get_prompt_by_use_case("statement_extraction")
prompt_template = prompt_data["content"] if prompt_data else None
prompt_id = prompt_data["id"] if prompt_data else None
prompt_version = prompt_data["version"] if prompt_data else None
if not prompt_template:
db.log("WARNING", "statement_extraction prompt not found in DB, using fallback")
prompt_template = """Extrahiere alle faktischen Aussagen aus dem Text als SPO-Tripel.
Regeln:
- Subject: Eine benannte Entität (Person, Organisation, Konzept, Methode)
- Predicate: Die Beziehung oder Eigenschaft (z.B. "entwickelte", "basiert auf", "ist Teil von")
- Object: Eine Entität oder ein Literal-Wert
Antworte NUR im JSON-Format:
{"statements": [
{"subject": "Name der Subject-Entität", "predicate": "Beziehung", "object": "Name oder Wert", "confidence": 0.0-1.0}
]}
Text:
{{TEXT}}"""
prompt = prompt_template.replace("{{TEXT}}", text[:3000])
try:
start_time = time.time()
tokens_in, tokens_out = 0, 0
model_name = ""
if client:
message = client.messages.create(
model=ANTHROPIC_MODEL,
max_tokens=1500,
messages=[{"role": "user", "content": prompt}],
)
response_text = message.content[0].text
tokens_in = message.usage.input_tokens
tokens_out = message.usage.output_tokens
model_name = ANTHROPIC_MODEL
else:
response = requests.post(
f"{OLLAMA_HOST}/api/generate",
json={
"model": OLLAMA_CHAT_MODEL,
"prompt": prompt,
"stream": False,
"format": "json",
},
timeout=120,
)
response.raise_for_status()
data = response.json()
response_text = data.get("response", "{}")
tokens_in = data.get("prompt_eval_count", 0)
tokens_out = data.get("eval_count", 0)
model_name = f"ollama:{OLLAMA_CHAT_MODEL}"
duration_ms = int((time.time() - start_time) * 1000)
protokoll.log_llm_call(
request=f"[statement_extraction] chunk_id={chunk_id}",
response=response_text[:2000],
model_name=model_name,
tokens_input=tokens_in,
tokens_output=tokens_out,
duration_ms=duration_ms,
status="completed",
)
json_match = re.search(r"\{[\s\S]*\}", response_text)
if json_match:
result = json.loads(json_match.group())
return {
"statements": result.get("statements", []),
"prompt_id": prompt_id,
"prompt_version": prompt_version,
"model_used": model_name,
}
return {"statements": [], "prompt_id": prompt_id, "prompt_version": prompt_version}
except Exception as e:
db.log("ERROR", f"Statement extraction failed for chunk {chunk_id}: {e}")
protokoll.log_llm_call(
request=f"[statement_extraction] chunk_id={chunk_id}",
model_name=ANTHROPIC_MODEL if client else f"ollama:{OLLAMA_CHAT_MODEL}",
status="error",
error_message=str(e),
)
return {"statements": [], "prompt_id": prompt_id, "prompt_version": prompt_version}
def store_statements(
chunk_id: int,
statements: list[dict],
prompt_version: str = None,
model_used: str = None,
) -> int:
"""
Store extracted statements in the database with entity linking.
Args:
chunk_id: ID of the source chunk
statements: List of statement dicts with subject, predicate, object, confidence
prompt_version: Version of the prompt used for extraction
model_used: Model used for extraction
Returns:
Number of successfully stored statements
"""
stored = 0
for stmt in statements:
try:
subject_name = stmt.get("subject", "").strip()
predicate = stmt.get("predicate", "").strip()
object_value = stmt.get("object", "").strip()
confidence = float(stmt.get("confidence", 0.8))
if not subject_name or not predicate:
continue
subject_entity = find_entity_by_name(subject_name)
if not subject_entity:
db.log("DEBUG", f"Subject entity not found: {subject_name}")
continue
subject_entity_id = subject_entity["id"]
object_entity_id = None
object_literal = None
if object_value:
object_entity = find_entity_by_name(object_value)
if object_entity:
object_entity_id = object_entity["id"]
else:
object_literal = object_value
cursor = db.execute(
"""INSERT INTO statements
(subject_entity_id, predicate, object_entity_id, object_literal,
chunk_id, confidence, status, created_at)
VALUES (%s, %s, %s, %s, %s, %s, 'extracted', NOW())""",
(
subject_entity_id,
predicate[:255],
object_entity_id,
object_literal,
chunk_id,
confidence,
),
)
db.commit()
statement_id = cursor.lastrowid
cursor.close()
db.log_provenance(
artifact_type="statement",
artifact_id=statement_id,
source_type="extraction",
source_id=chunk_id,
pipeline_step="statement_extract",
model_used=model_used,
prompt_version=prompt_version,
)
stored += 1
except Exception as e:
db.log("WARNING", f"Failed to store statement: {e}")
return stored
def analyze_chunk_statements(chunk_id: int, content: str, client=None) -> int:
"""
Extract and store statements for a single chunk.
Args:
chunk_id: ID of the chunk
content: Text content of the chunk
client: Optional Anthropic client
Returns:
Number of statements stored
"""
result = extract_statements(chunk_id, content, client)
statements = result.get("statements", [])
if statements:
stored = store_statements(
chunk_id,
statements,
prompt_version=result.get("prompt_version"),
model_used=result.get("model_used"),
)
db.log("INFO", f"Chunk {chunk_id}: {stored}/{len(statements)} statements stored")
return stored
return 0
def analyze_document_statements(document_id: int, client=None, progress=None) -> int:
"""
Extract statements from all chunks of a document.
Args:
document_id: ID of the document
client: Optional Anthropic client
progress: Optional PipelineProgress for logging
Returns:
Total number of statements stored
"""
cursor = db.execute(
"SELECT id, content FROM chunks WHERE document_id = %s ORDER BY chunk_index",
(document_id,),
)
chunks = cursor.fetchall()
cursor.close()
total = len(chunks)
db.log("INFO", f"Extracting statements from {total} chunks")
if progress:
progress.add_log(f"Statements: Extrahiere aus {total} Chunks...")
total_stored = 0
for i, chunk in enumerate(chunks, 1):
if progress:
progress.add_log(f"Statements: Chunk {i}/{total}...")
stored = analyze_chunk_statements(chunk["id"], chunk["content"], client)
total_stored += stored
db.log("INFO", f"Total statements stored: {total_stored}")
if progress:
progress.add_log(f"Statements: {total_stored} Aussagen extrahiert")
return total_stored