From 48894ec9ca66f1b2ab87a7387abe893b794fccca Mon Sep 17 00:00:00 2001 From: Luca Soldaini Date: Tue, 24 Oct 2023 16:40:53 -0700 Subject: [PATCH] Documentation on BaseParallelProcessor (#62) * added new doc, upping version * new docs for how 2 use parallel processors * made script * changed debug flag * removed temp text * better pbar --- docs/parallel-processor.md | 169 +++++++++++++++++++++++++++++++++++ pyproject.toml | 2 +- scripts/remove_empty_docs.py | 113 +++++++++++++++++++++++ 3 files changed, 283 insertions(+), 1 deletion(-) create mode 100644 docs/parallel-processor.md create mode 100644 scripts/remove_empty_docs.py diff --git a/docs/parallel-processor.md b/docs/parallel-processor.md new file mode 100644 index 00000000..fa15e200 --- /dev/null +++ b/docs/parallel-processor.md @@ -0,0 +1,169 @@ +# Writing Your Own Parallel Processor + +Many functions in the Dolma toolkit are built on top of `dolma.core.parallel.BaseParallelProcessor`. This class provides a simple interface for parallelizing a function over a list of inputs, as well keeping track of status using one or more progress bars. In this tutorial, we will walk through the process of writing a parallel processor to remove empty documents from a dataset. + +At its core, a parallel processor requires implementing two class methods, `process_single` and `increment_progressbar`: + +```python +from dolma.core.parallel import BaseParallelProcessor +from queue import Queue + + +class CustomParallelProcessor(BaseParallelProcessor): + @classmethod + def increment_progressbar( + cls, + queue: Queue, + /, + files: int = 0, + documents: int = 0, + ... + ): + """ + This method is called in the process_single + to increment the progress bar. + You can create as many progress bars as are + the numbers of arguments after the '/' separator. + In this example, I have created two progress + bars, one for files and one for documents. + The increment progressbar method should call + the super method with the same arguments. + """ + super().increment_progressbar( + queue, + files=files, + documents=documents, + ... + ) + + @classmethod + def process_single( + cls, + source_path: str, + destination_path: str, + queue: Queue, + **kwargs: Any, + ): + """ + This method is to process a single input file. + The method broadly opens source_path file, + processes it and writes the output to + destination_path. Every now and then, it + should call the increment_progressbar method + to update the progress bar. + """ + ... +``` + +Let's dive a bit deeper into one might implement the `process_single` method in the case of removing empty documents. +We assume `source_path` is a path to a either local or remote JSONL gzip'ed file, and use `smart_open` to deal with that. + +```python +from contextlib import ExitStack +from typing import Any +from queue import Queue +import json + +import smart_open +from dolma.core.parallel import BaseParallelProcessor + + +class RemoveEmptyDocumentsProcessor(BaseParallelProcessor): + @classmethod + def increment_progressbar( + cls, + queue: Queue, + /, + files: int = 0, + read_docs: int = 0, + written_docs: int = 0 + ): + """ + This method is to update the progress bar. We keep + track of three things: + - files: the number of files processed + - read_docs: the number of documents read in + - written_docs: the number of documents written out + (i.e., the number of documents that are not empty) + """ + super().increment_progressbar( + queue, + files=files, + read_docs=read_docs, + written_docs=written_docs + ) + + @classmethod + def process_single( + cls, + source_path: str, + destination_path: str, + queue: Queue, + **kwargs: Any, + ): + """ + This method is called for each file. It reads the file + line by line, and writes to the destination file only + if the document is not empty. + """ + + update_every_n_lines = 10_000 + read_docs = written_docs = 0 + + with ExitStack() as stack: + # open source and destination files + source_file = stack.enter_context( + smart_open.open(source_path, "rt") + ) + destination_file = stack.enter_context( + smart_open.open(destination_path, "wt") + ) + for ln in source_file: + # we first load the json document + document = json.loads(ln) + read_docs += 1 + + # we check if the document is + # empty, and if it is, we skip it + if document['text'].strip() == '': + continue + + # if the document is not empty, + # we write it to output + destination_file.write(ln) + written_docs += 1 + + # we update the progress bar every + # update_every_n_lines + if read_docs % update_every_n_lines == 0: + cls.increment_progressbar( + queue, + read_docs=read_docs, + written_docs=written_docs, + ) + + # we update the progress bar one last time + cls.increment_progressbar( + queue, + files=1, + read_docs=read_docs, + written_docs=written_docs, + ) +``` + +To use this processor, we invoke it as follows: + +```python +from tempfile import TemporaryDirectory + +with TemporaryDirectory() as tmpdir: + # create the processor + processor = RemoveEmptyDocumentsProcessor( + source_prefix="path/to/source/files/*.gz", + destination_prefix="path/to/destination/files", + metadata_prefix=tmpdir + ) + + # run the processor + processor() +``` diff --git a/pyproject.toml b/pyproject.toml index a00c2bdd..1995d749 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "dolma" -version = "0.9.0" +version = "0.9.1" description = "Data filters" license = {text = "Apache-2.0"} readme = "README.md" diff --git a/scripts/remove_empty_docs.py b/scripts/remove_empty_docs.py new file mode 100644 index 00000000..3a0c4ece --- /dev/null +++ b/scripts/remove_empty_docs.py @@ -0,0 +1,113 @@ +import json +from argparse import ArgumentParser +from contextlib import ExitStack +import multiprocessing +from queue import Queue +from tempfile import TemporaryDirectory +from typing import Any, Tuple, Union + +import smart_open + +from dolma.core.parallel import BaseParallelProcessor + + +class RemoveEmptyDocumentsProcessor(BaseParallelProcessor): + @classmethod + def increment_progressbar( + cls, + queue: "Queue[Union[Tuple[int, ...], None]]", + /, + files: int = 0, + read_docs: int = 0, + written_docs: int = 0, + ): + return super().increment_progressbar(queue, files=files, read_docs=read_docs, written_docs=written_docs) + + @classmethod + def process_single( + cls, + source_path: str, + destination_path: str, + queue: Queue, + **kwargs: Any, + ): + """ + This method is called for each file. It reads the file + line by line, and writes to the destination file only + if the document is not empty. + """ + + update_every_n_lines = 1 + read_docs = written_docs = 0 + + with ExitStack() as stack: + # open source and destination files + source_file = stack.enter_context(smart_open.open(source_path, "rt")) + destination_file = stack.enter_context(smart_open.open(destination_path, "wt")) + for ln in source_file: + # we first load the json document + document = json.loads(ln) + read_docs += 1 + + # we check if the document is + # empty, and if it is, we skip it + if document["text"].strip() == "": + continue + + # if the document is not empty, + # we write it to output + destination_file.write(ln) + written_docs += 1 + + # we update the progress bar every + # update_every_n_lines + if read_docs >= update_every_n_lines: + cls.increment_progressbar( + queue, + read_docs=read_docs, + written_docs=written_docs, + ) + read_docs = written_docs = 0 + + if queue.qsize() >= multiprocessing.cpu_count(): + # double the update interval if the queue is full + update_every_n_lines *= 2 + + # we update the progress bar one last time + cls.increment_progressbar( + queue, + files=1, + read_docs=read_docs, + written_docs=written_docs, + ) + + +def parse_args(): + ag = ArgumentParser() + ag.add_argument("-s", "--source-prefix", type=str, required=True) + ag.add_argument("-d", "--destination-prefix", type=str, required=True) + ag.add_argument("-n", "--num-processes", type=int, default=1) + ag.add_argument("-u", "--debug", action="store_true") + ag.add_argument("-t", "--temp-dir", type=str, default=None) + return ag.parse_args() + + +def main(): + args = parse_args() + + with TemporaryDirectory(dir=args.temp_dir) as tmpdir: + # create the processor + processor = RemoveEmptyDocumentsProcessor( + source_prefix=args.source_prefix, + destination_prefix=args.destination_prefix, + metadata_prefix=tmpdir, + num_processes=args.num_processes, + debug=args.debug, + ) + + # run the processor + processor() + + +if __name__ == "__main__": + main()