Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Document Chunker to always use docling #430

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
236 changes: 65 additions & 171 deletions src/instructlab/sdg/utils/chunkers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# Standard
from abc import ABC, abstractmethod
from collections import defaultdict
from enum import Enum
from pathlib import Path
from typing import DefaultDict, Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional
import json
import logging
import re
Expand All @@ -26,6 +24,7 @@

logger = logging.getLogger(__name__)
_DEFAULT_CHUNK_OVERLAP = 100
SUPPORTED_FILETYPES = [".pdf", ".md"]


def _num_tokens_from_words(num_words) -> int:
Expand Down Expand Up @@ -68,186 +67,66 @@ def resolve_ocr_options() -> OcrOptions:
return None


class FileTypes(Enum):
MD = ".md"
PDF = ".pdf"
def split_docs_by_filetype(document_paths: List[Path]) -> Dict[str, List[Path]]:
"""Split document paths into a dict of lists based on their file extension."""
document_dict = defaultdict(list)
for path in document_paths:
filetype = path.suffix
if filetype not in SUPPORTED_FILETYPES:
raise ValueError(f"Provided unsupported filetype {filetype}")

document_dict[filetype].append(path)

class ChunkerBase(ABC):
@abstractmethod
def chunk_documents(self):
pass


class DocumentChunker:
"""A factory chunker class that instantiates the applicable chunker

Currently, only Markdown and PDF are supported. For Markdown, returns
TextSplitChunker, and for PDF, returns ContextAwareChunker"""

def __new__(
cls,
leaf_node,
taxonomy_path,
output_dir: Path,
server_ctx_size=4096,
chunk_word_count=1024,
tokenizer_model_name: Optional[str] = None,
docling_model_path: Optional[str] = None,
):
"""Insantiate the appropriate chunker for the provided document

Args:
leaf_node: a leaf node dict containing "documents",
"filepaths", and "taxonomy_path" keys
output_dir (Path): directory where artifacts should be stored
server_ctx_size (int): Context window size of server
chunk_word_count (int): Maximum number of words to chunk a document
tokenizer_model_name (Optional[str]): name of huggingface model to get
tokenizer from
Returns:
TextSplitChunker | ContextAwareChunker: Object of the appropriate
chunker class for the provided filetype
"""
documents = leaf_node[0]["documents"]

if not isinstance(taxonomy_path, Path):
taxonomy_path = Path(taxonomy_path)

if isinstance(documents, str):
documents = [documents]
logger.info(
"Converted single string into a list of string. Assumed the string passed in is the document. Normally, chunk_document() should take a list as input."
)
elif not isinstance(documents, list):
raise TypeError(
"Expected: documents to be a list, but got {}".format(type(documents))
)

filepaths = leaf_node[0]["filepaths"]

doc_dict = cls._split_docs_by_filetype(documents, filepaths)
if len(doc_dict.keys()) > 1:
raise ValueError("Received multiple document types")
if len(doc_dict.keys()) < 1:
raise ValueError("Received no document types")

if FileTypes.MD in doc_dict:
doc_contents = [d for d, _ in doc_dict[FileTypes.MD]]
return TextSplitChunker(
doc_contents,
server_ctx_size,
chunk_word_count,
output_dir,
)
return dict(document_dict)

if FileTypes.PDF in doc_dict:
doc_paths = [p for _, p in doc_dict[FileTypes.PDF]]
return ContextAwareChunker(
doc_paths,
filepaths,
output_dir,
chunk_word_count,
tokenizer_model_name,
docling_model_path=docling_model_path,
)

@staticmethod
def _split_docs_by_filetype(
documents: List[str], filepaths: List[Path]
) -> DefaultDict[FileTypes, List[Tuple[str, Path]]]:
"""Separate documents into lists based on their filetype.

Currently, only Markdown and PDF are supported.
Args:
documents (List[str]): A list of the document contents as strings
filepaths (List[Path]): Corresponding document filepaths
Returns:
DefaultDict: Dictionary with either ".md" or ".pdf" as a key.
Markdown items contain document contents, PDF items contain
paths to documents.
"""
doc_dict = defaultdict(list)
for doc, path in zip(documents, filepaths):
if path.suffix == ".md":
# append doc contents
doc_dict[FileTypes.MD].append((doc, path))
elif path.suffix == ".pdf":
# append doc paths
doc_dict[FileTypes.PDF].append((doc, path))
else:
raise ValueError(
f"Received document of type .{path.suffix}, which is not a supported filetype"
)
return doc_dict

