From 8b111c94736247475a1f3c9909f775207da16f24 Mon Sep 17 00:00:00 2001 From: edknv Date: Fri, 3 Feb 2023 16:59:46 -0800 Subject: [PATCH 01/20] rough draft --- merlin/models/tf/__init__.py | 3 + merlin/models/tf/distributed/backend.py | 8 +++ merlin/models/tf/distributed/embedding.py | 81 +++++++++++++++++++++++ requirements/horovod.txt | 1 + tests/unit/tf/horovod/test_embedding.py | 46 +++++++++++++ 5 files changed, 139 insertions(+) create mode 100644 merlin/models/tf/distributed/embedding.py create mode 100644 tests/unit/tf/horovod/test_embedding.py diff --git a/merlin/models/tf/__init__.py b/merlin/models/tf/__init__.py index a3ec112f36..ebc4b303c0 100644 --- a/merlin/models/tf/__init__.py +++ b/merlin/models/tf/__init__.py @@ -128,6 +128,8 @@ from merlin.models.tf.prediction_tasks.regression import RegressionTask from merlin.models.tf.prediction_tasks.retrieval import ItemRetrievalTask from merlin.models.utils.dependencies import is_transformers_available +from merlin.models.tf.distributed.embedding import DistributedEmbeddingTable + if is_transformers_available(): from merlin.models.tf.transformers.block import ( @@ -211,6 +213,7 @@ "SequenceEmbeddingFeatures", "EmbeddingOptions", "EmbeddingTable", + "DistributedEmbeddingTable", "AverageEmbeddingsByWeightFeature", "Embeddings", "FeatureConfig", diff --git a/merlin/models/tf/distributed/backend.py b/merlin/models/tf/distributed/backend.py index b696713d1e..9f139e221b 100644 --- a/merlin/models/tf/distributed/backend.py +++ b/merlin/models/tf/distributed/backend.py @@ -1,6 +1,9 @@ hvd = None hvd_installed = False +dmp = None +dmp_installed = False + try: import horovod.tensorflow.keras as hvd # noqa: F401 @@ -8,6 +11,11 @@ except ImportError: pass +try: + from distributed_embeddings.python.layers import dist_model_parallel as dmp + dmp_installed = True +except ImportError: + pass if hvd_installed: hvd.init() diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py new file mode 100644 index 0000000000..ecc57ae765 --- /dev/null +++ b/merlin/models/tf/distributed/embedding.py @@ -0,0 +1,81 @@ +from typing import List, Union + +import tensorflow as tf + +from merlin.models.tf.inputs.embedding import EmbeddingTableBase +from merlin.models.tf.distributed.backend import dmp +from merlin.schema import ColumnSchema +from merlin.models.tf.typing import TabularData + + +@tf.keras.utils.register_keras_serializable(package="merlin.models") +class DistributedEmbeddingTable(EmbeddingTableBase): + """Large embedding table that automatically distributes embedding tables to multiple GPUs.""" + + def __init__( + self, + dim: int, + *col_schemas: ColumnSchema, + embeddings_initializer="uniform", + # sequence_combiner: Optional[CombinerType] = None, + trainable: bool = True, + name=None, + dtype=None, + dynamic=False, + **kwargs, + ): + super(DistributedEmbeddingTable, self).__init__( + dim, + *col_schemas, + trainable=trainable, + name=name, + dtype=dtype, + dynamic=dynamic, + **kwargs, + ) + + self.embedding_layers = [] + self.embeddings_initializer = embeddings_initializer + + self._create_embeddings() + + def _create_embeddings(self): + for table_size in self.table_sizes: + # if model_flags.test_combiner: + # self.embedding_layers.append( + # embedding.Embedding(input_dim=table_size, + # output_dim=self.embedding_dim, + # embeddings_initializer=DLRMInitializer(), + # combiner='sum')) + # else: + self.embedding_layers.append( + tf.keras.layers.Embedding( + input_dim=self.col_schema.int_domain.max, + output_dim=self.dim, + embeddings_initializer=self.embeddings_initializer, + ) + ) + self.embedding_layers = dmp.DistributedEmbedding(self.embedding_layers) + + def call(self, inputs: Union[tf.Tensor, TabularData]) -> Union[tf.Tensor, TabularData]: + """ + Parameters + ---------- + inputs : Union[tf.Tensor, tf.RaggedTensor, tf.SparseTensor] + Tensors or dictionary of tensors representing the input batch. + Returns + ------- + A tensor or dict of tensors corresponding to the embeddings for inputs + """ + if isinstance(inputs, dict): + outputs = {} + for feature_name in self.schema.column_names: + if feature_name in inputs: + embedding_outputs = self.embedding_layers(inputs[feature_name]) + #outputs[feature_name] = tf.concat(embedding_outputs, 1) + else: + embedding_outputs = self.embedding_layers(inputs) + #outputs = tf.concat(embedding_outputs, 1) + + #return outputs + return embedding_outputs diff --git a/requirements/horovod.txt b/requirements/horovod.txt index 8229a149aa..109ef215fe 100644 --- a/requirements/horovod.txt +++ b/requirements/horovod.txt @@ -1 +1,2 @@ horovod +git+https://github.com/NVIDIA-Merlin/distributed-embeddings diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py new file mode 100644 index 0000000000..35f8103dd4 --- /dev/null +++ b/tests/unit/tf/horovod/test_embedding.py @@ -0,0 +1,46 @@ +import numpy as np +import tensorflow as tf + +import merlin.models.tf as mm +from merlin.models.tf.distributed.backend import hvd, hvd_installed, dmp_installed +from merlin.schema import ColumnSchema, Tags + + +def generate_inputs(table_sizes, domain_max, global_batch_size): + global_inputs = [ + tf.random.uniform(shape=[global_batch_size], minval=0, maxval=domain_max, dtype=tf.int64) + for size in table_sizes + ] + for t in global_inputs: + hvd.broadcast(t, root_rank=0) + local_batch_size = global_batch_size // hvd.size() + size = hvd.size() + rank = hvd.rank() + inputs = [ + t[rank * local_batch_size : (rank + 1) * local_batch_size] for t in global_inputs + ] + + return inputs + + +def test_distributed_embedding_basic(): + assert hvd_installed is True + assert dmp_installed is True + + dim = 2 + domain_max = 10 + column_schema = ColumnSchema( + "item_id", + dtype=np.int32, + properties={"domain": {"min": 0, "max": domain_max, "name": "item_id"}}, + tags=[Tags.CATEGORICAL], + ) + table_sizes = [3, 4] + global_batch_size = 8 + + inputs = generate_inputs(table_sizes, domain_max, global_batch_size) + table = mm.DistributedEmbeddingTable(dim, table_sizes, column_schema) + outputs = table(inputs) + + assert outputs[0].shape == (4, 2) + assert outputs[1].shape == (4, 2) From 39cc9816ccda424ca06604708061631b8063e773 Mon Sep 17 00:00:00 2001 From: edknv Date: Tue, 7 Feb 2023 10:55:42 -0800 Subject: [PATCH 02/20] Introduce distributed embeddings --- merlin/models/tf/__init__.py | 5 +- merlin/models/tf/distributed/backend.py | 3 +- merlin/models/tf/distributed/embedding.py | 133 +++++++++++++--------- requirements/horovod.txt | 2 +- tests/unit/tf/horovod/test_embedding.py | 81 +++++++++---- 5 files changed, 142 insertions(+), 82 deletions(-) diff --git a/merlin/models/tf/__init__.py b/merlin/models/tf/__init__.py index ebc4b303c0..eafbfb1a94 100644 --- a/merlin/models/tf/__init__.py +++ b/merlin/models/tf/__init__.py @@ -76,6 +76,7 @@ ) from merlin.models.tf.core.encoder import EmbeddingEncoder, Encoder, TopKEncoder from merlin.models.tf.core.prediction import Prediction +from merlin.models.tf.distributed.embedding import DistributedEmbeddings from merlin.models.tf.inputs.base import InputBlock, InputBlockV2 from merlin.models.tf.inputs.continuous import Continuous, ContinuousFeatures, ContinuousProjection from merlin.models.tf.inputs.embedding import ( @@ -128,8 +129,6 @@ from merlin.models.tf.prediction_tasks.regression import RegressionTask from merlin.models.tf.prediction_tasks.retrieval import ItemRetrievalTask from merlin.models.utils.dependencies import is_transformers_available -from merlin.models.tf.distributed.embedding import DistributedEmbeddingTable - if is_transformers_available(): from merlin.models.tf.transformers.block import ( @@ -213,7 +212,7 @@ "SequenceEmbeddingFeatures", "EmbeddingOptions", "EmbeddingTable", - "DistributedEmbeddingTable", + "DistributedEmbeddings", "AverageEmbeddingsByWeightFeature", "Embeddings", "FeatureConfig", diff --git a/merlin/models/tf/distributed/backend.py b/merlin/models/tf/distributed/backend.py index 9f139e221b..d9500da23f 100644 --- a/merlin/models/tf/distributed/backend.py +++ b/merlin/models/tf/distributed/backend.py @@ -12,7 +12,8 @@ pass try: - from distributed_embeddings.python.layers import dist_model_parallel as dmp + from distributed_embeddings.python.layers import dist_model_parallel as dmp # noqa: F401 + dmp_installed = True except ImportError: pass diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index ecc57ae765..4b2528df00 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -1,81 +1,110 @@ -from typing import List, Union +from typing import Dict, List, Optional, Union import tensorflow as tf -from merlin.models.tf.inputs.embedding import EmbeddingTableBase -from merlin.models.tf.distributed.backend import dmp -from merlin.schema import ColumnSchema -from merlin.models.tf.typing import TabularData +from merlin.models.tf.core.tabular import TabularBlock +from merlin.models.tf.distributed.backend import dmp, dmp_installed, hvd_installed +from merlin.models.utils.schema_utils import infer_embedding_dim +from merlin.schema import Schema @tf.keras.utils.register_keras_serializable(package="merlin.models") -class DistributedEmbeddingTable(EmbeddingTableBase): - """Large embedding table that automatically distributes embedding tables to multiple GPUs.""" +class DistributedEmbeddings(TabularBlock): + """Large embedding table that automatically distributes embedding tables + to multiple GPUs. + + Parameters + ---------- + schema: Schema + Schema containing the columns used in embedding tables. + dim: Optional[Union[Dict[str, int], int]], optional + If int, the embedding size to use for all features, or a + dictionary-like {"feature_name": embedding size, ...}. + By default, None. + strategy: + column_slice_threshold: + dp_input: + input_table_map: + """ def __init__( self, - dim: int, - *col_schemas: ColumnSchema, - embeddings_initializer="uniform", - # sequence_combiner: Optional[CombinerType] = None, - trainable: bool = True, - name=None, - dtype=None, - dynamic=False, + schema: Schema, + dim: Optional[Union[Dict[str, int], int]] = None, + strategy: str = "basic", + column_slice_threshold: Optional[int] = None, + dp_input=True, + input_table_map=None, **kwargs, ): - super(DistributedEmbeddingTable, self).__init__( - dim, - *col_schemas, - trainable=trainable, - name=name, - dtype=dtype, - dynamic=dynamic, - **kwargs, - ) + if not hvd_installed or not dmp_installed: + raise ImportError( + "'horovod' and 'distributed-embeddings' are required to use " + f"{self.__class__.__name__}." + ) + + super(DistributedEmbeddings, self).__init__(schema=schema, **kwargs) + self.dim = dim + self.table_names = [] self.embedding_layers = [] - self.embeddings_initializer = embeddings_initializer - - self._create_embeddings() - - def _create_embeddings(self): - for table_size in self.table_sizes: - # if model_flags.test_combiner: - # self.embedding_layers.append( - # embedding.Embedding(input_dim=table_size, - # output_dim=self.embedding_dim, - # embeddings_initializer=DLRMInitializer(), - # combiner='sum')) - # else: + for col in self.schema: + table_name = col.int_domain.name or col.name + self.table_names.append(table_name) self.embedding_layers.append( tf.keras.layers.Embedding( - input_dim=self.col_schema.int_domain.max, - output_dim=self.dim, - embeddings_initializer=self.embeddings_initializer, + input_dim=self._infer_input_dim(col), + output_dim=self._infer_output_dim(col, dim), + name=table_name, ) ) - self.embedding_layers = dmp.DistributedEmbedding(self.embedding_layers) - def call(self, inputs: Union[tf.Tensor, TabularData]) -> Union[tf.Tensor, TabularData]: + self.embedding_layers = dmp.DistributedEmbedding( + self.embedding_layers, + strategy=strategy, + column_slice_threshold=column_slice_threshold, + dp_input=dp_input, + input_table_map=input_table_map, + ) + + def _infer_input_dim(self, col_schema): + return col_schema.int_domain.max + 1 + + def _infer_output_dim(self, col_schema, embedding_dims): + if isinstance(embedding_dims, dict): + dim = embedding_dims.get(col_schema.name) + elif isinstance(embedding_dims, int): + dim = embedding_dims + else: + dim = None + + if dim is None: + dim = infer_embedding_dim(col_schema) + + return dim + + def call( + self, inputs: Union[Dict[str, tf.Tensor], List[tf.Tensor]] + ) -> Union[Dict[str, tf.Tensor], List[tf.Tensor]]: """ Parameters ---------- - inputs : Union[tf.Tensor, tf.RaggedTensor, tf.SparseTensor] + inputs : Union[Dict[str, tf.Tensor], List[tf.Tensor]] Tensors or dictionary of tensors representing the input batch. + Returns ------- A tensor or dict of tensors corresponding to the embeddings for inputs """ if isinstance(inputs, dict): + ordered_inputs = [] + for feature_name in self.table_names: + ordered_inputs.append(inputs[feature_name]) + + ordered_outputs = self.embedding_layers(ordered_inputs) outputs = {} - for feature_name in self.schema.column_names: - if feature_name in inputs: - embedding_outputs = self.embedding_layers(inputs[feature_name]) - #outputs[feature_name] = tf.concat(embedding_outputs, 1) + for feature_name, output in zip(self.schema.column_names, ordered_outputs): + outputs[feature_name] = output else: - embedding_outputs = self.embedding_layers(inputs) - #outputs = tf.concat(embedding_outputs, 1) - - #return outputs - return embedding_outputs + outputs = self.embedding_layers(inputs) + return outputs diff --git a/requirements/horovod.txt b/requirements/horovod.txt index 109ef215fe..27e00a6834 100644 --- a/requirements/horovod.txt +++ b/requirements/horovod.txt @@ -1,2 +1,2 @@ horovod -git+https://github.com/NVIDIA-Merlin/distributed-embeddings +distributed-embeddings@git+https://github.com/NVIDIA-Merlin/distributed-embeddings diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index 35f8103dd4..d9c0de59c5 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -1,46 +1,77 @@ import numpy as np +import pytest import tensorflow as tf import merlin.models.tf as mm -from merlin.models.tf.distributed.backend import hvd, hvd_installed, dmp_installed -from merlin.schema import ColumnSchema, Tags +from merlin.io.dataset import Dataset +from merlin.models.tf.utils import testing_utils +from merlin.schema import ColumnSchema, Schema, Tags +hvd = pytest.importorskip("horovod.tensorflow.keras") +dmp = pytest.importorskip("distributed_embeddings.python.layers.dist_model_parallel") -def generate_inputs(table_sizes, domain_max, global_batch_size): + +def generate_inputs(input_dims, global_batch_size): global_inputs = [ - tf.random.uniform(shape=[global_batch_size], minval=0, maxval=domain_max, dtype=tf.int64) - for size in table_sizes + tf.random.uniform(shape=[global_batch_size], minval=0, maxval=dim, dtype=tf.int64) + for dim in input_dims ] for t in global_inputs: hvd.broadcast(t, root_rank=0) local_batch_size = global_batch_size // hvd.size() - size = hvd.size() rank = hvd.rank() - inputs = [ - t[rank * local_batch_size : (rank + 1) * local_batch_size] for t in global_inputs - ] - + inputs = [t[rank * local_batch_size : (rank + 1) * local_batch_size] for t in global_inputs] return inputs -def test_distributed_embedding_basic(): - assert hvd_installed is True - assert dmp_installed is True - - dim = 2 - domain_max = 10 - column_schema = ColumnSchema( - "item_id", +def test_distributed_embeddings_basic(embedding_dim=4, global_batch_size=8): + column_schema_0 = ColumnSchema( + "col0", dtype=np.int32, - properties={"domain": {"min": 0, "max": domain_max, "name": "item_id"}}, + properties={"domain": {"min": 0, "max": 10, "name": "col0"}}, tags=[Tags.CATEGORICAL], ) - table_sizes = [3, 4] - global_batch_size = 8 + column_schema_1 = ColumnSchema( + "col1", + dtype=np.int32, + properties={"domain": {"min": 0, "max": 20, "name": "col1"}}, + tags=[Tags.CATEGORICAL], + ) + schema = Schema([column_schema_0, column_schema_1]) - inputs = generate_inputs(table_sizes, domain_max, global_batch_size) - table = mm.DistributedEmbeddingTable(dim, table_sizes, column_schema) + inputs = generate_inputs([10, 20], global_batch_size) + table = mm.DistributedEmbeddings(schema, embedding_dim) outputs = table(inputs) - assert outputs[0].shape == (4, 2) - assert outputs[1].shape == (4, 2) + assert len(outputs) == 2 + assert outputs[0].shape == (global_batch_size // hvd.size(), embedding_dim) + assert outputs[1].shape == (global_batch_size // hvd.size(), embedding_dim) + + +def test_dlrm_model_with_embeddings(music_streaming_data, batch_size=8, embedding_dim=4): + music_streaming_data.schema = music_streaming_data.schema.select_by_name( + ["item_id", "user_id", "user_age", "click"] + ) + schema = music_streaming_data.schema + + ddf = music_streaming_data.to_ddf().repartition(npartitions=hvd.size()) + train = Dataset(ddf, schema=schema) + + train_loader = mm.Loader( + train, + schema=train.schema, + batch_size=batch_size, + shuffle=True, + drop_last=True, + ) + + model = mm.DLRMModel( + schema, + embeddings=mm.DistributedEmbeddings( + schema.select_by_tag(Tags.CATEGORICAL), dim=embedding_dim + ), + bottom_block=mm.MLPBlock([embedding_dim]), + prediction_tasks=mm.BinaryOutput("click"), + ) + + testing_utils.model_test(model, train_loader, run_eagerly=True) From dd3df0c3e2e2b95118df74e4ca5e7d8f8a677b48 Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 8 Feb 2023 09:45:35 -0800 Subject: [PATCH 03/20] install distributed-embeddings package in tox --- tox.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/tox.ini b/tox.ini index 8514e99b2c..b475cc5fe3 100644 --- a/tox.ini +++ b/tox.ini @@ -33,6 +33,7 @@ commands = conda env create --prefix {envdir}/env --file requirements/horovod-cpu-environment.yml --force {envdir}/env/bin/python -m pip install horovod --no-cache-dir {envdir}/env/bin/horovodrun --check-build + {envdir}/env/bin/python -m pip install git+https://github.com/NVIDIA-Merlin/distributed-embeddings.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git From 847c05ba074830a4470ce4c29d01a3349137fcc9 Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 8 Feb 2023 10:36:40 -0800 Subject: [PATCH 04/20] check if distributed-embeddings is installed before loading the class globally --- merlin/models/tf/__init__.py | 9 +++++++-- merlin/models/utils/dependencies.py | 10 ++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/merlin/models/tf/__init__.py b/merlin/models/tf/__init__.py index eafbfb1a94..2e14c2d68e 100644 --- a/merlin/models/tf/__init__.py +++ b/merlin/models/tf/__init__.py @@ -76,7 +76,6 @@ ) from merlin.models.tf.core.encoder import EmbeddingEncoder, Encoder, TopKEncoder from merlin.models.tf.core.prediction import Prediction -from merlin.models.tf.distributed.embedding import DistributedEmbeddings from merlin.models.tf.inputs.base import InputBlock, InputBlockV2 from merlin.models.tf.inputs.continuous import Continuous, ContinuousFeatures, ContinuousProjection from merlin.models.tf.inputs.embedding import ( @@ -128,7 +127,10 @@ from merlin.models.tf.prediction_tasks.multi import PredictionTasks from merlin.models.tf.prediction_tasks.regression import RegressionTask from merlin.models.tf.prediction_tasks.retrieval import ItemRetrievalTask -from merlin.models.utils.dependencies import is_transformers_available +from merlin.models.utils.dependencies import ( + is_distributed_embeddings_available, + is_transformers_available, +) if is_transformers_available(): from merlin.models.tf.transformers.block import ( @@ -146,6 +148,9 @@ LastHiddenStateAndAttention, ) +if is_distributed_embeddings_available(): + from merlin.models.tf.distributed.embedding import DistributedEmbeddings + from merlin.models.tf.transforms.features import ( BroadcastToSequence, CategoryEncoding, diff --git a/merlin/models/utils/dependencies.py b/merlin/models/utils/dependencies.py index 4d37e1ed98..ecf9412706 100644 --- a/merlin/models/utils/dependencies.py +++ b/merlin/models/utils/dependencies.py @@ -47,3 +47,13 @@ def is_transformers_available() -> bool: except ImportError: transformers = None return transformers is not None + + +def is_distributed_embeddings_available() -> bool: + try: + import horovod + import distributed_embeddings + except ImportError: + horovod = None + distributed_embeddings = None + return horovod is not None and distributed_embeddings is not None From ded191e5d7b8c7a1baa7feacdca13fabd60fc4e4 Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 8 Feb 2023 10:47:20 -0800 Subject: [PATCH 05/20] lint --- merlin/models/tf/__init__.py | 1 - merlin/models/utils/dependencies.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/merlin/models/tf/__init__.py b/merlin/models/tf/__init__.py index 2e14c2d68e..261a0ceefd 100644 --- a/merlin/models/tf/__init__.py +++ b/merlin/models/tf/__init__.py @@ -217,7 +217,6 @@ "SequenceEmbeddingFeatures", "EmbeddingOptions", "EmbeddingTable", - "DistributedEmbeddings", "AverageEmbeddingsByWeightFeature", "Embeddings", "FeatureConfig", diff --git a/merlin/models/utils/dependencies.py b/merlin/models/utils/dependencies.py index ecf9412706..da670a78a9 100644 --- a/merlin/models/utils/dependencies.py +++ b/merlin/models/utils/dependencies.py @@ -51,7 +51,7 @@ def is_transformers_available() -> bool: def is_distributed_embeddings_available() -> bool: try: - import horovod + import horovod # isort: skip import distributed_embeddings except ImportError: horovod = None From 99a9b2ded34956e2d6b7c6c3430083373e7d394b Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 8 Feb 2023 12:11:45 -0800 Subject: [PATCH 06/20] install distributed-embeddings from github repo --- tox.ini | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index b475cc5fe3..90abc5a3ef 100644 --- a/tox.ini +++ b/tox.ini @@ -23,6 +23,8 @@ commands = python -m pytest --cov-report term --cov merlin -rxs tests/unit/ [testenv:py38-horovod-cpu] +allowlist_externals = + git setenv = HOROVOD_WITH_MPI=1 HOROVOD_WITH_TENSORFLOW=1 @@ -31,12 +33,19 @@ setenv = commands = conda update --yes --name base --channel defaults conda conda env create --prefix {envdir}/env --file requirements/horovod-cpu-environment.yml --force + # Install horovod and check build {envdir}/env/bin/python -m pip install horovod --no-cache-dir {envdir}/env/bin/horovodrun --check-build - {envdir}/env/bin/python -m pip install git+https://github.com/NVIDIA-Merlin/distributed-embeddings.git + # Install distributed embeddings and check build + git clone https://github.com/NVIDIA-Merlin/distributed-embeddings.git + git submodule update --init --recursive + make pip_pkg && pip install artifacts/*.whl + python -c "import distributed_embeddings" + # Install Merlin packages {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git + # Run multi-gpu tests marked with `horovod` marker {envdir}/env/bin/horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh pytest -m horovod -rxs tests/unit [testenv:py38-nvtabular-cpu] From 674d53f7ed34a2a715ae84738aa5c9be0fcc44dd Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 15 Feb 2023 09:53:01 -0800 Subject: [PATCH 07/20] graph mode support --- merlin/models/tf/distributed/embedding.py | 65 +++++++++++++++++++++-- merlin/models/tf/models/base.py | 4 +- tests/unit/tf/horovod/test_embedding.py | 25 +++++---- 3 files changed, 77 insertions(+), 17 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 4b2528df00..ef096ca023 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -48,6 +48,7 @@ def __init__( self.dim = dim self.table_names = [] self.embedding_layers = [] + for col in self.schema: table_name = col.int_domain.name or col.name self.table_names.append(table_name) @@ -83,6 +84,25 @@ def _infer_output_dim(self, col_schema, embedding_dims): return dim + @tf.function + def build(self, input_shapes): + super().build(input_shapes) + + if self.embedding_layers.built is True: + return + + if isinstance(input_shapes, dict): + ordered_input_shapes = [] + for feature_name in self.table_names: + ordered_input_shapes.append(input_shapes[feature_name]) + elif isinstance(input_shapes, list): + ordered_input_shapes = input_shapes + else: + raise ValueError(f"Unexpected input type encountered: {input_shapes}") + self.embedding_layers.build(ordered_input_shapes) + + + @tf.function def call( self, inputs: Union[Dict[str, tf.Tensor], List[tf.Tensor]] ) -> Union[Dict[str, tf.Tensor], List[tf.Tensor]]: @@ -96,15 +116,52 @@ def call( ------- A tensor or dict of tensors corresponding to the embeddings for inputs """ + def _validate_inputs(tensor): + depth = 100 + if isinstance(tensor, tf.SparseTensor): + max_value = tf.reduce_max(tensor.values) + min_value = tf.reduce_min(tensor.values) + else: + max_value = tf.reduce_max(tensor) + min_value = tf.reduce_min(tensor) + print('*'*80) + print(min_value, max_value) + print('*'*80) + condition = tf.logical_and( + tf.greater(tf.cast(depth, max_value.dtype), max_value), + tf.greater_equal(min_value, tf.cast(0, min_value.dtype)), + ) + return condition + if isinstance(inputs, dict): ordered_inputs = [] + outputs = {} for feature_name in self.table_names: - ordered_inputs.append(inputs[feature_name]) - + with tf.control_dependencies([_validate_inputs(inputs[feature_name])]): + ordered_inputs.append(inputs[feature_name]) ordered_outputs = self.embedding_layers(ordered_inputs) - outputs = {} for feature_name, output in zip(self.schema.column_names, ordered_outputs): outputs[feature_name] = output + elif isinstance(inputs, list): + with tf.control_dependencies([_validate_inputs(inputs)]): + outputs = self.embedding_layers(inputs) else: - outputs = self.embedding_layers(inputs) + raise ValueError(f"Unexpected input type encountered: {inputs}") + return outputs + + @tf.function + def compute_call_output_shape(self, input_shapes): + def _get_output_shape(input_shape): + batch_size = input_shape[0] + output_shape = tf.TensorShape([batch_size, self.dim]) + return output_shape + + if isinstance(input_shapes, dict): + output_shapes = {k: _get_output_shape(v) for k, v in input_shapes.items()} + elif isinstance(input_shapes, list): + output_shapes = [_get_output_shape(x) for x in input_shapes] + else: + raise ValueError(f"Unexpected input type encountered: {input_shapes}") + + return output_shapes diff --git a/merlin/models/tf/models/base.py b/merlin/models/tf/models/base.py index 8a153cb100..354c8849bc 100644 --- a/merlin/models/tf/models/base.py +++ b/merlin/models/tf/models/base.py @@ -141,7 +141,7 @@ def get_output_schema(export_path: str) -> Schema: output_schema = Schema() for output_name, output_spec in signature.structured_outputs.items(): col_schema = ColumnSchema(output_name, dtype=output_spec.dtype.as_numpy_dtype) - shape = output_spec.shape + shape = tf.shape(output_spec) if shape.rank > 1 and (shape[1] is None or shape[1] > 1): is_ragged = shape[1] is None properties = {} @@ -1434,7 +1434,7 @@ def _maybe_build(self, inputs): ) _ragged_inputs = self.process_list(inputs) - feature_shapes = {k: v.shape for k, v in _ragged_inputs.items()} + feature_shapes = {k: tf.shape(v) for k, v in _ragged_inputs.items()} feature_dtypes = {k: v.dtype for k, v in _ragged_inputs.items()} for block in self.blocks: diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index d9c0de59c5..b8826a4184 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -48,15 +48,11 @@ def test_distributed_embeddings_basic(embedding_dim=4, global_batch_size=8): assert outputs[1].shape == (global_batch_size // hvd.size(), embedding_dim) -def test_dlrm_model_with_embeddings(music_streaming_data, batch_size=8, embedding_dim=4): +def test_dlrm_model_with_embeddings(music_streaming_data, batch_size=8, embedding_dim=16, learning_rate=0.03): music_streaming_data.schema = music_streaming_data.schema.select_by_name( ["item_id", "user_id", "user_age", "click"] ) - schema = music_streaming_data.schema - - ddf = music_streaming_data.to_ddf().repartition(npartitions=hvd.size()) - train = Dataset(ddf, schema=schema) - + train = music_streaming_data.repartition(npartitions=hvd.size()) train_loader = mm.Loader( train, schema=train.schema, @@ -65,13 +61,20 @@ def test_dlrm_model_with_embeddings(music_streaming_data, batch_size=8, embeddin drop_last=True, ) + target_column = train.schema.select_by_tag(Tags.TARGET).column_names[0] + model = mm.DLRMModel( - schema, + train.schema, embeddings=mm.DistributedEmbeddings( - schema.select_by_tag(Tags.CATEGORICAL), dim=embedding_dim + train.schema.select_by_tag(Tags.CATEGORICAL), dim=embedding_dim ), - bottom_block=mm.MLPBlock([embedding_dim]), - prediction_tasks=mm.BinaryOutput("click"), + bottom_block=mm.MLPBlock([32, embedding_dim]), + top_block=mm.MLPBlock([32, embedding_dim]), + prediction_tasks=mm.BinaryClassificationTask(target_column), ) - testing_utils.model_test(model, train_loader, run_eagerly=True) + opt = tf.keras.optimizers.Adagrad(learning_rate=learning_rate) + model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC()]) + + losses = model.fit(train_loader, epochs=2) + assert all(measure >= 0 for metric in losses.history for measure in losses.history[metric]) From 4a8bd97455a78821579b09a094b808bc08dcb2eb Mon Sep 17 00:00:00 2001 From: edknv Date: Wed, 15 Feb 2023 09:53:34 -0800 Subject: [PATCH 08/20] add distributed-embeddings to ci --- .../install_distributed_embeddings.sh | 25 +++++++++++++++++++ tox.ini | 7 ++---- 2 files changed, 27 insertions(+), 5 deletions(-) create mode 100755 examples/usecases/multi-gpu/install_distributed_embeddings.sh diff --git a/examples/usecases/multi-gpu/install_distributed_embeddings.sh b/examples/usecases/multi-gpu/install_distributed_embeddings.sh new file mode 100755 index 0000000000..49abd60541 --- /dev/null +++ b/examples/usecases/multi-gpu/install_distributed_embeddings.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -e + +ROOT_DIR="/tmp" + +pushd $ROOT_DIR + +git clone https://github.com/edknv/distributed-embeddings.git + +git config --global --add safe.directory $ROOT_DIR/distributed-embeddings +git config --global --add safe.directory $ROOT_DIR/distributed-embeddings/third_party/thrust + +pushd $ROOT_DIR/distributed-embeddings + +git checkout fix_shape_graph_mode + +git submodule update --init --recursive +make pip_pkg +python -m pip install --force-reinstall artifacts/*.whl +python setup.py install + +popd +2 + +python -c "import distributed_embeddings" diff --git a/tox.ini b/tox.ini index 4fda15f613..3fd4db0921 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ commands = [testenv:py38-horovod-cpu] allowlist_externals = - git + examples/usecases/multi-gpu/install_distributed_embeddings.sh setenv = HOROVOD_WITH_MPI=1 HOROVOD_WITH_TENSORFLOW=1 @@ -39,10 +39,7 @@ commands = {envdir}/env/bin/python -m pip install horovod --no-cache-dir {envdir}/env/bin/horovodrun --check-build # Install distributed embeddings and check build - git clone https://github.com/NVIDIA-Merlin/distributed-embeddings.git - git submodule update --init --recursive - make pip_pkg && pip install artifacts/*.whl - python -c "import distributed_embeddings" + sh examples/usecases/multi-gpu/install_distributed_embeddings.sh # Install Merlin packages {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git From 1f759848c7038f77b0836a64e27c9120255e5cbe Mon Sep 17 00:00:00 2001 From: edknv Date: Mon, 6 Mar 2023 16:52:05 -0800 Subject: [PATCH 09/20] Add multi-gpu ci tests --- .../install_distributed_embeddings.sh | 11 ++- examples/usecases/multi-gpu/install_sok.sh | 9 ++ examples/usecases/tf_trainer.py | 84 +++++++++++++++++++ merlin/models/tf/distributed/embedding.py | 9 +- tests/unit/tf/horovod/test_embedding.py | 5 +- tox.ini | 23 ++++- 6 files changed, 128 insertions(+), 13 deletions(-) create mode 100755 examples/usecases/multi-gpu/install_sok.sh create mode 100644 examples/usecases/tf_trainer.py diff --git a/examples/usecases/multi-gpu/install_distributed_embeddings.sh b/examples/usecases/multi-gpu/install_distributed_embeddings.sh index 49abd60541..62dbebcb78 100755 --- a/examples/usecases/multi-gpu/install_distributed_embeddings.sh +++ b/examples/usecases/multi-gpu/install_distributed_embeddings.sh @@ -2,24 +2,23 @@ set -e +WORK_DIR=$(pwd) ROOT_DIR="/tmp" -pushd $ROOT_DIR +cd $ROOT_DIR -git clone https://github.com/edknv/distributed-embeddings.git +git clone https://github.com/NVIDIA-Merlin/distributed-embeddings.git git config --global --add safe.directory $ROOT_DIR/distributed-embeddings git config --global --add safe.directory $ROOT_DIR/distributed-embeddings/third_party/thrust -pushd $ROOT_DIR/distributed-embeddings - -git checkout fix_shape_graph_mode +cd $ROOT_DIR/distributed-embeddings git submodule update --init --recursive make pip_pkg python -m pip install --force-reinstall artifacts/*.whl python setup.py install -popd +2 +cd $WORK_DIR python -c "import distributed_embeddings" diff --git a/examples/usecases/multi-gpu/install_sok.sh b/examples/usecases/multi-gpu/install_sok.sh new file mode 100755 index 0000000000..c921507c39 --- /dev/null +++ b/examples/usecases/multi-gpu/install_sok.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +cd /tmp +git clone --depth 1 https://github.com/NVIDIA-Merlin/HugeCTR hugectr +cd hugectr +git submodule update --init --recursive +cd sparse_operation_kit +python -m pip install scikit-build +python setup.py install diff --git a/examples/usecases/tf_trainer.py b/examples/usecases/tf_trainer.py new file mode 100644 index 0000000000..b3ebe6c682 --- /dev/null +++ b/examples/usecases/tf_trainer.py @@ -0,0 +1,84 @@ + +import os + +MPI_SIZE = int(os.getenv("OMPI_COMM_WORLD_SIZE")) +MPI_RANK = int(os.getenv("OMPI_COMM_WORLD_RANK")) + +os.environ["CUDA_VISIBLE_DEVICES"] = str(MPI_RANK) + +import nvtabular as nvt +from nvtabular.ops import * + +from merlin.models.utils.example_utils import workflow_fit_transform +from merlin.schema.tags import Tags + +import merlin.models.tf as mm +from merlin.io.dataset import Dataset +import tensorflow as tf + +import argparse + +parser = argparse.ArgumentParser( + description='Hyperparameters for model training' +) +parser.add_argument( + '--batch-size', + type=str, + help='Batch-Size per GPU worker' +) +parser.add_argument( + '--path', + type=str, + help='Directory with training and validation data' +) +args = parser.parse_args() + +# define train and valid dataset objects +#train = Dataset(os.path.join(args.path, "train", "part_" + str(MPI_RANK) + ".parquet")) +#valid = Dataset(os.path.join(args.path, "valid", "part_" + str(MPI_RANK) + ".parquet")) + +train = Dataset(os.path.join(args.path, "train", "*.parquet")) +valid = Dataset(os.path.join(args.path, "valid", "*.parquet")) + +ddf = train.to_ddf().repartition(npartitions=2) +train = Dataset(ddf, schema=train.schema) + +ddf = valid.to_ddf().repartition(npartitions=2) +valid = Dataset(ddf, schema=valid.schema) + +# define schema object +target_column = train.schema.select_by_tag(Tags.TARGET).column_names[0] + +train_loader = mm.Loader( + train, + schema=train.schema, + batch_size=int(args.batch_size), + shuffle=True, + drop_last=True, +) + +valid_loader = mm.Loader( + valid, + schema=valid.schema, + batch_size=int(args.batch_size), + shuffle=False, + drop_last=True, +) + +print("Number batches: " + str(len(train_loader))) + +model = mm.DLRMModel( + train.schema, + embedding_dim=16, + bottom_block=mm.MLPBlock([32, 16]), + top_block=mm.MLPBlock([32, 16]), + prediction_tasks=mm.BinaryOutput(target_column), +) + +opt = tf.keras.optimizers.Adagrad(learning_rate=0.01) +model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC()]) +losses = model.fit( + train_loader +) + +print(model.evaluate(valid, batch_size=int(args.batch_size), return_dict=True)) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index ef096ca023..8fcbc0ef89 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -43,6 +43,11 @@ def __init__( f"{self.__class__.__name__}." ) + if not tf.executing_eagerly: + raise RuntimeError( + f"Graph mode is not supported yet. Please use eager mode." + ) + super(DistributedEmbeddings, self).__init__(schema=schema, **kwargs) self.dim = dim @@ -84,7 +89,6 @@ def _infer_output_dim(self, col_schema, embedding_dims): return dim - @tf.function def build(self, input_shapes): super().build(input_shapes) @@ -124,9 +128,6 @@ def _validate_inputs(tensor): else: max_value = tf.reduce_max(tensor) min_value = tf.reduce_min(tensor) - print('*'*80) - print(min_value, max_value) - print('*'*80) condition = tf.logical_and( tf.greater(tf.cast(depth, max_value.dtype), max_value), tf.greater_equal(min_value, tf.cast(0, min_value.dtype)), diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index b8826a4184..1101fe8abc 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -48,7 +48,8 @@ def test_distributed_embeddings_basic(embedding_dim=4, global_batch_size=8): assert outputs[1].shape == (global_batch_size // hvd.size(), embedding_dim) -def test_dlrm_model_with_embeddings(music_streaming_data, batch_size=8, embedding_dim=16, learning_rate=0.03): +@pytest.mark.parametrize("run_eagerly", [True, False]) +def test_dlrm_model_with_embeddings(music_streaming_data, run_eagerly, batch_size=8, embedding_dim=16, learning_rate=0.03): music_streaming_data.schema = music_streaming_data.schema.select_by_name( ["item_id", "user_id", "user_age", "click"] ) @@ -74,7 +75,7 @@ def test_dlrm_model_with_embeddings(music_streaming_data, batch_size=8, embeddin ) opt = tf.keras.optimizers.Adagrad(learning_rate=learning_rate) - model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC()]) + model.compile(optimizer=opt, run_eagerly=run_eagerly, metrics=[tf.keras.metrics.AUC()]) losses = model.fit(train_loader, epochs=2) assert all(measure >= 0 for metric in losses.history for measure in losses.history[metric]) diff --git a/tox.ini b/tox.ini index 3fd4db0921..afe42b713e 100644 --- a/tox.ini +++ b/tox.ini @@ -24,9 +24,30 @@ commands = python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git@{posargs:main} python -m pytest --cov-report term --cov merlin -rxs tests/unit/ -[testenv:py38-horovod-cpu] +[testenv:py38-multi-gpu] +; Runs in: Github Actions +; Runs GPU-based tests. allowlist_externals = + horovodrun examples/usecases/multi-gpu/install_distributed_embeddings.sh +deps = + -rrequirements/test.txt +passenv = + OPAL_PREFIX +setenv = + TF_GPU_ALLOCATOR=cuda_malloc_async +sitepackages=true +commands = + # Install Merlin packages + python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git@{posargs:main} + python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git@{posargs:main} + python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git@{posargs:main} + # Install distributed embeddings and check build + sh examples/usecases/multi-gpu/install_distributed_embeddings.sh + # Run multi-gpu tests marked with `horovod` marker + horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh pytest -m horovod -rxs tests/unit + +[testenv:py38-horovod-cpu] setenv = HOROVOD_WITH_MPI=1 HOROVOD_WITH_TENSORFLOW=1 From a0c58d7dcdd227953abc915a6a86cc4ed4d4f18e Mon Sep 17 00:00:00 2001 From: edknv Date: Mon, 6 Mar 2023 16:55:39 -0800 Subject: [PATCH 10/20] remove graph mode error --- merlin/models/tf/distributed/embedding.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index 8fcbc0ef89..a06b459aa6 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -43,11 +43,6 @@ def __init__( f"{self.__class__.__name__}." ) - if not tf.executing_eagerly: - raise RuntimeError( - f"Graph mode is not supported yet. Please use eager mode." - ) - super(DistributedEmbeddings, self).__init__(schema=schema, **kwargs) self.dim = dim From 9892be8796d4c14abd244432bcba1cb494792aef Mon Sep 17 00:00:00 2001 From: edknv Date: Mon, 6 Mar 2023 17:14:36 -0800 Subject: [PATCH 11/20] lint and minor rearrangement --- .github/workflows/cpu-horovod.yml | 6 ++ .github/workflows/multi-gpu-ci.yml | 30 +++++++ .../install_distributed_embeddings.sh | 9 +- examples/usecases/multi-gpu/install_sok.sh | 9 -- examples/usecases/tf_trainer.py | 84 ------------------- merlin/models/tf/distributed/embedding.py | 2 +- requirements/horovod.txt | 1 - tests/unit/tf/horovod/test_embedding.py | 4 +- 8 files changed, 44 insertions(+), 101 deletions(-) create mode 100644 .github/workflows/multi-gpu-ci.yml delete mode 100755 examples/usecases/multi-gpu/install_sok.sh delete mode 100644 examples/usecases/tf_trainer.py diff --git a/.github/workflows/cpu-horovod.yml b/.github/workflows/cpu-horovod.yml index 481ee11163..b9fd597834 100644 --- a/.github/workflows/cpu-horovod.yml +++ b/.github/workflows/cpu-horovod.yml @@ -37,6 +37,12 @@ jobs: - name: Install tox-conda run: | python -m pip install tox-conda + - name: Prepare distributing-embeddings installation + uses: actions/checkout@v3 + with: + repository: NVIDIA-Merlin/distributed-embeddings + ref: main + token: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Run tests run: | ref_type=${{ github.ref_type }} diff --git a/.github/workflows/multi-gpu-ci.yml b/.github/workflows/multi-gpu-ci.yml new file mode 100644 index 0000000000..1e8cab380c --- /dev/null +++ b/.github/workflows/multi-gpu-ci.yml @@ -0,0 +1,30 @@ +name: horovod (2GPU) + +on: + workflow_dispatch: + push: + branches: [ main ] + tags: + - v* + pull_request: + branches: [ main ] + types: [opened, synchronize, reopened, closed] + +jobs: + gpu-ci: + runs-on: 2GPU + + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Run tests + run: | + ref_type=${{ github.ref_type }} + branch=main + if [[ $ref_type == "tag"* ]] + then + git -c protocol.version=2 fetch --no-tags --prune --progress --no-recurse-submodules --depth=1 origin +refs/heads/release*:refs/remotes/origin/release* + branch=$(git branch -r --contains ${{ github.ref_name }} --list '*release*' --format "%(refname:short)" | sed -e 's/^origin\///') + fi + cd ${{ github.workspace }}; tox -e py38-multi-gpu -- $branch diff --git a/examples/usecases/multi-gpu/install_distributed_embeddings.sh b/examples/usecases/multi-gpu/install_distributed_embeddings.sh index 62dbebcb78..562db164b4 100755 --- a/examples/usecases/multi-gpu/install_distributed_embeddings.sh +++ b/examples/usecases/multi-gpu/install_distributed_embeddings.sh @@ -7,12 +7,11 @@ ROOT_DIR="/tmp" cd $ROOT_DIR -git clone https://github.com/NVIDIA-Merlin/distributed-embeddings.git +if [ ! -d "distributed-embeddings" ]; then + git clone https://github.com/NVIDIA-Merlin/distributed-embeddings.git +fi -git config --global --add safe.directory $ROOT_DIR/distributed-embeddings -git config --global --add safe.directory $ROOT_DIR/distributed-embeddings/third_party/thrust - -cd $ROOT_DIR/distributed-embeddings +cd distributed-embeddings git submodule update --init --recursive make pip_pkg diff --git a/examples/usecases/multi-gpu/install_sok.sh b/examples/usecases/multi-gpu/install_sok.sh deleted file mode 100755 index c921507c39..0000000000 --- a/examples/usecases/multi-gpu/install_sok.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -cd /tmp -git clone --depth 1 https://github.com/NVIDIA-Merlin/HugeCTR hugectr -cd hugectr -git submodule update --init --recursive -cd sparse_operation_kit -python -m pip install scikit-build -python setup.py install diff --git a/examples/usecases/tf_trainer.py b/examples/usecases/tf_trainer.py deleted file mode 100644 index b3ebe6c682..0000000000 --- a/examples/usecases/tf_trainer.py +++ /dev/null @@ -1,84 +0,0 @@ - -import os - -MPI_SIZE = int(os.getenv("OMPI_COMM_WORLD_SIZE")) -MPI_RANK = int(os.getenv("OMPI_COMM_WORLD_RANK")) - -os.environ["CUDA_VISIBLE_DEVICES"] = str(MPI_RANK) - -import nvtabular as nvt -from nvtabular.ops import * - -from merlin.models.utils.example_utils import workflow_fit_transform -from merlin.schema.tags import Tags - -import merlin.models.tf as mm -from merlin.io.dataset import Dataset -import tensorflow as tf - -import argparse - -parser = argparse.ArgumentParser( - description='Hyperparameters for model training' -) -parser.add_argument( - '--batch-size', - type=str, - help='Batch-Size per GPU worker' -) -parser.add_argument( - '--path', - type=str, - help='Directory with training and validation data' -) -args = parser.parse_args() - -# define train and valid dataset objects -#train = Dataset(os.path.join(args.path, "train", "part_" + str(MPI_RANK) + ".parquet")) -#valid = Dataset(os.path.join(args.path, "valid", "part_" + str(MPI_RANK) + ".parquet")) - -train = Dataset(os.path.join(args.path, "train", "*.parquet")) -valid = Dataset(os.path.join(args.path, "valid", "*.parquet")) - -ddf = train.to_ddf().repartition(npartitions=2) -train = Dataset(ddf, schema=train.schema) - -ddf = valid.to_ddf().repartition(npartitions=2) -valid = Dataset(ddf, schema=valid.schema) - -# define schema object -target_column = train.schema.select_by_tag(Tags.TARGET).column_names[0] - -train_loader = mm.Loader( - train, - schema=train.schema, - batch_size=int(args.batch_size), - shuffle=True, - drop_last=True, -) - -valid_loader = mm.Loader( - valid, - schema=valid.schema, - batch_size=int(args.batch_size), - shuffle=False, - drop_last=True, -) - -print("Number batches: " + str(len(train_loader))) - -model = mm.DLRMModel( - train.schema, - embedding_dim=16, - bottom_block=mm.MLPBlock([32, 16]), - top_block=mm.MLPBlock([32, 16]), - prediction_tasks=mm.BinaryOutput(target_column), -) - -opt = tf.keras.optimizers.Adagrad(learning_rate=0.01) -model.compile(optimizer=opt, run_eagerly=False, metrics=[tf.keras.metrics.AUC()]) -losses = model.fit( - train_loader -) - -print(model.evaluate(valid, batch_size=int(args.batch_size), return_dict=True)) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index a06b459aa6..c85b95738c 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -100,7 +100,6 @@ def build(self, input_shapes): raise ValueError(f"Unexpected input type encountered: {input_shapes}") self.embedding_layers.build(ordered_input_shapes) - @tf.function def call( self, inputs: Union[Dict[str, tf.Tensor], List[tf.Tensor]] @@ -115,6 +114,7 @@ def call( ------- A tensor or dict of tensors corresponding to the embeddings for inputs """ + def _validate_inputs(tensor): depth = 100 if isinstance(tensor, tf.SparseTensor): diff --git a/requirements/horovod.txt b/requirements/horovod.txt index 27e00a6834..8229a149aa 100644 --- a/requirements/horovod.txt +++ b/requirements/horovod.txt @@ -1,2 +1 @@ horovod -distributed-embeddings@git+https://github.com/NVIDIA-Merlin/distributed-embeddings diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index 1101fe8abc..d3873305e7 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -49,7 +49,9 @@ def test_distributed_embeddings_basic(embedding_dim=4, global_batch_size=8): @pytest.mark.parametrize("run_eagerly", [True, False]) -def test_dlrm_model_with_embeddings(music_streaming_data, run_eagerly, batch_size=8, embedding_dim=16, learning_rate=0.03): +def test_dlrm_model_with_embeddings( + music_streaming_data, run_eagerly, batch_size=8, embedding_dim=16, learning_rate=0.03 +): music_streaming_data.schema = music_streaming_data.schema.select_by_name( ["item_id", "user_id", "user_age", "click"] ) From c76b528f3b47a7d4a7a87d21dacd5047ff0a24c4 Mon Sep 17 00:00:00 2001 From: edknv Date: Mon, 6 Mar 2023 17:18:00 -0800 Subject: [PATCH 12/20] lint --- tests/unit/tf/horovod/test_embedding.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/unit/tf/horovod/test_embedding.py b/tests/unit/tf/horovod/test_embedding.py index d3873305e7..0a8f5abd87 100644 --- a/tests/unit/tf/horovod/test_embedding.py +++ b/tests/unit/tf/horovod/test_embedding.py @@ -3,8 +3,6 @@ import tensorflow as tf import merlin.models.tf as mm -from merlin.io.dataset import Dataset -from merlin.models.tf.utils import testing_utils from merlin.schema import ColumnSchema, Schema, Tags hvd = pytest.importorskip("horovod.tensorflow.keras") From 4fe38630f8cda726d294937626d5390cb094aad8 Mon Sep 17 00:00:00 2001 From: edknv Date: Mon, 6 Mar 2023 19:23:53 -0800 Subject: [PATCH 13/20] revert to using tensor.shape --- merlin/models/tf/models/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/models/tf/models/base.py b/merlin/models/tf/models/base.py index af619384ff..730b0890d6 100644 --- a/merlin/models/tf/models/base.py +++ b/merlin/models/tf/models/base.py @@ -141,7 +141,7 @@ def get_output_schema(export_path: str) -> Schema: output_schema = Schema() for output_name, output_spec in signature.structured_outputs.items(): col_schema = ColumnSchema(output_name, dtype=output_spec.dtype.as_numpy_dtype) - shape = tf.shape(output_spec) + shape = output_spec.shape if shape.rank > 1 and (shape[1] is None or shape[1] > 1): is_ragged = shape[1] is None properties = {} From 9ac60c7deb5854ad9f6a022b12e5b1989f18df5a Mon Sep 17 00:00:00 2001 From: edknv Date: Mon, 6 Mar 2023 20:19:00 -0800 Subject: [PATCH 14/20] whitelist sh in tox --- tox.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/tox.ini b/tox.ini index 1d0643856c..d097faba1a 100644 --- a/tox.ini +++ b/tox.ini @@ -28,6 +28,7 @@ commands = ; Runs in: Github Actions ; Runs GPU-based tests. allowlist_externals = + sh horovodrun examples/usecases/multi-gpu/install_distributed_embeddings.sh deps = From 3c4b5297041d284cecc6b0712e2aee19cda1a8b0 Mon Sep 17 00:00:00 2001 From: edknv Date: Mon, 6 Mar 2023 20:54:22 -0800 Subject: [PATCH 15/20] specify path in gha --- .github/workflows/cpu-horovod.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cpu-horovod.yml b/.github/workflows/cpu-horovod.yml index b9fd597834..38320e4fb4 100644 --- a/.github/workflows/cpu-horovod.yml +++ b/.github/workflows/cpu-horovod.yml @@ -41,8 +41,8 @@ jobs: uses: actions/checkout@v3 with: repository: NVIDIA-Merlin/distributed-embeddings - ref: main token: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + path: /tmp/distributed-embeddings - name: Run tests run: | ref_type=${{ github.ref_type }} From 8c2e0e6068946ff2538481ee87a29b5a72996513 Mon Sep 17 00:00:00 2001 From: edknv Date: Tue, 7 Mar 2023 08:46:03 -0800 Subject: [PATCH 16/20] fix horovod cpu gha workflow --- .github/workflows/cpu-horovod.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/cpu-horovod.yml b/.github/workflows/cpu-horovod.yml index 38320e4fb4..142432cfda 100644 --- a/.github/workflows/cpu-horovod.yml +++ b/.github/workflows/cpu-horovod.yml @@ -41,7 +41,6 @@ jobs: uses: actions/checkout@v3 with: repository: NVIDIA-Merlin/distributed-embeddings - token: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} path: /tmp/distributed-embeddings - name: Run tests run: | From 5c7fd5a1c00bf67e16a532011657f530214ecf07 Mon Sep 17 00:00:00 2001 From: edknv Date: Tue, 7 Mar 2023 10:31:19 -0800 Subject: [PATCH 17/20] move horovod installation to multi-gpu --- .github/workflows/cpu-horovod.yml | 5 ----- .../multi-gpu/install_distributed_embeddings.sh | 5 +++-- tox.ini | 10 +++++----- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/.github/workflows/cpu-horovod.yml b/.github/workflows/cpu-horovod.yml index 142432cfda..481ee11163 100644 --- a/.github/workflows/cpu-horovod.yml +++ b/.github/workflows/cpu-horovod.yml @@ -37,11 +37,6 @@ jobs: - name: Install tox-conda run: | python -m pip install tox-conda - - name: Prepare distributing-embeddings installation - uses: actions/checkout@v3 - with: - repository: NVIDIA-Merlin/distributed-embeddings - path: /tmp/distributed-embeddings - name: Run tests run: | ref_type=${{ github.ref_type }} diff --git a/examples/usecases/multi-gpu/install_distributed_embeddings.sh b/examples/usecases/multi-gpu/install_distributed_embeddings.sh index 562db164b4..adb91362cf 100755 --- a/examples/usecases/multi-gpu/install_distributed_embeddings.sh +++ b/examples/usecases/multi-gpu/install_distributed_embeddings.sh @@ -2,10 +2,11 @@ set -e +INSTALL_DIR=$1 + WORK_DIR=$(pwd) -ROOT_DIR="/tmp" -cd $ROOT_DIR +cd $INSTALL_DIR if [ ! -d "distributed-embeddings" ]; then git clone https://github.com/NVIDIA-Merlin/distributed-embeddings.git diff --git a/tox.ini b/tox.ini index d097faba1a..d77ed5ca21 100644 --- a/tox.ini +++ b/tox.ini @@ -28,9 +28,8 @@ commands = ; Runs in: Github Actions ; Runs GPU-based tests. allowlist_externals = - sh + bash horovodrun - examples/usecases/multi-gpu/install_distributed_embeddings.sh deps = -rrequirements/test.txt passenv = @@ -43,8 +42,11 @@ commands = python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git@{posargs:main} python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git@{posargs:main} python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git@{posargs:main} + # Reinstall Horovod using `pip install --no-cache-dir` to build with the new version. + python -m pip install horovod --no-cache-dir + horovodrun --check-build # Install distributed embeddings and check build - sh examples/usecases/multi-gpu/install_distributed_embeddings.sh + bash examples/usecases/multi-gpu/install_distributed_embeddings.sh {envtmpdir} # Run multi-gpu tests marked with `horovod` marker horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh pytest -m horovod -rxs tests/unit @@ -60,8 +62,6 @@ commands = # Install horovod and check build {envdir}/env/bin/python -m pip install horovod --no-cache-dir {envdir}/env/bin/horovodrun --check-build - # Install distributed embeddings and check build - sh examples/usecases/multi-gpu/install_distributed_embeddings.sh # Install Merlin packages {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git From 36bb6064b01821342990dbea93cd4be8c4183ca8 Mon Sep 17 00:00:00 2001 From: edknv Date: Tue, 7 Mar 2023 11:47:27 -0800 Subject: [PATCH 18/20] use python -m in tox --- tox.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tox.ini b/tox.ini index d77ed5ca21..5a2040909b 100644 --- a/tox.ini +++ b/tox.ini @@ -48,7 +48,7 @@ commands = # Install distributed embeddings and check build bash examples/usecases/multi-gpu/install_distributed_embeddings.sh {envtmpdir} # Run multi-gpu tests marked with `horovod` marker - horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh pytest -m horovod -rxs tests/unit + horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh python -m pytest -m horovod -rxs tests/unit [testenv:py38-horovod-cpu] setenv = @@ -67,7 +67,7 @@ commands = {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git {envdir}/env/bin/python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git # Run multi-gpu tests marked with `horovod` marker - {envdir}/env/bin/horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh pytest -m horovod -rxs tests/unit + {envdir}/env/bin/horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh {envdir}/env/bin/python -m pytest -m horovod -rxs tests/unit [testenv:py38-nvtabular-cpu] passenv=GIT_COMMIT From b69078f28b701654e5002c40c06559dc3229f8cc Mon Sep 17 00:00:00 2001 From: edknv Date: Tue, 7 Mar 2023 13:12:07 -0800 Subject: [PATCH 19/20] Remove horovod installation --- tox.ini | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tox.ini b/tox.ini index 5a2040909b..232e301aef 100644 --- a/tox.ini +++ b/tox.ini @@ -30,8 +30,6 @@ commands = allowlist_externals = bash horovodrun -deps = - -rrequirements/test.txt passenv = OPAL_PREFIX setenv = @@ -42,10 +40,8 @@ commands = python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git@{posargs:main} python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git@{posargs:main} python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git@{posargs:main} - # Reinstall Horovod using `pip install --no-cache-dir` to build with the new version. - python -m pip install horovod --no-cache-dir - horovodrun --check-build # Install distributed embeddings and check build + # TODO: Move distributed-embeddings installation to CI runner. bash examples/usecases/multi-gpu/install_distributed_embeddings.sh {envtmpdir} # Run multi-gpu tests marked with `horovod` marker horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh python -m pytest -m horovod -rxs tests/unit From 2763cd85aa9c7a292680d9dc0ef078ef3c111dca Mon Sep 17 00:00:00 2001 From: edknv Date: Tue, 7 Mar 2023 16:14:38 -0800 Subject: [PATCH 20/20] clean up and add documentation --- merlin/models/tf/distributed/embedding.py | 40 +++++++++-------------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/merlin/models/tf/distributed/embedding.py b/merlin/models/tf/distributed/embedding.py index c85b95738c..e4847bf326 100644 --- a/merlin/models/tf/distributed/embedding.py +++ b/merlin/models/tf/distributed/embedding.py @@ -21,10 +21,18 @@ class DistributedEmbeddings(TabularBlock): If int, the embedding size to use for all features, or a dictionary-like {"feature_name": embedding size, ...}. By default, None. - strategy: - column_slice_threshold: - dp_input: - input_table_map: + strategy: str + Indicates how embedding tables are distributed. + One of ["basic", "memory_balanced"]. Default: "basic". + column_slice_threshold: Optional[int] + Desired upper bound of element count in each slice. + dp_input: bool + If True, takes data-parallel input in shape [local_batch_size x global_num_embeddings]. + Otherwise takes model-parallel input in shape [global_batch_size x local_num_embeddings]. + Default: true. + input_table_map: Optional[List[int]] + A list with same length as inputs. Maps `input[i]` to `table[input_table_map[i]]`. + If None, `input[i]` maps to `table[i]`. Default: None. """ def __init__( @@ -33,8 +41,8 @@ def __init__( dim: Optional[Union[Dict[str, int], int]] = None, strategy: str = "basic", column_slice_threshold: Optional[int] = None, - dp_input=True, - input_table_map=None, + dp_input: bool = True, + input_table_map: Optional[List[int]] = None, **kwargs, ): if not hvd_installed or not dmp_installed: @@ -115,32 +123,16 @@ def call( A tensor or dict of tensors corresponding to the embeddings for inputs """ - def _validate_inputs(tensor): - depth = 100 - if isinstance(tensor, tf.SparseTensor): - max_value = tf.reduce_max(tensor.values) - min_value = tf.reduce_min(tensor.values) - else: - max_value = tf.reduce_max(tensor) - min_value = tf.reduce_min(tensor) - condition = tf.logical_and( - tf.greater(tf.cast(depth, max_value.dtype), max_value), - tf.greater_equal(min_value, tf.cast(0, min_value.dtype)), - ) - return condition - if isinstance(inputs, dict): ordered_inputs = [] outputs = {} for feature_name in self.table_names: - with tf.control_dependencies([_validate_inputs(inputs[feature_name])]): - ordered_inputs.append(inputs[feature_name]) + ordered_inputs.append(inputs[feature_name]) ordered_outputs = self.embedding_layers(ordered_inputs) for feature_name, output in zip(self.schema.column_names, ordered_outputs): outputs[feature_name] = output elif isinstance(inputs, list): - with tf.control_dependencies([_validate_inputs(inputs)]): - outputs = self.embedding_layers(inputs) + outputs = self.embedding_layers(inputs) else: raise ValueError(f"Unexpected input type encountered: {inputs}")