Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add user config to specify type of distribution of time configuration #241

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,9 @@ jobs:
mpirun -np 2 ${DLIO_EXEC} workload=cosmoflow_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16
mpirun -np 2 ${DLIO_EXEC} workload=cosmoflow_h100 ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.dataset.format=synthetic
rm -rf data
- name: test_computation_time_distribution
run: |
source ${VENV_PATH}/bin/activate
rm -rf output data checkpoints
mpirun -np 2 pytest -k test_computation_time_distribution -v
rm -rf data
2 changes: 1 addition & 1 deletion dlio_benchmark/framework/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
from dlio_benchmark.storage.storage_factory import StorageFactory
from dlio_benchmark.utils.utility import utcnow

from time import sleep
import os
import logging

from dlio_benchmark.utils.config import ConfigArguments
from dlio_benchmark.utils.utility import sleep

class DummyTraceObject(object):
def __init__(self, string, step, r):
Expand Down
6 changes: 3 additions & 3 deletions dlio_benchmark/framework/tf_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

import os
import logging
from time import time, sleep
from time import time

from dlio_benchmark.common.constants import MODULE_AI_FRAMEWORK
from dlio_benchmark.data_loader.data_loader_factory import DataLoaderFactory
from dlio_benchmark.utils.utility import utcnow, DLIOMPI
from dlio_benchmark.utils.utility import Profile
from dlio_benchmark.utils.utility import Profile, sleep
from dlio_benchmark.common.error_code import ErrorCodes
from dlio_benchmark.framework.framework import Framework
from dlio_benchmark.reader.reader_factory import ReaderFactory
Expand Down Expand Up @@ -87,7 +87,7 @@ def trace_object(self, string, step, r):

@dlp.log
def compute(self, x, epoch_number, step, computation_time):
sleep(computation_time)
return sleep(computation_time)
# tf.function(self.model)(epoch_number, step, computation_time)

@dlp.log
Expand Down
5 changes: 3 additions & 2 deletions dlio_benchmark/framework/torch_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
from dlio_benchmark.utils.utility import utcnow, DLIOMPI
from dlio_benchmark.utils.utility import Profile

from time import sleep, time
from time import time

from dlio_benchmark.reader.reader_factory import ReaderFactory
from dlio_benchmark.storage.storage_factory import StorageFactory
from dlio_benchmark.utils.utility import sleep

HANDLED_FUNCTIONS = {}
dlp = Profile(MODULE_AI_FRAMEWORK)
Expand Down Expand Up @@ -93,7 +94,7 @@ def trace_object(self, string, step, r):

@dlp.log
def compute(self, x, epoch_number, step, computation_time):
torch_sleep(computation_time)
return torch_sleep(computation_time)

@dlp.log
def get_loader(self, dataset_type=DatasetType.TRAIN):
Expand Down
21 changes: 5 additions & 16 deletions dlio_benchmark/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import os
import math
import logging
from time import time, sleep
from time import time
import json
import numpy as np

Expand Down Expand Up @@ -114,7 +114,6 @@ def __init__(self, cfg):
self.epochs = self.args.epochs
self.batch_size = self.args.batch_size
self.computation_time = self.args.computation_time
self.computation_time_stdev = self.args.computation_time_stdev

if self.do_profiling:
self.profiler = ProfilerFactory().get_profiler(self.args.profiler)
Expand All @@ -133,7 +132,6 @@ def __init__(self, cfg):

self.batch_size_eval = self.args.batch_size_eval
self.eval_time = self.args.eval_time
self.eval_time_stdev = self.args.eval_time_stdev
self.eval_after_epoch = self.args.eval_after_epoch
self.epochs_between_evals = self.args.epochs_between_evals
self.stats = StatsCounter()
Expand Down Expand Up @@ -226,13 +224,8 @@ def _eval(self, epoch):
t0 = time()
for batch in loader.next():
self.stats.eval_batch_loaded(epoch, step, t0)
eval_time = 0.0
if self.eval_time > 0:
if self.eval_time_stdev > 0:
eval_time = abs(random.normal(self.eval_time, self.eval_time_stdev))
else:
eval_time = self.eval_time
self.framework.compute(batch, epoch, step, eval_time)
eval_time = self.eval_time
eval_time = self.framework.compute(batch, epoch, step, eval_time)
self.stats.eval_batch_processed(epoch, step, t0, eval_time)

