step_semantic.py
- Pfad:
/var/www/scripts/pipeline/step_semantic.py - Namespace: pipeline
- Zeilen: 228 | Größe: 7,498 Bytes
- Geändert: 2025-12-27 00:08:22 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 98
- Dependencies: 100 (25%)
- LOC: 90 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 1
- use analyzers.document_analyzer.analyze_document
Klassen 1
-
SemanticStepclass Zeile 14
Code
"""
Semantic Analysis Step Module
Handles Layer 4: Entity extraction, relations, taxonomy, ontology.
This step runs AFTER embedding (Layer 3) and is optional/async.
Documents are searchable without semantic analysis.
Part of Progressive Pipeline Architecture.
"""
from analyzers.document_analyzer import analyze_document
class SemanticStep:
"""Step: Semantic analysis of documents (Layer 4)."""
def __init__(self, db, progress=None):
"""
Initialize semantic step.
Args:
db: Database instance
progress: Optional PipelineProgress instance
"""
self.db = db
self.progress = progress
def execute(self, doc_id, text, use_anthropic=True):
"""
Execute full semantic analysis on a document.
Args:
doc_id: Document database ID
text: Full document text
use_anthropic: Use Anthropic API (True) or Ollama (False)
Returns:
dict: Analysis results with entity/relation counts
"""
if self.progress:
self.progress.update_step("semantic")
self.progress.add_log("Starte semantische Analyse...")
# Update document semantic status
self._update_semantic_status(doc_id, "processing")
try:
# Run full analysis (entities, relations, taxonomy, ontology)
result = analyze_document(
document_id=doc_id,
text=text,
use_anthropic=use_anthropic,
progress=self.progress,
)
# Update status based on result
if result.get("entities", 0) > 0 or result.get("categories", []):
self._update_semantic_status(doc_id, "complete")
else:
self._update_semantic_status(doc_id, "partial")
self.db.log("INFO", f"Semantic analysis complete for doc {doc_id}: {result}")
if self.progress:
self.progress.add_log(
f"Semantik: {result.get('entities', 0)} Entitäten, {result.get('relations', 0)} Relationen"
)
return result
except Exception as e:
self._update_semantic_status(doc_id, "error")
self.db.log("ERROR", f"Semantic analysis failed for doc {doc_id}: {e}")
if self.progress:
self.progress.add_log(f"Semantik-Fehler: {str(e)[:50]}")
raise
def queue(self, doc_id, priority=5):
"""
Queue document for async semantic analysis.
Args:
doc_id: Document database ID
priority: Queue priority (1=highest, 10=lowest)
Returns:
int: Queue entry ID
"""
cursor = self.db.execute(
"""INSERT INTO semantic_queue (document_id, priority, status, created_at)
VALUES (%s, %s, 'pending', NOW())
ON DUPLICATE KEY UPDATE
priority = VALUES(priority),
status = 'pending',
retry_count = 0,
error_message = NULL""",
(doc_id, priority),
)
self.db.commit()
queue_id = cursor.lastrowid
cursor.close()
self.db.log("INFO", f"Queued semantic analysis for doc {doc_id}, priority {priority}")
return queue_id
def process_queue(self, limit=5, use_anthropic=True):
"""
Process pending items from semantic queue.
Args:
limit: Max items to process
use_anthropic: Use Anthropic API
Returns:
dict: Processing summary
"""
# Get pending items ordered by priority
cursor = self.db.execute(
"""SELECT sq.id, sq.document_id, d.source_path
FROM semantic_queue sq
JOIN documents d ON sq.document_id = d.id
WHERE sq.status = 'pending'
ORDER BY sq.priority ASC, sq.created_at ASC
LIMIT %s""",
(limit,),
)
items = cursor.fetchall()
cursor.close()
processed = 0
failed = 0
for item in items:
queue_id = item["id"]
doc_id = item["document_id"]
# Mark as processing
self._update_queue_status(queue_id, "processing")
try:
# Get document text
text = self._get_document_text(doc_id)
if not text:
raise ValueError("No text found for document")
# Run analysis
self.execute(doc_id, text, use_anthropic)
# Mark as completed
self._update_queue_status(queue_id, "completed")
processed += 1
except Exception as e:
# Mark as failed, increment retry
self._update_queue_status(queue_id, "failed", str(e))
failed += 1
return {"processed": processed, "failed": failed, "remaining": self._get_queue_count()}
def _update_semantic_status(self, doc_id, status):
"""Update document semantic_status."""
cursor = self.db.execute(
"UPDATE documents SET semantic_status = %s WHERE id = %s",
(status, doc_id),
)
self.db.commit()
cursor.close()
def _update_queue_status(self, queue_id, status, error=None):
"""Update queue item status."""
if status == "processing":
cursor = self.db.execute(
"UPDATE semantic_queue SET status = %s, started_at = NOW() WHERE id = %s",
(status, queue_id),
)
elif status == "completed":
cursor = self.db.execute(
"UPDATE semantic_queue SET status = %s, completed_at = NOW() WHERE id = %s",
(status, queue_id),
)
elif status == "failed":
cursor = self.db.execute(
"""UPDATE semantic_queue
SET status = %s, error_message = %s, retry_count = retry_count + 1
WHERE id = %s""",
(status, error, queue_id),
)
else:
cursor = self.db.execute(
"UPDATE semantic_queue SET status = %s WHERE id = %s",
(status, queue_id),
)
self.db.commit()
cursor.close()
def _get_document_text(self, doc_id):
"""Get full document text from pages or chunks."""
# Try pages first
cursor = self.db.execute(
"SELECT text_content FROM document_pages WHERE document_id = %s ORDER BY page_number",
(doc_id,),
)
pages = cursor.fetchall()
cursor.close()
if pages:
return "\n\n".join(p["text_content"] for p in pages if p["text_content"])
# Fallback to chunks
cursor = self.db.execute(
"SELECT content FROM chunks WHERE document_id = %s ORDER BY chunk_index",
(doc_id,),
)
chunks = cursor.fetchall()
cursor.close()
if chunks:
return "\n\n".join(c["content"] for c in chunks if c["content"])
return None
def _get_queue_count(self):
"""Get count of pending queue items."""
cursor = self.db.execute("SELECT COUNT(*) as cnt FROM semantic_queue WHERE status = 'pending'")
result = cursor.fetchone()
cursor.close()
return result["cnt"] if result else 0