Skip to content

Commit

Permalink
For sample indexing we fix the uneven sampling
Browse files Browse the repository at this point in the history
1. Fix uneven sampling done for index based and iterative
2. Add a validation step to ensure we can validate that global indices are correctly shuffled and no indices are lost.
3. Make sure we do file and sample shuffling in reconfigure step.
4. Remove sample shuffling from dataloader Sampler code.
5. Added test case to support uneven file distributions #225
  • Loading branch information
hariharan-devarajan committed Aug 30, 2024
1 parent d6924f3 commit 9160722
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 107 deletions.
88 changes: 59 additions & 29 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,35 +103,65 @@ jobs:
- name: test_train
run: |
source ${VENV_PATH}/bin/activate
mpirun -np 2 pytest -k test_train[png-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[tfrecord-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[png-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[png-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[png-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali] -v
mpirun -np 2 pytest -k test_train[png-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[tfrecord-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[png-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[png-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[png-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow-True] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch-True] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali-True] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali-True] -v
mpirun -np 2 pytest -k test_train[png-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[tfrecord-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[png-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[png-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[npz-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[jpeg-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[hdf5-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[csv-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[png-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[npz-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[csv-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali-False] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow-False] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch-False] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali-False] -v
mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali-False] -v
rm -rf data
- name: test_custom_storage_root_train
run: |
Expand Down
14 changes: 3 additions & 11 deletions dlio_benchmark/data_loader/dali_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
class DaliIndexDataset(object):

def __init__(self, format_type, dataset_type, epoch, worker_index,
total_num_workers, total_num_samples, samples_per_worker, batch_size, shuffle=Shuffle.OFF, seed=1234):
total_num_workers, total_num_samples, samples_per_worker, batch_size):
self.format_type = format_type
self.dataset_type = dataset_type
self.epoch = epoch
Expand All @@ -56,10 +56,6 @@ def __init__(self, format_type, dataset_type, epoch, worker_index,
end_sample = (self.worker_index + 1) * samples_per_worker
if not hasattr(self, 'indices'):
self.indices = list(range(start_sample, end_sample))
if self.shuffle != Shuffle.OFF:
if self.shuffle == Shuffle.SEED:
np.random.seed(self.seed)
np.random.shuffle(self.indices)
def __call__(self, sample_info):
logging.debug(
f"{utcnow()} Reading {sample_info.idx_in_epoch} out of {self.samples_per_worker} by worker {self.worker_index} with {self.indices} indices")
Expand All @@ -74,7 +70,7 @@ def __call__(self, sample_info):

class DaliIteratorDataset(object):
def __init__(self, format_type, dataset_type, epoch, worker_index,
total_num_workers, total_num_samples, samples_per_worker, batch_size, shuffle=Shuffle.OFF, seed=1234):
total_num_workers, total_num_samples, samples_per_worker, batch_size):
self.format_type = format_type
self.dataset_type = dataset_type
self.epoch = epoch
Expand All @@ -95,10 +91,6 @@ def __init__(self, format_type, dataset_type, epoch, worker_index,
end_sample = (self.worker_index + 1) * samples_per_worker
if not hasattr(self, 'indices'):
self.indices = list(range(start_sample, end_sample))
if self.shuffle != Shuffle.OFF:
if self.shuffle == Shuffle.SEED:
np.random.seed(self.seed)
np.random.shuffle(self.indices)
def __iter__(self):
with Profile(MODULE_DATA_LOADER):
for image in self.reader.next():
Expand Down Expand Up @@ -129,7 +121,7 @@ def read(self, init=False):
global_worker_index = self._args.my_rank * num_pipelines + worker_index
# None executes pipeline on CPU and the reader does the batching
self.dataset = DaliIndexDataset(self.format_type, self.dataset_type, self.epoch_number, global_worker_index,
self._args.comm_size * num_pipelines, self.num_samples, samples_per_worker, 1, self._args.sample_shuffle, self._args.seed)
self._args.comm_size * num_pipelines, self.num_samples, samples_per_worker, 1)
pipeline = Pipeline(batch_size=self.batch_size, num_threads=num_threads, device_id=None, py_num_workers=num_threads//num_pipelines,
prefetch_queue_depth=prefetch_size, py_start_method=self._args.multiprocessing_context, exec_async=True)
with pipeline:
Expand Down
11 changes: 2 additions & 9 deletions dlio_benchmark/data_loader/torch_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,11 @@ def __getitem__(self, image_idx):


class dlio_sampler(Sampler):
def __init__(self, rank, size, num_samples, shuffle, epochs, seed):
def __init__(self, rank, size, num_samples, epochs):
self.size = size
self.rank = rank
self.num_samples = num_samples
self.shuffle = shuffle
self.epochs = epochs
self.seed = seed
samples_per_proc = int(math.ceil(num_samples/size))
start_sample = self.rank * samples_per_proc
end_sample = (self.rank + 1) * samples_per_proc
Expand All @@ -102,10 +100,6 @@ def __len__(self):
return self.num_samples

def __iter__(self):
if self.shuffle != Shuffle.OFF:
if self.shuffle == Shuffle.SEED:
np.random.seed(self.seed)
np.random.shuffle(self.indices)
for sample in self.indices:
yield sample

Expand All @@ -118,8 +112,7 @@ def __init__(self, format_type, dataset_type, epoch_number):
def read(self):
dataset = TorchDataset(self.format_type, self.dataset_type, self.epoch_number, self.num_samples,
self._args.read_threads, self.batch_size)
sampler = dlio_sampler(self._args.my_rank, self._args.comm_size, self.num_samples, self._args.sample_shuffle,
self._args.epochs, self._args.seed)
sampler = dlio_sampler(self._args.my_rank, self._args.comm_size, self.num_samples, self._args.epochs)
if self._args.read_threads >= 1:
prefetch_factor = math.ceil(self._args.prefetch_size / self._args.read_threads)
else:
Expand Down
Loading

0 comments on commit 9160722

Please sign in to comment.