class DocumentChunker: # pylint: disable=too-many-instance-attributes
# def __new__(
# cls,
# leaf_node,
# taxonomy_path,
# output_dir: Path,
# server_ctx_size=4096,
# chunk_word_count=1024,
# tokenizer_model_name: Optional[str] = None,
# docling_model_path: Optional[str] = None,
# ):
Comment on lines +84 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this comment? It looks like it should be safe to remove to keep things tidy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will take this away.


class TextSplitChunker(ChunkerBase):
def __init__(
self,
document_contents: List | str,
server_ctx_size: int,
chunk_word_count: int,
document_paths: List[Path],
output_dir: Path,
tokenizer_model_name: str,
docling_model_path: Optional[Path] = None,
server_ctx_size: int = 4096,
chunk_word_count: int = 1024,
):
self.document_contents = document_contents
self.server_ctx_size = server_ctx_size
self.chunk_word_count = chunk_word_count
self.output_dir = output_dir
if not document_paths:
raise ValueError("Provided empty list of documents")

def chunk_documents(self) -> List:
"""Naively chunk markdown documents based on the word count provided by the user.
Returns:
List[str]: List of chunked documents.
"""
num_tokens_per_doc = _num_tokens_from_words(self.chunk_word_count)
if num_tokens_per_doc > int(self.server_ctx_size - 1024):
raise ValueError(
"Error: {}".format(
str(
f"Given word count ({self.chunk_word_count}) per doc will exceed the server context window size ({self.server_ctx_size})"
)
)
)
if self.document_contents == []:
return []
document_dict = split_docs_by_filetype(document_paths)

chunk_size = _num_chars_from_tokens(num_tokens_per_doc)
return chunk_markdowns(self.document_contents, chunk_size)
if len(document_dict) > 1:
raise ValueError("Provided multiple document types")
Comment on lines +109 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new requirement, that users can only provide a single type of document per leaf node? In other words, I cannot have pdf, markdown, or other documents within a leaf node? I don't think we enforced this before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure myself about this. @khaledsulayman can answer this better.


# We know there is only 1 key, value pair, so we take the first
self.document_filetype, self.document_paths = next(iter(document_dict.items()))
self.docling_model_path = docling_model_path
self.converter = self._init_docling_converter()

class ContextAwareChunker(ChunkerBase): # pylint: disable=too-many-instance-attributes
def __init__(
self,
document_paths,
filepaths,
output_dir: Path,
chunk_word_count: int,
tokenizer_model_name: Optional[str],
docling_model_path=None,
):
self.document_paths = document_paths
self.filepaths = filepaths
self.output_dir = self._path_validator(output_dir)
self.server_ctx_size = server_ctx_size
self.chunk_word_count = chunk_word_count
self.tokenizer = self.create_tokenizer(tokenizer_model_name)
self.docling_model_path = docling_model_path

def chunk_documents(self) -> List:
"""Semantically chunk PDF documents.

Returns:
List: a list of chunks from the documents
"""
def _init_docling_converter(self):
"""Initialize docling converter with filetype-specific configurations"""
# triggers torch loading, import lazily
# pylint: disable=import-outside-toplevel
# Third Party
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline

if self.document_paths == []:
return []

if self.docling_model_path is None:
logger.info("Docling models not found on disk, downloading models...")
self.docling_model_path = StandardPdfPipeline.download_models_hf()
Expand All @@ -258,17 +137,29 @@ def chunk_documents(self) -> List:
artifacts_path=self.docling_model_path,
do_ocr=False,
)

ocr_options = resolve_ocr_options()
if ocr_options is not None:
pipeline_options.do_ocr = True
pipeline_options.ocr_options = ocr_options

converter = DocumentConverter(
return DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
}
)
parsed_documents = converter.convert_all(self.filepaths)

def chunk_documents(self) -> List:
"""Split a list of documents into chunks

Returns:
List: a list of chunks from the documents
"""

if self.document_paths == []:
return []

parsed_documents = self.converter.convert_all(self.document_paths)

docling_artifacts_path = self.export_documents(parsed_documents)

