chunk.py

Code Hygiene Score: 85

Issues 1

Zeile Typ Beschreibung
407 magic_number Magic Number gefunden: 100

Dependencies 5

Funktionen 8

Code

"""
Semantic chunking for KI-System Pipeline
Splits documents into meaningful chunks preserving hierarchy.
"""

import re

from config import CHUNK_OVERLAP_PERCENT, MAX_CHUNK_SIZE, MIN_CHUNK_SIZE
from constants import PERCENT_FULL


def split_into_sentences(text):
    """Split text into sentences."""
    # German-aware sentence splitting
    pattern = r"(?<=[.!?])\s+(?=[A-ZÄÖÜ])"
    sentences = re.split(pattern, text)
    return [s.strip() for s in sentences if s.strip()]


def calculate_overlap(chunk_size):
    """Calculate overlap size based on chunk size."""
    return int(chunk_size * CHUNK_OVERLAP_PERCENT / PERCENT_FULL)


def chunk_by_structure(extraction_result):
    """
    Chunk document based on its structure.
    Preserves heading hierarchy in metadata.
    """
    chunks = []
    file_type = extraction_result["file_type"]
    content = extraction_result["content"]

    if file_type == ".pdf":
        chunks = chunk_pdf(content)
    elif file_type == ".pptx":
        chunks = chunk_pptx(content)
    elif file_type == ".docx":
        chunks = chunk_docx(content)
    elif file_type == ".md":
        chunks = chunk_markdown(content)
    elif file_type == ".txt":
        chunks = chunk_text(content["text"])

    return chunks