step += 1
Expand Down Expand Up @@ -268,13 +261,9 @@ def _train(self, epoch):
if block_step == 1 and block != 1:
self.stats.start_block(epoch, block)
computation_time = self.computation_time
if self.computation_time > 0:
if (isinstance(computation_time, dict) and len(computation_time) > 0) or (isinstance(computation_time, float) and computation_time > 0):
self.framework.trace_object("Train", overall_step, 1)
if self.computation_time_stdev > 0:
computation_time = abs(random.normal(self.computation_time, self.computation_time_stdev))
else:
computation_time = self.computation_time
self.framework.compute(batch, epoch, block_step, computation_time)
computation_time = self.framework.compute(batch, epoch, block_step, computation_time)
self.stats.batch_processed(epoch, overall_step, block, t0, computation_time)
self.comm.barrier()
if self.do_checkpoint and (
Expand Down
2 changes: 1 addition & 1 deletion dlio_benchmark/reader/dali_image_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"""
import math
import logging
from time import time, sleep
from time import time
import numpy as np

import nvidia.dali.fn as fn
Expand Down
2 changes: 1 addition & 1 deletion dlio_benchmark/reader/dali_npy_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"""
import math
import logging
from time import time, sleep
from time import time
import numpy as np

import nvidia.dali.fn as fn
Expand Down
2 changes: 1 addition & 1 deletion dlio_benchmark/reader/dali_tfrecord_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import math
import logging
from time import time, sleep
from time import time
import numpy as np

import nvidia
Expand Down
8 changes: 3 additions & 5 deletions dlio_benchmark/reader/reader_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
import os
import math
import logging
from time import sleep
import glob
from dlio_benchmark.common.constants import MODULE_DATA_READER

from dlio_benchmark.utils.utility import sleep

dlp = Profile(MODULE_DATA_READER)

Expand Down Expand Up @@ -60,10 +59,9 @@ def __init__(self, dataset_type, thread_index):

@dlp.log
def preprocess(self, a=None):
if self._args.preprocess_time > 0. or self._args.preprocess_time_stdev > 0.:
t = np.random.normal(self._args.preprocess_time, self._args.preprocess_time_stdev)
sleep(abs(t))
sleep(self._args.preprocess_time)
return a

@abstractmethod
def open(self, filename):
return
Expand Down
58 changes: 44 additions & 14 deletions dlio_benchmark/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@
from time import time


from typing import List, ClassVar
from typing import Any, Dict, List, ClassVar

from dlio_benchmark.common.constants import MODULE_CONFIG
from dlio_benchmark.common.enumerations import StorageType, FormatType, Shuffle, ReadType, FileAccess, Compression, \
FrameworkType, \
DataLoaderType, Profiler, DatasetType, DataLoaderSampler, CheckpointLocationType, CheckpointMechanismType
from dlio_benchmark.utils.utility import DLIOMPI, get_trace_name, utcnow
from dlio_benchmark.utils.utility import Profile, PerfTrace, DFTRACER_ENABLE
from dataclasses import dataclass
from omegaconf import OmegaConf, DictConfig
import math
import os
import numpy as np
from dlio_benchmark.utils.utility import Profile, PerfTrace, DFTRACER_ENABLE

dlp = Profile(MODULE_CONFIG)
@dataclass
Expand Down Expand Up @@ -82,10 +83,8 @@ class ConfigArguments:
read_threads: int = 1
dont_use_mmap: bool = False
computation_threads: int = 1
computation_time: float = 0.
computation_time_stdev: float = 0.
preprocess_time: float = 0.
preprocess_time_stdev: float = 0.
computation_time: ClassVar[Dict[str, Any]] = {}
preprocess_time: ClassVar[Dict[str, Any]] = {}
prefetch_size: int = 2
enable_chunking: bool = False
chunk_size: int = 0
Expand All @@ -97,8 +96,7 @@ class ConfigArguments:
batch_size_eval: int = 1
num_files_eval: int = 0
generation_buffer_size: int = 2 * 1073741824 # 2 GB
eval_time: float = 0.0
eval_time_stdev: float = 0.0
eval_time: ClassVar[Dict[str, Any]] = {}
eval_after_epoch: int = 1
epochs_between_evals: int = 1
checkpoint_type: CheckpointLocationType = CheckpointLocationType.RANK_ZERO
Expand Down Expand Up @@ -518,10 +516,21 @@ def LoadConfig(args, config):
args.read_type = reader['read_type']
if 'transfer_size' in reader:
args.transfer_size = reader['transfer_size']

args.preprocess_time = {}
if 'preprocess_time' in reader:
args.preprocess_time = reader['preprocess_time']
preprocess_time = {}
if isinstance(reader['preprocess_time'], dict):
preprocess_time = reader['preprocess_time']
elif isinstance(reader['preprocess_time'], (int, float)):
preprocess_time["mean"] = reader['preprocess_time']
elif isinstance(reader['preprocess_time'], DictConfig):
preprocess_time = OmegaConf.to_container(reader['preprocess_time'])
else:
args.preprocess_time = reader['preprocess_time']
args.preprocess_time = preprocess_time if preprocess_time is not None else {}
if 'preprocess_time_stdev' in reader:
args.preprocess_time_stdev = reader['preprocess_time_stdev']
args.preprocess_time["stdev"] = reader['preprocess_time_stdev']
if 'pin_memory' in reader:
args.pin_memory = reader['pin_memory']

Expand All @@ -533,18 +542,39 @@ def LoadConfig(args, config):
args.total_training_steps = config['train']['total_training_steps']
if 'seed_change_epoch' in config['train']:
args.seed_change_epoch = config['train']['seed_change_epoch']
args.computation_time = {}
if 'computation_time' in config['train']:
args.computation_time = config['train']['computation_time']
computation_time = {}
if isinstance(config['train']['computation_time'], dict):
computation_time = config['train']['computation_time']
elif isinstance(config['train']['computation_time'], (int, float)):
computation_time["mean"] = config['train']['computation_time']
elif isinstance(config['train']['computation_time'], DictConfig):
computation_time = OmegaConf.to_container(config['train']['computation_time'])
else:
args.computation_time = config['train']['computation_time']
args.computation_time = computation_time if computation_time is not None else {}
if 'computation_time_stdev' in config['train']:
args.computation_time_stdev = config['train']['computation_time_stdev']
args.computation_time["stdev"] = config['train']['computation_time_stdev']
if 'seed' in config['train']:
args.seed = config['train']['seed']

if 'evaluation' in config:
args.eval_time = {}
if 'eval_time' in config['evaluation']:
args.eval_time = config['evaluation']['eval_time']
eval_time = {}
if isinstance(config['evaluation']['eval_time'], dict):
eval_time = config['evaluation']['eval_time']
elif isinstance(config['evaluation']['eval_time'], (int, float)):
eval_time["mean"] = config['evaluation']['eval_time']
elif isinstance(config['evaluation']['eval_time'], DictConfig):
eval_time = OmegaConf.to_container(config['evaluation']['eval_time'])
else:
args.eval_time = config['evaluation']['eval_time']
args.eval_time = eval_time if eval_time is not None else {}

if 'eval_time_stdev' in config['evaluation']:
args.eval_time_stdev = config['evaluation']['eval_time_stdev']
args.eval_time["stdev"] = config['evaluation']['eval_time_stdev']
if 'eval_after_epoch' in config['evaluation']:
args.eval_after_epoch = config['evaluation']['eval_after_epoch']
if 'epochs_between_evals' in config['evaluation']:
Expand Down
30 changes: 29 additions & 1 deletion dlio_benchmark/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import os
from datetime import datetime
import logging
from time import time
from time import time, sleep as base_sleep
from functools import wraps
import threading
import json
Expand Down Expand Up @@ -309,3 +309,31 @@ def get_trace_name(output_folder, use_pid=False):
if use_pid:
val = f"-{os.getpid()}"
return f"{output_folder}/trace-{DLIOMPI.get_instance().rank()}-of-{DLIOMPI.get_instance().size()}{val}.pfw"

def sleep(config):
sleep_time = 0.0
if isinstance(config, dict) and len(config) > 0:
if "type" in config:
if config["type"] == "normal":
sleep_time = np.random.normal(config["mean"], config["stdev"])
elif config["type"] == "uniform":
sleep_time = np.random.uniform(config["min"], config["max"])
elif config["type"] == "gamma":
sleep_time = np.random.gamma(config["shape"], config["scale"])
elif config["type"] == "exponential":
sleep_time = np.random.exponential(config["scale"])
elif config["type"] == "poisson":
sleep_time = np.random.poisson(config["lam"])
else:
if "mean" in config:
if "stdev" in config:
sleep_time = np.random.normal(config["mean"], config["stdev"])
else:
sleep_time = config["mean"]
elif isinstance(config, (int, float)):
sleep_time = config
sleep_time = abs(sleep_time)
if sleep_time > 0.0:
base_sleep(sleep_time)
return sleep_time

Loading
Loading