diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c90663d..5231ab53 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -269,3 +269,9 @@ jobs: mpirun -np 2 ${DLIO_EXEC} workload=cosmoflow_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 mpirun -np 2 ${DLIO_EXEC} workload=cosmoflow_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.dataset.format=synthetic rm -rf data + - name: test_computation_time_distribution + run: | + source ${VENV_PATH}/bin/activate + rm -rf output data checkpoints + mpirun -np 2 pytest -k test_computation_time_distribution -v + rm -rf data diff --git a/dlio_benchmark/framework/framework.py b/dlio_benchmark/framework/framework.py index 02731d8f..98191b4c 100644 --- a/dlio_benchmark/framework/framework.py +++ b/dlio_benchmark/framework/framework.py @@ -22,11 +22,11 @@ from dlio_benchmark.storage.storage_factory import StorageFactory from dlio_benchmark.utils.utility import utcnow -from time import sleep import os import logging from dlio_benchmark.utils.config import ConfigArguments +from dlio_benchmark.utils.utility import sleep class DummyTraceObject(object): def __init__(self, string, step, r): diff --git a/dlio_benchmark/framework/tf_framework.py b/dlio_benchmark/framework/tf_framework.py index 1b1a049f..5141a830 100644 --- a/dlio_benchmark/framework/tf_framework.py +++ b/dlio_benchmark/framework/tf_framework.py @@ -17,12 +17,12 @@ import os import logging -from time import time, sleep +from time import time from dlio_benchmark.common.constants import MODULE_AI_FRAMEWORK from dlio_benchmark.data_loader.data_loader_factory import DataLoaderFactory from dlio_benchmark.utils.utility import utcnow, DLIOMPI -from dlio_benchmark.utils.utility import Profile +from dlio_benchmark.utils.utility import Profile, sleep from dlio_benchmark.common.error_code import ErrorCodes from dlio_benchmark.framework.framework import Framework from dlio_benchmark.reader.reader_factory import ReaderFactory @@ -87,7 +87,7 @@ def trace_object(self, string, step, r): @dlp.log def compute(self, x, epoch_number, step, computation_time): - sleep(computation_time) + return sleep(computation_time) # tf.function(self.model)(epoch_number, step, computation_time) @dlp.log diff --git a/dlio_benchmark/framework/torch_framework.py b/dlio_benchmark/framework/torch_framework.py index 1d4b85c0..ab862671 100644 --- a/dlio_benchmark/framework/torch_framework.py +++ b/dlio_benchmark/framework/torch_framework.py @@ -27,10 +27,11 @@ from dlio_benchmark.utils.utility import utcnow, DLIOMPI from dlio_benchmark.utils.utility import Profile -from time import sleep, time +from time import time from dlio_benchmark.reader.reader_factory import ReaderFactory from dlio_benchmark.storage.storage_factory import StorageFactory +from dlio_benchmark.utils.utility import sleep HANDLED_FUNCTIONS = {} dlp = Profile(MODULE_AI_FRAMEWORK) @@ -93,7 +94,7 @@ def trace_object(self, string, step, r): @dlp.log def compute(self, x, epoch_number, step, computation_time): - torch_sleep(computation_time) + return torch_sleep(computation_time) @dlp.log def get_loader(self, dataset_type=DatasetType.TRAIN): diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index 1a312ce1..bf0b4b42 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -17,7 +17,7 @@ import os import math import logging -from time import time, sleep +from time import time import json import numpy as np @@ -114,7 +114,6 @@ def __init__(self, cfg): self.epochs = self.args.epochs self.batch_size = self.args.batch_size self.computation_time = self.args.computation_time - self.computation_time_stdev = self.args.computation_time_stdev if self.do_profiling: self.profiler = ProfilerFactory().get_profiler(self.args.profiler) @@ -133,7 +132,6 @@ def __init__(self, cfg): self.batch_size_eval = self.args.batch_size_eval self.eval_time = self.args.eval_time - self.eval_time_stdev = self.args.eval_time_stdev self.eval_after_epoch = self.args.eval_after_epoch self.epochs_between_evals = self.args.epochs_between_evals self.stats = StatsCounter() @@ -226,13 +224,8 @@ def _eval(self, epoch): t0 = time() for batch in loader.next(): self.stats.eval_batch_loaded(epoch, step, t0) - eval_time = 0.0 - if self.eval_time > 0: - if self.eval_time_stdev > 0: - eval_time = abs(random.normal(self.eval_time, self.eval_time_stdev)) - else: - eval_time = self.eval_time - self.framework.compute(batch, epoch, step, eval_time) + eval_time = self.eval_time + eval_time = self.framework.compute(batch, epoch, step, eval_time) self.stats.eval_batch_processed(epoch, step, t0, eval_time) step += 1 @@ -268,13 +261,9 @@ def _train(self, epoch): if block_step == 1 and block != 1: self.stats.start_block(epoch, block) computation_time = self.computation_time - if self.computation_time > 0: + if (isinstance(computation_time, dict) and len(computation_time) > 0) or (isinstance(computation_time, float) and computation_time > 0): self.framework.trace_object("Train", overall_step, 1) - if self.computation_time_stdev > 0: - computation_time = abs(random.normal(self.computation_time, self.computation_time_stdev)) - else: - computation_time = self.computation_time - self.framework.compute(batch, epoch, block_step, computation_time) + computation_time = self.framework.compute(batch, epoch, block_step, computation_time) self.stats.batch_processed(epoch, overall_step, block, t0, computation_time) self.comm.barrier() if self.do_checkpoint and ( diff --git a/dlio_benchmark/reader/dali_image_reader.py b/dlio_benchmark/reader/dali_image_reader.py index 6876610b..14c9c302 100644 --- a/dlio_benchmark/reader/dali_image_reader.py +++ b/dlio_benchmark/reader/dali_image_reader.py @@ -16,7 +16,7 @@ """ import math import logging -from time import time, sleep +from time import time import numpy as np import nvidia.dali.fn as fn diff --git a/dlio_benchmark/reader/dali_npy_reader.py b/dlio_benchmark/reader/dali_npy_reader.py index 6091f360..c2c4c3cb 100644 --- a/dlio_benchmark/reader/dali_npy_reader.py +++ b/dlio_benchmark/reader/dali_npy_reader.py @@ -16,7 +16,7 @@ """ import math import logging -from time import time, sleep +from time import time import numpy as np import nvidia.dali.fn as fn diff --git a/dlio_benchmark/reader/dali_tfrecord_reader.py b/dlio_benchmark/reader/dali_tfrecord_reader.py index 99132188..a3852cf2 100644 --- a/dlio_benchmark/reader/dali_tfrecord_reader.py +++ b/dlio_benchmark/reader/dali_tfrecord_reader.py @@ -18,7 +18,7 @@ import math import logging -from time import time, sleep +from time import time import numpy as np import nvidia diff --git a/dlio_benchmark/reader/reader_handler.py b/dlio_benchmark/reader/reader_handler.py index 720e4378..f91b4de4 100644 --- a/dlio_benchmark/reader/reader_handler.py +++ b/dlio_benchmark/reader/reader_handler.py @@ -27,10 +27,9 @@ import os import math import logging -from time import sleep import glob from dlio_benchmark.common.constants import MODULE_DATA_READER - +from dlio_benchmark.utils.utility import sleep dlp = Profile(MODULE_DATA_READER) @@ -60,10 +59,9 @@ def __init__(self, dataset_type, thread_index): @dlp.log def preprocess(self, a=None): - if self._args.preprocess_time > 0. or self._args.preprocess_time_stdev > 0.: - t = np.random.normal(self._args.preprocess_time, self._args.preprocess_time_stdev) - sleep(abs(t)) + sleep(self._args.preprocess_time) return a + @abstractmethod def open(self, filename): return diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index 39ed54d9..c2fa6abf 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -22,18 +22,19 @@ from time import time -from typing import List, ClassVar +from typing import Any, Dict, List, ClassVar from dlio_benchmark.common.constants import MODULE_CONFIG from dlio_benchmark.common.enumerations import StorageType, FormatType, Shuffle, ReadType, FileAccess, Compression, \ FrameworkType, \ DataLoaderType, Profiler, DatasetType, DataLoaderSampler, CheckpointLocationType, CheckpointMechanismType from dlio_benchmark.utils.utility import DLIOMPI, get_trace_name, utcnow +from dlio_benchmark.utils.utility import Profile, PerfTrace, DFTRACER_ENABLE from dataclasses import dataclass +from omegaconf import OmegaConf, DictConfig import math import os import numpy as np -from dlio_benchmark.utils.utility import Profile, PerfTrace, DFTRACER_ENABLE dlp = Profile(MODULE_CONFIG) @dataclass @@ -82,10 +83,8 @@ class ConfigArguments: read_threads: int = 1 dont_use_mmap: bool = False computation_threads: int = 1 - computation_time: float = 0. - computation_time_stdev: float = 0. - preprocess_time: float = 0. - preprocess_time_stdev: float = 0. + computation_time: ClassVar[Dict[str, Any]] = {} + preprocess_time: ClassVar[Dict[str, Any]] = {} prefetch_size: int = 2 enable_chunking: bool = False chunk_size: int = 0 @@ -97,8 +96,7 @@ class ConfigArguments: batch_size_eval: int = 1 num_files_eval: int = 0 generation_buffer_size: int = 2 * 1073741824 # 2 GB - eval_time: float = 0.0 - eval_time_stdev: float = 0.0 + eval_time: ClassVar[Dict[str, Any]] = {} eval_after_epoch: int = 1 epochs_between_evals: int = 1 checkpoint_type: CheckpointLocationType = CheckpointLocationType.RANK_ZERO @@ -518,10 +516,21 @@ def LoadConfig(args, config): args.read_type = reader['read_type'] if 'transfer_size' in reader: args.transfer_size = reader['transfer_size'] + + args.preprocess_time = {} if 'preprocess_time' in reader: - args.preprocess_time = reader['preprocess_time'] + preprocess_time = {} + if isinstance(reader['preprocess_time'], dict): + preprocess_time = reader['preprocess_time'] + elif isinstance(reader['preprocess_time'], (int, float)): + preprocess_time["mean"] = reader['preprocess_time'] + elif isinstance(reader['preprocess_time'], DictConfig): + preprocess_time = OmegaConf.to_container(reader['preprocess_time']) + else: + args.preprocess_time = reader['preprocess_time'] + args.preprocess_time = preprocess_time if preprocess_time is not None else {} if 'preprocess_time_stdev' in reader: - args.preprocess_time_stdev = reader['preprocess_time_stdev'] + args.preprocess_time["stdev"] = reader['preprocess_time_stdev'] if 'pin_memory' in reader: args.pin_memory = reader['pin_memory'] @@ -533,18 +542,39 @@ def LoadConfig(args, config): args.total_training_steps = config['train']['total_training_steps'] if 'seed_change_epoch' in config['train']: args.seed_change_epoch = config['train']['seed_change_epoch'] + args.computation_time = {} if 'computation_time' in config['train']: - args.computation_time = config['train']['computation_time'] + computation_time = {} + if isinstance(config['train']['computation_time'], dict): + computation_time = config['train']['computation_time'] + elif isinstance(config['train']['computation_time'], (int, float)): + computation_time["mean"] = config['train']['computation_time'] + elif isinstance(config['train']['computation_time'], DictConfig): + computation_time = OmegaConf.to_container(config['train']['computation_time']) + else: + args.computation_time = config['train']['computation_time'] + args.computation_time = computation_time if computation_time is not None else {} if 'computation_time_stdev' in config['train']: - args.computation_time_stdev = config['train']['computation_time_stdev'] + args.computation_time["stdev"] = config['train']['computation_time_stdev'] if 'seed' in config['train']: args.seed = config['train']['seed'] if 'evaluation' in config: + args.eval_time = {} if 'eval_time' in config['evaluation']: - args.eval_time = config['evaluation']['eval_time'] + eval_time = {} + if isinstance(config['evaluation']['eval_time'], dict): + eval_time = config['evaluation']['eval_time'] + elif isinstance(config['evaluation']['eval_time'], (int, float)): + eval_time["mean"] = config['evaluation']['eval_time'] + elif isinstance(config['evaluation']['eval_time'], DictConfig): + eval_time = OmegaConf.to_container(config['evaluation']['eval_time']) + else: + args.eval_time = config['evaluation']['eval_time'] + args.eval_time = eval_time if eval_time is not None else {} + if 'eval_time_stdev' in config['evaluation']: - args.eval_time_stdev = config['evaluation']['eval_time_stdev'] + args.eval_time["stdev"] = config['evaluation']['eval_time_stdev'] if 'eval_after_epoch' in config['evaluation']: args.eval_after_epoch = config['evaluation']['eval_after_epoch'] if 'epochs_between_evals' in config['evaluation']: diff --git a/dlio_benchmark/utils/utility.py b/dlio_benchmark/utils/utility.py index 2e342a32..52a3eb00 100644 --- a/dlio_benchmark/utils/utility.py +++ b/dlio_benchmark/utils/utility.py @@ -18,7 +18,7 @@ import os from datetime import datetime import logging -from time import time +from time import time, sleep as base_sleep from functools import wraps import threading import json @@ -309,3 +309,31 @@ def get_trace_name(output_folder, use_pid=False): if use_pid: val = f"-{os.getpid()}" return f"{output_folder}/trace-{DLIOMPI.get_instance().rank()}-of-{DLIOMPI.get_instance().size()}{val}.pfw" + +def sleep(config): + sleep_time = 0.0 + if isinstance(config, dict) and len(config) > 0: + if "type" in config: + if config["type"] == "normal": + sleep_time = np.random.normal(config["mean"], config["stdev"]) + elif config["type"] == "uniform": + sleep_time = np.random.uniform(config["min"], config["max"]) + elif config["type"] == "gamma": + sleep_time = np.random.gamma(config["shape"], config["scale"]) + elif config["type"] == "exponential": + sleep_time = np.random.exponential(config["scale"]) + elif config["type"] == "poisson": + sleep_time = np.random.poisson(config["lam"]) + else: + if "mean" in config: + if "stdev" in config: + sleep_time = np.random.normal(config["mean"], config["stdev"]) + else: + sleep_time = config["mean"] + elif isinstance(config, (int, float)): + sleep_time = config + sleep_time = abs(sleep_time) + if sleep_time > 0.0: + base_sleep(sleep_time) + return sleep_time + diff --git a/docs/source/config.rst b/docs/source/config.rst index 4073509b..047d7ecd 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -221,7 +221,8 @@ reader - transfer size in byte for tensorflow data loader. * - preprocess_time - 0.0 - - The amount of emulated preprocess time (sleep) in second. + - | The amount of emulated preprocess time (sleep) in second. + | Can be specified as a distribution, see :ref:`Time Configuration` for more details. * - preprocess_time_stdev - 0.0 - The standard deviation of the amount of emulated preprocess time (sleep) in second. @@ -255,7 +256,8 @@ train - number of epochs to simulate * - computation_time - 0.0 - - emulated computation time per step in second + - | emulated computation time per step in second + | Can be specified as a distribution, see :ref:`Time Configuration` for more details. * - computation_time_stdev - 0.0 - standard deviation of the emulated computation time per step in second @@ -287,7 +289,8 @@ evaluation - Description * - eval_time - 0 - - emulated computation time (sleep) for each evaluation step. + - | emulated computation time (sleep) for each evaluation step. + | Can be specified as a distribution, see :ref:`Time Configuration` for more details. * - eval_time_stdev - 0 - standard deviation of the emulated computation time (sleep) for each evaluation step. @@ -384,6 +387,63 @@ profiling * ``dftracer``: https://github.com/hariharan-devarajan/dftracer. DFTRACER_ENABLE=1 has to be set to enable profiler. Please refer to :ref:`profiling` on how to enable these profiling tools. +Time Configuration +============================================ + +The time configuration is crucial for the emulation. Here, we are able to specify distribution of the time configuration. + +For example, to specify distribution of the computation time, one can specify the configuration as ``dictionary`` with the following format: + + +* Normal Distribution + +.. code-block:: yaml + computation_time: + mean: 1.0 + stdev: 0.1 + type: normal + + # or + + computation_time: + mean: 1.0 + + # or + + computation_time: + mean: 1.0 + stdev: 0.1 + +* Uniform Distribution + +.. code-block:: yaml + computation_time: + min: 0.5 + max: 1.5 + type: uniform + +* Gamma Distribution + +.. code-block:: yaml + computation_time: + shape: 1.0 + scale: 1.0 + type: gamma + +* Exponential Distribution + +.. code-block:: yaml + computation_time: + scale: 1.0 + type: exponential + +* Poisson Distribution + +.. code-block:: yaml + computation_time: + lam: 1.0 + type: poisson + How to create a DLIO configuration YAML file ============================================= Creating a YAML file for a workload is very straight forward. Most of the options are essentially the same with the actual workload, such as ``framework``, ``reader``, and many options in ``train``, ``evaluation``, such as ``epochs``. The main work involved is to find out the dataset information and the computation time. For the former, one can to check the original dataset to find out the number of files for training, how many samples per file, and the sample size, data format, etc. For the latter, one has to run the actual workload to find out the comptuation time per training step. One might have to add timing stamp before and after the training step. diff --git a/tests/dlio_benchmark_test.py b/tests/dlio_benchmark_test.py index 99e2faa4..902ce4e9 100644 --- a/tests/dlio_benchmark_test.py +++ b/tests/dlio_benchmark_test.py @@ -486,5 +486,43 @@ def test_custom_storage_root_train(fmt, framework) -> None: clean(storage_root) finalize() +compute_time_distributions = { + "uniform": {"type": "uniform", "min": 1.0, "max": 2.0}, + "normal": {"type": "normal", "mean": 1.0, "stdev": 1.0}, + "gamma": {"type": "gamma", "shape": 1.0, "scale": 1.0}, + "exp": {"type": "exponential", "scale": 1.0}, + "poisson": {"type": "poisson", "lam": 1.0}, + "normal_v2": {"mean": 1.0}, # mean, dist: normal + "normal_v3": {"mean": 1.0, "stdev": 1.0}, # mean, stdev, dist: normal + "normal_v4": 2.0, # mean, dist: normal +} + +@pytest.mark.timeout(60, method="thread") +@pytest.mark.parametrize("dist", list(compute_time_distributions.keys())) +def test_computation_time_distribution(dist) -> None: + init() + clean() + compute_time_overrides = [] + dist_val = compute_time_distributions[dist] + if isinstance(dist_val, dict): + for key, value in dist_val.items(): + compute_time_overrides.append(f"++workload.train.computation_time.{key}={value}") + else: + compute_time_overrides.append(f"++workload.train.computation_time={dist_val}") + + if (comm.rank == 0): + logging.info("") + logging.info("=" * 80) + logging.info(f" DLIO test for computation time distribution") + logging.info("=" * 80) + with initialize_config_dir(version_base=None, config_dir=config_dir): + cfg = compose(config_name='config', + overrides=['++workload.workflow.train=True', \ + '++workload.workflow.generate_data=True', \ + '++workload.train.epochs=4'] + compute_time_overrides) + benchmark = run_benchmark(cfg) + clean() + finalize() + if __name__ == '__main__': unittest.main()