document_analyzer.py

Code Hygiene Score: 77

Keine Issues gefunden.

Dependencies 12

Funktionen 4

Code

"""
Document Analyzer - Full semantic analysis of documents.
"""

import re
import sys

sys.path.insert(0, "/var/www/scripts/pipeline")

from db import db

from .client import get_anthropic_client
from .entity_extractor import extract_entities_anthropic, extract_entities_ollama, find_entity_by_name, store_entities
from .ontology_classifier import classify_entities
from .relation_extractor import extract_relations
from .semantic_analyzer import analyze_chunks_semantics
from .taxonomy_classifier import classify_taxonomy


def analyze_document(document_id: int, text: str, use_anthropic: bool = True, progress=None) -> dict:
    """
    Full semantic analysis of a document.
    Extracts entities, relations, and taxonomy classification.
    """
    db.log("INFO", f"Starting semantic analysis for document {document_id}")

    if progress:
        progress.add_log("Analyse: Starte Entity-Extraktion...")

    client = get_anthropic_client() if use_anthropic else None

    # Extract entities
    if client:
        entities = extract_entities_anthropic(text, client)
    else:
        entities = extract_entities_ollama(text)

    db.log("INFO", f"Extracted {len(entities)} entities")
    if progress:
        progress.add_log(f"Analyse: {len(entities)} Entitäten extrahiert")

    relations = []

    # Store entities
    if entities:
        stored = store_entities(document_id, entities)
        db.log("INFO", f"Stored {stored} entities")
        if progress:
            progress.add_log(f"Analyse: {stored} Entitäten gespeichert")

        # Extract relations
        if progress:
            progress.add_log("Analyse: Extrahiere Relationen...")
        relations = extract_relations(text, entities, client)
        db.log("INFO", f"Extracted {len(relations)} relations")
        if progress:
            progress.add_log(f"Analyse: {len(relations)} Relationen extrahiert")

        # Store relations
        for rel in relations:
            try:
                # Use fuzzy matching via canonical_name
                source = find_entity_by_name(rel["source"])
                target = find_entity_by_name(rel["target"])

                if source and target:
                    cursor = db.execute(
                        """INSERT IGNORE INTO entity_relations
                           (source_entity_id, target_entity_id, relation_type, created_at)
                           VALUES (%s, %s, %s, NOW())""",
                        (source["id"], target["id"], rel["relation"]),
                    )
                    db.commit()
                    cursor.close()

                    cursor = db.execute(
                        """INSERT IGNORE INTO entity_ontology
                           (source_entity_id, target_entity_id, relation_type, direction,
                            strength, source_type, source_id, created_at)
                           VALUES (%s, %s, %s, 'unidirectional', 1.0, 'document', %s, NOW())""",
                        (source["id"], target["id"], rel["relation"], document_id),
                    )
                    db.commit()
                    cursor.close()
            except Exception as e:
                db.log("WARNING", f"Failed to store relation: {e}")

    # Taxonomy classification
    if progress:
        progress.add_log("Analyse: Klassifiziere Taxonomie...")
    taxonomy = classify_taxonomy(text, client)
    db.log("INFO", f"Classified into {len(taxonomy.get('categories', []))} categories")
    if progress:
        progress.add_log(f"Analyse: {len(taxonomy.get('categories', []))} Kategorien zugewiesen")

    # Store taxonomy assignments
    for category in taxonomy.get("categories", []):
        try:
            clean_category = re.sub(r"^\d+\.\s*", "", category).strip()

            cursor = db.execute("SELECT id FROM taxonomy_terms WHERE name = %s LIMIT 1", (clean_category,))
            term = cursor.fetchone()
            cursor.close()

            if term:
                cursor = db.execute(
                    """INSERT IGNORE INTO document_taxonomy
                       (document_id, taxonomy_term_id, confidence, created_at)
                       VALUES (%s, %s, %s, NOW())""",
                    (document_id, term["id"], taxonomy.get("confidence", 0.5)),
                )
                db.commit()
                cursor.close()
        except Exception as e:
            db.log("WARNING", f"Failed to store taxonomy: {e}")

    # Link entities to chunks
    chunk_entity_links = 0
    if entities:
        chunk_entity_links = link_chunk_entities(document_id)
        db.log("INFO", f"Created {chunk_entity_links} chunk-entity links")

    # Classify entities to ontology classes
    ontology_classifications = 0
    if entities:
        if progress:
            progress.add_log("Analyse: Klassifiziere Entitäten zu Ontologie-Klassen...")
        ontology_classifications = classify_entities(document_id)
        db.log("INFO", f"Created {ontology_classifications} entity-ontology classifications")
        if progress:
            progress.add_log(f"Analyse: {ontology_classifications} Ontologie-Zuordnungen")

    # Propagate taxonomy to chunks
    chunk_taxonomy_links = 0
    if taxonomy.get("categories"):
        chunk_taxonomy_links = propagate_taxonomy_to_chunks(document_id, taxonomy)
        db.log("INFO", f"Created {chunk_taxonomy_links} chunk-taxonomy links")
        if progress:
            progress.add_log(f"Analyse: {chunk_taxonomy_links} Chunk-Taxonomie-Zuweisungen")

    # Assign taxonomy to entities
    entity_taxonomy_links = 0
    if entities and taxonomy.get("categories"):
        entity_taxonomy_links = assign_entity_taxonomy(document_id, entities, taxonomy)
        db.log("INFO", f"Created {entity_taxonomy_links} entity-taxonomy links")
        if progress:
            progress.add_log(f"Analyse: {entity_taxonomy_links} Entity-Taxonomie-Zuweisungen")

    # Analyze chunk semantics
    chunks_analyzed = analyze_chunks_semantics(document_id, client, progress)
    db.log("INFO", f"Chunk semantics: {chunks_analyzed} chunks analyzed")

    return {
        "entities": len(entities),
        "relations": len(relations),
        "categories": taxonomy.get("categories", []),
        "chunk_entity_links": chunk_entity_links,
        "ontology_classifications": ontology_classifications,
        "chunk_taxonomy_links": chunk_taxonomy_links,
        "entity_taxonomy_links": entity_taxonomy_links,
        "chunks_semantics": chunks_analyzed,
    }


