From 890b0db95de24601f1be7dfb6df896d77e8cf04b Mon Sep 17 00:00:00 2001 From: Ray Andrew Date: Tue, 12 Nov 2024 02:33:07 +0000 Subject: [PATCH 1/6] change sleep API to accept distribution type --- dlio_benchmark/framework/framework.py | 2 +- dlio_benchmark/framework/tf_framework.py | 4 +- dlio_benchmark/framework/torch_framework.py | 3 +- dlio_benchmark/main.py | 19 ++----- dlio_benchmark/reader/dali_image_reader.py | 2 +- dlio_benchmark/reader/dali_npy_reader.py | 2 +- dlio_benchmark/reader/dali_tfrecord_reader.py | 2 +- dlio_benchmark/reader/reader_handler.py | 8 +-- dlio_benchmark/utils/config.py | 56 ++++++++++++++----- dlio_benchmark/utils/utility.py | 29 +++++++++- 10 files changed, 86 insertions(+), 41 deletions(-) diff --git a/dlio_benchmark/framework/framework.py b/dlio_benchmark/framework/framework.py index 02731d8f..98191b4c 100644 --- a/dlio_benchmark/framework/framework.py +++ b/dlio_benchmark/framework/framework.py @@ -22,11 +22,11 @@ from dlio_benchmark.storage.storage_factory import StorageFactory from dlio_benchmark.utils.utility import utcnow -from time import sleep import os import logging from dlio_benchmark.utils.config import ConfigArguments +from dlio_benchmark.utils.utility import sleep class DummyTraceObject(object): def __init__(self, string, step, r): diff --git a/dlio_benchmark/framework/tf_framework.py b/dlio_benchmark/framework/tf_framework.py index 1b1a049f..be995449 100644 --- a/dlio_benchmark/framework/tf_framework.py +++ b/dlio_benchmark/framework/tf_framework.py @@ -17,12 +17,12 @@ import os import logging -from time import time, sleep +from time import time from dlio_benchmark.common.constants import MODULE_AI_FRAMEWORK from dlio_benchmark.data_loader.data_loader_factory import DataLoaderFactory from dlio_benchmark.utils.utility import utcnow, DLIOMPI -from dlio_benchmark.utils.utility import Profile +from dlio_benchmark.utils.utility import Profile, sleep from dlio_benchmark.common.error_code import ErrorCodes from dlio_benchmark.framework.framework import Framework from dlio_benchmark.reader.reader_factory import ReaderFactory diff --git a/dlio_benchmark/framework/torch_framework.py b/dlio_benchmark/framework/torch_framework.py index 1d4b85c0..a86e2938 100644 --- a/dlio_benchmark/framework/torch_framework.py +++ b/dlio_benchmark/framework/torch_framework.py @@ -27,10 +27,11 @@ from dlio_benchmark.utils.utility import utcnow, DLIOMPI from dlio_benchmark.utils.utility import Profile -from time import sleep, time +from time import time from dlio_benchmark.reader.reader_factory import ReaderFactory from dlio_benchmark.storage.storage_factory import StorageFactory +from dlio_benchmark.utils.utility import sleep HANDLED_FUNCTIONS = {} dlp = Profile(MODULE_AI_FRAMEWORK) diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index 1a312ce1..b367debd 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -17,7 +17,7 @@ import os import math import logging -from time import time, sleep +from time import time import json import numpy as np @@ -114,7 +114,6 @@ def __init__(self, cfg): self.epochs = self.args.epochs self.batch_size = self.args.batch_size self.computation_time = self.args.computation_time - self.computation_time_stdev = self.args.computation_time_stdev if self.do_profiling: self.profiler = ProfilerFactory().get_profiler(self.args.profiler) @@ -133,7 +132,6 @@ def __init__(self, cfg): self.batch_size_eval = self.args.batch_size_eval self.eval_time = self.args.eval_time - self.eval_time_stdev = self.args.eval_time_stdev self.eval_after_epoch = self.args.eval_after_epoch self.epochs_between_evals = self.args.epochs_between_evals self.stats = StatsCounter() @@ -226,13 +224,8 @@ def _eval(self, epoch): t0 = time() for batch in loader.next(): self.stats.eval_batch_loaded(epoch, step, t0) - eval_time = 0.0 - if self.eval_time > 0: - if self.eval_time_stdev > 0: - eval_time = abs(random.normal(self.eval_time, self.eval_time_stdev)) - else: - eval_time = self.eval_time - self.framework.compute(batch, epoch, step, eval_time) + eval_time = self.eval_time + self.framework.compute(batch, epoch, step, eval_time) self.stats.eval_batch_processed(epoch, step, t0, eval_time) step += 1 @@ -268,12 +261,8 @@ def _train(self, epoch): if block_step == 1 and block != 1: self.stats.start_block(epoch, block) computation_time = self.computation_time - if self.computation_time > 0: + if (isinstance(computation_time, dict) and len(computation_time) > 0) or (isinstance(computation_time, float) and computation_time > 0): self.framework.trace_object("Train", overall_step, 1) - if self.computation_time_stdev > 0: - computation_time = abs(random.normal(self.computation_time, self.computation_time_stdev)) - else: - computation_time = self.computation_time self.framework.compute(batch, epoch, block_step, computation_time) self.stats.batch_processed(epoch, overall_step, block, t0, computation_time) self.comm.barrier() diff --git a/dlio_benchmark/reader/dali_image_reader.py b/dlio_benchmark/reader/dali_image_reader.py index 6876610b..14c9c302 100644 --- a/dlio_benchmark/reader/dali_image_reader.py +++ b/dlio_benchmark/reader/dali_image_reader.py @@ -16,7 +16,7 @@ """ import math import logging -from time import time, sleep +from time import time import numpy as np import nvidia.dali.fn as fn diff --git a/dlio_benchmark/reader/dali_npy_reader.py b/dlio_benchmark/reader/dali_npy_reader.py index 6091f360..c2c4c3cb 100644 --- a/dlio_benchmark/reader/dali_npy_reader.py +++ b/dlio_benchmark/reader/dali_npy_reader.py @@ -16,7 +16,7 @@ """ import math import logging -from time import time, sleep +from time import time import numpy as np import nvidia.dali.fn as fn diff --git a/dlio_benchmark/reader/dali_tfrecord_reader.py b/dlio_benchmark/reader/dali_tfrecord_reader.py index 99132188..a3852cf2 100644 --- a/dlio_benchmark/reader/dali_tfrecord_reader.py +++ b/dlio_benchmark/reader/dali_tfrecord_reader.py @@ -18,7 +18,7 @@ import math import logging -from time import time, sleep +from time import time import numpy as np import nvidia diff --git a/dlio_benchmark/reader/reader_handler.py b/dlio_benchmark/reader/reader_handler.py index 720e4378..f91b4de4 100644 --- a/dlio_benchmark/reader/reader_handler.py +++ b/dlio_benchmark/reader/reader_handler.py @@ -27,10 +27,9 @@ import os import math import logging -from time import sleep import glob from dlio_benchmark.common.constants import MODULE_DATA_READER - +from dlio_benchmark.utils.utility import sleep dlp = Profile(MODULE_DATA_READER) @@ -60,10 +59,9 @@ def __init__(self, dataset_type, thread_index): @dlp.log def preprocess(self, a=None): - if self._args.preprocess_time > 0. or self._args.preprocess_time_stdev > 0.: - t = np.random.normal(self._args.preprocess_time, self._args.preprocess_time_stdev) - sleep(abs(t)) + sleep(self._args.preprocess_time) return a + @abstractmethod def open(self, filename): return diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index 39ed54d9..4fa4ee97 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -29,11 +29,12 @@ FrameworkType, \ DataLoaderType, Profiler, DatasetType, DataLoaderSampler, CheckpointLocationType, CheckpointMechanismType from dlio_benchmark.utils.utility import DLIOMPI, get_trace_name, utcnow +from dlio_benchmark.utils.utility import Profile, PerfTrace, DFTRACER_ENABLE from dataclasses import dataclass +from omegaconf import DictConfig import math import os import numpy as np -from dlio_benchmark.utils.utility import Profile, PerfTrace, DFTRACER_ENABLE dlp = Profile(MODULE_CONFIG) @dataclass @@ -82,10 +83,8 @@ class ConfigArguments: read_threads: int = 1 dont_use_mmap: bool = False computation_threads: int = 1 - computation_time: float = 0. - computation_time_stdev: float = 0. - preprocess_time: float = 0. - preprocess_time_stdev: float = 0. + computation_time: ClassVar[Dict[str, Any]] = {} + preprocess_time: ClassVar[Dict[str, Any]] = {} prefetch_size: int = 2 enable_chunking: bool = False chunk_size: int = 0 @@ -97,8 +96,7 @@ class ConfigArguments: batch_size_eval: int = 1 num_files_eval: int = 0 generation_buffer_size: int = 2 * 1073741824 # 2 GB - eval_time: float = 0.0 - eval_time_stdev: float = 0.0 + eval_time: ClassVar[Dict[str, Any]] = {} eval_after_epoch: int = 1 epochs_between_evals: int = 1 checkpoint_type: CheckpointLocationType = CheckpointLocationType.RANK_ZERO @@ -518,10 +516,21 @@ def LoadConfig(args, config): args.read_type = reader['read_type'] if 'transfer_size' in reader: args.transfer_size = reader['transfer_size'] + + args.preprocess_time = {} if 'preprocess_time' in reader: - args.preprocess_time = reader['preprocess_time'] + preprocess_time = {} + if isinstance(reader['preprocess_time'], dict): + preprocess_time = reader['preprocess_time'] + elif isinstance(reader['preprocess_time'], (int, float)): + preprocess_time["mean"] = reader['preprocess_time'] + else isinstance(reader['preprocess_time'], DictConfig): + preprocess_time = OmegaConf.to_container(reader['preprocess_time']) + else: + args.preprocess_time = reader['preprocess_time'] + args.preprocess_time = preprocess_time if preprocess_time is not None else {} if 'preprocess_time_stdev' in reader: - args.preprocess_time_stdev = reader['preprocess_time_stdev'] + args.preprocess_time["stdev"] = reader['preprocess_time_stdev'] if 'pin_memory' in reader: args.pin_memory = reader['pin_memory'] @@ -533,18 +542,39 @@ def LoadConfig(args, config): args.total_training_steps = config['train']['total_training_steps'] if 'seed_change_epoch' in config['train']: args.seed_change_epoch = config['train']['seed_change_epoch'] + args.computation_time = {} if 'computation_time' in config['train']: - args.computation_time = config['train']['computation_time'] + computation_time = {} + if isinstance(config['train']['computation_time'], dict): + computation_time = config['train']['computation_time'] + elif isinstance(config['train']['computation_time'], (int, float)): + computation_time["mean"] = config['train']['computation_time'] + else isinstance(config['train']['computation_time'], DictConfig): + computation_time = OmegaConf.to_container(config['train']['computation_time']) + else: + args.computation_time = config['train']['computation_time'] + args.computation_time = computation_time if computation_time is not None else {} if 'computation_time_stdev' in config['train']: - args.computation_time_stdev = config['train']['computation_time_stdev'] + args.computation_time["stdev"] = config['train']['computation_time_stdev'] if 'seed' in config['train']: args.seed = config['train']['seed'] if 'evaluation' in config: + args.eval_time = {} if 'eval_time' in config['evaluation']: - args.eval_time = config['evaluation']['eval_time'] + eval_time = {} + if isinstance(config['evaluation']['eval_time'], dict): + eval_time = config['evaluation']['eval_time'] + elif isinstance(config['evaluation']['eval_time'], (int, float)): + eval_time["mean"] = config['evaluation']['eval_time'] + else isinstance(config['evaluation']['eval_time'], DictConfig): + eval_time = OmegaConf.to_container(config['evaluation']['eval_time']) + else: + args.eval_time = config['evaluation']['eval_time'] + args.eval_time = eval_time if eval_time is not None else {} + if 'eval_time_stdev' in config['evaluation']: - args.eval_time_stdev = config['evaluation']['eval_time_stdev'] + args.eval_time["stdev"] = config['evaluation']['eval_time_stdev'] if 'eval_after_epoch' in config['evaluation']: args.eval_after_epoch = config['evaluation']['eval_after_epoch'] if 'epochs_between_evals' in config['evaluation']: diff --git a/dlio_benchmark/utils/utility.py b/dlio_benchmark/utils/utility.py index 2e342a32..0080f076 100644 --- a/dlio_benchmark/utils/utility.py +++ b/dlio_benchmark/utils/utility.py @@ -18,7 +18,7 @@ import os from datetime import datetime import logging -from time import time +from time import time, sleep as base_sleep from functools import wraps import threading import json @@ -309,3 +309,30 @@ def get_trace_name(output_folder, use_pid=False): if use_pid: val = f"-{os.getpid()}" return f"{output_folder}/trace-{DLIOMPI.get_instance().rank()}-of-{DLIOMPI.get_instance().size()}{val}.pfw" + +def sleep(config): + sleep_time = 0.0 + if isinstance(config, dict) and len(config) > 0: + if "type" in config: + if config["type"] == "normal": + sleep_time = np.random.normal(config["mean"], config["stdev"]) + elif config["type"] == "uniform": + sleep_time = np.random.uniform(config["min"], config["max"]) + elif config["type"] == "gamma": + sleep_time = np.random.gamma(config["shape"], config["scale"]) + elif config["type"] == "exponential": + sleep_time = np.random.exponential(config["scale"]) + elif config["type"] == "poisson": + sleep_time = np.random.poisson(config["lam"]) + else: + if "mean" in config: + if "stdev" in config: + sleep_time = np.random.normal(config["mean"], config["stdev"]) + else: + sleep_time = config["mean"] + elif isinstance(config, (int, float)): + sleep_time = config + sleep_time = abs(sleep_time) + if sleep_time > 0.0: + base_sleep(sleep_time) + From ced249d7d36aa0f5f553ecdb78e211bc8767331e Mon Sep 17 00:00:00 2001 From: Ray Andrew Date: Tue, 12 Nov 2024 02:47:04 +0000 Subject: [PATCH 2/6] change documentation --- docs/source/config.rst | 66 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/docs/source/config.rst b/docs/source/config.rst index 4073509b..047d7ecd 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -221,7 +221,8 @@ reader - transfer size in byte for tensorflow data loader. * - preprocess_time - 0.0 - - The amount of emulated preprocess time (sleep) in second. + - | The amount of emulated preprocess time (sleep) in second. + | Can be specified as a distribution, see :ref:`Time Configuration` for more details. * - preprocess_time_stdev - 0.0 - The standard deviation of the amount of emulated preprocess time (sleep) in second. @@ -255,7 +256,8 @@ train - number of epochs to simulate * - computation_time - 0.0 - - emulated computation time per step in second + - | emulated computation time per step in second + | Can be specified as a distribution, see :ref:`Time Configuration` for more details. * - computation_time_stdev - 0.0 - standard deviation of the emulated computation time per step in second @@ -287,7 +289,8 @@ evaluation - Description * - eval_time - 0 - - emulated computation time (sleep) for each evaluation step. + - | emulated computation time (sleep) for each evaluation step. + | Can be specified as a distribution, see :ref:`Time Configuration` for more details. * - eval_time_stdev - 0 - standard deviation of the emulated computation time (sleep) for each evaluation step. @@ -384,6 +387,63 @@ profiling * ``dftracer``: https://github.com/hariharan-devarajan/dftracer. DFTRACER_ENABLE=1 has to be set to enable profiler. Please refer to :ref:`profiling` on how to enable these profiling tools. +Time Configuration +============================================ + +The time configuration is crucial for the emulation. Here, we are able to specify distribution of the time configuration. + +For example, to specify distribution of the computation time, one can specify the configuration as ``dictionary`` with the following format: + + +* Normal Distribution + +.. code-block:: yaml + computation_time: + mean: 1.0 + stdev: 0.1 + type: normal + + # or + + computation_time: + mean: 1.0 + + # or + + computation_time: + mean: 1.0 + stdev: 0.1 + +* Uniform Distribution + +.. code-block:: yaml + computation_time: + min: 0.5 + max: 1.5 + type: uniform + +* Gamma Distribution + +.. code-block:: yaml + computation_time: + shape: 1.0 + scale: 1.0 + type: gamma + +* Exponential Distribution + +.. code-block:: yaml + computation_time: + scale: 1.0 + type: exponential + +* Poisson Distribution + +.. code-block:: yaml + computation_time: + lam: 1.0 + type: poisson + How to create a DLIO configuration YAML file ============================================= Creating a YAML file for a workload is very straight forward. Most of the options are essentially the same with the actual workload, such as ``framework``, ``reader``, and many options in ``train``, ``evaluation``, such as ``epochs``. The main work involved is to find out the dataset information and the computation time. For the former, one can to check the original dataset to find out the number of files for training, how many samples per file, and the sample size, data format, etc. For the latter, one has to run the actual workload to find out the comptuation time per training step. One might have to add timing stamp before and after the training step. From 54385b2e12b36f9d81a11dc3872ce052ae0f9fb6 Mon Sep 17 00:00:00 2001 From: Ray Andrew Date: Tue, 12 Nov 2024 02:56:58 +0000 Subject: [PATCH 3/6] fix error branching --- dlio_benchmark/utils/config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index 4fa4ee97..b94386d4 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -524,7 +524,7 @@ def LoadConfig(args, config): preprocess_time = reader['preprocess_time'] elif isinstance(reader['preprocess_time'], (int, float)): preprocess_time["mean"] = reader['preprocess_time'] - else isinstance(reader['preprocess_time'], DictConfig): + elif isinstance(reader['preprocess_time'], DictConfig): preprocess_time = OmegaConf.to_container(reader['preprocess_time']) else: args.preprocess_time = reader['preprocess_time'] @@ -549,7 +549,7 @@ def LoadConfig(args, config): computation_time = config['train']['computation_time'] elif isinstance(config['train']['computation_time'], (int, float)): computation_time["mean"] = config['train']['computation_time'] - else isinstance(config['train']['computation_time'], DictConfig): + elif isinstance(config['train']['computation_time'], DictConfig): computation_time = OmegaConf.to_container(config['train']['computation_time']) else: args.computation_time = config['train']['computation_time'] @@ -567,7 +567,7 @@ def LoadConfig(args, config): eval_time = config['evaluation']['eval_time'] elif isinstance(config['evaluation']['eval_time'], (int, float)): eval_time["mean"] = config['evaluation']['eval_time'] - else isinstance(config['evaluation']['eval_time'], DictConfig): + elif isinstance(config['evaluation']['eval_time'], DictConfig): eval_time = OmegaConf.to_container(config['evaluation']['eval_time']) else: args.eval_time = config['evaluation']['eval_time'] From 024f0ab2e7b18a602a7962a66fe59a9b2eb2545f Mon Sep 17 00:00:00 2001 From: Ray Andrew Date: Tue, 12 Nov 2024 03:25:54 +0000 Subject: [PATCH 4/6] fix missing import --- dlio_benchmark/utils/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index b94386d4..db71d94c 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -22,7 +22,7 @@ from time import time -from typing import List, ClassVar +from typing import Any, Dict, List, ClassVar from dlio_benchmark.common.constants import MODULE_CONFIG from dlio_benchmark.common.enumerations import StorageType, FormatType, Shuffle, ReadType, FileAccess, Compression, \ From 6893c02b89caa8868ca19de39885a0eb577cf599 Mon Sep 17 00:00:00 2001 From: Ray Andrew Date: Tue, 12 Nov 2024 03:36:57 +0000 Subject: [PATCH 5/6] return sleep time for statscounter --- dlio_benchmark/framework/tf_framework.py | 2 +- dlio_benchmark/framework/torch_framework.py | 2 +- dlio_benchmark/main.py | 4 ++-- dlio_benchmark/utils/utility.py | 1 + 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dlio_benchmark/framework/tf_framework.py b/dlio_benchmark/framework/tf_framework.py index be995449..5141a830 100644 --- a/dlio_benchmark/framework/tf_framework.py +++ b/dlio_benchmark/framework/tf_framework.py @@ -87,7 +87,7 @@ def trace_object(self, string, step, r): @dlp.log def compute(self, x, epoch_number, step, computation_time): - sleep(computation_time) + return sleep(computation_time) # tf.function(self.model)(epoch_number, step, computation_time) @dlp.log diff --git a/dlio_benchmark/framework/torch_framework.py b/dlio_benchmark/framework/torch_framework.py index a86e2938..ab862671 100644 --- a/dlio_benchmark/framework/torch_framework.py +++ b/dlio_benchmark/framework/torch_framework.py @@ -94,7 +94,7 @@ def trace_object(self, string, step, r): @dlp.log def compute(self, x, epoch_number, step, computation_time): - torch_sleep(computation_time) + return torch_sleep(computation_time) @dlp.log def get_loader(self, dataset_type=DatasetType.TRAIN): diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index b367debd..bf0b4b42 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -225,7 +225,7 @@ def _eval(self, epoch): for batch in loader.next(): self.stats.eval_batch_loaded(epoch, step, t0) eval_time = self.eval_time - self.framework.compute(batch, epoch, step, eval_time) + eval_time = self.framework.compute(batch, epoch, step, eval_time) self.stats.eval_batch_processed(epoch, step, t0, eval_time) step += 1 @@ -263,7 +263,7 @@ def _train(self, epoch): computation_time = self.computation_time if (isinstance(computation_time, dict) and len(computation_time) > 0) or (isinstance(computation_time, float) and computation_time > 0): self.framework.trace_object("Train", overall_step, 1) - self.framework.compute(batch, epoch, block_step, computation_time) + computation_time = self.framework.compute(batch, epoch, block_step, computation_time) self.stats.batch_processed(epoch, overall_step, block, t0, computation_time) self.comm.barrier() if self.do_checkpoint and ( diff --git a/dlio_benchmark/utils/utility.py b/dlio_benchmark/utils/utility.py index 0080f076..52a3eb00 100644 --- a/dlio_benchmark/utils/utility.py +++ b/dlio_benchmark/utils/utility.py @@ -335,4 +335,5 @@ def sleep(config): sleep_time = abs(sleep_time) if sleep_time > 0.0: base_sleep(sleep_time) + return sleep_time From c4ac742ff17023d04a85a0cc27abc3a9fbc30ecb Mon Sep 17 00:00:00 2001 From: Ray Andrew Date: Tue, 12 Nov 2024 04:24:53 +0000 Subject: [PATCH 6/6] add test case for time distribution config --- .github/workflows/ci.yml | 6 ++++++ dlio_benchmark/utils/config.py | 2 +- tests/dlio_benchmark_test.py | 38 ++++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c90663d..5231ab53 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -269,3 +269,9 @@ jobs: mpirun -np 2 ${DLIO_EXEC} workload=cosmoflow_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 mpirun -np 2 ${DLIO_EXEC} workload=cosmoflow_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.dataset.format=synthetic rm -rf data + - name: test_computation_time_distribution + run: | + source ${VENV_PATH}/bin/activate + rm -rf output data checkpoints + mpirun -np 2 pytest -k test_computation_time_distribution -v + rm -rf data diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index db71d94c..c2fa6abf 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -31,7 +31,7 @@ from dlio_benchmark.utils.utility import DLIOMPI, get_trace_name, utcnow from dlio_benchmark.utils.utility import Profile, PerfTrace, DFTRACER_ENABLE from dataclasses import dataclass -from omegaconf import DictConfig +from omegaconf import OmegaConf, DictConfig import math import os import numpy as np diff --git a/tests/dlio_benchmark_test.py b/tests/dlio_benchmark_test.py index 99e2faa4..902ce4e9 100644 --- a/tests/dlio_benchmark_test.py +++ b/tests/dlio_benchmark_test.py @@ -486,5 +486,43 @@ def test_custom_storage_root_train(fmt, framework) -> None: clean(storage_root) finalize() +compute_time_distributions = { + "uniform": {"type": "uniform", "min": 1.0, "max": 2.0}, + "normal": {"type": "normal", "mean": 1.0, "stdev": 1.0}, + "gamma": {"type": "gamma", "shape": 1.0, "scale": 1.0}, + "exp": {"type": "exponential", "scale": 1.0}, + "poisson": {"type": "poisson", "lam": 1.0}, + "normal_v2": {"mean": 1.0}, # mean, dist: normal + "normal_v3": {"mean": 1.0, "stdev": 1.0}, # mean, stdev, dist: normal + "normal_v4": 2.0, # mean, dist: normal +} + +@pytest.mark.timeout(60, method="thread") +@pytest.mark.parametrize("dist", list(compute_time_distributions.keys())) +def test_computation_time_distribution(dist) -> None: + init() + clean() + compute_time_overrides = [] + dist_val = compute_time_distributions[dist] + if isinstance(dist_val, dict): + for key, value in dist_val.items(): + compute_time_overrides.append(f"++workload.train.computation_time.{key}={value}") + else: + compute_time_overrides.append(f"++workload.train.computation_time={dist_val}") + + if (comm.rank == 0): + logging.info("") + logging.info("=" * 80) + logging.info(f" DLIO test for computation time distribution") + logging.info("=" * 80) + with initialize_config_dir(version_base=None, config_dir=config_dir): + cfg = compose(config_name='config', + overrides=['++workload.workflow.train=True', \ + '++workload.workflow.generate_data=True', \ + '++workload.train.epochs=4'] + compute_time_overrides) + benchmark = run_benchmark(cfg) + clean() + finalize() + if __name__ == '__main__': unittest.main()