step_semantic_extended.py

Code Hygiene Score: 82

Keine Issues gefunden.

Dependencies 2

Klassen 6

Code

"""
Extended Semantic Steps for Scientific Pipeline v1.

Implements 6 new step_types for Pipeline #5:
1. DuplicateCheckStep - Hash-based duplicate detection
2. TextSemanticAnalyzeStep - Analyzes HOW text is structured (Textsemantik)
3. TextSemanticStoreStep - Stores text semantics to chunk_semantics (unified)
4. KnowledgeSemanticAnalyzeStep - Analyzes WHAT entities MEAN (Wissenssemantik)
5. KnowledgeSemanticStoreStep - Stores knowledge semantics to entity_semantics (unified)
6. OntologyStoreStep - Stores ontology classifications

Part of Pipeline-Refactoring based on scientific specification.
Updated 2025-12-28: Unified schema - merged chunk_text_semantics into chunk_semantics,
                    entity_knowledge_semantics into entity_semantics.
"""

import json

import ollama


class DuplicateCheckStep:
    """Step: Check for duplicate documents via content hash."""

    def __init__(self, db, progress=None):
        self.db = db
        self.progress = progress

    def execute(self, doc_id: int, content_hash: str) -> dict:
        """
        Check if document with same hash already exists.

        Args:
            doc_id: Current document ID
            content_hash: SHA-256 hash of document content

        Returns:
            dict: {status: 'ok'|'abort', reason: str, duplicate_id: int|None}
        """
        if self.progress:
            self.progress.update_step("duplicate_check")
            self.progress.add_log("Prüfe auf Duplikate...")

        if not content_hash:
            return {"status": "skip", "reason": "no_hash"}

        cursor = self.db.execute(
            """SELECT id, source_path FROM documents
               WHERE file_hash = %s AND id != %s AND status = 'done'
               LIMIT 1""",
            (content_hash, doc_id),
        )
        existing = cursor.fetchone()
        cursor.close()

        if existing:
            self.db.log(
                "INFO",
                f"Duplicate found: doc {doc_id} matches {existing['id']} ({existing['source_path']})",
            )
            if self.progress:
                self.progress.add_log(f"Duplikat gefunden: ID {existing['id']}")

            return {
                "status": "abort",
                "reason": "duplicate",
                "duplicate_id": existing["id"],
                "duplicate_path": existing["source_path"],
            }

        if self.progress:
            self.progress.add_log("Kein Duplikat gefunden")

        return {"status": "ok"}


class TextSemanticAnalyzeStep:
    """Step: Analyze HOW text is structured (Textsemantik).

    Analyzes each chunk for:
    - statement_form: assertion, question, command, conditional
    - intent: explain, argue, define, compare, exemplify, warn, instruct
    - frame: theoretical, practical, historical, methodological, critical
    - is_negated: whether the statement is negated
    - discourse_role: thesis, evidence, example, counter, summary, definition
    """

    PROMPT_TEMPLATE = """Analysiere den folgenden Text semantisch.

Bestimme:
1. statement_form: Ist es eine Aussage (assertion), Frage (question), Aufforderung (command) oder Bedingung (conditional)?
2. intent: Was ist die Absicht? explain, argue, define, compare, exemplify, warn, instruct
3. frame: Welcher Rahmen? theoretical, practical, historical, methodological, critical
4. is_negated: Wird etwas verneint? true/false
5. discourse_role: Welche Rolle im Diskurs? thesis, evidence, example, counter, summary, definition

Antworte NUR mit gültigem JSON:
{{
  "statement_form": "assertion|question|command|conditional",
  "intent": "explain|argue|define|compare|exemplify|warn|instruct",
  "frame": "theoretical|practical|historical|methodological|critical",
  "is_negated": false,
  "discourse_role": "thesis|evidence|example|counter|summary|definition"
}}

Text:
{content}"""

    def __init__(self, db, progress=None):
        self.db = db
        self.progress = progress

    def execute(self, chunks: list, config: dict) -> list:
        """
        Analyze text semantics for each chunk.

        Args:
            chunks: List of chunk dicts with 'id' and 'content'
            config: Step config with 'model' (default: mistral)

        Returns:
            list: Chunks with added 'text_semantics' field
        """
        if self.progress:
            self.progress.update_step("text_semantic_analyze")
            self.progress.add_log(f"Textsemantik-Analyse für {len(chunks)} Chunks...")

        model = config.get("model")
        if not model:
            raise ValueError("Model muss in config übergeben werden! Step: TextSemanticAnalyzeStep")
        analyzed = 0
        errors = 0

        for chunk in chunks:
            try:
                prompt = self.PROMPT_TEMPLATE.format(content=chunk["content"][:2000])

                response = ollama.generate(
                    model=model,
                    prompt=prompt,
                    options={"num_predict": 200},
                )

                # Parse JSON response
                response_text = response["response"].strip()
                # Find JSON in response
                start = response_text.find("{")
                end = response_text.rfind("}") + 1
                if start >= 0 and end > start:
                    json_str = response_text[start:end]
                    chunk["text_semantics"] = json.loads(json_str)
                    chunk["text_semantics"]["model_used"] = model
                    analyzed += 1
                else:
                    chunk["text_semantics"] = None
                    errors += 1

            except Exception as e:
                self.db.log("WARNING", f"Text semantic analysis failed for chunk {chunk['id']}: {e}")
                chunk["text_semantics"] = None
                errors += 1

        if self.progress:
            self.progress.add_log(f"Textsemantik: {analyzed} analysiert, {errors} Fehler")

        self.db.log("INFO", f"Text semantic analysis: {analyzed} chunks, {errors} errors")
        return chunks


