Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make max_text_bytes_per_part configurable #314

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions nemo_curator/modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dataclasses import dataclass, field
from typing import List, Optional

import numpy as np
import yaml


Expand Down Expand Up @@ -80,6 +81,9 @@ class FuzzyDuplicatesConfig(BaseConfig):
bucket_mapping_blocksize: int = 256
parts_per_worker: int = 1
bucket_parts_per_worker: int = 8
# String bytes limit for cuDF
# https://github.com/rapidsai/cudf/issues/13733
max_text_bytes_per_part: int = int(np.iinfo(np.int32).max * 3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want us to not use this config at all.

@ayushdg , @praateekmahajan , @sarahyurick lets huddle before we merge this in


def __post_init__(self):
self.num_hashes = self.num_buckets * self.hashes_per_bucket
Expand Down
14 changes: 8 additions & 6 deletions nemo_curator/modules/fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def __init__(
text_field=self.config.text_field,
logger=self._logger,
num_anchors=self.config.num_anchors,
max_text_bytes_per_part=self.config.max_text_bytes_per_part,
)
self.jaccard_shuffle = _Shuffle(
id_fields=[self.config.id_field],
Expand Down Expand Up @@ -727,6 +728,7 @@ def __init__(
text_field: str = "text",
bucket_field: str = "_bucket_id",
num_anchors: int = 2,
max_text_bytes_per_part: int = int(np.iinfo(np.int32).max * 3),
logger: Union[logging.LoggerAdapter, str] = "./",
):
"""
Expand All @@ -741,6 +743,7 @@ def __init__(
self.text_field = text_field
self.num_anchors = num_anchors
self.bucket_field = bucket_field
self.max_text_bytes_per_part = max_text_bytes_per_part
if isinstance(logger, str):
self._logger = create_logger(
rank=0,
Expand Down Expand Up @@ -780,11 +783,7 @@ def _get_output_map_from_text_bytes_per_bucket(
bytes_column,
output_partition_column="_output_partition_id",
):
# String bytes limit for cuDF
# https://github.com/rapidsai/cudf/issues/13733
max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3)

self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}")
self._logger.info(f"max_text_bytes_per_part = {self.max_text_bytes_per_part}")
# Increasing in an attempt to prevent hitting
# ulimits
output_map_df_meta = cudf.DataFrame(
Expand All @@ -796,7 +795,7 @@ def _get_output_map_from_text_bytes_per_bucket(

output_map_df = ddf_bk_text_bytes.map_partitions(
_MapBuckets._get_output_part_ids_with_approx_equal_sum,
max_text_bytes_per_part=max_text_bytes_per_part,
max_text_bytes_per_part=self.max_text_bytes_per_part,
buckets_column=self.bucket_field,
bytes_column=bytes_column,
output_partition_column=output_partition_column,
Expand Down Expand Up @@ -969,6 +968,7 @@ def __init__(
logger: Union[logging.LoggerAdapter, str] = "./",
profile_dir: str = None,
int_to_str_id: str = None,
max_text_bytes_per_part: int = int(np.iinfo(np.int32).max * 3),
):
if isinstance(logger, str):
self._logger = create_logger(
Expand All @@ -983,6 +983,7 @@ def __init__(
self.text_field = text_field
self.profile_dir = profile_dir
self.int_to_str_id = int_to_str_id
self.max_text_bytes_per_part = max_text_bytes_per_part

def shuffle_docs_on_buckets(
self,
Expand Down Expand Up @@ -1153,6 +1154,7 @@ def _batched_merge_and_write(
partition_on=partition_on,
text_column=self.text_field,
num_workers=num_workers,
max_text_bytes_per_part=self.max_text_bytes_per_part,
)
except OverflowError as err:
# We encountered an overflow error!
Expand Down
13 changes: 11 additions & 2 deletions nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ def get_shuffle_part_ids_df(
output_col,
size_col,
num_workers=0,
max_text_bytes_per_part=int(np.iinfo(np.int32).max * 3),
):
sizes = agg_df[size_col].values
max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3)

# Adjust max_text_bytes_per_part if the number of output
# partitions is small compared to the number of workers.
Expand All @@ -133,6 +133,7 @@ def get_shuffle_partition_info(
text_column,
bytes_column="_text_bytes",
num_workers=None,
max_text_bytes_per_part=int(np.iinfo(np.int32).max * 3),
):
df[bytes_column] = df[text_column].map_partitions(lambda s: s.str.byte_count())
agg_df, _ = get_agg_text_bytes_df(
Expand All @@ -147,11 +148,18 @@ def get_shuffle_partition_info(
size_col=bytes_column,
num_workers=num_workers,
output_col=output_column,
max_text_bytes_per_part=max_text_bytes_per_part,
).persist()
return shuffle_part_ids


def text_bytes_aware_shuffle(df, partition_on, text_column, num_workers=None):
def text_bytes_aware_shuffle(
df,
partition_on,
text_column,
num_workers=None,
max_text_bytes_per_part=int(np.iinfo(np.int32).max * 3),
):
"""
This shuffle takes into account the text bytes of each partition
and tries to make sure that the output partitions do not exceed
Expand All @@ -175,6 +183,7 @@ def text_bytes_aware_shuffle(df, partition_on, text_column, num_workers=None):
num_workers=num_workers,
output_column=output_col,
text_column=text_column,
max_text_bytes_per_part=max_text_bytes_per_part,
)
n_output_partitions = shuffle_part_ids[output_col].max().compute() + 1
n_output_partitions = int(n_output_partitions)
Expand Down
6 changes: 3 additions & 3 deletions tutorials/single_node_tutorial/single_gpu_tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1335,10 +1335,10 @@
"In this section, we will be using `_MapBucket()` and `_Shuffle()`.\n",
"\n",
"For `_MapBucket()`, it is designed to take input text data in jsonl format and bucket information which is output of LSH, map the documents to their respective buckets, and write the resulting DataFrame containing the anchor documents and their associated bucket information to a parquet file. Arguments include:\n",
"- `id_field`: Key in input .jsonl file for identifying document ID\n",
"- `id_field`: Key in input .jsonl file for identifying document ID.\n",
"- `text_field`: Key in input .jsonl file which contains document text.\n",
"- `bucket_field`: Key in input _buckets.parquet which contains `bucket_id`.\n",
"- `num_anchors`: Number of anchors (document in the same buckets) to be output\n",
"- `num_anchors`: Number of anchors (document in the same buckets) to be output.\n",
"\n",
"\n",
"For `_Shuffle()`, it perform a shuffling operation on the documents based on their bucket assignments, output in .parquet format. This shuffling operation is a crucial step in the deduplication process, as it helps distribute similar documents across different partitions or workers, enabling efficient parallel processing and deduplication in subsequent steps. Arguments include:\n",
Expand All @@ -1360,7 +1360,7 @@
" get_bucket_ddf_from_parquet_path,\n",
" get_text_ddf_from_json_path_with_blocksize,\n",
")\n",
"from nemo_curator.modules.fuzzy_dedup import _MapBuckets,_Shuffle"
"from nemo_curator.modules.fuzzy_dedup import _MapBuckets, _Shuffle"
]
},
{
Expand Down