Protokoll #4038
| ID | 4038 |
|---|---|
| Zeitstempel | 2025-12-22 09:04:29.473396 |
| Client | root |
| IP | Warning: Undefined array key "request_ip" in /var/www/dev.campus.systemische-tools.de/src/View/protokoll/show.php on line 13 Deprecated: htmlspecialchars(): Passing null to parameter #1 ($string) of type string is deprecated in /var/www/dev.campus.systemische-tools.de/src/View/protokoll/show.php on line 13 |
| Modell | claude-sonnet-4-20250514 |
| Status | completed |
| Tokens | 1,608 (Input: 0, Output: 0) |
| Dauer | 96 ms |
| Request-Zeit | Warning: Undefined array key "request_timestamp" in /var/www/dev.campus.systemische-tools.de/src/View/protokoll/show.php on line 18 |
| Response-Zeit | - |
Warning: Undefined array key "error_message" in /var/www/dev.campus.systemische-tools.de/src/View/protokoll/show.php on line 22
Request
{
"event": "PreToolUse",
"tool_name": "Read",
"tool_input": {
"file_path": "\/opt\/scripts\/pipeline\/chunk.py"
}
}
Response
{
"tool_response": {
"type": "text",
"file": {
"filePath": "\/opt\/scripts\/pipeline\/extract.py",
"content": "\"\"\"\nText extraction for KI-System Pipeline\nExtracts text from PDF, PPTX, DOCX, MD, TXT files.\n\"\"\"\n\nimport re\nfrom pathlib import Path\n\nfrom config import OCR_ENABLED, OCR_LANGUAGE\nfrom db import db\n\n\ndef extract_pdf(file_path):\n \"\"\"Extract text from PDF using PyMuPDF with optional OCR.\"\"\"\n import fitz # PyMuPDF\n\n doc = fitz.open(file_path)\n pages = []\n\n for page_num, page in enumerate(doc):\n text = page.get_text()\n\n # If page has little text and OCR is enabled, try OCR\n if OCR_ENABLED and len(text.strip()) < 50:\n # Get page as image and OCR it\n pix = page.get_pixmap(dpi=300)\n img_data = pix.tobytes(\"png\")\n\n try:\n import io\n\n import pytesseract\n from PIL import Image\n\n img = Image.open(io.BytesIO(img_data))\n ocr_text = pytesseract.image_to_string(img, lang=OCR_LANGUAGE)\n if len(ocr_text.strip()) > len(text.strip()):\n text = ocr_text\n except Exception as e:\n db.log(\"WARNING\", f\"OCR failed for page {page_num}: {e}\")\n\n pages.append({\"page\": page_num + 1, \"text\": text.strip(), \"images\": len(page.get_images())})\n\n doc.close()\n return pages\n\n\ndef extract_pptx(file_path):\n \"\"\"Extract text from PowerPoint including speaker notes.\"\"\"\n from pptx import Presentation\n\n prs = Presentation(file_path)\n slides = []\n\n for slide_num, slide in enumerate(prs.slides):\n text_parts = []\n\n # Extract text from shapes\n for shape in slide.shapes:\n if hasattr(shape, \"text\") and shape.text:\n text_parts.append(shape.text)\n\n # Extract speaker notes\n notes = \"\"\n if slide.has_notes_slide:\n notes_frame = slide.notes_slide.notes_text_frame\n if notes_frame:\n notes = notes_frame.text\n\n slides.append({\"slide\": slide_num + 1, \"text\": \"\\n\".join(text_parts), \"notes\": notes})\n\n return slides\n\n\ndef extract_docx(file_path):\n \"\"\"Extract text from Word document.\"\"\"\n from docx import Document\n\n doc = Document(file_path)\n paragraphs = []\n\n for para in doc.paragraphs:\n if para.text.strip():\n style = para.style.name if para.style else \"Normal\"\n paragraphs.append({\"text\": para.text, \"style\": style, \"is_heading\": style.startswith(\"Heading\")})\n\n return paragraphs\n\n\ndef extract_markdown(file_path):\n \"\"\"Extract text from Markdown, preserving structure.\"\"\"\n import frontmatter\n\n with open(file_path, encoding=\"utf-8\") as f:\n post = frontmatter.load(f)\n\n content = post.content\n metadata = dict(post.metadata)\n\n # Parse headings for structure\n sections = []\n current_section = {\"heading\": None, \"level\": 0, \"content\": []}\n\n for line in content.split(\"\\n\"):\n heading_match = re.match(r\"^(#{1,6})\\s+(.+)$\", line)\n if heading_match:\n if current_section[\"content\"] or current_section[\"heading\"]:\n sections.append(current_section)\n current_section = {\"heading\": heading_match.group(2), \"level\": len(heading_match.group(1)), \"content\": []}\n else:\n current_section[\"content\"].append(line)\n\n if current_section[\"content\"] or current_section[\"heading\"]:\n sections.append(current_section)\n\n return {\"metadata\": metadata, \"sections\": sections}\n\n\ndef extract_txt(file_path):\n \"\"\"Extract text from plain text file.\"\"\"\n with open(file_path, encoding=\"utf-8\") as f:\n content = f.read()\n return {\"text\": content}\n\n\ndef extract(file_path):\n \"\"\"\n Main extraction function.\n Returns extracted content with structure info.\n \"\"\"\n ext = Path(file_path).suffix.lower()\n\n extractors = {\n \".pdf\": extract_pdf,\n \".pptx\": extract_pptx,\n \".docx\": extract_docx,\n \".md\": extract_markdown,\n \".txt\": extract_txt,\n }\n\n if ext not in extractors:\n raise ValueError(f\"Unsupported file type: {ext}\")\n\n db.log(\"INFO\", f\"Extracting: {file_path}\", f\"type={ext}\")\n\n try:\n content = extractors[ext](file_path)\n return {\"file_path\": file_path, \"file_type\": ext, \"content\": content, \"success\": True}\n except Exception as e:\n db.log(\"ERROR\", f\"Extraction failed: {file_path}\", str(e))\n return {\"file_path\": file_path, \"file_type\": ext, \"error\": str(e), \"success\": False}\n\n\ndef get_full_text(extraction_result):\n \"\"\"Get plain text from extraction result.\"\"\"\n if not extraction_result[\"success\"]:\n return \"\"\n\n content = extraction_result[\"content\"]\n file_type = extraction_result[\"file_type\"]\n\n if file_type == \".pdf\":\n return \"\\n\\n\".join(p[\"text\"] for p in content if p[\"text\"])\n\n elif file_type == \".pptx\":\n parts = []\n for slide in content:\n if slide[\"text\"]:\n parts.append(slide[\"text\"])\n if slide[\"notes\"]:\n parts.append(f\"[Notes: {slide['notes']}]\")\n return \"\\n\\n\".join(parts)\n\n elif file_type == \".docx\":\n return \"\\n\".join(p[\"text\"] for p in content)\n\n elif file_type == \".md\":\n parts = []\n for section in content[\"sections\"]:\n if section[\"heading\"]:\n parts.append(f\"{'#' * section['level']} {section['heading']}\")\n parts.extend(section[\"content\"])\n return \"\\n\".join(parts)\n\n elif file_type == \".txt\":\n return content[\"text\"]\n\n return \"\"\n\n\nif __name__ == \"__main__\":\n import sys\n\n if len(sys.argv) > 1:\n result = extract(sys.argv[1])\n if result[\"success\"]:\n print(get_full_text(result))\n else:\n print(f\"Error: {result['error']}\")\n",
"numLines": 200,
"startLine": 1,
"totalLines": 200
}
}
}