semantic_analyzer.py
- Pfad:
/var/www/scripts/pipeline/analyzers/semantic_analyzer.py - Namespace: pipeline
- Zeilen: 147 | Größe: 5,365 Bytes
- Geändert: 2025-12-27 15:35:43 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 80
- Dependencies: 20 (25%)
- LOC: 100 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 13
- use json
- use re
- use sys
- use time
- use requests
- use config.ANTHROPIC_MODEL
- use config.OLLAMA_CHAT_MODEL
- use config.OLLAMA_HOST
- use constants.BATCH_LIMIT
- use constants.MS_PER_SECOND
- use constants.OLLAMA_TIMEOUT
- use db.db
- use protokoll.protokoll
Funktionen 2
-
analyze_chunk_semantics()Zeile 20 -
analyze_chunks_semantics()Zeile 123
Code
"""
Semantic Analyzer - Analyze chunks for summary, keywords, sentiment, topics.
"""
import json
import re
import sys
import time
import requests
sys.path.insert(0, "/var/www/scripts/pipeline")
from config import ANTHROPIC_MODEL, OLLAMA_CHAT_MODEL, OLLAMA_HOST
from constants import BATCH_LIMIT, MS_PER_SECOND, OLLAMA_TIMEOUT
from db import db
from protokoll import protokoll
def analyze_chunk_semantics(chunk_id: int, content: str, client=None) -> dict | None:
"""
Analyze a single chunk for semantics (summary, keywords, sentiment, topics).
Stores result in chunk_semantics table.
"""
prompt_template = db.get_prompt("chunk_semantics")
if not prompt_template:
prompt_template = """Analysiere diesen Textabschnitt und extrahiere:
1. **summary**: Eine kurze Zusammenfassung (1-2 Sätze)
2. **keywords**: 3-5 wichtige Schlüsselwörter
3. **sentiment**: Stimmung (positive, negative, neutral, mixed)
4. **topics**: 2-3 Hauptthemen
Antworte NUR im JSON-Format:
{"summary": "...", "keywords": ["...", "..."], "sentiment": "neutral", "topics": ["...", "..."]}
Text:
{{TEXT}}"""
# Support both {text} and {{TEXT}} placeholders
prompt = prompt_template.replace("{{TEXT}}", content[:2000]).replace("{text}", content[:2000])
try:
start_time = time.time()
tokens_in, tokens_out = 0, 0
model_name = ""
if client:
message = client.messages.create(
model=ANTHROPIC_MODEL, max_tokens=500, messages=[{"role": "user", "content": prompt}]
)
response_text = message.content[0].text
tokens_in = message.usage.input_tokens
tokens_out = message.usage.output_tokens
model_name = ANTHROPIC_MODEL
else:
response = requests.post(
f"{OLLAMA_HOST}/api/generate",
json={"model": OLLAMA_CHAT_MODEL, "prompt": prompt, "stream": False, "format": "json"},
timeout=OLLAMA_TIMEOUT,
)
response.raise_for_status()
data = response.json()
response_text = data.get("response", "{}")
tokens_in = data.get("prompt_eval_count", 0)
tokens_out = data.get("eval_count", 0)
model_name = f"ollama:{OLLAMA_CHAT_MODEL}"
duration_ms = int((time.time() - start_time) * MS_PER_SECOND)
protokoll.log_llm_call(
request=f"[chunk_semantics] chunk_id={chunk_id}",
response=response_text[:BATCH_LIMIT],
model_name=model_name,
tokens_input=tokens_in,
tokens_output=tokens_out,
duration_ms=duration_ms,
status="completed",
)
# Extract first valid JSON object (handle multiple JSON blocks from Ollama)
json_match = re.search(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", response_text)
if json_match:
try:
result = json.loads(json_match.group())
except json.JSONDecodeError:
# Fallback: try to find simpler JSON structure
simple_match = re.search(r'\{"summary":\s*"[^"]*"[^}]*\}', response_text)
if simple_match:
result = json.loads(simple_match.group())
else:
return None
cursor = db.execute(
"""INSERT INTO chunk_semantics
(chunk_id, summary, keywords, sentiment, topics, language, analyzed_at, analysis_model)
VALUES (%s, %s, %s, %s, %s, 'de', NOW(), %s)
ON DUPLICATE KEY UPDATE
summary = VALUES(summary), keywords = VALUES(keywords),
sentiment = VALUES(sentiment), topics = VALUES(topics),
analyzed_at = NOW(), analysis_model = VALUES(analysis_model)""",
(
chunk_id,
result.get("summary", ""),
json.dumps(result.get("keywords", []), ensure_ascii=False),
result.get("sentiment", "neutral"),
json.dumps(result.get("topics", []), ensure_ascii=False),
model_name,
),
)
db.commit()
cursor.close()
return result
return None
except Exception as e:
db.log("ERROR", f"Chunk semantics analysis failed: {e}")
return None
def analyze_chunks_semantics(document_id: int, client=None, progress=None) -> int:
"""Analyze all chunks of a document for semantics."""
cursor = db.execute("SELECT id, content FROM chunks WHERE document_id = %s ORDER BY chunk_index", (document_id,))
chunks = cursor.fetchall()
cursor.close()
total = len(chunks)
db.log("INFO", f"Analyzing semantics for {total} chunks")
if progress:
progress.add_log(f"Semantik: Analysiere {total} Chunks...")
analyzed = 0
for i, chunk in enumerate(chunks, 1):
if progress:
progress.add_log(f"Semantik: Chunk {i}/{total}...")
result = analyze_chunk_semantics(chunk["id"], chunk["content"], client)
if result:
analyzed += 1
db.log("INFO", f"Analyzed {analyzed}/{total} chunks for semantics")
if progress:
progress.add_log(f"Semantik: {analyzed}/{total} Chunks analysiert")
return analyzed