extract.py
- Pfad:
/var/www/scripts/pipeline/extract.py - Namespace: pipeline
- Zeilen: 380 | Größe: 11,643 Bytes
- Geändert: 2025-12-27 11:41:22 | Gescannt: 2025-12-31 10:22:15
Code Hygiene Score: 63
- Dependencies: 0 (25%)
- LOC: 40 (20%)
- Methods: 100 (20%)
- Secrets: 100 (15%)
- Classes: 100 (10%)
- Magic Numbers: 100 (10%)
Keine Issues gefunden.
Dependencies 15
- use re
- use pathlib.Path
- use config.OCR_ENABLED
- use config.OCR_LANGUAGE
- use db.db
- use fitz
- use orientation.correct_page_orientation
- use orientation.get_page_rotation
- use pptx.Presentation
- use docx.Document
- use frontmatter
- use sys
- use io
- use pytesseract
- use PIL.Image
Funktionen 10
-
extract_pdf()Zeile 13 -
extract_pptx()Zeile 57 -
extract_docx()Zeile 84 -
extract_markdown()Zeile 99 -
extract_txt()Zeile 128 -
extract()Zeile 135 -
get_full_text()Zeile 163 -
extract_sections()Zeile 200 -
save_sections()Zeile 271 -
assign_chunk_sections()Zeile 320
Code
"""
Text extraction for KI-System Pipeline
Extracts text from PDF, PPTX, DOCX, MD, TXT files.
"""
import re
from pathlib import Path
from config import OCR_ENABLED, OCR_LANGUAGE
from db import db
def extract_pdf(file_path):
"""Extract text from PDF using PyMuPDF with optional OCR and rotation detection."""
import fitz # PyMuPDF
from orientation import correct_page_orientation, get_page_rotation
doc = fitz.open(file_path)
pages = []
for page_num, page in enumerate(doc):
# Detect and handle page rotation
rotation = get_page_rotation(page)
# Apply rotation to page before text extraction if needed
if rotation != 0:
page.set_rotation(rotation)
db.log("INFO", f"Page {page_num + 1}: Applied rotation {rotation}°")
text = page.get_text()
# If page has little text and OCR is enabled, try OCR
if OCR_ENABLED and len(text.strip()) < 50:
# Get correctly oriented image for OCR
img_data, _ = correct_page_orientation(page, target_dpi=300)
try:
import io
import pytesseract
from PIL import Image
img = Image.open(io.BytesIO(img_data))
ocr_text = pytesseract.image_to_string(img, lang=OCR_LANGUAGE)
if len(ocr_text.strip()) > len(text.strip()):
text = ocr_text
except Exception as e:
db.log("WARNING", f"OCR failed for page {page_num}: {e}")
pages.append({"page": page_num + 1, "text": text.strip(), "images": len(page.get_images())})
doc.close()
return pages
def extract_pptx(file_path):
"""Extract text from PowerPoint including speaker notes."""
from pptx import Presentation
prs = Presentation(file_path)
slides = []
for slide_num, slide in enumerate(prs.slides):
text_parts = []
# Extract text from shapes
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text:
text_parts.append(shape.text)
# Extract speaker notes
notes = ""
if slide.has_notes_slide:
notes_frame = slide.notes_slide.notes_text_frame
if notes_frame:
notes = notes_frame.text
slides.append({"slide": slide_num + 1, "text": "\n".join(text_parts), "notes": notes})
return slides
def extract_docx(file_path):
"""Extract text from Word document."""
from docx import Document
doc = Document(file_path)
paragraphs = []
for para in doc.paragraphs:
if para.text.strip():
style = para.style.name if para.style else "Normal"
paragraphs.append({"text": para.text, "style": style, "is_heading": style.startswith("Heading")})
return paragraphs
def extract_markdown(file_path):
"""Extract text from Markdown, preserving structure."""
import frontmatter
with open(file_path, encoding="utf-8") as f:
post = frontmatter.load(f)
content = post.content
metadata = dict(post.metadata)
# Parse headings for structure
sections = []
current_section = {"heading": None, "level": 0, "content": []}
for line in content.split("\n"):
heading_match = re.match(r"^(#{1,6})\s+(.+)$", line)
if heading_match:
if current_section["content"] or current_section["heading"]:
sections.append(current_section)
current_section = {"heading": heading_match.group(2), "level": len(heading_match.group(1)), "content": []}
else:
current_section["content"].append(line)
if current_section["content"] or current_section["heading"]:
sections.append(current_section)
return {"metadata": metadata, "sections": sections}
def extract_txt(file_path):
"""Extract text from plain text file."""
with open(file_path, encoding="utf-8") as f:
content = f.read()
return {"text": content}
def extract(file_path):
"""
Main extraction function.
Returns extracted content with structure info.
"""
ext = Path(file_path).suffix.lower()
extractors = {
".pdf": extract_pdf,
".pptx": extract_pptx,
".docx": extract_docx,
".md": extract_markdown,
".txt": extract_txt,
}
if ext not in extractors:
raise ValueError(f"Unsupported file type: {ext}")
db.log("INFO", f"Extracting: {file_path}", f"type={ext}")
try:
content = extractors[ext](file_path)
return {"file_path": file_path, "file_type": ext, "content": content, "success": True}
except Exception as e:
db.log("ERROR", f"Extraction failed: {file_path}", str(e))
return {"file_path": file_path, "file_type": ext, "error": str(e), "success": False}
def get_full_text(extraction_result):
"""Get plain text from extraction result."""
if not extraction_result["success"]:
return ""
content = extraction_result["content"]
file_type = extraction_result["file_type"]
if file_type == ".pdf":
return "\n\n".join(p["text"] for p in content if p["text"])
elif file_type == ".pptx":
parts = []
for slide in content:
if slide["text"]:
parts.append(slide["text"])
if slide["notes"]:
parts.append(f"[Notes: {slide['notes']}]")
return "\n\n".join(parts)
elif file_type == ".docx":
return "\n".join(p["text"] for p in content)
elif file_type == ".md":
parts = []
for section in content["sections"]:
if section["heading"]:
parts.append(f"{'#' * section['level']} {section['heading']}")
parts.extend(section["content"])
return "\n".join(parts)
elif file_type == ".txt":
return content["text"]
return ""
def extract_sections(document_id: int, text: str) -> list[dict]:
"""
Extract document structure as section hierarchy from text.
Parses Markdown-style headings (# to ######) to build a hierarchical
section structure for the document.
Args:
document_id: The document ID in the database
text: Full text content of the document
Returns:
List of section dictionaries with:
- document_id: int
- title: str
- level: int (1-6)
- parent_id: int or None
- start_pos: int
- end_pos: int
- sort_order: int
- path: str (hierarchical path like "1.2.3")
"""
sections = []
parent_stack = [] # Stack of (level, section_index) for parent tracking
sort_order = 0
# Find all headings with their positions
heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
matches = list(heading_pattern.finditer(text))
for i, match in enumerate(matches):
level = len(match.group(1))
title = match.group(2).strip()
start_pos = match.start()
# End position is either start of next heading or end of text
end_pos = matches[i + 1].start() if i + 1 < len(matches) else len(text)
# Find parent: closest previous section with level < current
parent_id = None
while parent_stack and parent_stack[-1][0] >= level:
parent_stack.pop()
if parent_stack:
parent_id = parent_stack[-1][1] # Index in sections list
# Build path
path_parts = [str(s[1] + 1) for s in parent_stack] # 1-indexed
path_parts.append(str(sort_order + 1))
path = ".".join(path_parts) if path_parts else str(sort_order + 1)
section = {
"document_id": document_id,
"title": title[:500], # Truncate to fit VARCHAR(500)
"level": level,
"parent_index": parent_id, # Temporary, will be converted to DB ID
"start_pos": start_pos,
"end_pos": end_pos,
"sort_order": sort_order,
"depth": len(parent_stack),
"path": path,
}
sections.append(section)
parent_stack.append((level, len(sections) - 1))
sort_order += 1
return sections
def save_sections(document_id: int, sections: list[dict]) -> dict[int, int]:
"""
Save extracted sections to database.
Args:
document_id: The document ID
sections: List of section dictionaries from extract_sections()
Returns:
Mapping of section index to database ID
"""
# Clear existing sections for this document
cursor = db.execute("DELETE FROM document_sections WHERE document_id = %s", (document_id,))
db.commit()
cursor.close()
index_to_id = {}
for idx, section in enumerate(sections):
# Resolve parent_index to actual DB ID
parent_db_id = None
if section.get("parent_index") is not None:
parent_db_id = index_to_id.get(section["parent_index"])
cursor = db.execute(
"""INSERT INTO document_sections
(document_id, parent_section_id, title, heading_level,
sort_order, depth, path, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())""",
(
document_id,
parent_db_id,
section["title"],
section["level"],
section["sort_order"],
section["depth"],
section["path"],
),
)
db.commit()
section_id = cursor.lastrowid
cursor.close()
index_to_id[idx] = section_id
db.log("INFO", f"Saved {len(sections)} sections for document {document_id}")
return index_to_id
def assign_chunk_sections(document_id: int, sections: list[dict], index_to_id: dict[int, int]) -> int:
"""
Assign section IDs to chunks based on their position.
Args:
document_id: The document ID
sections: List of section dictionaries with start_pos/end_pos
index_to_id: Mapping from section index to database ID
Returns:
Number of chunks updated
"""
if not sections:
return 0
# Get all chunks for this document
cursor = db.execute(
"SELECT id, content FROM chunks WHERE document_id = %s ORDER BY chunk_index",
(document_id,),
)
chunks = cursor.fetchall()
cursor.close()
updated = 0
# For simplicity, assign based on heading_path if available
# or based on last section before chunk content appears
for chunk in chunks:
chunk_id = chunk["id"]
# Find the best matching section (last section that starts before chunk content)
best_section_idx = None
for idx, section in enumerate(sections):
# Check if chunk content appears after section start
if section["start_pos"] <= len(chunk["content"]):
best_section_idx = idx
if best_section_idx is not None and best_section_idx in index_to_id:
section_db_id = index_to_id[best_section_idx]
cursor = db.execute(
"UPDATE chunks SET section_id = %s WHERE id = %s",
(section_db_id, chunk_id),
)
db.commit()
cursor.close()
updated += 1
db.log("INFO", f"Assigned sections to {updated} chunks for document {document_id}")
return updated
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
result = extract(sys.argv[1])
if result["success"]:
print(get_full_text(result))
else:
print(f"Error: {result['error']}")