diff --git a/env-files/tensorflow/generic_tf.sh b/env-files/tensorflow/generic_tf.sh index 4a37c9ca..51eede4b 100644 --- a/env-files/tensorflow/generic_tf.sh +++ b/env-files/tensorflow/generic_tf.sh @@ -84,7 +84,7 @@ fi # Since TF 2.16, keras updated to 3.3, # which leads to an error when more than 1 node is used # https://keras.io/getting_started/ -pip3 install tf_keras +pip3 install tf_keras==2.16.* # itwinai pip3 install -e .[dev] diff --git a/tutorials/distributed-ml/tf-tutorial-0-basics/README.md b/tutorials/distributed-ml/tf-tutorial-0-basics/README.md index c2c49595..12e4d7f6 100644 --- a/tutorials/distributed-ml/tf-tutorial-0-basics/README.md +++ b/tutorials/distributed-ml/tf-tutorial-0-basics/README.md @@ -3,10 +3,38 @@ In this tutorial we show how to use Tensorflow `MultiWorkerMirroredStrategy`. Note that the environment is tested on the HDFML system at JSC. For other systems, the module versions might need change accordingly. -Other strategies will be updated here. -First, from the root of this repository, build the environment containing -Tensorflow. You can *try* with: +Importantly, it should be kept it mind that under distributed training with Tensorflow, +the batch size should be scaled with the number of workers in this way: + +```bash +# scale batch size with number of workers (or replicas) +batch_size = args.batch_size * num_replicas +``` + +The other point to consider is the specification of epochs in Tensorflow. +An example code snipped for training an ML model in Tensorflow is shown below. + +```bash +# trains the model +model.fit(dist_train, + epochs=args.epochs, + steps_per_epoch=len(x_train)//batch_size) +``` + +Here `steps_per_epoch` specifies the number of iterations of `train_dataset` +over the specified number of `epochs`, where `train_dataset` +is the TF dataset class that is employed for training the model. +If `train_dataset` is too small, then the amount of data could be too less +for the specified number of `epochs`. This can be mitigated with the `repeat()` +option, which ensures that there is sufficient data for training. + +```bash +train_dataset = train_dataset.batch(batch_size).repeat() +``` + +After consideration of these aspects, from the root of this repository, +build the environment containing Tensorflow. You can *try* with: ```bash # Creates a Python venv called envAItf_hdfml diff --git a/tutorials/distributed-ml/tf-tutorial-0-basics/train.py b/tutorials/distributed-ml/tf-tutorial-0-basics/train.py index da84f9a7..3fd8b849 100644 --- a/tutorials/distributed-ml/tf-tutorial-0-basics/train.py +++ b/tutorials/distributed-ml/tf-tutorial-0-basics/train.py @@ -23,6 +23,14 @@ def parse_args() -> argparse.Namespace: "--batch_size", "-bs", type=int, default=64 ) + parser.add_argument( + "--data_dir", type=str, + default='/p/scratch/intertwin/datasets/.keras/datasets/mnist.npz' + ) + parser.add_argument( + "--epochs", type=int, + default=3 + ) parser.add_argument( "--shuffle_dataloader", action=argparse.BooleanOptionalAction @@ -32,29 +40,29 @@ def parse_args() -> argparse.Namespace: return args -def tf_rnd_dataset(args): - """Dummy TF dataset.""" +def trainer_entrypoint_fn( + foo: Any, + args: argparse.Namespace, + strategy, + num_replicas +) -> int: + """Training function, similar to custom code developed + by some use case. + """ + # dataset to be trained (x_train, y_train), (x_test, y_test) = \ tf.keras.datasets.mnist.load_data( - path='p/scratch/intertwin/datasets/.keras/datasets/mnist.npz') + path=args.data_dir) train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) - train_dataset = train_dataset.batch(args.batch_size) - test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)) - test_dataset = test_dataset.batch(args.batch_size) - return train_dataset, test_dataset + # scale batch size with number of workers + batch_size = args.batch_size * num_replicas - -def trainer_entrypoint_fn( - foo: Any, args: argparse.Namespace, strategy -) -> int: - """Dummy training function, similar to custom code developed - by some use case. - """ - # dataset to be trained - train_dataset, test_dataset = tf_rnd_dataset(args) + # batching dataset and repeat + train_dataset = train_dataset.batch(batch_size).repeat() + test_dataset = test_dataset.batch(batch_size).repeat() # distribute datasets among mirrored replicas dist_train = strategy.experimental_distribute_dataset( @@ -79,16 +87,18 @@ def trainer_entrypoint_fn( metrics=['accuracy'] ) - model.fit(dist_train, - epochs=5, - steps_per_epoch=2000) + model.fit(dist_train, + epochs=args.epochs, + steps_per_epoch=len(x_train)//batch_size) - test_scores = model.evaluate(dist_test, verbose=0, steps=500) + test_scores = model.evaluate(dist_test, + verbose=1, + steps=len(x_test)//batch_size) - print('Test loss:', test_scores[0]) - print('Test accuracy:', test_scores[1]) + print('Test loss:', test_scores[0]) + print('Test accuracy:', test_scores[1]) - return 123 + return 123 if __name__ == "__main__": @@ -105,4 +115,4 @@ def trainer_entrypoint_fn( f"Strategy {args.strategy} is not recognized/implemented.") # Launch distributed training - trainer_entrypoint_fn("foobar", args, strategy) + trainer_entrypoint_fn("foobar", args, strategy, num_replicas) diff --git a/tutorials/distributed-ml/tf-tutorial-1-imagenet/README.md b/tutorials/distributed-ml/tf-tutorial-1-imagenet/README.md index c2c49595..a0afb602 100644 --- a/tutorials/distributed-ml/tf-tutorial-1-imagenet/README.md +++ b/tutorials/distributed-ml/tf-tutorial-1-imagenet/README.md @@ -3,10 +3,41 @@ In this tutorial we show how to use Tensorflow `MultiWorkerMirroredStrategy`. Note that the environment is tested on the HDFML system at JSC. For other systems, the module versions might need change accordingly. -Other strategies will be updated here. +Before starting training, the following aspects should be considered in the trainer script. -First, from the root of this repository, build the environment containing -Tensorflow. You can *try* with: +Importantly, it should be kept it mind that under distributed training with Tensorflow, +the batch size should be scaled with the number of workers in this way: + +```bash +# scale batch size with number of workers, returned by get_strategy()[1] +batch_size = args.batch_size * get_strategy()[1] +``` + +The other point to consider is the specification of epochs in Tensorflow. +An example code snipped for training an ML model in Tensorflow is shown below. + +```bash +# trains the model +model.fit(dist_train, + epochs=args.epochs, + steps_per_epoch=train_size//batch_size, + verbose=10) +``` + +Here `steps_per_epoch` specifies the number of iterations of `train_dataset` +over the specified number of `epochs`, where `train_dataset` +is the TF dataset class that is employed for training the model. +If `train_dataset` is too small, then the amount of data could be too less +for the specified number of `epochs`. This can be mitigated with the `repeat()` +option, which ensures that there is sufficient data for training. + +```bash +train_dataset = train_dataset.batch( + batch_size).prefetch(tf.data.experimental.AUTOTUNE).repeat() +``` + +After consideration of these aspects, from the root of this repository, +build the environment containing Tensorflow. You can *try* with: ```bash # Creates a Python venv called envAItf_hdfml diff --git a/tutorials/distributed-ml/tf-tutorial-1-imagenet/train.py b/tutorials/distributed-ml/tf-tutorial-1-imagenet/train.py index d820fd9b..f11cad74 100644 --- a/tutorials/distributed-ml/tf-tutorial-1-imagenet/train.py +++ b/tutorials/distributed-ml/tf-tutorial-1-imagenet/train.py @@ -83,7 +83,7 @@ def tf_records_loader(files_path, shuffle=False): datasets = datasets.flat_map(tf.data.TFRecordDataset) datasets = datasets.map( deserialization_fn, num_parallel_calls=tf.data.AUTOTUNE) - return datasets + return datasets, len(tf.data.Dataset.from_tensor_slices(files_path)) def main(): @@ -117,51 +117,57 @@ def main(): metrics=['accuracy'] ) - # scale batch size with number of workers - batch_size = args.batch_size * get_strategy()[1] + # scale batch size with number of workers + batch_size = args.batch_size * get_strategy()[1] - dir_imagenet = args.data_dir+'imagenet-1K-tfrecords' - train_shard_suffix = 'train-*-of-01024' - test_shard_suffix = 'validation-*-of-00128' + dir_imagenet = args.data_dir+'imagenet-1K-tfrecords' + train_shard_suffix = 'train-*-of-01024' + test_shard_suffix = 'validation-*-of-00128' - train_set_path = sorted( - tf.io.gfile.glob(dir_imagenet + f'/{train_shard_suffix}') - ) - test_set_path = sorted( - tf.io.gfile.glob(dir_imagenet + f'/{test_shard_suffix}') - ) + train_set_path = sorted( + tf.io.gfile.glob(dir_imagenet + f'/{train_shard_suffix}') + ) + test_set_path = sorted( + tf.io.gfile.glob(dir_imagenet + f'/{test_shard_suffix}') + ) - train_dataset = tf_records_loader(train_set_path, shuffle=True) - test_dataset = tf_records_loader(test_set_path) + train_dataset, train_size = tf_records_loader( + train_set_path, shuffle=True) + test_dataset, test_size = tf_records_loader(test_set_path) - train_dataset = train_dataset.batch( - batch_size).prefetch(tf.data.experimental.AUTOTUNE) - test_dataset = test_dataset.batch( - batch_size).prefetch(tf.data.experimental.AUTOTUNE) + train_dataset = train_dataset.batch( + batch_size).prefetch(tf.data.experimental.AUTOTUNE).repeat() + test_dataset = test_dataset.batch( + batch_size).prefetch(tf.data.experimental.AUTOTUNE).repeat() - # distribute datasets among mirrored replicas - dist_train = strategy.experimental_distribute_dataset( - train_dataset - ) - dist_test = strategy.experimental_distribute_dataset( - test_dataset - ) + # distribute datasets among mirrored replicas + dist_train = strategy.experimental_distribute_dataset( + train_dataset + ) + dist_test = strategy.experimental_distribute_dataset( + test_dataset + ) - # TODO: add callbacks to evaluate per epoch time - et = timer() + # TODO: add callbacks to evaluate per epoch time + et = timer() - # trains the model - model.fit(dist_train, epochs=args.epochs, steps_per_epoch=500, verbose=10) + # trains the model + model.fit(dist_train, + epochs=args.epochs, + steps_per_epoch=train_size//batch_size, + verbose=10) - print('TIMER: total epoch time:', - timer() - et, ' s') - print('TIMER: average epoch time:', - (timer() - et) / (args.epochs), ' s') + print('TIMER: total epoch time:', + timer() - et, ' s') + print('TIMER: average epoch time:', + (timer() - et) / (args.epochs), ' s') - test_scores = model.evaluate(dist_test, steps=100, verbose=5) + test_scores = model.evaluate(dist_test, + steps=test_size//batch_size, + verbose=5) - print('Test loss:', test_scores[0]) - print('Test accuracy:', test_scores[1]) + print('Test loss:', test_scores[0]) + print('Test accuracy:', test_scores[1]) if __name__ == "__main__":