class TextSemanticStoreStep:
    """Step: Store text semantics to unified chunk_semantics table."""

    def __init__(self, db, progress=None):
        self.db = db
        self.progress = progress

    def execute(self, chunks: list, config: dict) -> dict:
        """
        Store text semantics to chunk_semantics table (unified schema).

        Args:
            chunks: List of chunks with 'text_semantics' field
            config: Step config

        Returns:
            dict: {stored: int, skipped: int}
        """
        if self.progress:
            self.progress.update_step("text_semantic_store")
            self.progress.add_log("Speichere Textsemantik...")

        stored = 0
        skipped = 0

        for chunk in chunks:
            if not chunk.get("text_semantics"):
                skipped += 1
                continue

            sem = chunk["text_semantics"]

            try:
                # Update existing chunk_semantics record with text semantic fields
                cursor = self.db.execute(
                    """UPDATE chunk_semantics
                       SET statement_form = %s,
                           intent = %s,
                           frame = %s,
                           is_negated = %s,
                           discourse_role = %s
                       WHERE chunk_id = %s""",
                    (
                        sem.get("statement_form"),
                        sem.get("intent"),
                        sem.get("frame"),
                        sem.get("is_negated", False),
                        sem.get("discourse_role"),
                        chunk["id"],
                    ),
                )
                self.db.commit()
                cursor.close()
                stored += 1

            except Exception as e:
                self.db.log("ERROR", f"Failed to store text semantics for chunk {chunk['id']}: {e}")
                skipped += 1

        if self.progress:
            self.progress.add_log(f"Textsemantik gespeichert: {stored}")

        self.db.log("INFO", f"Text semantics stored: {stored}, skipped: {skipped}")
        return {"stored": stored, "skipped": skipped}


class KnowledgeSemanticAnalyzeStep:
    """Step: Analyze WHAT entities MEAN in context (Wissenssemantik).

    Analyzes each entity for:
    - semantic_role: agent, patient, instrument, location, cause, effect
    - properties: JSON object with entity properties
    - functional_category: method, tool, concept, actor, outcome, process
    - context_meaning: Brief explanation of entity's meaning in context
    """

    PROMPT_TEMPLATE = """Analysiere die Bedeutung dieser Entität im Kontext.

Entität: {name}
Typ: {entity_type}
Kontext: {context}

Bestimme:
1. semantic_role: Welche Rolle spielt die Entität? agent, patient, instrument, location, cause, effect
2. properties: Welche Eigenschaften hat sie? (als JSON-Objekt)
3. functional_category: Welche Funktion? method, tool, concept, actor, outcome, process
4. context_meaning: Was bedeutet die Entität in diesem Kontext? (1 Satz)

Antworte NUR mit gültigem JSON:
{{
  "semantic_role": "agent|patient|instrument|location|cause|effect",
  "properties": {{"key": "value"}},
  "functional_category": "method|tool|concept|actor|outcome|process",
  "context_meaning": "Kurze Erklärung"
}}"""

    def __init__(self, db, progress=None):
        self.db = db
        self.progress = progress

    def execute(self, entities: list, config: dict) -> list:
        """
        Analyze knowledge semantics for each entity.

        Args:
            entities: List of entity dicts with 'id', 'name', 'type', 'context'
            config: Step config with 'model' (default: mistral)

        Returns:
            list: Entities with added 'knowledge_semantics' field
        """
        if self.progress:
            self.progress.update_step("knowledge_semantic_analyze")
            self.progress.add_log(f"Wissenssemantik-Analyse für {len(entities)} Entitäten...")

        model = config.get("model")
        if not model:
            raise ValueError("Model muss in config übergeben werden! Step: KnowledgeSemanticAnalyzeStep")
        analyzed = 0
        errors = 0

        for entity in entities:
            try:
                prompt = self.PROMPT_TEMPLATE.format(
                    name=entity.get("name", ""),
                    entity_type=entity.get("type", "unknown"),
                    context=entity.get("context", "")[:1500],
                )

                response = ollama.generate(
                    model=model,
                    prompt=prompt,
                    options={"num_predict": 300},
                )

                # Parse JSON response
                response_text = response["response"].strip()
                start = response_text.find("{")
                end = response_text.rfind("}") + 1
                if start >= 0 and end > start:
                    json_str = response_text[start:end]
                    entity["knowledge_semantics"] = json.loads(json_str)
                    entity["knowledge_semantics"]["model_used"] = model
                    analyzed += 1
                else:
                    entity["knowledge_semantics"] = None
                    errors += 1

            except Exception as e:
                self.db.log("WARNING", f"Knowledge semantic analysis failed for entity {entity.get('id')}: {e}")
                entity["knowledge_semantics"] = None
                errors += 1

        if self.progress:
            self.progress.add_log(f"Wissenssemantik: {analyzed} analysiert, {errors} Fehler")

        self.db.log("INFO", f"Knowledge semantic analysis: {analyzed} entities, {errors} errors")
        return entities


