Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(loader): implement markdown parsing in MathpixPDFReader #498

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 180 additions & 20 deletions libs/kotaemon/kotaemon/loaders/mathpix_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Generator, List, Optional, Union

import requests
from langchain.utils import get_from_dict_or_env
from llama_index.core.readers.base import BaseReader

from kotaemon.base import Document

from .utils.table import parse_markdown_text_to_tables, strip_special_chars_markdown
from .utils.table import strip_special_chars_markdown


# MathpixPDFLoader implementation taken largely from Daniel Gross's:
Expand All @@ -21,7 +21,7 @@ class MathpixPDFReader(BaseReader):
def __init__(
self,
processed_file_format: str = "md",
max_wait_time_seconds: int = 500,
max_wait_time_seconds: int = 900,
should_clean_pdf: bool = True,
**kwargs: Any,
) -> None:
Expand Down Expand Up @@ -87,22 +87,38 @@ def wait_for_processing(self, pdf_id: str) -> None:
response = requests.get(url, headers=self._mathpix_headers)
response_data = response.json()
status = response_data.get("status", None)
print(
f"Processing status: {status},"
f"Progress: {response_data.get('percent_done', 0)}%"
)

if status == "completed":
return
elif status == "error":
raise ValueError("Unable to retrieve PDF from Mathpix")
raise ValueError(f"Mathpix processing error: {response_data}")
elif status in [
"split",
"processing",
]: # Add handling for processing states
time.sleep(5)
continue
else:
print(response_data)
print(url)
print(f"Unknown status: {response_data}")
time.sleep(5)
raise TimeoutError

raise TimeoutError(
f"Processing did not complete within {self.max_wait_time_seconds} seconds"
)

def get_processed_pdf(self, pdf_id: str) -> str:
self.wait_for_processing(pdf_id)
url = f"{self.url}/{pdf_id}.{self.processed_file_format}"
response = requests.get(url, headers=self._mathpix_headers)
return response.content.decode("utf-8")
if response.status_code != 200:
raise ValueError(f"Failed to get processed PDF: {response.text}")
content = response.content.decode("utf-8")
print(f"Retrieved content length: {len(content)}") # Debug print
return content

def clean_pdf(self, contents: str) -> str:
"""Clean the PDF file.
Expand Down Expand Up @@ -139,26 +155,79 @@ def clean_pdf(self, contents: str) -> str:
contents = re.sub(markup_regex, "", contents)
return contents

def parse_markdown_text_to_tables(
self, content: str
) -> tuple[list[tuple[int, str]], list[tuple[int, str]]]:
"""Parse markdown text to get tables and texts separately.

Returns:
Tuple of (tables, texts) where each is a list of (page_num, content) tuples
"""
print("Starting markdown parsing...")
print(f"Content length: {len(content)}")

# Split by page markers if present
pages = re.split(r"(?m)^# Page \d+\n", content)

tables: list[tuple[int, str]] = []
texts: list[tuple[int, str]] = []

for page_num, page_content in enumerate(pages, 1):
if not page_content.strip():
continue

# Extract tables from the page
table_matches = re.findall(r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)", page_content)
if table_matches:
for table in table_matches:
tables.append(
(page_num, table.strip())
) # Store as tuple with page number
# Remove tables from page content
page_content = re.sub(
r"(\|[^\n]+\|(?:\n\|[^\n]+\|)*)", "", page_content
)

# Split remaining content into meaningful chunks
chunks = re.split(r"\n\s*\n", page_content)
for chunk in chunks:
if chunk.strip():
texts.append(
(page_num, chunk.strip())
) # Store as tuple with page number

print(f"Found {len(tables)} tables and {len(texts)} text sections")
return tables, texts

def load_data(
self, file_path: Path, extra_info: Optional[dict] = None, **kwargs
self,
file: Union[str, List[str], Path],
extra_info: Optional[Dict] = None,
**load_kwargs: Any,
) -> List[Document]:
if "response_content" in kwargs:
# overriding response content if specified
content = kwargs["response_content"]
"""Load data from file path."""
file_path = Path(file) if isinstance(file, str) else file

if "response_content" in load_kwargs:
content = load_kwargs["response_content"]
else:
# call original API
pdf_id = self.send_pdf(file_path)
content = self.get_processed_pdf(pdf_id)

if self.should_clean_pdf:
content = self.clean_pdf(content)
tables, texts = parse_markdown_text_to_tables(content)

tables, texts = self.parse_markdown_text_to_tables(content)
documents = []
for table in tables:
text = strip_special_chars_markdown(table)

# Handle tables
for page_num, table_content in tables:
text = strip_special_chars_markdown(table_content)
metadata = {
"table_origin": table,
"table_origin": table_content,
"type": "table",
"page_label": page_num,
"page_number": page_num,
}
if extra_info:
metadata.update(extra_info)
Expand All @@ -171,8 +240,99 @@ def load_data(
)
)

for text in texts:
metadata = {"source": file_path.name, "type": "text"}
documents.append(Document(text=text, metadata=metadata))
# Handle text sections
for page_num, text_content in texts:
if not text_content.strip():
continue
metadata = {
"source": str(file_path),
"type": "text",
"page_label": page_num,
"page_number": page_num,
}
if extra_info:
metadata.update(extra_info)
documents.append(Document(text=text_content, metadata=metadata))

# Fallback if no content was parsed
if not documents and content.strip():
metadata = {
"source": str(file_path),
"type": "text",
"page_label": 1,
"page_number": 1,
}
if extra_info:
metadata.update(extra_info)
documents.append(Document(text=content.strip(), metadata=metadata))

return documents

def lazy_load_data(
self,
file: Union[str, List[str], Path],
extra_info: Optional[Dict] = None,
**load_kwargs: Any,
) -> Generator[Document, None, None]:
"""Lazy load data from file path."""
file_path = Path(file) if isinstance(file, str) else file

if "response_content" in load_kwargs:
content = load_kwargs["response_content"]
else:
pdf_id = self.send_pdf(file_path)
print(f"PDF ID: {pdf_id}")
content = self.get_processed_pdf(pdf_id)

if self.should_clean_pdf:
content = self.clean_pdf(content)

tables, texts = self.parse_markdown_text_to_tables(content)

# Handle tables
for page_num, table_content in tables: # Changed variable name for clarity
text = strip_special_chars_markdown(table_content) # Pass just the content
metadata = {
"table_origin": table_content, # Use table_content here too
"type": "table",
"page_label": page_num,
"page_number": page_num,
}
if extra_info:
metadata.update(extra_info)
yield Document(
text=text,
metadata=metadata,
metadata_template="",
metadata_seperator="",
)

# Handle text sections
for page_num, text_content in texts: # Changed variable name for clarity
if not text_content.strip():
continue
metadata = {
"source": str(file_path),
"type": "text",
"page_label": page_num,
"page_number": page_num,
}
if extra_info:
metadata.update(extra_info)
yield Document(
text=text_content, metadata=metadata
) # Use text_content directly

# Fallback if no content was parsed
if not (tables or texts) and content.strip():
metadata = {
"source": str(file_path),
"type": "text",
"page_label": 1,
"page_number": 1,
}
if extra_info:
metadata.update(extra_info)
yield Document(text=content.strip(), metadata=metadata)

print(f"Completed processing PDF: {file_path}")