diff --git a/libs/kotaemon/kotaemon/loaders/mathpix_loader.py b/libs/kotaemon/kotaemon/loaders/mathpix_loader.py index d07b06941..5c1a97fb3 100644 --- a/libs/kotaemon/kotaemon/loaders/mathpix_loader.py +++ b/libs/kotaemon/kotaemon/loaders/mathpix_loader.py @@ -2,7 +2,7 @@ import re import time from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Generator, List, Optional, Union import requests from langchain.utils import get_from_dict_or_env @@ -10,7 +10,7 @@ from kotaemon.base import Document -from .utils.table import parse_markdown_text_to_tables, strip_special_chars_markdown +from .utils.table import strip_special_chars_markdown # MathpixPDFLoader implementation taken largely from Daniel Gross's: @@ -21,7 +21,7 @@ class MathpixPDFReader(BaseReader): def __init__( self, processed_file_format: str = "md", - max_wait_time_seconds: int = 500, + max_wait_time_seconds: int = 900, should_clean_pdf: bool = True, **kwargs: Any, ) -> None: @@ -87,22 +87,38 @@ def wait_for_processing(self, pdf_id: str) -> None: response = requests.get(url, headers=self._mathpix_headers) response_data = response.json() status = response_data.get("status", None) + print( + f"Processing status: {status}," + f"Progress: {response_data.get('percent_done', 0)}%" + ) if status == "completed": return elif status == "error": - raise ValueError("Unable to retrieve PDF from Mathpix") + raise ValueError(f"Mathpix processing error: {response_data}") + elif status in [ + "split", + "processing", + ]: # Add handling for processing states + time.sleep(5) + continue else: - print(response_data) - print(url) + print(f"Unknown status: {response_data}") time.sleep(5) - raise TimeoutError + + raise TimeoutError( + f"Processing did not complete within {self.max_wait_time_seconds} seconds" + ) def get_processed_pdf(self, pdf_id: str) -> str: self.wait_for_processing(pdf_id) url = f"{self.url}/{pdf_id}.{self.processed_file_format}" response = requests.get(url, headers=self._mathpix_headers) - return response.content.decode("utf-8") + if response.status_code != 200: + raise ValueError(f"Failed to get processed PDF: {response.text}") + content = response.content.decode("utf-8") + print(f"Retrieved content length: {len(content)}") # Debug print + return content def clean_pdf(self, contents: str) -> str: """Clean the PDF file. @@ -139,26 +155,79 @@ def clean_pdf(self, contents: str) -> str: contents = re.sub(markup_regex, "", contents) return contents + def parse_markdown_text_to_tables( + self, content: str + ) -> tuple[list[tuple[int, str]], list[tuple[int, str]]]: + """Parse markdown text to get tables and texts separately. + + Returns: + Tuple of (tables, texts) where each is a list of (page_num, content) tuples + """ + print("Starting markdown parsing...") + print(f"Content length: {len(content)}") + + # Split by page markers if present + pages = re.split(r"(?m)^# Page \d+\n", content) + + tables: list[tuple[int, str]] = [] + texts: list[tuple[int, str]] = [] + + for page_num, page_content in enumerate(pages, 1): + if not page_content.strip(): + continue + + # Extract tables from the page + table_matches = re.findall(r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)", page_content) + if table_matches: + for table in table_matches: + tables.append( + (page_num, table.strip()) + ) # Store as tuple with page number + # Remove tables from page content + page_content = re.sub( + r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)", "", page_content + ) + + # Split remaining content into meaningful chunks + chunks = re.split(r"\n\s*\n", page_content) + for chunk in chunks: + if chunk.strip(): + texts.append( + (page_num, chunk.strip()) + ) # Store as tuple with page number + + print(f"Found {len(tables)} tables and {len(texts)} text sections") + return tables, texts + def load_data( - self, file_path: Path, extra_info: Optional[dict] = None, **kwargs + self, + file: Union[str, List[str], Path], + extra_info: Optional[Dict] = None, + **load_kwargs: Any, ) -> List[Document]: - if "response_content" in kwargs: - # overriding response content if specified - content = kwargs["response_content"] + """Load data from file path.""" + file_path = Path(file) if isinstance(file, str) else file + + if "response_content" in load_kwargs: + content = load_kwargs["response_content"] else: - # call original API pdf_id = self.send_pdf(file_path) content = self.get_processed_pdf(pdf_id) if self.should_clean_pdf: content = self.clean_pdf(content) - tables, texts = parse_markdown_text_to_tables(content) + + tables, texts = self.parse_markdown_text_to_tables(content) documents = [] - for table in tables: - text = strip_special_chars_markdown(table) + + # Handle tables + for page_num, table_content in tables: + text = strip_special_chars_markdown(table_content) metadata = { - "table_origin": table, + "table_origin": table_content, "type": "table", + "page_label": page_num, + "page_number": page_num, } if extra_info: metadata.update(extra_info) @@ -171,8 +240,99 @@ def load_data( ) ) - for text in texts: - metadata = {"source": file_path.name, "type": "text"} - documents.append(Document(text=text, metadata=metadata)) + # Handle text sections + for page_num, text_content in texts: + if not text_content.strip(): + continue + metadata = { + "source": str(file_path), + "type": "text", + "page_label": page_num, + "page_number": page_num, + } + if extra_info: + metadata.update(extra_info) + documents.append(Document(text=text_content, metadata=metadata)) + + # Fallback if no content was parsed + if not documents and content.strip(): + metadata = { + "source": str(file_path), + "type": "text", + "page_label": 1, + "page_number": 1, + } + if extra_info: + metadata.update(extra_info) + documents.append(Document(text=content.strip(), metadata=metadata)) return documents + + def lazy_load_data( + self, + file: Union[str, List[str], Path], + extra_info: Optional[Dict] = None, + **load_kwargs: Any, + ) -> Generator[Document, None, None]: + """Lazy load data from file path.""" + file_path = Path(file) if isinstance(file, str) else file + + if "response_content" in load_kwargs: + content = load_kwargs["response_content"] + else: + pdf_id = self.send_pdf(file_path) + print(f"PDF ID: {pdf_id}") + content = self.get_processed_pdf(pdf_id) + + if self.should_clean_pdf: + content = self.clean_pdf(content) + + tables, texts = self.parse_markdown_text_to_tables(content) + + # Handle tables + for page_num, table_content in tables: # Changed variable name for clarity + text = strip_special_chars_markdown(table_content) # Pass just the content + metadata = { + "table_origin": table_content, # Use table_content here too + "type": "table", + "page_label": page_num, + "page_number": page_num, + } + if extra_info: + metadata.update(extra_info) + yield Document( + text=text, + metadata=metadata, + metadata_template="", + metadata_seperator="", + ) + + # Handle text sections + for page_num, text_content in texts: # Changed variable name for clarity + if not text_content.strip(): + continue + metadata = { + "source": str(file_path), + "type": "text", + "page_label": page_num, + "page_number": page_num, + } + if extra_info: + metadata.update(extra_info) + yield Document( + text=text_content, metadata=metadata + ) # Use text_content directly + + # Fallback if no content was parsed + if not (tables or texts) and content.strip(): + metadata = { + "source": str(file_path), + "type": "text", + "page_label": 1, + "page_number": 1, + } + if extra_info: + metadata.update(extra_info) + yield Document(text=content.strip(), metadata=metadata) + + print(f"Completed processing PDF: {file_path}")