def link_chunk_entities(document_id: int) -> int:
    """
    Link entities to their source chunks.
    Scans each chunk for entity mentions and populates chunk_entities.
    """
    cursor = db.execute("SELECT id, name, canonical_name FROM entities")
    entities = cursor.fetchall()
    cursor.close()

    if not entities:
        db.log("INFO", f"No entities to link for document {document_id}")
        return 0

    cursor = db.execute("SELECT id, content FROM chunks WHERE document_id = %s", (document_id,))
    chunks = cursor.fetchall()
    cursor.close()

    linked = 0
    for chunk in chunks:
        chunk_text = chunk["content"].lower()

        for entity in entities:
            name_lower = entity["name"].lower()
            canonical = (entity["canonical_name"] or "").lower()

            mention_count = chunk_text.count(name_lower)
            if canonical and canonical != name_lower:
                mention_count += chunk_text.count(canonical)

            if mention_count > 0:
                relevance = min(1.0, mention_count * 0.2)

                try:
                    cursor = db.execute(
                        """INSERT INTO chunk_entities
                           (chunk_id, entity_id, relevance_score, mention_count)
                           VALUES (%s, %s, %s, %s)
                           ON DUPLICATE KEY UPDATE
                           relevance_score = VALUES(relevance_score),
                           mention_count = VALUES(mention_count)""",
                        (chunk["id"], entity["id"], relevance, mention_count),
                    )
                    db.commit()
                    cursor.close()
                    linked += 1
                except Exception as e:
                    db.log("WARNING", f"Failed to link chunk {chunk['id']} to entity {entity['id']}: {e}")

    db.log("INFO", f"Linked {linked} chunk-entity pairs for document {document_id}")
    return linked


def propagate_taxonomy_to_chunks(document_id: int, taxonomy: dict) -> int:
    """
    Propagate document taxonomy to all its chunks.
    Uses the new db.add_chunk_taxonomy() function.
    """
    cursor = db.execute("SELECT id FROM chunks WHERE document_id = %s", (document_id,))
    chunks = cursor.fetchall()
    cursor.close()

    if not chunks:
        return 0

    confidence = taxonomy.get("confidence", 0.7)
    linked = 0

    for category in taxonomy.get("categories", []):
        clean_category = re.sub(r"^\d+\.\s*", "", category).strip()

        cursor = db.execute("SELECT id FROM taxonomy_terms WHERE name = %s LIMIT 1", (clean_category,))
        term = cursor.fetchone()
        cursor.close()

        if term:
            for chunk in chunks:
                result = db.add_chunk_taxonomy(
                    chunk_id=chunk["id"],
                    term_id=term["id"],
                    confidence=confidence,
                    source="auto",
                )
                if result:
                    linked += 1

    return linked


def assign_entity_taxonomy(document_id: int, entities: list, taxonomy: dict) -> int:
    """
    Assign taxonomy terms to extracted entities based on document context.
    Uses the new db.add_entity_taxonomy() function.
    """
    cursor = db.execute(
        "SELECT id, name FROM entities WHERE id IN (SELECT entity_id FROM document_entities WHERE document_id = %s)",
        (document_id,),
    )
    doc_entities = cursor.fetchall()
    cursor.close()

    if not doc_entities:
        return 0

    relevance = taxonomy.get("confidence", 0.7)
    linked = 0

    for category in taxonomy.get("categories", []):
        clean_category = re.sub(r"^\d+\.\s*", "", category).strip()

        cursor = db.execute("SELECT id FROM taxonomy_terms WHERE name = %s LIMIT 1", (clean_category,))
        term = cursor.fetchone()
        cursor.close()

        if term:
            for entity in doc_entities:
                result = db.add_entity_taxonomy(
                    entity_id=entity["id"],
                    term_id=term["id"],
                    relevance=relevance,
                    validated=False,
                )
                if result:
                    linked += 1

    return linked
← Übersicht