extract.py

Code Hygiene Score: 63

Keine Issues gefunden.

Dependencies 15

Funktionen 10

Code

"""
Text extraction for KI-System Pipeline
Extracts text from PDF, PPTX, DOCX, MD, TXT files.
"""

import re
from pathlib import Path

from config import OCR_ENABLED, OCR_LANGUAGE
from db import db


def extract_pdf(file_path):
    """Extract text from PDF using PyMuPDF with optional OCR and rotation detection."""
    import fitz  # PyMuPDF

    from orientation import correct_page_orientation, get_page_rotation

    doc = fitz.open(file_path)
    pages = []

    for page_num, page in enumerate(doc):
        # Detect and handle page rotation
        rotation = get_page_rotation(page)

        # Apply rotation to page before text extraction if needed
        if rotation != 0:
            page.set_rotation(rotation)
            db.log("INFO", f"Page {page_num + 1}: Applied rotation {rotation}°")

        text = page.get_text()

        # If page has little text and OCR is enabled, try OCR
        if OCR_ENABLED and len(text.strip()) < 50:
            # Get correctly oriented image for OCR
            img_data, _ = correct_page_orientation(page, target_dpi=300)

            try:
                import io

                import pytesseract
                from PIL import Image

                img = Image.open(io.BytesIO(img_data))
                ocr_text = pytesseract.image_to_string(img, lang=OCR_LANGUAGE)
                if len(ocr_text.strip()) > len(text.strip()):
                    text = ocr_text
            except Exception as e:
                db.log("WARNING", f"OCR failed for page {page_num}: {e}")

        pages.append({"page": page_num + 1, "text": text.strip(), "images": len(page.get_images())})

    doc.close()
    return pages


def extract_pptx(file_path):
    """Extract text from PowerPoint including speaker notes."""
    from pptx import Presentation

    prs = Presentation(file_path)
    slides = []

    for slide_num, slide in enumerate(prs.slides):
        text_parts = []

        # Extract text from shapes
        for shape in slide.shapes:
            if hasattr(shape, "text") and shape.text:
                text_parts.append(shape.text)

        # Extract speaker notes
        notes = ""
        if slide.has_notes_slide:
            notes_frame = slide.notes_slide.notes_text_frame
            if notes_frame:
                notes = notes_frame.text

        slides.append({"slide": slide_num + 1, "text": "\n".join(text_parts), "notes": notes})

    return slides


def extract_docx(file_path):
    """Extract text from Word document."""
    from docx import Document

    doc = Document(file_path)
    paragraphs = []

    for para in doc.paragraphs:
        if para.text.strip():
            style = para.style.name if para.style else "Normal"
            paragraphs.append({"text": para.text, "style": style, "is_heading": style.startswith("Heading")})

    return paragraphs


def extract_markdown(file_path):
    """Extract text from Markdown, preserving structure."""
    import frontmatter

    with open(file_path, encoding="utf-8") as f:
        post = frontmatter.load(f)

    content = post.content
    metadata = dict(post.metadata)

    # Parse headings for structure
    sections = []
    current_section = {"heading": None, "level": 0, "content": []}

    for line in content.split("\n"):
        heading_match = re.match(r"^(#{1,6})\s+(.+)$", line)
        if heading_match:
            if current_section["content"] or current_section["heading"]:
                sections.append(current_section)
            current_section = {"heading": heading_match.group(2), "level": len(heading_match.group(1)), "content": []}
        else:
            current_section["content"].append(line)

    if current_section["content"] or current_section["heading"]:
        sections.append(current_section)

    return {"metadata": metadata, "sections": sections}


def extract_txt(file_path):
    """Extract text from plain text file."""
    with open(file_path, encoding="utf-8") as f:
        content = f.read()
    return {"text": content}


def extract(file_path):
    """
    Main extraction function.
    Returns extracted content with structure info.
    """
    ext = Path(file_path).suffix.lower()

    extractors = {
        ".pdf": extract_pdf,
        ".pptx": extract_pptx,
        ".docx": extract_docx,
        ".md": extract_markdown,
        ".txt": extract_txt,
    }

    if ext not in extractors:
        raise ValueError(f"Unsupported file type: {ext}")

    db.log("INFO", f"Extracting: {file_path}", f"type={ext}")

    try:
        content = extractors[ext](file_path)
        return {"file_path": file_path, "file_type": ext, "content": content, "success": True}
    except Exception as e:
        db.log("ERROR", f"Extraction failed: {file_path}", str(e))
        return {"file_path": file_path, "file_type": ext, "error": str(e), "success": False}


def get_full_text(extraction_result):
    """Get plain text from extraction result."""
    if not extraction_result["success"]:
        return ""

    content = extraction_result["content"]
    file_type = extraction_result["file_type"]

    if file_type == ".pdf":
        return "\n\n".join(p["text"] for p in content if p["text"])

    elif file_type == ".pptx":
        parts = []
        for slide in content:
            if slide["text"]:
                parts.append(slide["text"])
            if slide["notes"]:
                parts.append(f"[Notes: {slide['notes']}]")
        return "\n\n".join(parts)

    elif file_type == ".docx":
        return "\n".join(p["text"] for p in content)

    elif file_type == ".md":
        parts = []
        for section in content["sections"]:
            if section["heading"]:
                parts.append(f"{'#' * section['level']} {section['heading']}")
            parts.extend(section["content"])
        return "\n".join(parts)

    elif file_type == ".txt":
        return content["text"]

    return ""


