Skip to content

Commit

Permalink
Write multiple items to output file at once, in distributed data anal…
Browse files Browse the repository at this point in the history
…yzer. (microsoft#5169)

Minor improvements of
[https://github.com/microsoft/DeepSpeed/pull/5129](https://github.com/microsoft/DeepSpeed/pull/5129).
- Writes all buffers at once to the output file, instead of iteratively
(`indexed_dataset.py`, method `add_items()`).
- Fixes the wrong initialisation of `num_workers` and `worker_id` that
were being ignored when they were provided by the user.

---------

Co-authored-by: Conglong Li <[email protected]>
  • Loading branch information
2 people authored and rraminen committed May 9, 2024
1 parent 0d69cbc commit d1e1016
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
25 changes: 17 additions & 8 deletions deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import torch
from torch.utils.data import BatchSampler, SequentialSampler, DataLoader, Subset

from deepspeed.utils import logger, groups
from deepspeed.utils import logger
import deepspeed.comm as dist
from deepspeed.runtime.data_pipeline.data_sampling.indexed_dataset import MMapIndexedDataset, valid_dtypes
from deepspeed.runtime.data_pipeline.data_sampling.utils import split_dataset, split_index, create_mmap_dataset_builder, close_mmap_dataset_builder, find_fit_int_dtype
Expand Down Expand Up @@ -482,17 +482,17 @@ def __init__(
dist.init_distributed()

# comm_group and worker_id+num_workers are mutually exclusive
if comm_group is not None:
self.comm_group = comm_group
self.num_workers = self.comm_group.size()
self.worker_id = self.comm_group.rank()
self.comm_group = comm_group
if self.comm_group is None:
# self.comm_group = deepspeed.utils.groups._clone_world_group()
self.num_workers = num_workers
self.worker_id = worker_id
else:
self.comm_group = groups._clone_world_group()
self.num_workers = self.comm_group.size()
self.worker_id = self.comm_group.rank()

if self.worker_id == 0:
logger.info(f"Data analyzer initialized with {self.num_workers} workers.")
logger.info(f"Distributed data analyzer initialized with {self.num_workers} workers.")

def run_map_reduce(self):

Expand Down Expand Up @@ -635,9 +635,18 @@ def file_write_ordered(self, tensor_list, fname, numpy_dtype):
# method to deserializes a buffer into rows of different lengths and write them to file
def write_buffer_to_file(buff, src, builder):
assert self.worker_id == 0, "only rank 0 can write to file"

# # write one buffer at a time
# for row_len in row_lens[src]:
# builder.add_item(buff[:row_len].cpu())
# buff = buff[row_len:]

# collect all buffers and write them all at once
buffer_list = []
for row_len in row_lens[src]:
builder.add_item(buff[:row_len].cpu())
buffer_list.append(buff[:row_len].cpu())
buff = buff[row_len:]
builder.add_items(buffer_list)

# 5. rank 0 prepares output folder and file
if self.worker_id == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,18 @@ def __init__(self, out_file, dtype=np.int64):
self._doc_idx = [0]

def add_item(self, tensor):
""" write the tensor to the file and update its size in the index"""
np_array = np.array(tensor.numpy(), dtype=self._dtype)
self._data_file.write(np_array.tobytes(order='C'))
self._sizes.append(np_array.size)

def add_items(self, tensor_list):
""" write a list of tensors to the file and update their sizes in the index"""
np_arrays = [np.array(t.numpy(), dtype=self._dtype) for t in tensor_list]
self._data_file.writelines([arr.tobytes(order='C') for arr in np_arrays])
for arr in np_arrays:
self._sizes.append(arr.size)

def add_item_numpy(self, np_array):
if np_array.dtype != self._dtype:
np_array = np_array.astype(self._dtype)
Expand Down

0 comments on commit d1e1016

Please sign in to comment.