class KnowledgeSemanticStoreStep:
    """Step: Store knowledge semantics to unified entity_semantics table."""

    def __init__(self, db, progress=None):
        self.db = db
        self.progress = progress

    def execute(self, entities: list, config: dict) -> dict:
        """
        Store knowledge semantics to entity_semantics table (unified schema).

        Args:
            entities: List of entities with 'knowledge_semantics' field
            config: Step config

        Returns:
            dict: {stored: int, skipped: int}
        """
        if self.progress:
            self.progress.update_step("knowledge_semantic_store")
            self.progress.add_log("Speichere Wissenssemantik...")

        stored = 0
        skipped = 0

        for entity in entities:
            if not entity.get("knowledge_semantics"):
                skipped += 1
                continue

            sem = entity["knowledge_semantics"]

            try:
                cursor = self.db.execute(
                    """INSERT INTO entity_semantics
                       (entity_id, chunk_id, context, semantic_role, properties,
                        functional_category, confidence, model_used)
                       VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                       ON DUPLICATE KEY UPDATE
                       chunk_id = VALUES(chunk_id),
                       context = VALUES(context),
                       semantic_role = VALUES(semantic_role),
                       properties = VALUES(properties),
                       functional_category = VALUES(functional_category),
                       model_used = VALUES(model_used),
                       updated_at = NOW()""",
                    (
                        entity["id"],
                        entity.get("chunk_id"),
                        sem.get("context_meaning"),
                        sem.get("semantic_role"),
                        json.dumps(sem.get("properties", {})),
                        sem.get("functional_category"),
                        0.8,  # Default confidence
                        sem.get("model_used"),
                    ),
                )
                self.db.commit()
                cursor.close()
                stored += 1

            except Exception as e:
                self.db.log("ERROR", f"Failed to store knowledge semantics for entity {entity.get('id')}: {e}")
                skipped += 1

        if self.progress:
            self.progress.add_log(f"Wissenssemantik gespeichert: {stored}")

        self.db.log("INFO", f"Knowledge semantics stored: {stored}, skipped: {skipped}")
        return {"stored": stored, "skipped": skipped}


class OntologyStoreStep:
    """Step: Store ontology classifications to database."""

    def __init__(self, db, progress=None):
        self.db = db
        self.progress = progress

    def execute(self, classifications: list, config: dict) -> dict:
        """
        Store ontology classifications to ontology_classes table.

        Args:
            classifications: List of classification dicts with:
                - entity_id: Entity ID
                - class_name: Ontology class name
                - parent_class: Optional parent class
                - confidence: Classification confidence (0-1)
            config: Step config

        Returns:
            dict: {stored: int, skipped: int}
        """
        if self.progress:
            self.progress.update_step("ontology_store")
            self.progress.add_log(f"Speichere {len(classifications)} Ontologie-Klassifikationen...")

        stored = 0
        skipped = 0

        for cls in classifications:
            if not cls.get("entity_id") or not cls.get("class_name"):
                skipped += 1
                continue

            try:
                cursor = self.db.execute(
                    """INSERT INTO ontology_classes
                       (entity_id, class_name, parent_class, confidence)
                       VALUES (%s, %s, %s, %s)
                       ON DUPLICATE KEY UPDATE
                       parent_class = VALUES(parent_class),
                       confidence = VALUES(confidence)""",
                    (
                        cls["entity_id"],
                        cls["class_name"],
                        cls.get("parent_class"),
                        cls.get("confidence", 1.0),
                    ),
                )
                self.db.commit()
                cursor.close()
                stored += 1

            except Exception as e:
                self.db.log("ERROR", f"Failed to store ontology class for entity {cls.get('entity_id')}: {e}")
                skipped += 1

        if self.progress:
            self.progress.add_log(f"Ontologie gespeichert: {stored}")

        self.db.log("INFO", f"Ontology classes stored: {stored}, skipped: {skipped}")
        return {"stored": stored, "skipped": skipped}
← Übersicht Graph