{
"tool_response": {
"filePath": "\/var\/www\/scripts\/pipeline\/extract.py",
"oldString": "def extract_pdf(file_path):\n \"\"\"Extract text from PDF using PyMuPDF with optional OCR.\"\"\"\n import fitz # PyMuPDF\n\n doc = fitz.open(file_path)\n pages = []\n\n for page_num, page in enumerate(doc):\n text = page.get_text()\n\n # If page has little text and OCR is enabled, try OCR\n if OCR_ENABLED and len(text.strip()) < 50:\n # Get page as image and OCR it\n pix = page.get_pixmap(dpi=300)\n img_data = pix.tobytes(\"png\")\n\n try:\n import io\n\n import pytesseract\n from PIL import Image\n\n img = Image.open(io.BytesIO(img_data))\n ocr_text = pytesseract.image_to_string(img, lang=OCR_LANGUAGE)\n if len(ocr_text.strip()) > len(text.strip()):\n text = ocr_text\n except Exception as e:\n db.log(\"WARNING\", f\"OCR failed for page {page_num}: {e}\")\n\n pages.append({\"page\": page_num + 1, \"text\": text.strip(), \"images\": len(page.get_images())})",
"newString": "def extract_pdf(file_path):\n \"\"\"Extract text from PDF using PyMuPDF with optional OCR and rotation detection.\"\"\"\n import fitz # PyMuPDF\n\n from orientation import correct_page_orientation, get_page_rotation\n\n doc = fitz.open(file_path)\n pages = []\n\n for page_num, page in enumerate(doc):\n # Detect and handle page rotation\n rotation = get_page_rotation(page)\n\n # Apply rotation to page before text extraction if needed\n if rotation != 0:\n page.set_rotation(rotation)\n db.log(\"INFO\", f\"Page {page_num + 1}: Applied rotation {rotation}°\")\n\n text = page.get_text()\n\n # If page has little text and OCR is enabled, try OCR\n if OCR_ENABLED and len(text.strip()) < 50:\n # Get correctly oriented image for OCR\n img_data, _ = correct_page_orientation(page, target_dpi=300)\n\n try:\n import io\n\n import pytesseract\n from PIL import Image\n\n img = Image.open(io.BytesIO(img_data))\n ocr_text = pytesseract.image_to_string(img, lang=OCR_LANGUAGE)\n if len(ocr_text.strip()) > len(text.strip()):\n text = ocr_text\n except Exception as e:\n db.log(\"WARNING\", f\"OCR failed for page {page_num}: {e}\")\n\n pages.append({\"page\": page_num + 1, \"text\": text.strip(), \"images\": len(page.get_images())})",
"originalFile": "\"\"\"\nText extraction for KI-System Pipeline\nExtracts text from PDF, PPTX, DOCX, MD, TXT files.\n\"\"\"\n\nimport re\nfrom pathlib import Path\n\nfrom config import OCR_ENABLED, OCR_LANGUAGE\nfrom db import db\n\n\ndef extract_pdf(file_path):\n \"\"\"Extract text from PDF using PyMuPDF with optional OCR.\"\"\"\n import fitz # PyMuPDF\n\n doc = fitz.open(file_path)\n pages = []\n\n for page_num, page in enumerate(doc):\n text = page.get_text()\n\n # If page has little text and OCR is enabled, try OCR\n if OCR_ENABLED and len(text.strip()) < 50:\n # Get page as image and OCR it\n pix = page.get_pixmap(dpi=300)\n img_data = pix.tobytes(\"png\")\n\n try:\n import io\n\n import pytesseract\n from PIL import Image\n\n img = Image.open(io.BytesIO(img_data))\n ocr_text = pytesseract.image_to_string(img, lang=OCR_LANGUAGE)\n if len(ocr_text.strip()) > len(text.strip()):\n text = ocr_text\n except Exception as e:\n db.log(\"WARNING\", f\"OCR failed for page {page_num}: {e}\")\n\n pages.append({\"page\": page_num + 1, \"text\": text.strip(), \"images\": len(page.get_images())})\n\n doc.close()\n return pages\n\n\ndef extract_pptx(file_path):\n \"\"\"Extract text from PowerPoint including speaker notes.\"\"\"\n from pptx import Presentation\n\n prs = Presentation(file_path)\n slides = []\n\n for slide_num, slide in enumerate(prs.slides):\n text_parts = []\n\n # Extract text from shapes\n for shape in slide.shapes:\n if hasattr(shape, \"text\") and shape.text:\n text_parts.append(shape.text)\n\n # Extract speaker notes\n notes = \"\"\n if slide.has_notes_slide:\n notes_frame = slide.notes_slide.notes_text_frame\n if notes_frame:\n notes = notes_frame.text\n\n slides.append({\"slide\": slide_num + 1, \"text\": \"\\n\".join(text_parts), \"notes\": notes})\n\n return slides\n\n\ndef extract_docx(file_path):\n \"\"\"Extract text from Word document.\"\"\"\n from docx import Document\n\n doc = Document(file_path)\n paragraphs = []\n\n for para in doc.paragraphs:\n if para.text.strip():\n style = para.style.name if para.style else \"Normal\"\n paragraphs.append({\"text\": para.text, \"style\": style, \"is_heading\": style.startswith(\"Heading\")})\n\n return paragraphs\n\n\ndef extract_markdown(file_path):\n \"\"\"Extract text from Markdown, preserving structure.\"\"\"\n import frontmatter\n\n with open(file_path, encoding=\"utf-8\") as f:\n post = frontmatter.load(f)\n\n content = post.content\n metadata = dict(post.metadata)\n\n # Parse headings for structure\n sections = []\n current_section = {\"heading\": None, \"level\": 0, \"content\": []}\n\n for line in content.split(\"\\n\"):\n heading_match = re.match(r\"^(#{1,6})\\s+(.+)$\", line)\n if heading_match:\n if current_section[\"content\"] or current_section[\"heading\"]:\n sections.append(current_section)\n current_section = {\"heading\": heading_match.group(2), \"level\": len(heading_match.group(1)), \"content\": []}\n else:\n current_section[\"content\"].append(line)\n\n if current_section[\"content\"] or current_section[\"heading\"]:\n sections.append(current_section)\n\n return {\"metadata\": metadata, \"sections\": sections}\n\n\ndef extract_txt(file_path):\n \"\"\"Extract text from plain text file.\"\"\"\n with open(file_path, encoding=\"utf-8\") as f:\n content = f.read()\n return {\"text\": content}\n\n\ndef extract(file_path):\n \"\"\"\n Main extraction function.\n Returns extracted content with structure info.\n \"\"\"\n ext = Path(file_path).suffix.lower()\n\n extractors = {\n \".pdf\": extract_pdf,\n \".pptx\": extract_pptx,\n \".docx\": extract_docx,\n \".md\": extract_markdown,\n \".txt\": extract_txt,\n }\n\n if ext not in extractors:\n raise ValueError(f\"Unsupported file type: {ext}\")\n\n db.log(\"INFO\", f\"Extracting: {file_path}\", f\"type={ext}\")\n\n try:\n content = extractors[ext](file_path)\n return {\"file_path\": file_path, \"file_type\": ext, \"content\": content, \"success\": True}\n except Exception as e:\n db.log(\"ERROR\", f\"Extraction failed: {file_path}\", str(e))\n return {\"file_path\": file_path, \"file_type\": ext, \"error\": str(e), \"success\": False}\n\n\ndef get_full_text(extraction_result):\n \"\"\"Get plain text from extraction result.\"\"\"\n if not extraction_result[\"success\"]:\n return \"\"\n\n content = extraction_result[\"content\"]\n file_type = extraction_result[\"file_type\"]\n\n if file_type == \".pdf\":\n return \"\\n\\n\".join(p[\"text\"] for p in content if p[\"text\"])\n\n elif file_type == \".pptx\":\n parts = []\n for slide in content:\n if slide[\"text\"]:\n parts.append(slide[\"text\"])\n if slide[\"notes\"]:\n parts.append(f\"[Notes: {slide['notes']}]\")\n return \"\\n\\n\".join(parts)\n\n elif file_type == \".docx\":\n return \"\\n\".join(p[\"text\"] for p in content)\n\n elif file_type == \".md\":\n parts = []\n for section in content[\"sections\"]:\n if section[\"heading\"]:\n parts.append(f\"{'#' * section['level']} {section['heading']}\")\n parts.extend(section[\"content\"])\n return \"\\n\".join(parts)\n\n elif file_type == \".txt\":\n return content[\"text\"]\n\n return \"\"\n\n\ndef extract_sections(document_id: int, text: str) -> list[dict]:\n \"\"\"\n Extract document structure as section hierarchy from text.\n\n Parses Markdown-style headings (# to ######) to build a hierarchical\n section structure for the document.\n\n Args:\n document_id: The document ID in the database\n text: Full text content of the document\n\n Returns:\n List of section dictionaries with:\n - document_id: int\n - title: str\n - level: int (1-6)\n - parent_id: int or None\n - start_pos: int\n - end_pos: int\n - sort_order: int\n - path: str (hierarchical path like \"1.2.3\")\n \"\"\"\n sections = []\n parent_stack = [] # Stack of (level, section_index) for parent tracking\n sort_order = 0\n\n # Find all headings with their positions\n heading_pattern = re.compile(r\"^(#{1,6})\\s+(.+)$\", re.MULTILINE)\n\n matches = list(heading_pattern.finditer(text))\n\n for i, match in enumerate(matches):\n level = len(match.group(1))\n title = match.group(2).strip()\n start_pos = match.start()\n\n # End position is either start of next heading or end of text\n end_pos = matches[i + 1].start() if i + 1 < len(matches) else len(text)\n\n # Find parent: closest previous section with level < current\n parent_id = None\n while parent_stack and parent_stack[-1][0] >= level:\n parent_stack.pop()\n\n if parent_stack:\n parent_id = parent_stack[-1][1] # Index in sections list\n\n # Build path\n path_parts = [str(s[1] + 1) for s in parent_stack] # 1-indexed\n path_parts.append(str(sort_order + 1))\n path = \".\".join(path_parts) if path_parts else str(sort_order + 1)\n\n section = {\n \"document_id\": document_id,\n \"title\": title[:500], # Truncate to fit VARCHAR(500)\n \"level\": level,\n \"parent_index\": parent_id, # Temporary, will be converted to DB ID\n \"start_pos\": start_pos,\n \"end_pos\": end_pos,\n \"sort_order\": sort_order,\n \"depth\": len(parent_stack),\n \"path\": path,\n }\n\n sections.append(section)\n parent_stack.append((level, len(sections) - 1))\n sort_order += 1\n\n return sections\n\n\ndef save_sections(document_id: int, sections: list[dict]) -> dict[int, int]:\n \"\"\"\n Save extracted sections to database.\n\n Args:\n document_id: The document ID\n sections: List of section dictionaries from extract_sections()\n\n Returns:\n Mapping of section index to database ID\n \"\"\"\n # Clear existing sections for this document\n cursor = db.execute(\"DELETE FROM document_sections WHERE document_id = %s\", (document_id,))\n db.commit()\n cursor.close()\n\n index_to_id = {}\n\n for idx, section in enumerate(sections):\n # Resolve parent_index to actual DB ID\n parent_db_id = None\n if section.get(\"parent_index\") is not None:\n parent_db_id = index_to_id.get(section[\"parent_index\"])\n\n cursor = db.execute(\n \"\"\"INSERT INTO document_sections\n (document_id, parent_section_id, title, heading_level,\n sort_order, depth, path, created_at)\n VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())\"\"\",\n (\n document_id,\n parent_db_id,\n section[\"title\"],\n section[\"level\"],\n section[\"sort_order\"],\n section[\"depth\"],\n section[\"path\"],\n ),\n )\n db.commit()\n section_id = cursor.lastrowid\n cursor.close()\n\n index_to_id[idx] = section_id\n\n db.log(\"INFO\", f\"Saved {len(sections)} sections for document {document_id}\")\n return index_to_id\n\n\ndef assign_chunk_sections(document_id: int, sections: list[dict], index_to_id: dict[int, int]) -> int:\n \"\"\"\n Assign section IDs to chunks based on their position.\n\n Args:\n document_id: The document ID\n sections: List of section dictionaries with start_pos\/end_pos\n index_to_id: Mapping from section index to database ID\n\n Returns:\n Number of chunks updated\n \"\"\"\n if not sections:\n return 0\n\n # Get all chunks for this document\n cursor = db.execute(\n \"SELECT id, content FROM chunks WHERE document_id = %s ORDER BY chunk_index\",\n (document_id,),\n )\n chunks = cursor.fetchall()\n cursor.close()\n\n ... [TRUNCATED-f4a0d419b9c01a8f]",
"structuredPatch": [
{
"oldStart": 11,
"oldLines": 20,
"newStart": 11,
"newLines": 29,
"lines": [
" ",
" ",
" def extract_pdf(file_path):",
"- \"\"\"Extract text from PDF using PyMuPDF with optional OCR.\"\"\"",
"+ \"\"\"Extract text from PDF using PyMuPDF with optional OCR and rotation detection.\"\"\"",
" import fitz # PyMuPDF",
" ",
"+ from orientation import correct_page_orientation, get_page_rotation",
"+",
" doc = fitz.open(file_path)",
" pages = []",
" ",
" for page_num, page in enumerate(doc):",
"+ # Detect and handle page rotation",
"+ rotation = get_page_rotation(page)",
"+",
"+ # Apply rotation to page before text extraction if needed",
"+ if rotation != 0:",
"+ page.set_rotation(rotation)",
"+ db.log(\"INFO\", f\"Page {page_num + 1}: Applied rotation {rotation}°\")",
"+",
" text = page.get_text()",
" ",
" # If page has little text and OCR is enabled, try OCR",
" if OCR_ENABLED and len(text.strip()) < 50:",
"- # Get page as image and OCR it",
"- pix = page.get_pixmap(dpi=300)",
"- img_data = pix.tobytes(\"png\")",
"+ # Get correctly oriented image for OCR",
"+ img_data, _ = correct_page_orientation(page, target_dpi=300)",
" ",
" try:",
" import io"
]
}
],
"userModified": false,
"replaceAll": false
}
}