Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tf dataloader fixes #194

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion env-files/tensorflow/generic_tf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fi
# Since TF 2.16, keras updated to 3.3,
# which leads to an error when more than 1 node is used
# https://keras.io/getting_started/
pip3 install tf_keras
pip3 install tf_keras==2.16.*

# itwinai
pip3 install -e .[dev]
34 changes: 31 additions & 3 deletions tutorials/distributed-ml/tf-tutorial-0-basics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,38 @@
In this tutorial we show how to use Tensorflow `MultiWorkerMirroredStrategy`.
Note that the environment is tested on the HDFML system at JSC.
For other systems, the module versions might need change accordingly.
Other strategies will be updated here.

First, from the root of this repository, build the environment containing
Tensorflow. You can *try* with:
Importantly, it should be kept it mind that under distributed training with Tensorflow,
the batch size should be scaled with the number of workers in this way:

```bash
# scale batch size with number of workers (or replicas)
batch_size = args.batch_size * num_replicas
```

The other point to consider is the specification of epochs in Tensorflow.
An example code snipped for training an ML model in Tensorflow is shown below.

```bash
# trains the model
model.fit(dist_train,
epochs=args.epochs,
steps_per_epoch=len(x_train)//batch_size)
```

Here `steps_per_epoch` specifies the number of iterations of `train_dataset`
over the specified number of `epochs`, where `train_dataset`
is the TF dataset class that is employed for training the model.
If `train_dataset` is too small, then the amount of data could be too less
for the specified number of `epochs`. This can be mitigated with the `repeat()`
option, which ensures that there is sufficient data for training.

```bash
train_dataset = train_dataset.batch(batch_size).repeat()
```

After consideration of these aspects, from the root of this repository,
build the environment containing Tensorflow. You can *try* with:

```bash
# Creates a Python venv called envAItf_hdfml
Expand Down
58 changes: 34 additions & 24 deletions tutorials/distributed-ml/tf-tutorial-0-basics/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ def parse_args() -> argparse.Namespace:
"--batch_size", "-bs", type=int,
default=64
)
parser.add_argument(
"--data_dir", type=str,
default='/p/scratch/intertwin/datasets/.keras/datasets/mnist.npz'
)
parser.add_argument(
"--epochs", type=int,
default=3
)
parser.add_argument(
"--shuffle_dataloader",
action=argparse.BooleanOptionalAction
Expand All @@ -32,29 +40,29 @@ def parse_args() -> argparse.Namespace:
return args


def tf_rnd_dataset(args):
"""Dummy TF dataset."""
def trainer_entrypoint_fn(
foo: Any,
args: argparse.Namespace,
strategy,
num_replicas
) -> int:
"""Training function, similar to custom code developed
by some use case.
"""
# dataset to be trained
(x_train, y_train), (x_test, y_test) = \
tf.keras.datasets.mnist.load_data(
path='p/scratch/intertwin/datasets/.keras/datasets/mnist.npz')
path=args.data_dir)

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.batch(args.batch_size)

test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_dataset = test_dataset.batch(args.batch_size)

return train_dataset, test_dataset
# scale batch size with number of workers
batch_size = args.batch_size * num_replicas


def trainer_entrypoint_fn(
foo: Any, args: argparse.Namespace, strategy
) -> int:
"""Dummy training function, similar to custom code developed
by some use case.
"""
# dataset to be trained
train_dataset, test_dataset = tf_rnd_dataset(args)
# batching dataset and repeat
train_dataset = train_dataset.batch(batch_size).repeat()
test_dataset = test_dataset.batch(batch_size).repeat()

# distribute datasets among mirrored replicas
dist_train = strategy.experimental_distribute_dataset(
Expand All @@ -79,16 +87,18 @@ def trainer_entrypoint_fn(
metrics=['accuracy']
)

model.fit(dist_train,
epochs=5,
steps_per_epoch=2000)
model.fit(dist_train,
epochs=args.epochs,
steps_per_epoch=len(x_train)//batch_size)

test_scores = model.evaluate(dist_test, verbose=0, steps=500)
test_scores = model.evaluate(dist_test,
verbose=1,
steps=len(x_test)//batch_size)

print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])
print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])

return 123
return 123


if __name__ == "__main__":
Expand All @@ -105,4 +115,4 @@ def trainer_entrypoint_fn(
f"Strategy {args.strategy} is not recognized/implemented.")

# Launch distributed training
trainer_entrypoint_fn("foobar", args, strategy)
trainer_entrypoint_fn("foobar", args, strategy, num_replicas)
37 changes: 34 additions & 3 deletions tutorials/distributed-ml/tf-tutorial-1-imagenet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,41 @@
In this tutorial we show how to use Tensorflow `MultiWorkerMirroredStrategy`.
Note that the environment is tested on the HDFML system at JSC.
For other systems, the module versions might need change accordingly.
Other strategies will be updated here.
Before starting training, the following aspects should be considered in the trainer script.

First, from the root of this repository, build the environment containing
Tensorflow. You can *try* with:
Importantly, it should be kept it mind that under distributed training with Tensorflow,
the batch size should be scaled with the number of workers in this way:

```bash
# scale batch size with number of workers, returned by get_strategy()[1]
batch_size = args.batch_size * get_strategy()[1]
```

The other point to consider is the specification of epochs in Tensorflow.
An example code snipped for training an ML model in Tensorflow is shown below.

```bash
# trains the model
model.fit(dist_train,
epochs=args.epochs,
steps_per_epoch=train_size//batch_size,
verbose=10)
```

Here `steps_per_epoch` specifies the number of iterations of `train_dataset`
over the specified number of `epochs`, where `train_dataset`
is the TF dataset class that is employed for training the model.
If `train_dataset` is too small, then the amount of data could be too less
for the specified number of `epochs`. This can be mitigated with the `repeat()`
option, which ensures that there is sufficient data for training.

```bash
train_dataset = train_dataset.batch(
batch_size).prefetch(tf.data.experimental.AUTOTUNE).repeat()
```

After consideration of these aspects, from the root of this repository,
build the environment containing Tensorflow. You can *try* with:

```bash
# Creates a Python venv called envAItf_hdfml
Expand Down
78 changes: 42 additions & 36 deletions tutorials/distributed-ml/tf-tutorial-1-imagenet/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def tf_records_loader(files_path, shuffle=False):
datasets = datasets.flat_map(tf.data.TFRecordDataset)
datasets = datasets.map(
deserialization_fn, num_parallel_calls=tf.data.AUTOTUNE)
return datasets
return datasets, len(tf.data.Dataset.from_tensor_slices(files_path))