Expand Down Expand Up @@ -348,7 +239,7 @@ def fuse_texts(
return fused_texts

@staticmethod
def create_tokenizer(model_name: Optional[str]):
def create_tokenizer(model_name: str):
"""
Create a tokenizer instance from a pre-trained model or a local directory.

Expand All @@ -363,10 +254,9 @@ def create_tokenizer(model_name: Optional[str]):
# Third Party
from transformers import AutoTokenizer

if model_name is None:
raise TypeError("No model path provided")

model_path = Path(model_name)
model_path = Path(
model_name
) # TODO expect a path from the DocumentChunker constructor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO something we want to address in this PR, or is this a comment for some future work we want to track?

error_info_message = (
"Please run `ilab model download {download_args}` and try again"
)
Expand Down Expand Up @@ -486,7 +376,7 @@ def get_table_page_number(self, json_book, idx):
prev_page_num = book_element["prov"][0]["page"]
break
for book_element in json_book["main-text"][idx:]:
if "prov" in book_element:
if "prov" in book_element and book_element["prov"]:
next_page_num = book_element["prov"][0]["page"]
break
if prev_page_num is not None and next_page_num is not None:
Expand Down Expand Up @@ -545,8 +435,14 @@ def build_chunks_from_docling_json(
current_book_page_number = self.get_table_page_number(
json_book, idx
)
book_text = self.get_table(json_book, book_element["$ref"])
elif book_element["prov"]:
current_book_page_number = book_element["prov"][0][
"page"
] # TODO export to function to handle empty ["prov"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO for this PR? Or future work?

book_text = book_element["text"]
else:
current_book_page_number = book_element["prov"][0]["page"]
current_book_page_number = None
book_text = book_element["text"]

if book_element["type"] == "subtitle-level-1":
Expand Down Expand Up @@ -599,8 +495,6 @@ def build_chunks_from_docling_json(

if book_element["type"] == "paragraph":
book_text = self.add_heading_formatting(book_text)
elif book_element["type"] == "table":
book_text = self.get_table(json_book, book_element["$ref"])
if "## References" in book_text or "## Acknowledgements" in book_text:
# For research papers we ignore everything after this sections
break
Expand Down
26 changes: 22 additions & 4 deletions src/instructlab/sdg/utils/taxonomy.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ def _get_taxonomy(repo="taxonomy"):
return taxonomy_file_paths


def _string_contains_html(s: str) -> bool:
"""Detect HTML tags in a string.

We use this to catch markdown files that may contain html elements since
docling does not support this."""
Comment on lines +115 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a temporary fix until Docling supports html within markdown? Or is this something we expect to keep longer-term? We might want to open a feature request to docling for it to better handle this scenario so we can work towards removing this check entirely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should be a temporary fix for now. I will do the needful and open a feature request to docling.

# Define a regex to detect HTML tags
html_tag_pattern = re.compile(r"<\/?[a-zA-Z][\s\S]*?>")

# Check for HTML tags in the content
return bool(html_tag_pattern.search(s))


def _get_documents(
source: Dict[str, Union[str, List[str]]],
skip_checkout: bool = False,
Expand Down Expand Up @@ -160,6 +172,12 @@ def _get_documents(
# Process Markdown files
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()
if _string_contains_html(content):
logging.warning(
f"Provided markdown file {file_path} contains HTML contents, which is currently unsupported as a part of markdown"
"NOTE: Continuing this might affect your data generation quality."
"To get best results please format your markdown documents without the use of HTML or use a different document filetype."
)
file_contents.append(content)
filepaths.append(Path(file_path))
logger.info(
Expand Down Expand Up @@ -411,20 +429,20 @@ def map_chunks_to_icls(chunks: List, leaf_node: Dict) -> Dataset:

def _knowledge_leaf_node_to_samples(
leaf_node,
taxonomy_path,
taxonomy_path, # pylint: disable=unused-argument
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we keeping this unused param for some backwards compatibility reason?

server_ctx_size,
chunk_word_count,
document_output_dir,
model_name,
docling_model_path=None,
):
document_paths = leaf_node[0]["filepaths"]
chunker = DocumentChunker(
leaf_node=leaf_node,
taxonomy_path=taxonomy_path,
document_paths=document_paths,
output_dir=document_output_dir,
tokenizer_model_name=model_name,
server_ctx_size=server_ctx_size,
chunk_word_count=chunk_word_count,
tokenizer_model_name=model_name,
docling_model_path=docling_model_path,
)
chunks = chunker.chunk_documents()
Expand Down
Loading
Loading