chunk.py
- Pfad:
/var/www/scripts/pipeline/chunk.py
- Namespace: pipeline
- Zeilen: 409 | Größe: 14,052 Bytes
- Geändert: 2025-12-28 01:12:19 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 85
- Dependencies: 100 (25%)
- LOC: 30 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 90 (10%)
Issues 1
| Zeile |
Typ |
Beschreibung |
| 407 |
magic_number |
Magic Number gefunden: 100 |
Dependencies 5
- use re
- use config.CHUNK_OVERLAP_PERCENT
- use config.MAX_CHUNK_SIZE
- use config.MIN_CHUNK_SIZE
- use constants.PERCENT_FULL
Funktionen 8
-
split_into_sentences()
Zeile 12
-
calculate_overlap()
Zeile 20
-
chunk_by_structure()
Zeile 25
-
chunk_pdf()
Zeile 48
-
chunk_pptx()
Zeile 163
-
chunk_docx()
Zeile 191
-
chunk_markdown()
Zeile 262
-
chunk_text()
Zeile 301
Code
"""
Semantic chunking for KI-System Pipeline
Splits documents into meaningful chunks preserving hierarchy.
"""
import re
from config import CHUNK_OVERLAP_PERCENT, MAX_CHUNK_SIZE, MIN_CHUNK_SIZE
from constants import PERCENT_FULL
def split_into_sentences(text):
"""Split text into sentences."""
# German-aware sentence splitting
pattern = r"(?<=[.!?])\s+(?=[A-ZÄÖÜ])"
sentences = re.split(pattern, text)
return [s.strip() for s in sentences if s.strip()]
def calculate_overlap(chunk_size):
"""Calculate overlap size based on chunk size."""
return int(chunk_size * CHUNK_OVERLAP_PERCENT / PERCENT_FULL)
def chunk_by_structure(extraction_result):
"""
Chunk document based on its structure.
Preserves heading hierarchy in metadata.
"""
chunks = []
file_type = extraction_result["file_type"]
content = extraction_result["content"]
if file_type == ".pdf":
chunks = chunk_pdf(content)
elif file_type == ".pptx":
chunks = chunk_pptx(content)
elif file_type == ".docx":
chunks = chunk_docx(content)
elif file_type == ".md":
chunks = chunk_markdown(content)
elif file_type == ".txt":
chunks = chunk_text(content["text"])
return chunks
def chunk_pdf(pages):
"""Chunk PDF by pages and paragraphs."""
chunks = []
position = 0
for page in pages:
if not page["text"]:
continue
# Split page into paragraphs
paragraphs = page["text"].split("\n\n")
current_chunk = []
current_size = 0
for para in paragraphs:
para = para.strip()
if not para:
continue
para_size = len(para)
# If paragraph alone exceeds max, split it
if para_size > MAX_CHUNK_SIZE:
# Flush current chunk
if current_chunk:
chunk_text = "\n\n".join(current_chunk)
chunks.append(
{
"content": chunk_text,
"heading_path": [f"Seite {page['page']}"],
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {"page": page["page"]},
}
)
position += len(chunk_text)
current_chunk = []
current_size = 0
# Split large paragraph by sentences
sentences = split_into_sentences(para)
sentence_chunk = []
sentence_size = 0
for sentence in sentences:
if sentence_size + len(sentence) > MAX_CHUNK_SIZE:
chunk_text = " ".join(sentence_chunk)
chunks.append(
{
"content": chunk_text,
"heading_path": [f"Seite {page['page']}"],
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {"page": page["page"]},
}
)
position += len(chunk_text)
# Keep overlap
overlap_count = max(1, len(sentence_chunk) // 10)
sentence_chunk = sentence_chunk[-overlap_count:]
sentence_size = sum(len(s) for s in sentence_chunk)
sentence_chunk.append(sentence)
sentence_size += len(sentence)
if sentence_chunk:
current_chunk = [" ".join(sentence_chunk)]
current_size = sentence_size
elif current_size + para_size > MAX_CHUNK_SIZE:
# Flush current chunk
chunk_text = "\n\n".join(current_chunk)
chunks.append(
{
"content": chunk_text,
"heading_path": [f"Seite {page['page']}"],
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {"page": page["page"]},
}
)
position += len(chunk_text)
# Start new chunk with overlap
overlap = calculate_overlap(len(chunk_text))
if overlap > 0 and current_chunk:
overlap_text = current_chunk[-1][-overlap:]
current_chunk = [overlap_text, para]
current_size = len(overlap_text) + para_size
else:
current_chunk = [para]
current_size = para_size
else:
current_chunk.append(para)
current_size += para_size
# Flush remaining
if current_chunk:
chunk_text = "\n\n".join(current_chunk)
if len(chunk_text) >= MIN_CHUNK_SIZE:
chunks.append(
{
"content": chunk_text,
"heading_path": [f"Seite {page['page']}"],
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {"page": page["page"]},
}
)
position += len(chunk_text)
return chunks
def chunk_pptx(slides):
"""Chunk PowerPoint by slides."""
chunks = []
position = 0
for slide in slides:
content_parts = []
if slide["text"]:
content_parts.append(slide["text"])
if slide["notes"]:
content_parts.append(f"\n[Notizen: {slide['notes']}]")
if content_parts:
chunk_text = "\n".join(content_parts)
chunks.append(
{
"content": chunk_text,
"heading_path": [f"Folie {slide['slide']}"],
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {"slide": slide["slide"]},
}
)
position += len(chunk_text)
return chunks
def chunk_docx(paragraphs):
"""Chunk Word document by headings."""
chunks = []
position = 0
current_headings = []
current_chunk = []
current_size = 0
for para in paragraphs:
if para["is_heading"]:
# Flush current chunk
if current_chunk:
chunk_text = "\n\n".join(current_chunk)
if len(chunk_text) >= MIN_CHUNK_SIZE:
chunks.append(
{
"content": chunk_text,
"heading_path": current_headings.copy(),
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {},
}
)
position += len(chunk_text)
current_chunk = []
current_size = 0
# Update heading path
level = int(para["style"].replace("Heading ", "")) if "Heading " in para["style"] else 1
while len(current_headings) >= level:
current_headings.pop()
current_headings.append(para["text"])
else:
para_size = len(para["text"])
if current_size + para_size > MAX_CHUNK_SIZE and current_chunk:
# Flush
chunk_text = "\n\n".join(current_chunk)
chunks.append(
{
"content": chunk_text,
"heading_path": current_headings.copy(),
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {},
}
)
position += len(chunk_text)
current_chunk = []
current_size = 0
current_chunk.append(para["text"])
current_size += para_size
# Flush remaining
if current_chunk:
chunk_text = "\n\n".join(current_chunk)
if len(chunk_text) >= MIN_CHUNK_SIZE:
chunks.append(
{
"content": chunk_text,
"heading_path": current_headings.copy(),
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {},
}
)
return chunks
def chunk_markdown(content):
"""Chunk Markdown by sections."""
chunks = []
position = 0
heading_stack = []
for section in content["sections"]:
if section["heading"]:
level = section["level"]
while len(heading_stack) >= level:
heading_stack.pop()
heading_stack.append(section["heading"])
section_text = "\n".join(section["content"]).strip()
if section_text and len(section_text) >= MIN_CHUNK_SIZE:
# Large sections need splitting
if len(section_text) > MAX_CHUNK_SIZE:
sub_chunks = chunk_text(section_text)
for sub in sub_chunks:
sub["heading_path"] = heading_stack.copy()
sub["position_start"] = position
sub["position_end"] = position + len(sub["content"])
position += len(sub["content"])
chunks.append(sub)
else:
chunks.append(
{
"content": section_text,
"heading_path": heading_stack.copy(),
"position_start": position,
"position_end": position + len(section_text),
"metadata": content.get("metadata", {}),
}
)
position += len(section_text)
return chunks
def chunk_text(text):
"""Chunk plain text by paragraphs/sentences."""
chunks = []
position = 0
paragraphs = text.split("\n\n")
current_chunk = []
current_size = 0
for para in paragraphs:
para = para.strip()
if not para:
continue
para_size = len(para)
if para_size > MAX_CHUNK_SIZE:
# Flush and split large paragraph
if current_chunk:
chunk_text = "\n\n".join(current_chunk)
chunks.append(
{
"content": chunk_text,
"heading_path": [],
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {},
}
)
position += len(chunk_text)
current_chunk = []
current_size = 0
sentences = split_into_sentences(para)
sentence_chunk = []
sentence_size = 0
for sentence in sentences:
if sentence_size + len(sentence) > MAX_CHUNK_SIZE and sentence_chunk:
chunk_text = " ".join(sentence_chunk)
chunks.append(
{
"content": chunk_text,
"heading_path": [],
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {},
}
)
position += len(chunk_text)
overlap_count = max(1, len(sentence_chunk) // 10)
sentence_chunk = sentence_chunk[-overlap_count:]
sentence_size = sum(len(s) for s in sentence_chunk)
sentence_chunk.append(sentence)
sentence_size += len(sentence)
if sentence_chunk:
current_chunk = [" ".join(sentence_chunk)]
current_size = sentence_size
elif current_size + para_size > MAX_CHUNK_SIZE:
chunk_text = "\n\n".join(current_chunk)
chunks.append(
{
"content": chunk_text,
"heading_path": [],
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {},
}
)
position += len(chunk_text)
current_chunk = [para]
current_size = para_size
else:
current_chunk.append(para)
current_size += para_size
if current_chunk:
chunk_text = "\n\n".join(current_chunk)
if len(chunk_text) >= MIN_CHUNK_SIZE:
chunks.append(
{
"content": chunk_text,
"heading_path": [],
"position_start": position,
"position_end": position + len(chunk_text),
"metadata": {},
}
)
return chunks
if __name__ == "__main__":
# Test chunking
test_text = """Dies ist ein Testabsatz. Er enthält mehrere Sätze. Diese werden für das Chunking verwendet.
Dies ist der zweite Absatz. Er ist etwas länger und enthält mehr Informationen über das System.
Und hier kommt der dritte Absatz mit weiteren Details."""
chunks = chunk_text(test_text)
for i, chunk in enumerate(chunks):
print(f"Chunk {i}: {len(chunk['content'])} chars")
print(chunk["content"][:100] + "...")
print()