def main():
Expand Down Expand Up @@ -117,51 +117,57 @@ def main():
metrics=['accuracy']
)

# scale batch size with number of workers
batch_size = args.batch_size * get_strategy()[1]
# scale batch size with number of workers
batch_size = args.batch_size * get_strategy()[1]

dir_imagenet = args.data_dir+'imagenet-1K-tfrecords'
train_shard_suffix = 'train-*-of-01024'
test_shard_suffix = 'validation-*-of-00128'
dir_imagenet = args.data_dir+'imagenet-1K-tfrecords'
train_shard_suffix = 'train-*-of-01024'
test_shard_suffix = 'validation-*-of-00128'

train_set_path = sorted(
tf.io.gfile.glob(dir_imagenet + f'/{train_shard_suffix}')
)
test_set_path = sorted(
tf.io.gfile.glob(dir_imagenet + f'/{test_shard_suffix}')
)
train_set_path = sorted(
tf.io.gfile.glob(dir_imagenet + f'/{train_shard_suffix}')
)
test_set_path = sorted(
tf.io.gfile.glob(dir_imagenet + f'/{test_shard_suffix}')
)

train_dataset = tf_records_loader(train_set_path, shuffle=True)
test_dataset = tf_records_loader(test_set_path)
train_dataset, train_size = tf_records_loader(
train_set_path, shuffle=True)
test_dataset, test_size = tf_records_loader(test_set_path)

train_dataset = train_dataset.batch(
batch_size).prefetch(tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.batch(
batch_size).prefetch(tf.data.experimental.AUTOTUNE)
train_dataset = train_dataset.batch(
batch_size).prefetch(tf.data.experimental.AUTOTUNE).repeat()
test_dataset = test_dataset.batch(
batch_size).prefetch(tf.data.experimental.AUTOTUNE).repeat()

# distribute datasets among mirrored replicas
dist_train = strategy.experimental_distribute_dataset(
train_dataset
)
dist_test = strategy.experimental_distribute_dataset(
test_dataset
)
# distribute datasets among mirrored replicas
dist_train = strategy.experimental_distribute_dataset(
train_dataset
)
dist_test = strategy.experimental_distribute_dataset(
test_dataset
)

# TODO: add callbacks to evaluate per epoch time
et = timer()
# TODO: add callbacks to evaluate per epoch time
et = timer()

# trains the model
model.fit(dist_train, epochs=args.epochs, steps_per_epoch=500, verbose=10)
# trains the model
model.fit(dist_train,
epochs=args.epochs,
steps_per_epoch=train_size//batch_size,
verbose=10)

print('TIMER: total epoch time:',
timer() - et, ' s')
print('TIMER: average epoch time:',
(timer() - et) / (args.epochs), ' s')
print('TIMER: total epoch time:',
timer() - et, ' s')
print('TIMER: average epoch time:',
(timer() - et) / (args.epochs), ' s')

test_scores = model.evaluate(dist_test, steps=100, verbose=5)
test_scores = model.evaluate(dist_test,
steps=test_size//batch_size,
verbose=5)

print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])
print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])


if __name__ == "__main__":
Expand Down
Loading