{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/var\/www\/scripts\/pipeline\/analyzers\/document_analyzer.py",
"content": "\"\"\"\nDocument Analyzer - Full semantic analysis of documents.\n\"\"\"\n\nimport re\nimport sys\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom db import db\n\nfrom .client import get_anthropic_client\nfrom .entity_extractor import extract_entities_anthropic, extract_entities_ollama, find_entity_by_name, store_entities\nfrom .ontology_classifier import classify_entities\nfrom .relation_extractor import extract_relations\nfrom .semantic_analyzer import analyze_chunks_semantics\nfrom .taxonomy_classifier import classify_taxonomy\n\n\ndef analyze_document(document_id: int, text: str, use_anthropic: bool = True, progress=None) -> dict:\n \"\"\"\n Full semantic analysis of a document.\n Extracts entities, relations, and taxonomy classification.\n \"\"\"\n db.log(\"INFO\", f\"Starting semantic analysis for document {document_id}\")\n\n if progress:\n progress.add_log(\"Analyse: Starte Entity-Extraktion...\")\n\n client = get_anthropic_client() if use_anthropic else None\n\n # Extract entities\n if client:\n entities = extract_entities_anthropic(text, client)\n else:\n entities = extract_entities_ollama(text)\n\n db.log(\"INFO\", f\"Extracted {len(entities)} entities\")\n if progress:\n progress.add_log(f\"Analyse: {len(entities)} Entitäten extrahiert\")\n\n relations = []\n\n # Store entities\n if entities:\n stored = store_entities(document_id, entities)\n db.log(\"INFO\", f\"Stored {stored} entities\")\n if progress:\n progress.add_log(f\"Analyse: {stored} Entitäten gespeichert\")\n\n # Extract relations\n if progress:\n progress.add_log(\"Analyse: Extrahiere Relationen...\")\n relations = extract_relations(text, entities, client)\n db.log(\"INFO\", f\"Extracted {len(relations)} relations\")\n if progress:\n progress.add_log(f\"Analyse: {len(relations)} Relationen extrahiert\")\n\n # Store relations\n for rel in relations:\n try:\n # Use fuzzy matching via canonical_name\n source = find_entity_by_name(rel[\"source\"])\n target = find_entity_by_name(rel[\"target\"])\n\n if source and target:\n cursor = db.execute(\n \"\"\"INSERT IGNORE INTO entity_relations\n (source_entity_id, target_entity_id, relation_type, created_at)\n VALUES (%s, %s, %s, NOW())\"\"\",\n (source[\"id\"], target[\"id\"], rel[\"relation\"]),\n )\n db.commit()\n cursor.close()\n\n cursor = db.execute(\n \"\"\"INSERT IGNORE INTO entity_ontology\n (source_entity_id, target_entity_id, relation_type, direction,\n strength, source_type, source_id, created_at)\n VALUES (%s, %s, %s, 'unidirectional', 1.0, 'document', %s, NOW())\"\"\",\n (source[\"id\"], target[\"id\"], rel[\"relation\"], document_id),\n )\n db.commit()\n cursor.close()\n except Exception as e:\n db.log(\"WARNING\", f\"Failed to store relation: {e}\")\n\n # Taxonomy classification\n if progress:\n progress.add_log(\"Analyse: Klassifiziere Taxonomie...\")\n taxonomy = classify_taxonomy(text, client)\n db.log(\"INFO\", f\"Classified into {len(taxonomy.get('categories', []))} categories\")\n if progress:\n progress.add_log(f\"Analyse: {len(taxonomy.get('categories', []))} Kategorien zugewiesen\")\n\n # Store taxonomy assignments\n for category in taxonomy.get(\"categories\", []):\n try:\n clean_category = re.sub(r\"^\\d+\\.\\s*\", \"\", category).strip()\n\n cursor = db.execute(\"SELECT id FROM taxonomy_terms WHERE name = %s LIMIT 1\", (clean_category,))\n term = cursor.fetchone()\n cursor.close()\n\n if term:\n cursor = db.execute(\n \"\"\"INSERT IGNORE INTO document_taxonomy\n (document_id, taxonomy_term_id, confidence, created_at)\n VALUES (%s, %s, %s, NOW())\"\"\",\n (document_id, term[\"id\"], taxonomy.get(\"confidence\", 0.5)),\n )\n db.commit()\n cursor.close()\n except Exception as e:\n db.log(\"WARNING\", f\"Failed to store taxonomy: {e}\")\n\n # Link entities to chunks\n chunk_entity_links = 0\n if entities:\n chunk_entity_links = link_chunk_entities(document_id)\n db.log(\"INFO\", f\"Created {chunk_entity_links} chunk-entity links\")\n\n # Classify entities to ontology classes\n ontology_classifications = 0\n if entities:\n if progress:\n progress.add_log(\"Analyse: Klassifiziere Entitäten zu Ontologie-Klassen...\")\n ontology_classifications = classify_entities(document_id)\n db.log(\"INFO\", f\"Created {ontology_classifications} entity-ontology classifications\")\n if progress:\n progress.add_log(f\"Analyse: {ontology_classifications} Ontologie-Zuordnungen\")\n\n # Propagate taxonomy to chunks\n chunk_taxonomy_links = 0\n if taxonomy.get(\"categories\"):\n chunk_taxonomy_links = propagate_taxonomy_to_chunks(document_id, taxonomy)\n db.log(\"INFO\", f\"Created {chunk_taxonomy_links} chunk-taxonomy links\")\n if progress:\n progress.add_log(f\"Analyse: {chunk_taxonomy_links} Chunk-Taxonomie-Zuweisungen\")\n\n # Assign taxonomy to entities\n entity_taxonomy_links = 0\n if entities and taxonomy.get(\"categories\"):\n entity_taxonomy_links = assign_entity_taxonomy(document_id, entities, taxonomy)\n db.log(\"INFO\", f\"Created {entity_taxonomy_links} entity-taxonomy links\")\n if progress:\n progress.add_log(f\"Analyse: {entity_taxonomy_links} Entity-Taxonomie-Zuweisungen\")\n\n # Analyze chunk semantics\n chunks_analyzed = analyze_chunks_semantics(document_id, client, progress)\n db.log(\"INFO\", f\"Chunk semantics: {chunks_analyzed} chunks analyzed\")\n\n return {\n \"entities\": len(entities),\n \"relations\": len(relations),\n \"categories\": taxonomy.get(\"categories\", []),\n \"chunk_entity_links\": chunk_entity_links,\n \"ontology_classifications\": ontology_classifications,\n \"chunk_taxonomy_links\": chunk_taxonomy_links,\n \"entity_taxonomy_links\": entity_taxonomy_links,\n \"chunks_semantics\": chunks_analyzed,\n }\n\n\ndef link_chunk_entities(document_id: int) -> int:\n \"\"\"\n Link entities to their source chunks.\n Scans each chunk for entity mentions and populates chunk_entities.\n \"\"\"\n cursor = db.execute(\"SELECT id, name, canonical_name FROM entities\")\n entities = cursor.fetchall()\n cursor.close()\n\n if not entities:\n db.log(\"INFO\", f\"No entities to link for document {document_id}\")\n return 0\n\n cursor = db.execute(\"SELECT id, content FROM chunks WHERE document_id = %s\", (document_id,))\n chunks = cursor.fetchall()\n cursor.close()\n\n linked = 0\n for chunk in chunks:\n chunk_text = chunk[\"content\"].lower()\n\n for entity in entities:\n name_lower = entity[\"name\"].lower()\n canonical = (entity[\"canonical_name\"] or \"\").lower()\n\n mention_count = chunk_text.count(name_lower)\n if canonical and canonical != name_lower:\n mention_count += chunk_text.count(canonical)\n\n if mention_count > 0:\n relevance = min(1.0, mention_count * 0.2)\n\n try:\n cursor = db.execute(\n \"\"\"INSERT INTO chunk_entities\n (chunk_id, entity_id, relevance_score, mention_count)\n VALUES (%s, %s, %s, %s)\n ON DUPLICATE KEY UPDATE\n relevance_score = VALUES(relevance_score),\n mention_count = VALUES(mention_count)\"\"\",\n (chunk[\"id\"], entity[\"id\"], relevance, mention_count),\n )\n db.commit()\n cursor.close()\n linked += 1\n except Exception as e:\n db.log(\"WARNING\", f\"Failed to link chunk {chunk['id']} to entity {entity['id']}: {e}\")\n\n db.log(\"INFO\", f\"Linked {linked} chunk-entity pairs for document {document_id}\")\n return linked\n\n\ndef propagate_taxonomy_to_chunks(document_id: int, taxonomy: dict) -> int:\n \"\"\"\n Propagate document taxonomy to all its chunks.\n Uses the new db.add_chunk_taxonomy() function.\n \"\"\"\n cursor = db.execute(\"SELECT id FROM chunks WHERE document_id = %s\", (document_id,))\n chunks = cursor.fetchall()\n cursor.close()\n\n if not chunks:\n return 0\n\n confidence = taxonomy.get(\"confidence\", 0.7)\n linked = 0\n\n for category in taxonomy.get(\"categories\", []):\n clean_category = re.sub(r\"^\\d+\\.\\s*\", \"\", category).strip()\n\n cursor = db.execute(\"SELECT id FROM taxonomy_terms WHERE name = %s LIMIT 1\", (clean_category,))\n term = cursor.fetchone()\n cursor.close()\n\n if term:\n for chunk in chunks:\n result = db.add_chunk_taxonomy(\n chunk_id=chunk[\"id\"],\n term_id=term[\"id\"],\n confidence=confidence,\n source=\"auto\",\n )\n if result:\n linked += 1\n\n return linked\n\n\ndef assign_entity_taxonomy(document_id: int, entities: list, taxonomy: dict) -> int:\n \"\"\"\n Assign taxonomy terms to extracted entities based on document context.\n Uses the new db.add_entity_taxonomy() function.\n \"\"\"\n cursor = db.execute(\n \"SELECT id, name FROM entities WHERE id IN \"\n \"(SELECT entity_id FROM document_entities WHERE document_id = %s)\",\n (document_id,),\n )\n doc_entities = cursor.fetchall()\n cursor.close()\n\n ... [TRUNCATED-d9f5b391bbe9eafb]",
"numLines": 291,
"startLine": 1,
"totalLines": 291
}
}
}