step_semantic_extended.py
- Pfad:
/var/www/scripts/pipeline/step_semantic_extended.py - Namespace: pipeline
- Zeilen: 464 | Größe: 16,766 Bytes
- Geändert: 2025-12-28 13:59:29 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 82
- Dependencies: 100 (25%)
- LOC: 12 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 2
- use json
- use ollama
Klassen 6
-
DuplicateCheckStepclass Zeile 22 -
TextSemanticAnalyzeStepclass Zeile 77 -
TextSemanticStoreStepclass Zeile 170 -
KnowledgeSemanticAnalyzeStepclass Zeile 236 -
KnowledgeSemanticStoreStepclass Zeile 330 -
OntologyStoreStepclass Zeile 402
Code
"""
Extended Semantic Steps for Scientific Pipeline v1.
Implements 6 new step_types for Pipeline #5:
1. DuplicateCheckStep - Hash-based duplicate detection
2. TextSemanticAnalyzeStep - Analyzes HOW text is structured (Textsemantik)
3. TextSemanticStoreStep - Stores text semantics to chunk_semantics (unified)
4. KnowledgeSemanticAnalyzeStep - Analyzes WHAT entities MEAN (Wissenssemantik)
5. KnowledgeSemanticStoreStep - Stores knowledge semantics to entity_semantics (unified)
6. OntologyStoreStep - Stores ontology classifications
Part of Pipeline-Refactoring based on scientific specification.
Updated 2025-12-28: Unified schema - merged chunk_text_semantics into chunk_semantics,
entity_knowledge_semantics into entity_semantics.
"""
import json
import ollama
class DuplicateCheckStep:
"""Step: Check for duplicate documents via content hash."""
def __init__(self, db, progress=None):
self.db = db
self.progress = progress
def execute(self, doc_id: int, content_hash: str) -> dict:
"""
Check if document with same hash already exists.
Args:
doc_id: Current document ID
content_hash: SHA-256 hash of document content
Returns:
dict: {status: 'ok'|'abort', reason: str, duplicate_id: int|None}
"""
if self.progress:
self.progress.update_step("duplicate_check")
self.progress.add_log("Prüfe auf Duplikate...")
if not content_hash:
return {"status": "skip", "reason": "no_hash"}
cursor = self.db.execute(
"""SELECT id, source_path FROM documents
WHERE file_hash = %s AND id != %s AND status = 'done'
LIMIT 1""",
(content_hash, doc_id),
)
existing = cursor.fetchone()
cursor.close()
if existing:
self.db.log(
"INFO",
f"Duplicate found: doc {doc_id} matches {existing['id']} ({existing['source_path']})",
)
if self.progress:
self.progress.add_log(f"Duplikat gefunden: ID {existing['id']}")
return {
"status": "abort",
"reason": "duplicate",
"duplicate_id": existing["id"],
"duplicate_path": existing["source_path"],
}
if self.progress:
self.progress.add_log("Kein Duplikat gefunden")
return {"status": "ok"}
class TextSemanticAnalyzeStep:
"""Step: Analyze HOW text is structured (Textsemantik).
Analyzes each chunk for:
- statement_form: assertion, question, command, conditional
- intent: explain, argue, define, compare, exemplify, warn, instruct
- frame: theoretical, practical, historical, methodological, critical
- is_negated: whether the statement is negated
- discourse_role: thesis, evidence, example, counter, summary, definition
"""
PROMPT_TEMPLATE = """Analysiere den folgenden Text semantisch.
Bestimme:
1. statement_form: Ist es eine Aussage (assertion), Frage (question), Aufforderung (command) oder Bedingung (conditional)?
2. intent: Was ist die Absicht? explain, argue, define, compare, exemplify, warn, instruct
3. frame: Welcher Rahmen? theoretical, practical, historical, methodological, critical
4. is_negated: Wird etwas verneint? true/false
5. discourse_role: Welche Rolle im Diskurs? thesis, evidence, example, counter, summary, definition
Antworte NUR mit gültigem JSON:
{{
"statement_form": "assertion|question|command|conditional",
"intent": "explain|argue|define|compare|exemplify|warn|instruct",
"frame": "theoretical|practical|historical|methodological|critical",
"is_negated": false,
"discourse_role": "thesis|evidence|example|counter|summary|definition"
}}
Text:
{content}"""
def __init__(self, db, progress=None):
self.db = db
self.progress = progress
def execute(self, chunks: list, config: dict) -> list:
"""
Analyze text semantics for each chunk.
Args:
chunks: List of chunk dicts with 'id' and 'content'
config: Step config with 'model' (default: mistral)
Returns:
list: Chunks with added 'text_semantics' field
"""
if self.progress:
self.progress.update_step("text_semantic_analyze")
self.progress.add_log(f"Textsemantik-Analyse für {len(chunks)} Chunks...")
model = config.get("model")
if not model:
raise ValueError("Model muss in config übergeben werden! Step: TextSemanticAnalyzeStep")
analyzed = 0
errors = 0
for chunk in chunks:
try:
prompt = self.PROMPT_TEMPLATE.format(content=chunk["content"][:2000])
response = ollama.generate(
model=model,
prompt=prompt,
options={"num_predict": 200},
)
# Parse JSON response
response_text = response["response"].strip()
# Find JSON in response
start = response_text.find("{")
end = response_text.rfind("}") + 1
if start >= 0 and end > start:
json_str = response_text[start:end]
chunk["text_semantics"] = json.loads(json_str)
chunk["text_semantics"]["model_used"] = model
analyzed += 1
else:
chunk["text_semantics"] = None
errors += 1
except Exception as e:
self.db.log("WARNING", f"Text semantic analysis failed for chunk {chunk['id']}: {e}")
chunk["text_semantics"] = None
errors += 1
if self.progress:
self.progress.add_log(f"Textsemantik: {analyzed} analysiert, {errors} Fehler")
self.db.log("INFO", f"Text semantic analysis: {analyzed} chunks, {errors} errors")
return chunks
class TextSemanticStoreStep:
"""Step: Store text semantics to unified chunk_semantics table."""
def __init__(self, db, progress=None):
self.db = db
self.progress = progress
def execute(self, chunks: list, config: dict) -> dict:
"""
Store text semantics to chunk_semantics table (unified schema).
Args:
chunks: List of chunks with 'text_semantics' field
config: Step config
Returns:
dict: {stored: int, skipped: int}
"""
if self.progress:
self.progress.update_step("text_semantic_store")
self.progress.add_log("Speichere Textsemantik...")
stored = 0
skipped = 0
for chunk in chunks:
if not chunk.get("text_semantics"):
skipped += 1
continue
sem = chunk["text_semantics"]
try:
# Update existing chunk_semantics record with text semantic fields
cursor = self.db.execute(
"""UPDATE chunk_semantics
SET statement_form = %s,
intent = %s,
frame = %s,
is_negated = %s,
discourse_role = %s
WHERE chunk_id = %s""",
(
sem.get("statement_form"),
sem.get("intent"),
sem.get("frame"),
sem.get("is_negated", False),
sem.get("discourse_role"),
chunk["id"],
),
)
self.db.commit()
cursor.close()
stored += 1
except Exception as e:
self.db.log("ERROR", f"Failed to store text semantics for chunk {chunk['id']}: {e}")
skipped += 1
if self.progress:
self.progress.add_log(f"Textsemantik gespeichert: {stored}")
self.db.log("INFO", f"Text semantics stored: {stored}, skipped: {skipped}")
return {"stored": stored, "skipped": skipped}
class KnowledgeSemanticAnalyzeStep:
"""Step: Analyze WHAT entities MEAN in context (Wissenssemantik).
Analyzes each entity for:
- semantic_role: agent, patient, instrument, location, cause, effect
- properties: JSON object with entity properties
- functional_category: method, tool, concept, actor, outcome, process
- context_meaning: Brief explanation of entity's meaning in context
"""
PROMPT_TEMPLATE = """Analysiere die Bedeutung dieser Entität im Kontext.
Entität: {name}
Typ: {entity_type}
Kontext: {context}
Bestimme:
1. semantic_role: Welche Rolle spielt die Entität? agent, patient, instrument, location, cause, effect
2. properties: Welche Eigenschaften hat sie? (als JSON-Objekt)
3. functional_category: Welche Funktion? method, tool, concept, actor, outcome, process
4. context_meaning: Was bedeutet die Entität in diesem Kontext? (1 Satz)
Antworte NUR mit gültigem JSON:
{{
"semantic_role": "agent|patient|instrument|location|cause|effect",
"properties": {{"key": "value"}},
"functional_category": "method|tool|concept|actor|outcome|process",
"context_meaning": "Kurze Erklärung"
}}"""
def __init__(self, db, progress=None):
self.db = db
self.progress = progress
def execute(self, entities: list, config: dict) -> list:
"""
Analyze knowledge semantics for each entity.
Args:
entities: List of entity dicts with 'id', 'name', 'type', 'context'
config: Step config with 'model' (default: mistral)
Returns:
list: Entities with added 'knowledge_semantics' field
"""
if self.progress:
self.progress.update_step("knowledge_semantic_analyze")
self.progress.add_log(f"Wissenssemantik-Analyse für {len(entities)} Entitäten...")
model = config.get("model")
if not model:
raise ValueError("Model muss in config übergeben werden! Step: KnowledgeSemanticAnalyzeStep")
analyzed = 0
errors = 0
for entity in entities:
try:
prompt = self.PROMPT_TEMPLATE.format(
name=entity.get("name", ""),
entity_type=entity.get("type", "unknown"),
context=entity.get("context", "")[:1500],
)
response = ollama.generate(
model=model,
prompt=prompt,
options={"num_predict": 300},
)
# Parse JSON response
response_text = response["response"].strip()
start = response_text.find("{")
end = response_text.rfind("}") + 1
if start >= 0 and end > start:
json_str = response_text[start:end]
entity["knowledge_semantics"] = json.loads(json_str)
entity["knowledge_semantics"]["model_used"] = model
analyzed += 1
else:
entity["knowledge_semantics"] = None
errors += 1
except Exception as e:
self.db.log("WARNING", f"Knowledge semantic analysis failed for entity {entity.get('id')}: {e}")
entity["knowledge_semantics"] = None
errors += 1
if self.progress:
self.progress.add_log(f"Wissenssemantik: {analyzed} analysiert, {errors} Fehler")
self.db.log("INFO", f"Knowledge semantic analysis: {analyzed} entities, {errors} errors")
return entities
class KnowledgeSemanticStoreStep:
"""Step: Store knowledge semantics to unified entity_semantics table."""
def __init__(self, db, progress=None):
self.db = db
self.progress = progress
def execute(self, entities: list, config: dict) -> dict:
"""
Store knowledge semantics to entity_semantics table (unified schema).
Args:
entities: List of entities with 'knowledge_semantics' field
config: Step config
Returns:
dict: {stored: int, skipped: int}
"""
if self.progress:
self.progress.update_step("knowledge_semantic_store")
self.progress.add_log("Speichere Wissenssemantik...")
stored = 0
skipped = 0
for entity in entities:
if not entity.get("knowledge_semantics"):
skipped += 1
continue
sem = entity["knowledge_semantics"]
try:
cursor = self.db.execute(
"""INSERT INTO entity_semantics
(entity_id, chunk_id, context, semantic_role, properties,
functional_category, confidence, model_used)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
chunk_id = VALUES(chunk_id),
context = VALUES(context),
semantic_role = VALUES(semantic_role),
properties = VALUES(properties),
functional_category = VALUES(functional_category),
model_used = VALUES(model_used),
updated_at = NOW()""",
(
entity["id"],
entity.get("chunk_id"),
sem.get("context_meaning"),
sem.get("semantic_role"),
json.dumps(sem.get("properties", {})),
sem.get("functional_category"),
0.8, # Default confidence
sem.get("model_used"),
),
)
self.db.commit()
cursor.close()
stored += 1
except Exception as e:
self.db.log("ERROR", f"Failed to store knowledge semantics for entity {entity.get('id')}: {e}")
skipped += 1
if self.progress:
self.progress.add_log(f"Wissenssemantik gespeichert: {stored}")
self.db.log("INFO", f"Knowledge semantics stored: {stored}, skipped: {skipped}")
return {"stored": stored, "skipped": skipped}
class OntologyStoreStep:
"""Step: Store ontology classifications to database."""
def __init__(self, db, progress=None):
self.db = db
self.progress = progress
def execute(self, classifications: list, config: dict) -> dict:
"""
Store ontology classifications to ontology_classes table.
Args:
classifications: List of classification dicts with:
- entity_id: Entity ID
- class_name: Ontology class name
- parent_class: Optional parent class
- confidence: Classification confidence (0-1)
config: Step config
Returns:
dict: {stored: int, skipped: int}
"""
if self.progress:
self.progress.update_step("ontology_store")
self.progress.add_log(f"Speichere {len(classifications)} Ontologie-Klassifikationen...")
stored = 0
skipped = 0
for cls in classifications:
if not cls.get("entity_id") or not cls.get("class_name"):
skipped += 1
continue
try:
cursor = self.db.execute(
"""INSERT INTO ontology_classes
(entity_id, class_name, parent_class, confidence)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
parent_class = VALUES(parent_class),
confidence = VALUES(confidence)""",
(
cls["entity_id"],
cls["class_name"],
cls.get("parent_class"),
cls.get("confidence", 1.0),
),
)
self.db.commit()
cursor.close()
stored += 1
except Exception as e:
self.db.log("ERROR", f"Failed to store ontology class for entity {cls.get('entity_id')}: {e}")
skipped += 1
if self.progress:
self.progress.add_log(f"Ontologie gespeichert: {stored}")
self.db.log("INFO", f"Ontology classes stored: {stored}, skipped: {skipped}")
return {"stored": stored, "skipped": skipped}