def chunk_pdf(pages):
    """Chunk PDF by pages and paragraphs."""
    chunks = []
    position = 0

    for page in pages:
        if not page["text"]:
            continue

        # Split page into paragraphs
        paragraphs = page["text"].split("\n\n")

        current_chunk = []
        current_size = 0

        for para in paragraphs:
            para = para.strip()
            if not para:
                continue

            para_size = len(para)

            # If paragraph alone exceeds max, split it
            if para_size > MAX_CHUNK_SIZE:
                # Flush current chunk
                if current_chunk:
                    chunk_text = "\n\n".join(current_chunk)
                    chunks.append(
                        {
                            "content": chunk_text,
                            "heading_path": [f"Seite {page['page']}"],
                            "position_start": position,
                            "position_end": position + len(chunk_text),
                            "metadata": {"page": page["page"]},
                        }
                    )
                    position += len(chunk_text)
                    current_chunk = []
                    current_size = 0

                # Split large paragraph by sentences
                sentences = split_into_sentences(para)
                sentence_chunk = []
                sentence_size = 0

                for sentence in sentences:
                    if sentence_size + len(sentence) > MAX_CHUNK_SIZE:
                        chunk_text = " ".join(sentence_chunk)
                        chunks.append(
                            {
                                "content": chunk_text,
                                "heading_path": [f"Seite {page['page']}"],
                                "position_start": position,
                                "position_end": position + len(chunk_text),
                                "metadata": {"page": page["page"]},
                            }
                        )
                        position += len(chunk_text)
                        # Keep overlap
                        overlap_count = max(1, len(sentence_chunk) // 10)
                        sentence_chunk = sentence_chunk[-overlap_count:]
                        sentence_size = sum(len(s) for s in sentence_chunk)

                    sentence_chunk.append(sentence)
                    sentence_size += len(sentence)

                if sentence_chunk:
                    current_chunk = [" ".join(sentence_chunk)]
                    current_size = sentence_size

            elif current_size + para_size > MAX_CHUNK_SIZE:
                # Flush current chunk
                chunk_text = "\n\n".join(current_chunk)
                chunks.append(
                    {
                        "content": chunk_text,
                        "heading_path": [f"Seite {page['page']}"],
                        "position_start": position,
                        "position_end": position + len(chunk_text),
                        "metadata": {"page": page["page"]},
                    }
                )
                position += len(chunk_text)

                # Start new chunk with overlap
                overlap = calculate_overlap(len(chunk_text))
                if overlap > 0 and current_chunk:
                    overlap_text = current_chunk[-1][-overlap:]
                    current_chunk = [overlap_text, para]
                    current_size = len(overlap_text) + para_size
                else:
                    current_chunk = [para]
                    current_size = para_size
            else:
                current_chunk.append(para)
                current_size += para_size

        # Flush remaining
        if current_chunk:
            chunk_text = "\n\n".join(current_chunk)
            if len(chunk_text) >= MIN_CHUNK_SIZE:
                chunks.append(
                    {
                        "content": chunk_text,
                        "heading_path": [f"Seite {page['page']}"],
                        "position_start": position,
                        "position_end": position + len(chunk_text),
                        "metadata": {"page": page["page"]},
                    }
                )
                position += len(chunk_text)

    return chunks


def chunk_pptx(slides):
    """Chunk PowerPoint by slides."""
    chunks = []
    position = 0

    for slide in slides:
        content_parts = []
        if slide["text"]:
            content_parts.append(slide["text"])
        if slide["notes"]:
            content_parts.append(f"\n[Notizen: {slide['notes']}]")

        if content_parts:
            chunk_text = "\n".join(content_parts)
            chunks.append(
                {
                    "content": chunk_text,
                    "heading_path": [f"Folie {slide['slide']}"],
                    "position_start": position,
                    "position_end": position + len(chunk_text),
                    "metadata": {"slide": slide["slide"]},
                }
            )
            position += len(chunk_text)

    return chunks


def chunk_docx(paragraphs):
    """Chunk Word document by headings."""
    chunks = []
    position = 0
    current_headings = []
    current_chunk = []
    current_size = 0

    for para in paragraphs:
        if para["is_heading"]:
            # Flush current chunk
            if current_chunk:
                chunk_text = "\n\n".join(current_chunk)
                if len(chunk_text) >= MIN_CHUNK_SIZE:
                    chunks.append(
                        {
                            "content": chunk_text,
                            "heading_path": current_headings.copy(),
                            "position_start": position,
                            "position_end": position + len(chunk_text),
                            "metadata": {},
                        }
                    )
                    position += len(chunk_text)
                current_chunk = []
                current_size = 0

            # Update heading path
            level = int(para["style"].replace("Heading ", "")) if "Heading " in para["style"] else 1
            while len(current_headings) >= level:
                current_headings.pop()
            current_headings.append(para["text"])

        else:
            para_size = len(para["text"])
            if current_size + para_size > MAX_CHUNK_SIZE and current_chunk:
                # Flush
                chunk_text = "\n\n".join(current_chunk)
                chunks.append(
                    {
                        "content": chunk_text,
                        "heading_path": current_headings.copy(),
                        "position_start": position,
                        "position_end": position + len(chunk_text),
                        "metadata": {},
                    }
                )
                position += len(chunk_text)
                current_chunk = []
                current_size = 0

            current_chunk.append(para["text"])
            current_size += para_size

    # Flush remaining
    if current_chunk:
        chunk_text = "\n\n".join(current_chunk)
        if len(chunk_text) >= MIN_CHUNK_SIZE:
            chunks.append(
                {
                    "content": chunk_text,
                    "heading_path": current_headings.copy(),
                    "position_start": position,
                    "position_end": position + len(chunk_text),
                    "metadata": {},
                }
            )

    return chunks


def chunk_markdown(content):
    """Chunk Markdown by sections."""
    chunks = []
    position = 0
    heading_stack = []

    for section in content["sections"]:
        if section["heading"]:
            level = section["level"]
            while len(heading_stack) >= level:
                heading_stack.pop()
            heading_stack.append(section["heading"])

        section_text = "\n".join(section["content"]).strip()
        if section_text and len(section_text) >= MIN_CHUNK_SIZE:
            # Large sections need splitting
            if len(section_text) > MAX_CHUNK_SIZE:
                sub_chunks = chunk_text(section_text)
                for sub in sub_chunks:
                    sub["heading_path"] = heading_stack.copy()
                    sub["position_start"] = position
                    sub["position_end"] = position + len(sub["content"])
                    position += len(sub["content"])
                    chunks.append(sub)
            else:
                chunks.append(
                    {
                        "content": section_text,
                        "heading_path": heading_stack.copy(),
                        "position_start": position,
                        "position_end": position + len(section_text),
                        "metadata": content.get("metadata", {}),
                    }
                )
                position += len(section_text)

    return chunks


def chunk_text(text):
    """Chunk plain text by paragraphs/sentences."""
    chunks = []
    position = 0
    paragraphs = text.split("\n\n")

    current_chunk = []
    current_size = 0

    for para in paragraphs:
        para = para.strip()
        if not para:
            continue

        para_size = len(para)

        if para_size > MAX_CHUNK_SIZE:
            # Flush and split large paragraph
            if current_chunk:
                chunk_text = "\n\n".join(current_chunk)
                chunks.append(
                    {
                        "content": chunk_text,
                        "heading_path": [],
                        "position_start": position,
                        "position_end": position + len(chunk_text),
                        "metadata": {},
                    }
                )
                position += len(chunk_text)
                current_chunk = []
                current_size = 0

            sentences = split_into_sentences(para)
            sentence_chunk = []
            sentence_size = 0

            for sentence in sentences:
                if sentence_size + len(sentence) > MAX_CHUNK_SIZE and sentence_chunk:
                    chunk_text = " ".join(sentence_chunk)
                    chunks.append(
                        {
                            "content": chunk_text,
                            "heading_path": [],
                            "position_start": position,
                            "position_end": position + len(chunk_text),
                            "metadata": {},
                        }
                    )
                    position += len(chunk_text)
                    overlap_count = max(1, len(sentence_chunk) // 10)
                    sentence_chunk = sentence_chunk[-overlap_count:]
                    sentence_size = sum(len(s) for s in sentence_chunk)

                sentence_chunk.append(sentence)
                sentence_size += len(sentence)

            if sentence_chunk:
                current_chunk = [" ".join(sentence_chunk)]
                current_size = sentence_size

        elif current_size + para_size > MAX_CHUNK_SIZE:
            chunk_text = "\n\n".join(current_chunk)
            chunks.append(
                {
                    "content": chunk_text,
                    "heading_path": [],
                    "position_start": position,
                    "position_end": position + len(chunk_text),
                    "metadata": {},
                }
            )
            position += len(chunk_text)
            current_chunk = [para]
            current_size = para_size
        else:
            current_chunk.append(para)
            current_size += para_size

    if current_chunk:
        chunk_text = "\n\n".join(current_chunk)
        if len(chunk_text) >= MIN_CHUNK_SIZE:
            chunks.append(
                {
                    "content": chunk_text,
                    "heading_path": [],
                    "position_start": position,
                    "position_end": position + len(chunk_text),
                    "metadata": {},
                }
            )

    return chunks


if __name__ == "__main__":
    # Test chunking
    test_text = """Dies ist ein Testabsatz. Er enthält mehrere Sätze. Diese werden für das Chunking verwendet.

    Dies ist der zweite Absatz. Er ist etwas länger und enthält mehr Informationen über das System.

    Und hier kommt der dritte Absatz mit weiteren Details."""

    chunks = chunk_text(test_text)
    for i, chunk in enumerate(chunks):
        print(f"Chunk {i}: {len(chunk['content'])} chars")
        print(chunk["content"][:100] + "...")
        print()
← Übersicht