document_analyzer.py
- Pfad:
/var/www/scripts/pipeline/analyzers/document_analyzer.py - Namespace: pipeline
- Zeilen: 290 | Größe: 10,741 Bytes
- Geändert: 2025-12-31 03:01:09 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 77
- Dependencies: 30 (25%)
- LOC: 70 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 12
- use re
- use sys
- use db.db
- use client.get_anthropic_client
- use entity_extractor.extract_entities_anthropic
- use entity_extractor.extract_entities_ollama
- use entity_extractor.find_entity_by_name
- use entity_extractor.store_entities
- use ontology_classifier.classify_entities
- use relation_extractor.extract_relations
- use semantic_analyzer.analyze_chunks_semantics
- use taxonomy_classifier.classify_taxonomy
Funktionen 4
-
analyze_document()Zeile 20 -
link_chunk_entities()Zeile 165 -
propagate_taxonomy_to_chunks()Zeile 217 -
assign_entity_taxonomy()Zeile 253
Code
"""
Document Analyzer - Full semantic analysis of documents.
"""
import re
import sys
sys.path.insert(0, "/var/www/scripts/pipeline")
from db import db
from .client import get_anthropic_client
from .entity_extractor import extract_entities_anthropic, extract_entities_ollama, find_entity_by_name, store_entities
from .ontology_classifier import classify_entities
from .relation_extractor import extract_relations
from .semantic_analyzer import analyze_chunks_semantics
from .taxonomy_classifier import classify_taxonomy
def analyze_document(document_id: int, text: str, use_anthropic: bool = True, progress=None) -> dict:
"""
Full semantic analysis of a document.
Extracts entities, relations, and taxonomy classification.
"""
db.log("INFO", f"Starting semantic analysis for document {document_id}")
if progress:
progress.add_log("Analyse: Starte Entity-Extraktion...")
client = get_anthropic_client() if use_anthropic else None
# Extract entities
if client:
entities = extract_entities_anthropic(text, client)
else:
entities = extract_entities_ollama(text)
db.log("INFO", f"Extracted {len(entities)} entities")
if progress:
progress.add_log(f"Analyse: {len(entities)} Entitäten extrahiert")
relations = []
# Store entities
if entities:
stored = store_entities(document_id, entities)
db.log("INFO", f"Stored {stored} entities")
if progress:
progress.add_log(f"Analyse: {stored} Entitäten gespeichert")
# Extract relations
if progress:
progress.add_log("Analyse: Extrahiere Relationen...")
relations = extract_relations(text, entities, client)
db.log("INFO", f"Extracted {len(relations)} relations")
if progress:
progress.add_log(f"Analyse: {len(relations)} Relationen extrahiert")
# Store relations
for rel in relations:
try:
# Use fuzzy matching via canonical_name
source = find_entity_by_name(rel["source"])
target = find_entity_by_name(rel["target"])
if source and target:
cursor = db.execute(
"""INSERT IGNORE INTO entity_relations
(source_entity_id, target_entity_id, relation_type, created_at)
VALUES (%s, %s, %s, NOW())""",
(source["id"], target["id"], rel["relation"]),
)
db.commit()
cursor.close()
cursor = db.execute(
"""INSERT IGNORE INTO entity_ontology
(source_entity_id, target_entity_id, relation_type, direction,
strength, source_type, source_id, created_at)
VALUES (%s, %s, %s, 'unidirectional', 1.0, 'document', %s, NOW())""",
(source["id"], target["id"], rel["relation"], document_id),
)
db.commit()
cursor.close()
except Exception as e:
db.log("WARNING", f"Failed to store relation: {e}")
# Taxonomy classification
if progress:
progress.add_log("Analyse: Klassifiziere Taxonomie...")
taxonomy = classify_taxonomy(text, client)
db.log("INFO", f"Classified into {len(taxonomy.get('categories', []))} categories")
if progress:
progress.add_log(f"Analyse: {len(taxonomy.get('categories', []))} Kategorien zugewiesen")
# Store taxonomy assignments
for category in taxonomy.get("categories", []):
try:
clean_category = re.sub(r"^\d+\.\s*", "", category).strip()
cursor = db.execute("SELECT id FROM taxonomy_terms WHERE name = %s LIMIT 1", (clean_category,))
term = cursor.fetchone()
cursor.close()
if term:
cursor = db.execute(
"""INSERT IGNORE INTO document_taxonomy
(document_id, taxonomy_term_id, confidence, created_at)
VALUES (%s, %s, %s, NOW())""",
(document_id, term["id"], taxonomy.get("confidence", 0.5)),
)
db.commit()
cursor.close()
except Exception as e:
db.log("WARNING", f"Failed to store taxonomy: {e}")
# Link entities to chunks
chunk_entity_links = 0
if entities:
chunk_entity_links = link_chunk_entities(document_id)
db.log("INFO", f"Created {chunk_entity_links} chunk-entity links")
# Classify entities to ontology classes
ontology_classifications = 0
if entities:
if progress:
progress.add_log("Analyse: Klassifiziere Entitäten zu Ontologie-Klassen...")
ontology_classifications = classify_entities(document_id)
db.log("INFO", f"Created {ontology_classifications} entity-ontology classifications")
if progress:
progress.add_log(f"Analyse: {ontology_classifications} Ontologie-Zuordnungen")
# Propagate taxonomy to chunks
chunk_taxonomy_links = 0
if taxonomy.get("categories"):
chunk_taxonomy_links = propagate_taxonomy_to_chunks(document_id, taxonomy)
db.log("INFO", f"Created {chunk_taxonomy_links} chunk-taxonomy links")
if progress:
progress.add_log(f"Analyse: {chunk_taxonomy_links} Chunk-Taxonomie-Zuweisungen")
# Assign taxonomy to entities
entity_taxonomy_links = 0
if entities and taxonomy.get("categories"):
entity_taxonomy_links = assign_entity_taxonomy(document_id, entities, taxonomy)
db.log("INFO", f"Created {entity_taxonomy_links} entity-taxonomy links")
if progress:
progress.add_log(f"Analyse: {entity_taxonomy_links} Entity-Taxonomie-Zuweisungen")
# Analyze chunk semantics
chunks_analyzed = analyze_chunks_semantics(document_id, client, progress)
db.log("INFO", f"Chunk semantics: {chunks_analyzed} chunks analyzed")
return {
"entities": len(entities),
"relations": len(relations),
"categories": taxonomy.get("categories", []),
"chunk_entity_links": chunk_entity_links,
"ontology_classifications": ontology_classifications,
"chunk_taxonomy_links": chunk_taxonomy_links,
"entity_taxonomy_links": entity_taxonomy_links,
"chunks_semantics": chunks_analyzed,
}
def link_chunk_entities(document_id: int) -> int:
"""
Link entities to their source chunks.
Scans each chunk for entity mentions and populates chunk_entities.
"""
cursor = db.execute("SELECT id, name, canonical_name FROM entities")
entities = cursor.fetchall()
cursor.close()
if not entities:
db.log("INFO", f"No entities to link for document {document_id}")
return 0
cursor = db.execute("SELECT id, content FROM chunks WHERE document_id = %s", (document_id,))
chunks = cursor.fetchall()
cursor.close()
linked = 0
for chunk in chunks:
chunk_text = chunk["content"].lower()
for entity in entities:
name_lower = entity["name"].lower()
canonical = (entity["canonical_name"] or "").lower()
mention_count = chunk_text.count(name_lower)
if canonical and canonical != name_lower:
mention_count += chunk_text.count(canonical)
if mention_count > 0:
relevance = min(1.0, mention_count * 0.2)
try:
cursor = db.execute(
"""INSERT INTO chunk_entities
(chunk_id, entity_id, relevance_score, mention_count)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
relevance_score = VALUES(relevance_score),
mention_count = VALUES(mention_count)""",
(chunk["id"], entity["id"], relevance, mention_count),
)
db.commit()
cursor.close()
linked += 1
except Exception as e:
db.log("WARNING", f"Failed to link chunk {chunk['id']} to entity {entity['id']}: {e}")
db.log("INFO", f"Linked {linked} chunk-entity pairs for document {document_id}")
return linked
def propagate_taxonomy_to_chunks(document_id: int, taxonomy: dict) -> int:
"""
Propagate document taxonomy to all its chunks.
Uses the new db.add_chunk_taxonomy() function.
"""
cursor = db.execute("SELECT id FROM chunks WHERE document_id = %s", (document_id,))
chunks = cursor.fetchall()
cursor.close()
if not chunks:
return 0
confidence = taxonomy.get("confidence", 0.7)
linked = 0
for category in taxonomy.get("categories", []):
clean_category = re.sub(r"^\d+\.\s*", "", category).strip()
cursor = db.execute("SELECT id FROM taxonomy_terms WHERE name = %s LIMIT 1", (clean_category,))
term = cursor.fetchone()
cursor.close()
if term:
for chunk in chunks:
result = db.add_chunk_taxonomy(
chunk_id=chunk["id"],
term_id=term["id"],
confidence=confidence,
source="auto",
)
if result:
linked += 1
return linked
def assign_entity_taxonomy(document_id: int, entities: list, taxonomy: dict) -> int:
"""
Assign taxonomy terms to extracted entities based on document context.
Uses the new db.add_entity_taxonomy() function.
"""
cursor = db.execute(
"SELECT id, name FROM entities WHERE id IN (SELECT entity_id FROM document_entities WHERE document_id = %s)",
(document_id,),
)
doc_entities = cursor.fetchall()
cursor.close()
if not doc_entities:
return 0
relevance = taxonomy.get("confidence", 0.7)
linked = 0
for category in taxonomy.get("categories", []):
clean_category = re.sub(r"^\d+\.\s*", "", category).strip()
cursor = db.execute("SELECT id FROM taxonomy_terms WHERE name = %s LIMIT 1", (clean_category,))
term = cursor.fetchone()
cursor.close()
if term:
for entity in doc_entities:
result = db.add_entity_taxonomy(
entity_id=entity["id"],
term_id=term["id"],
relevance=relevance,
validated=False,
)
if result:
linked += 1
return linked