Protokoll #21961

ID21961
Zeitstempel2025-12-27 11:40:09.123811
Clientroot
IP145.224.72.140
Modellclaude-sonnet-4-20250514
Statuscompleted
Tokens2,732 (Input: 28, Output: 2,704)
Dauer85 ms
Request-Zeit2025-12-27 11:40:09.123811
Response-Zeit2025-12-27 11:40:09.208421

Request

{
    "event": "PreToolUse",
    "tool_name": "Read",
    "tool_input": {
        "file_path": "\/var\/www\/scripts\/pipeline\/vision.py"
    }
}

Response

{
    "tool_response": {
        "type": "text",
        "file": {
            "filePath": "\/var\/www\/scripts\/pipeline\/extract.py",
            "content": "\"\"\"\nText extraction for KI-System Pipeline\nExtracts text from PDF, PPTX, DOCX, MD, TXT files.\n\"\"\"\n\nimport re\nfrom pathlib import Path\n\nfrom config import OCR_ENABLED, OCR_LANGUAGE\nfrom db import db\n\n\ndef extract_pdf(file_path):\n    \"\"\"Extract text from PDF using PyMuPDF with optional OCR.\"\"\"\n    import fitz  # PyMuPDF\n\n    doc = fitz.open(file_path)\n    pages = []\n\n    for page_num, page in enumerate(doc):\n        text = page.get_text()\n\n        # If page has little text and OCR is enabled, try OCR\n        if OCR_ENABLED and len(text.strip()) < 50:\n            # Get page as image and OCR it\n            pix = page.get_pixmap(dpi=300)\n            img_data = pix.tobytes(\"png\")\n\n            try:\n                import io\n\n                import pytesseract\n                from PIL import Image\n\n                img = Image.open(io.BytesIO(img_data))\n                ocr_text = pytesseract.image_to_string(img, lang=OCR_LANGUAGE)\n                if len(ocr_text.strip()) > len(text.strip()):\n                    text = ocr_text\n            except Exception as e:\n                db.log(\"WARNING\", f\"OCR failed for page {page_num}: {e}\")\n\n        pages.append({\"page\": page_num + 1, \"text\": text.strip(), \"images\": len(page.get_images())})\n\n    doc.close()\n    return pages\n\n\ndef extract_pptx(file_path):\n    \"\"\"Extract text from PowerPoint including speaker notes.\"\"\"\n    from pptx import Presentation\n\n    prs = Presentation(file_path)\n    slides = []\n\n    for slide_num, slide in enumerate(prs.slides):\n        text_parts = []\n\n        # Extract text from shapes\n        for shape in slide.shapes:\n            if hasattr(shape, \"text\") and shape.text:\n                text_parts.append(shape.text)\n\n        # Extract speaker notes\n        notes = \"\"\n        if slide.has_notes_slide:\n            notes_frame = slide.notes_slide.notes_text_frame\n            if notes_frame:\n                notes = notes_frame.text\n\n        slides.append({\"slide\": slide_num + 1, \"text\": \"\\n\".join(text_parts), \"notes\": notes})\n\n    return slides\n\n\ndef extract_docx(file_path):\n    \"\"\"Extract text from Word document.\"\"\"\n    from docx import Document\n\n    doc = Document(file_path)\n    paragraphs = []\n\n    for para in doc.paragraphs:\n        if para.text.strip():\n            style = para.style.name if para.style else \"Normal\"\n            paragraphs.append({\"text\": para.text, \"style\": style, \"is_heading\": style.startswith(\"Heading\")})\n\n    return paragraphs\n\n\ndef extract_markdown(file_path):\n    \"\"\"Extract text from Markdown, preserving structure.\"\"\"\n    import frontmatter\n\n    with open(file_path, encoding=\"utf-8\") as f:\n        post = frontmatter.load(f)\n\n    content = post.content\n    metadata = dict(post.metadata)\n\n    # Parse headings for structure\n    sections = []\n    current_section = {\"heading\": None, \"level\": 0, \"content\": []}\n\n    for line in content.split(\"\\n\"):\n        heading_match = re.match(r\"^(#{1,6})\\s+(.+)$\", line)\n        if heading_match:\n            if current_section[\"content\"] or current_section[\"heading\"]:\n                sections.append(current_section)\n            current_section = {\"heading\": heading_match.group(2), \"level\": len(heading_match.group(1)), \"content\": []}\n        else:\n            current_section[\"content\"].append(line)\n\n    if current_section[\"content\"] or current_section[\"heading\"]:\n        sections.append(current_section)\n\n    return {\"metadata\": metadata, \"sections\": sections}\n\n\ndef extract_txt(file_path):\n    \"\"\"Extract text from plain text file.\"\"\"\n    with open(file_path, encoding=\"utf-8\") as f:\n        content = f.read()\n    return {\"text\": content}\n\n\ndef extract(file_path):\n    \"\"\"\n    Main extraction function.\n    Returns extracted content with structure info.\n    \"\"\"\n    ext = Path(file_path).suffix.lower()\n\n    extractors = {\n        \".pdf\": extract_pdf,\n        \".pptx\": extract_pptx,\n        \".docx\": extract_docx,\n        \".md\": extract_markdown,\n        \".txt\": extract_txt,\n    }\n\n    if ext not in extractors:\n        raise ValueError(f\"Unsupported file type: {ext}\")\n\n    db.log(\"INFO\", f\"Extracting: {file_path}\", f\"type={ext}\")\n\n    try:\n        content = extractors[ext](file_path)\n        return {\"file_path\": file_path, \"file_type\": ext, \"content\": content, \"success\": True}\n    except Exception as e:\n        db.log(\"ERROR\", f\"Extraction failed: {file_path}\", str(e))\n        return {\"file_path\": file_path, \"file_type\": ext, \"error\": str(e), \"success\": False}\n\n\ndef get_full_text(extraction_result):\n    \"\"\"Get plain text from extraction result.\"\"\"\n    if not extraction_result[\"success\"]:\n        return \"\"\n\n    content = extraction_result[\"content\"]\n    file_type = extraction_result[\"file_type\"]\n\n    if file_type == \".pdf\":\n        return \"\\n\\n\".join(p[\"text\"] for p in content if p[\"text\"])\n\n    elif file_type == \".pptx\":\n        parts = []\n        for slide in content:\n            if slide[\"text\"]:\n                parts.append(slide[\"text\"])\n            if slide[\"notes\"]:\n                parts.append(f\"[Notes: {slide['notes']}]\")\n        return \"\\n\\n\".join(parts)\n\n    elif file_type == \".docx\":\n        return \"\\n\".join(p[\"text\"] for p in content)\n\n    elif file_type == \".md\":\n        parts = []\n        for section in content[\"sections\"]:\n            if section[\"heading\"]:\n                parts.append(f\"{'#' * section['level']} {section['heading']}\")\n            parts.extend(section[\"content\"])\n        return \"\\n\".join(parts)\n\n    elif file_type == \".txt\":\n        return content[\"text\"]\n\n    return \"\"\n\n\ndef extract_sections(document_id: int, text: str) -> list[dict]:\n    \"\"\"\n    Extract document structure as section hierarchy from text.\n\n    Parses Markdown-style headings (# to ######) to build a hierarchical\n    section structure for the document.\n\n    Args:\n        document_id: The document ID in the database\n        text: Full text content of the document\n\n    Returns:\n        List of section dictionaries with:\n        - document_id: int\n        - title: str\n        - level: int (1-6)\n        - parent_id: int or None\n        - start_pos: int\n        - end_pos: int\n        - sort_order: int\n        - path: str (hierarchical path like \"1.2.3\")\n    \"\"\"\n    sections = []\n    parent_stack = []  # Stack of (level, section_index) for parent tracking\n    sort_order = 0\n\n    # Find all headings with their positions\n    heading_pattern = re.compile(r\"^(#{1,6})\\s+(.+)$\", re.MULTILINE)\n\n    matches = list(heading_pattern.finditer(text))\n\n    for i, match in enumerate(matches):\n        level = len(match.group(1))\n        title = match.group(2).strip()\n        start_pos = match.start()\n\n        # End position is either start of next heading or end of text\n        end_pos = matches[i + 1].start() if i + 1 < len(matches) else len(text)\n\n        # Find parent: closest previous section with level < current\n        parent_id = None\n        while parent_stack and parent_stack[-1][0] >= level:\n            parent_stack.pop()\n\n        if parent_stack:\n            parent_id = parent_stack[-1][1]  # Index in sections list\n\n        # Build path\n        path_parts = [str(s[1] + 1) for s in parent_stack]  # 1-indexed\n        path_parts.append(str(sort_order + 1))\n        path = \".\".join(path_parts) if path_parts else str(sort_order + 1)\n\n        section = {\n            \"document_id\": document_id,\n            \"title\": title[:500],  # Truncate to fit VARCHAR(500)\n            \"level\": level,\n            \"parent_index\": parent_id,  # Temporary, will be converted to DB ID\n            \"start_pos\": start_pos,\n            \"end_pos\": end_pos,\n            \"sort_order\": sort_order,\n            \"depth\": len(parent_stack),\n            \"path\": path,\n        }\n\n        sections.append(section)\n        parent_stack.append((level, len(sections) - 1))\n        sort_order += 1\n\n    return sections\n\n\ndef save_sections(document_id: int, sections: list[dict]) -> dict[int, int]:\n    \"\"\"\n    Save extracted sections to database.\n\n    Args:\n        document_id: The document ID\n        sections: List of section dictionaries from extract_sections()\n\n    Returns:\n        Mapping of section index to database ID\n    \"\"\"\n    # Clear existing sections for this document\n    cursor = db.execute(\"DELETE FROM document_sections WHERE document_id = %s\", (document_id,))\n    db.commit()\n    cursor.close()\n\n    index_to_id = {}\n\n    for idx, section in enumerate(sections):\n        # Resolve parent_index to actual DB ID\n        parent_db_id = None\n        if section.get(\"parent_index\") is not None:\n            parent_db_id = index_to_id.get(section[\"parent_index\"])\n\n        cursor = db.execute(\n            \"\"\"INSERT INTO document_sections\n               (document_id, parent_section_id, title, heading_level,\n                sort_order, depth, path, created_at)\n               VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())\"\"\",\n            (\n                document_id,\n                parent_db_id,\n                section[\"title\"],\n                section[\"level\"],\n                section[\"sort_order\"],\n                section[\"depth\"],\n                section[\"path\"],\n            ),\n        )\n        db.commit()\n        section_id = cursor.lastrowid\n        cursor.close()\n\n        index_to_id[idx] = section_id\n\n    db.log(\"INFO\", f\"Saved {len(sections)} sections for document {document_id}\")\n    return index_to_id\n\n\ndef assign_chunk_sections(document_id: int, sections: list[dict], index_to_id: dict[int, int]) -> int:\n    \"\"\"\n    Assign section IDs to chunks based on their position.\n\n    Args:\n        document_id: The document ID\n        sections: List of section dictionaries with start_pos\/end_pos\n        index_to_id: Mapping from section index to database ID\n\n    Returns:\n        Number of chunks updated\n    \"\"\"\n    if not sections:\n        return 0\n\n    # Get all chunks for this document\n    cursor = db.execute(\n        \"SELECT id, content FROM chunks WHERE document_id = %s ORDER BY chunk_index\",\n        (document_id,),\n    )\n    chunks = cursor.fetchall()\n    cursor.close()\n\n  ... [TRUNCATED-f4a0d419b9c01a8f]",
            "numLines": 371,
            "startLine": 1,
            "totalLines": 371
        }
    }
}
← Vorheriger Zur Liste Nächster →