def extract_sections(document_id: int, text: str) -> list[dict]:
    """
    Extract document structure as section hierarchy from text.

    Parses Markdown-style headings (# to ######) to build a hierarchical
    section structure for the document.

    Args:
        document_id: The document ID in the database
        text: Full text content of the document

    Returns:
        List of section dictionaries with:
        - document_id: int
        - title: str
        - level: int (1-6)
        - parent_id: int or None
        - start_pos: int
        - end_pos: int
        - sort_order: int
        - path: str (hierarchical path like "1.2.3")
    """
    sections = []
    parent_stack = []  # Stack of (level, section_index) for parent tracking
    sort_order = 0

    # Find all headings with their positions
    heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)

    matches = list(heading_pattern.finditer(text))

    for i, match in enumerate(matches):
        level = len(match.group(1))
        title = match.group(2).strip()
        start_pos = match.start()

        # End position is either start of next heading or end of text
        end_pos = matches[i + 1].start() if i + 1 < len(matches) else len(text)

        # Find parent: closest previous section with level < current
        parent_id = None
        while parent_stack and parent_stack[-1][0] >= level:
            parent_stack.pop()

        if parent_stack:
            parent_id = parent_stack[-1][1]  # Index in sections list

        # Build path
        path_parts = [str(s[1] + 1) for s in parent_stack]  # 1-indexed
        path_parts.append(str(sort_order + 1))
        path = ".".join(path_parts) if path_parts else str(sort_order + 1)

        section = {
            "document_id": document_id,
            "title": title[:500],  # Truncate to fit VARCHAR(500)
            "level": level,
            "parent_index": parent_id,  # Temporary, will be converted to DB ID
            "start_pos": start_pos,
            "end_pos": end_pos,
            "sort_order": sort_order,
            "depth": len(parent_stack),
            "path": path,
        }

        sections.append(section)
        parent_stack.append((level, len(sections) - 1))
        sort_order += 1

    return sections


def save_sections(document_id: int, sections: list[dict]) -> dict[int, int]:
    """
    Save extracted sections to database.

    Args:
        document_id: The document ID
        sections: List of section dictionaries from extract_sections()

    Returns:
        Mapping of section index to database ID
    """
    # Clear existing sections for this document
    cursor = db.execute("DELETE FROM document_sections WHERE document_id = %s", (document_id,))
    db.commit()
    cursor.close()

    index_to_id = {}

    for idx, section in enumerate(sections):
        # Resolve parent_index to actual DB ID
        parent_db_id = None
        if section.get("parent_index") is not None:
            parent_db_id = index_to_id.get(section["parent_index"])

        cursor = db.execute(
            """INSERT INTO document_sections
               (document_id, parent_section_id, title, heading_level,
                sort_order, depth, path, created_at)
               VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())""",
            (
                document_id,
                parent_db_id,
                section["title"],
                section["level"],
                section["sort_order"],
                section["depth"],
                section["path"],
            ),
        )
        db.commit()
        section_id = cursor.lastrowid
        cursor.close()

        index_to_id[idx] = section_id

    db.log("INFO", f"Saved {len(sections)} sections for document {document_id}")
    return index_to_id


def assign_chunk_sections(document_id: int, sections: list[dict], index_to_id: dict[int, int]) -> int:
    """
    Assign section IDs to chunks based on their position.

    Args:
        document_id: The document ID
        sections: List of section dictionaries with start_pos/end_pos
        index_to_id: Mapping from section index to database ID

    Returns:
        Number of chunks updated
    """
    if not sections:
        return 0

    # Get all chunks for this document
    cursor = db.execute(
        "SELECT id, content FROM chunks WHERE document_id = %s ORDER BY chunk_index",
        (document_id,),
    )
    chunks = cursor.fetchall()
    cursor.close()

    updated = 0

    # For simplicity, assign based on heading_path if available
    # or based on last section before chunk content appears
    for chunk in chunks:
        chunk_id = chunk["id"]

        # Find the best matching section (last section that starts before chunk content)
        best_section_idx = None
        for idx, section in enumerate(sections):
            # Check if chunk content appears after section start
            if section["start_pos"] <= len(chunk["content"]):
                best_section_idx = idx

        if best_section_idx is not None and best_section_idx in index_to_id:
            section_db_id = index_to_id[best_section_idx]
            cursor = db.execute(
                "UPDATE chunks SET section_id = %s WHERE id = %s",
                (section_db_id, chunk_id),
            )
            db.commit()
            cursor.close()
            updated += 1

    db.log("INFO", f"Assigned sections to {updated} chunks for document {document_id}")
    return updated


if __name__ == "__main__":
    import sys

    if len(sys.argv) > 1:
        result = extract(sys.argv[1])
        if result["success"]:
            print(get_full_text(result))
        else:
            print(f"Error: {result['error']}")
← Übersicht