{
"event": "PreToolUse",
"tool_name": "Write",
"tool_input": {
"file_path": "\/var\/www\/scripts\/pipeline\/analyzers\/statement_analyzer.py",
"content": "\"\"\"\nStatement Analyzer - Extract SPO-triplets (Subject-Predicate-Object statements).\n\"\"\"\n\nimport json\nimport re\nimport sys\nimport time\n\nimport requests\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom config import ANTHROPIC_MODEL, OLLAMA_CHAT_MODEL, OLLAMA_HOST\nfrom db import db\nfrom protokoll import protokoll\n\nfrom .entity_extractor import find_entity_by_name\n\n\ndef extract_statements(chunk_id: int, text: str, client=None) -> dict:\n \"\"\"\n Extract SPO-triplets (Subject-Predicate-Object statements) from text.\n\n Args:\n chunk_id: ID of the chunk being analyzed\n text: Text content to extract statements from\n client: Optional Anthropic client (falls back to Ollama if None)\n\n Returns:\n Dict with statements list, prompt_id, prompt_version, model_used\n \"\"\"\n prompt_data = db.get_prompt_by_use_case(\"statement_extraction\")\n prompt_template = prompt_data[\"content\"] if prompt_data else None\n prompt_id = prompt_data[\"id\"] if prompt_data else None\n prompt_version = prompt_data[\"version\"] if prompt_data else None\n\n if not prompt_template:\n db.log(\"WARNING\", \"statement_extraction prompt not found in DB, using fallback\")\n prompt_template = \"\"\"Extrahiere alle faktischen Aussagen aus dem Text als SPO-Tripel.\n\nRegeln:\n- Subject: Eine benannte Entität (Person, Organisation, Konzept, Methode)\n- Predicate: Die Beziehung oder Eigenschaft (z.B. \"entwickelte\", \"basiert auf\", \"ist Teil von\")\n- Object: Eine Entität oder ein Literal-Wert\n\nAntworte NUR im JSON-Format:\n{\"statements\": [\n {\"subject\": \"Name der Subject-Entität\", \"predicate\": \"Beziehung\", \"object\": \"Name oder Wert\", \"confidence\": 0.0-1.0}\n]}\n\nText:\n{{TEXT}}\"\"\"\n\n prompt = prompt_template.replace(\"{{TEXT}}\", text[:3000])\n\n try:\n start_time = time.time()\n tokens_in, tokens_out = 0, 0\n model_name = \"\"\n\n if client:\n message = client.messages.create(\n model=ANTHROPIC_MODEL,\n max_tokens=1500,\n messages=[{\"role\": \"user\", \"content\": prompt}],\n )\n response_text = message.content[0].text\n tokens_in = message.usage.input_tokens\n tokens_out = message.usage.output_tokens\n model_name = ANTHROPIC_MODEL\n else:\n response = requests.post(\n f\"{OLLAMA_HOST}\/api\/generate\",\n json={\n \"model\": OLLAMA_CHAT_MODEL,\n \"prompt\": prompt,\n \"stream\": False,\n \"format\": \"json\",\n },\n timeout=120,\n )\n response.raise_for_status()\n data = response.json()\n response_text = data.get(\"response\", \"{}\")\n tokens_in = data.get(\"prompt_eval_count\", 0)\n tokens_out = data.get(\"eval_count\", 0)\n model_name = f\"ollama:{OLLAMA_CHAT_MODEL}\"\n\n duration_ms = int((time.time() - start_time) * 1000)\n\n protokoll.log_llm_call(\n request=f\"[statement_extraction] chunk_id={chunk_id}\",\n response=response_text[:2000],\n model_name=model_name,\n tokens_input=tokens_in,\n tokens_output=tokens_out,\n duration_ms=duration_ms,\n status=\"completed\",\n )\n\n json_match = re.search(r\"\\{[\\s\\S]*\\}\", response_text)\n if json_match:\n result = json.loads(json_match.group())\n return {\n \"statements\": result.get(\"statements\", []),\n \"prompt_id\": prompt_id,\n \"prompt_version\": prompt_version,\n \"model_used\": model_name,\n }\n return {\"statements\": [], \"prompt_id\": prompt_id, \"prompt_version\": prompt_version}\n\n except Exception as e:\n db.log(\"ERROR\", f\"Statement extraction failed for chunk {chunk_id}: {e}\")\n protokoll.log_llm_call(\n request=f\"[statement_extraction] chunk_id={chunk_id}\",\n model_name=ANTHROPIC_MODEL if client else f\"ollama:{OLLAMA_CHAT_MODEL}\",\n status=\"error\",\n error_message=str(e),\n )\n return {\"statements\": [], \"prompt_id\": prompt_id, \"prompt_version\": prompt_version}\n\n\ndef store_statements(\n chunk_id: int,\n statements: list[dict],\n prompt_version: str = None,\n model_used: str = None,\n) -> int:\n \"\"\"\n Store extracted statements in the database with entity linking.\n\n Args:\n chunk_id: ID of the source chunk\n statements: List of statement dicts with subject, predicate, object, confidence\n prompt_version: Version of the prompt used for extraction\n model_used: Model used for extraction\n\n Returns:\n Number of successfully stored statements\n \"\"\"\n stored = 0\n\n for stmt in statements:\n try:\n subject_name = stmt.get(\"subject\", \"\").strip()\n predicate = stmt.get(\"predicate\", \"\").strip()\n object_value = stmt.get(\"object\", \"\").strip()\n confidence = float(stmt.get(\"confidence\", 0.8))\n\n if not subject_name or not predicate:\n continue\n\n subject_entity = find_entity_by_name(subject_name)\n if not subject_entity:\n db.log(\"DEBUG\", f\"Subject entity not found: {subject_name}\")\n continue\n\n subject_entity_id = subject_entity[\"id\"]\n\n object_entity_id = None\n object_literal = None\n\n if object_value:\n object_entity = find_entity_by_name(object_value)\n if object_entity:\n object_entity_id = object_entity[\"id\"]\n else:\n object_literal = object_value\n\n cursor = db.execute(\n \"\"\"INSERT INTO statements\n (subject_entity_id, predicate, object_entity_id, object_literal,\n chunk_id, confidence, status, created_at)\n VALUES (%s, %s, %s, %s, %s, %s, 'extracted', NOW())\"\"\",\n (\n subject_entity_id,\n predicate[:255],\n object_entity_id,\n object_literal,\n chunk_id,\n confidence,\n ),\n )\n db.commit()\n statement_id = cursor.lastrowid\n cursor.close()\n\n db.log_provenance(\n artifact_type=\"statement\",\n artifact_id=statement_id,\n source_type=\"extraction\",\n source_id=chunk_id,\n pipeline_step=\"statement_extract\",\n model_used=model_used,\n prompt_version=prompt_version,\n )\n\n stored += 1\n\n except Exception as e:\n db.log(\"WARNING\", f\"Failed to store statement: {e}\")\n\n return stored\n\n\ndef analyze_chunk_statements(chunk_id: int, content: str, client=None) -> int:\n \"\"\"\n Extract and store statements for a single chunk.\n\n Args:\n chunk_id: ID of the chunk\n content: Text content of the chunk\n client: Optional Anthropic client\n\n Returns:\n Number of statements stored\n \"\"\"\n result = extract_statements(chunk_id, content, client)\n statements = result.get(\"statements\", [])\n if statements:\n stored = store_statements(\n chunk_id,\n statements,\n prompt_version=result.get(\"prompt_version\"),\n model_used=result.get(\"model_used\"),\n )\n db.log(\"INFO\", f\"Chunk {chunk_id}: {stored}\/{len(statements)} statements stored\")\n return stored\n return 0\n\n\ndef analyze_document_statements(document_id: int, client=None, progress=None) -> int:\n \"\"\"\n Extract statements from all chunks of a document.\n\n Args:\n document_id: ID of the document\n client: Optional Anthropic client\n progress: Optional PipelineProgress for logging\n\n Returns:\n Total number of statements stored\n \"\"\"\n cursor = db.execute(\n \"SELECT id, content FROM chunks WHERE document_id = %s ORDER BY chunk_index\",\n (document_id,),\n )\n chunks = cursor.fetchall()\n cursor.close()\n\n total = len(chunks)\n db.log(\"INFO\", f\"Extracting statements from {total} chunks\")\n if progress:\n progress.add_log(f\"Statements: Extrahiere aus {total} Chunks...\")\n\n total_stored = 0\n for i, chunk in enumerate(chunks, 1):\n if progress:\n progress.add_log(f\"Statements: Chunk {i}\/{total}...\")\n\n stored = analyze_chunk_statements(chunk[\"id\"], chunk[\"content\"], client)\n total_stored += stored\n\n db.log(\"INFO\", f\"Total statements stored: {total_stored}\")\n if progress:\n progress.add_log(f\"Statements: {total_stored} Aussagen extrahiert\")\n\n return total_stored\n"
}
}
{
"tool_response": {
"type": "create",
"filePath": "\/var\/www\/scripts\/pipeline\/analyzers\/statement_analyzer.py",
"content": "\"\"\"\nStatement Analyzer - Extract SPO-triplets (Subject-Predicate-Object statements).\n\"\"\"\n\nimport json\nimport re\nimport sys\nimport time\n\nimport requests\n\nsys.path.insert(0, \"\/var\/www\/scripts\/pipeline\")\n\nfrom config import ANTHROPIC_MODEL, OLLAMA_CHAT_MODEL, OLLAMA_HOST\nfrom db import db\nfrom protokoll import protokoll\n\nfrom .entity_extractor import find_entity_by_name\n\n\ndef extract_statements(chunk_id: int, text: str, client=None) -> dict:\n \"\"\"\n Extract SPO-triplets (Subject-Predicate-Object statements) from text.\n\n Args:\n chunk_id: ID of the chunk being analyzed\n text: Text content to extract statements from\n client: Optional Anthropic client (falls back to Ollama if None)\n\n Returns:\n Dict with statements list, prompt_id, prompt_version, model_used\n \"\"\"\n prompt_data = db.get_prompt_by_use_case(\"statement_extraction\")\n prompt_template = prompt_data[\"content\"] if prompt_data else None\n prompt_id = prompt_data[\"id\"] if prompt_data else None\n prompt_version = prompt_data[\"version\"] if prompt_data else None\n\n if not prompt_template:\n db.log(\"WARNING\", \"statement_extraction prompt not found in DB, using fallback\")\n prompt_template = \"\"\"Extrahiere alle faktischen Aussagen aus dem Text als SPO-Tripel.\n\nRegeln:\n- Subject: Eine benannte Entität (Person, Organisation, Konzept, Methode)\n- Predicate: Die Beziehung oder Eigenschaft (z.B. \"entwickelte\", \"basiert auf\", \"ist Teil von\")\n- Object: Eine Entität oder ein Literal-Wert\n\nAntworte NUR im JSON-Format:\n{\"statements\": [\n {\"subject\": \"Name der Subject-Entität\", \"predicate\": \"Beziehung\", \"object\": \"Name oder Wert\", \"confidence\": 0.0-1.0}\n]}\n\nText:\n{{TEXT}}\"\"\"\n\n prompt = prompt_template.replace(\"{{TEXT}}\", text[:3000])\n\n try:\n start_time = time.time()\n tokens_in, tokens_out = 0, 0\n model_name = \"\"\n\n if client:\n message = client.messages.create(\n model=ANTHROPIC_MODEL,\n max_tokens=1500,\n messages=[{\"role\": \"user\", \"content\": prompt}],\n )\n response_text = message.content[0].text\n tokens_in = message.usage.input_tokens\n tokens_out = message.usage.output_tokens\n model_name = ANTHROPIC_MODEL\n else:\n response = requests.post(\n f\"{OLLAMA_HOST}\/api\/generate\",\n json={\n \"model\": OLLAMA_CHAT_MODEL,\n \"prompt\": prompt,\n \"stream\": False,\n \"format\": \"json\",\n },\n timeout=120,\n )\n response.raise_for_status()\n data = response.json()\n response_text = data.get(\"response\", \"{}\")\n tokens_in = data.get(\"prompt_eval_count\", 0)\n tokens_out = data.get(\"eval_count\", 0)\n model_name = f\"ollama:{OLLAMA_CHAT_MODEL}\"\n\n duration_ms = int((time.time() - start_time) * 1000)\n\n protokoll.log_llm_call(\n request=f\"[statement_extraction] chunk_id={chunk_id}\",\n response=response_text[:2000],\n model_name=model_name,\n tokens_input=tokens_in,\n tokens_output=tokens_out,\n duration_ms=duration_ms,\n status=\"completed\",\n )\n\n json_match = re.search(r\"\\{[\\s\\S]*\\}\", response_text)\n if json_match:\n result = json.loads(json_match.group())\n return {\n \"statements\": result.get(\"statements\", []),\n \"prompt_id\": prompt_id,\n \"prompt_version\": prompt_version,\n \"model_used\": model_name,\n }\n return {\"statements\": [], \"prompt_id\": prompt_id, \"prompt_version\": prompt_version}\n\n except Exception as e:\n db.log(\"ERROR\", f\"Statement extraction failed for chunk {chunk_id}: {e}\")\n protokoll.log_llm_call(\n request=f\"[statement_extraction] chunk_id={chunk_id}\",\n model_name=ANTHROPIC_MODEL if client else f\"ollama:{OLLAMA_CHAT_MODEL}\",\n status=\"error\",\n error_message=str(e),\n )\n return {\"statements\": [], \"prompt_id\": prompt_id, \"prompt_version\": prompt_version}\n\n\ndef store_statements(\n chunk_id: int,\n statements: list[dict],\n prompt_version: str = None,\n model_used: str = None,\n) -> int:\n \"\"\"\n Store extracted statements in the database with entity linking.\n\n Args:\n chunk_id: ID of the source chunk\n statements: List of statement dicts with subject, predicate, object, confidence\n prompt_version: Version of the prompt used for extraction\n model_used: Model used for extraction\n\n Returns:\n Number of successfully stored statements\n \"\"\"\n stored = 0\n\n for stmt in statements:\n try:\n subject_name = stmt.get(\"subject\", \"\").strip()\n predicate = stmt.get(\"predicate\", \"\").strip()\n object_value = stmt.get(\"object\", \"\").strip()\n confidence = float(stmt.get(\"confidence\", 0.8))\n\n if not subject_name or not predicate:\n continue\n\n subject_entity = find_entity_by_name(subject_name)\n if not subject_entity:\n db.log(\"DEBUG\", f\"Subject entity not found: {subject_name}\")\n continue\n\n subject_entity_id = subject_entity[\"id\"]\n\n object_entity_id = None\n object_literal = None\n\n if object_value:\n object_entity = find_entity_by_name(object_value)\n if object_entity:\n object_entity_id = object_entity[\"id\"]\n else:\n object_literal = object_value\n\n cursor = db.execute(\n \"\"\"INSERT INTO statements\n (subject_entity_id, predicate, object_entity_id, object_literal,\n chunk_id, confidence, status, created_at)\n VALUES (%s, %s, %s, %s, %s, %s, 'extracted', NOW())\"\"\",\n (\n subject_entity_id,\n predicate[:255],\n object_entity_id,\n object_literal,\n chunk_id,\n confidence,\n ),\n )\n db.commit()\n statement_id = cursor.lastrowid\n cursor.close()\n\n db.log_provenance(\n artifact_type=\"statement\",\n artifact_id=statement_id,\n source_type=\"extraction\",\n source_id=chunk_id,\n pipeline_step=\"statement_extract\",\n model_used=model_used,\n prompt_version=prompt_version,\n )\n\n stored += 1\n\n except Exception as e:\n db.log(\"WARNING\", f\"Failed to store statement: {e}\")\n\n return stored\n\n\ndef analyze_chunk_statements(chunk_id: int, content: str, client=None) -> int:\n \"\"\"\n Extract and store statements for a single chunk.\n\n Args:\n chunk_id: ID of the chunk\n content: Text content of the chunk\n client: Optional Anthropic client\n\n Returns:\n Number of statements stored\n \"\"\"\n result = extract_statements(chunk_id, content, client)\n statements = result.get(\"statements\", [])\n if statements:\n stored = store_statements(\n chunk_id,\n statements,\n prompt_version=result.get(\"prompt_version\"),\n model_used=result.get(\"model_used\"),\n )\n db.log(\"INFO\", f\"Chunk {chunk_id}: {stored}\/{len(statements)} statements stored\")\n return stored\n return 0\n\n\ndef analyze_document_statements(document_id: int, client=None, progress=None) -> int:\n \"\"\"\n Extract statements from all chunks of a document.\n\n Args:\n document_id: ID of the document\n client: Optional Anthropic client\n progress: Optional PipelineProgress for logging\n\n Returns:\n Total number of statements stored\n \"\"\"\n cursor = db.execute(\n \"SELECT id, content FROM chunks WHERE document_id = %s ORDER BY chunk_index\",\n (document_id,),\n )\n chunks = cursor.fetchall()\n cursor.close()\n\n total = len(chunks)\n db.log(\"INFO\", f\"Extracting statements from {total} chunks\")\n if progress:\n progress.add_log(f\"Statements: Extrahiere aus {total} Chunks...\")\n\n total_stored = 0\n for i, chunk in enumerate(chunks, 1):\n if progress:\n progress.add_log(f\"Statements: Chunk {i}\/{total}...\")\n\n stored = analyze_chunk_statements(chunk[\"id\"], chunk[\"content\"], client)\n total_stored += stored\n\n db.log(\"INFO\", f\"Total statements stored: {total_stored}\")\n if progress:\n progress.add_log(f\"Statements: {total_stored} Aussagen extrahiert\")\n\n return total_stored\n",
"structuredPatch": [],
"originalFile": null
}
}