From 7e14b135c5a662e1fd6e5fbdc2b72f8dbf02cc76 Mon Sep 17 00:00:00 2001 From: Oliver Holworthy Date: Thu, 9 Nov 2023 14:23:32 +0000 Subject: [PATCH] Move compile method back and add special case for TopKOutput --- merlin/models/tf/models/base.py | 294 ++++++++++++++++---------------- 1 file changed, 149 insertions(+), 145 deletions(-) diff --git a/merlin/models/tf/models/base.py b/merlin/models/tf/models/base.py index 3d27a0c91a..02a56c2ec0 100644 --- a/merlin/models/tf/models/base.py +++ b/merlin/models/tf/models/base.py @@ -69,6 +69,7 @@ from merlin.models.tf.outputs.base import ModelOutput, ModelOutputType from merlin.models.tf.outputs.classification import CategoricalOutput from merlin.models.tf.outputs.contrastive import ContrastiveOutput +from merlin.models.tf.outputs.topk import TopKOutput from merlin.models.tf.prediction_tasks.base import ParallelPredictionBlock, PredictionTask from merlin.models.tf.transforms.features import PrepareFeatures, expected_input_cols_from_schema from merlin.models.tf.transforms.sequence import SequenceTransform @@ -324,6 +325,154 @@ def __init__(self, **kwargs): initial_value=lambda: True, ) + def compile( + self, + optimizer="rmsprop", + loss: Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]] = None, + metrics: Optional[ + Union[ + MetricType, + Sequence[MetricType], + Sequence[Sequence[MetricType]], + Dict[str, MetricType], + Dict[str, Sequence[MetricType]], + ] + ] = None, + loss_weights=None, + weighted_metrics: Optional[ + Union[ + MetricType, + Sequence[MetricType], + Sequence[Sequence[MetricType]], + Dict[str, MetricType], + Dict[str, Sequence[MetricType]], + ] + ] = None, + run_eagerly=None, + steps_per_execution=None, + jit_compile=None, + **kwargs, + ): + """Configures the model for training. + Example: + ```python + model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), + loss=tf.keras.losses.BinaryCrossentropy(), + metrics=[tf.keras.metrics.BinaryAccuracy(), + tf.keras.metrics.FalseNegatives()]) + ``` + Args: + optimizer: String (name of optimizer) or optimizer instance. See + `tf.keras.optimizers`. + loss: Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]] = None + losses : Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]], optional + {LOSS_PARAMETERS_DOCSTRINGS} + See `tf.keras.losses`. A loss + function is any callable with the signature `loss = fn(y_true, + y_pred)`, where `y_true` are the ground truth values, and + `y_pred` are the model's predictions. + `y_true` should have shape + `(batch_size, d0, .. dN)` (except in the case of + sparse loss functions such as + sparse categorical crossentropy which expects integer arrays of shape + `(batch_size, d0, .. dN-1)`). + `y_pred` should have shape `(batch_size, d0, .. dN)`. + The loss function should return a float tensor. + If a custom `Loss` instance is + used and reduction is set to `None`, return value has shape + `(batch_size, d0, .. dN-1)` i.e. per-sample or per-timestep loss + values; otherwise, it is a scalar. If the model has multiple outputs, + you can use a different loss on each output by passing a dictionary + or a list of losses. The loss value that will be minimized by the + model will then be the sum of all individual losses, unless + `loss_weights` is specified. + loss_weights: Optional list or dictionary specifying scalar coefficients + (Python floats) to weight the loss contributions of different model + outputs. The loss value that will be minimized by the model will then + be the *weighted sum* of all individual losses, weighted by the + `loss_weights` coefficients. + If a list, it is expected to have a 1:1 mapping to the model's + outputs (Keras sorts tasks by name). + If a dict, it is expected to map output names (strings) + to scalar coefficients. + metrics: Optional[ Union[ MetricType, Sequence[MetricType], + Sequence[Sequence[MetricType]], + Dict[str, MetricType], Dict[str, Sequence[MetricType]], ] ], optional + {METRICS_PARAMETERS_DOCSTRING} + weighted_metrics: Optional[ Union[ MetricType, Sequence[MetricType], + Sequence[Sequence[MetricType]], + Dict[str, MetricType], Dict[str, Sequence[MetricType]], ] ], optional + List of metrics to be evaluated and weighted by + `sample_weight` or `class_weight` during training and testing. + {METRICS_PARAMETERS_DOCSTRING} + run_eagerly: Bool. Defaults to `False`. If `True`, this `Model`'s + logic will not be wrapped in a `tf.function`. Recommended to leave + this as `None` unless your `Model` cannot be run inside a + `tf.function`. `run_eagerly=True` is not supported when using + `tf.distribute.experimental.ParameterServerStrategy`. + steps_per_execution: Int. Defaults to 1. The number of batches to run + during each `tf.function` call. Running multiple batches inside a + single `tf.function` call can greatly improve performance on TPUs or + small models with a large Python overhead. At most, one full epoch + will be run each execution. If a number larger than the size of the + epoch is passed, the execution will be truncated to the size of the + epoch. Note that if `steps_per_execution` is set to `N`, + `Callback.on_batch_begin` and `Callback.on_batch_end` methods will + only be called every `N` batches (i.e. before/after each `tf.function` + execution). + jit_compile: If `True`, compile the model training step with XLA. + [XLA](https://www.tensorflow.org/xla) is an optimizing compiler for + machine learning. + `jit_compile` is not enabled for by default. + This option cannot be enabled with `run_eagerly=True`. + Note that `jit_compile=True` is + may not necessarily work for all models. + For more information on supported operations please refer to the + [XLA documentation](https://www.tensorflow.org/xla). + Also refer to + [known XLA issues](https://www.tensorflow.org/xla/known_issues) for + more details. + **kwargs: Arguments supported for backwards compatibility only. + """ + + num_v1_blocks = len(self.prediction_tasks) + num_v2_blocks = len(self.model_outputs) + + if num_v1_blocks > 1 and num_v2_blocks > 1: + raise ValueError( + "You cannot use both `prediction_tasks` and `prediction_blocks` at the same time.", + "`prediction_tasks` is deprecated and will be removed in a future version.", + ) + + if num_v1_blocks > 0: + self.output_names = [task.task_name for task in self.prediction_tasks] + else: + if num_v2_blocks == 1 and isinstance(self.model_outputs[0], TopKOutput): + pass + else: + self.output_names = [block.full_name for block in self.model_outputs] + + # This flag will make Keras change the metric-names which is not needed in v2 + from_serialized = kwargs.pop("from_serialized", num_v2_blocks > 0) + + if hvd_installed and hvd.size() > 1: + # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow + # uses hvd.DistributedOptimizer() to compute gradients. + kwargs.update({"experimental_run_tf_function": False}) + + super(BaseModel, self).compile( + optimizer=self._create_optimizer(optimizer), + loss=self._create_loss(loss), + metrics=self._create_metrics(metrics, weighted=False), + weighted_metrics=self._create_metrics(weighted_metrics, weighted=True), + run_eagerly=run_eagerly, + loss_weights=loss_weights, + steps_per_execution=steps_per_execution, + jit_compile=jit_compile, + from_serialized=from_serialized, + **kwargs, + ) + def _create_optimizer(self, optimizer): def _create_single_distributed_optimizer(opt): opt_config = opt.get_config() @@ -1616,151 +1765,6 @@ def _maybe_build(self, inputs): child._feature_dtypes = feature_dtypes super()._maybe_build(inputs) - def compile( - self, - optimizer="rmsprop", - loss: Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]] = None, - metrics: Optional[ - Union[ - MetricType, - Sequence[MetricType], - Sequence[Sequence[MetricType]], - Dict[str, MetricType], - Dict[str, Sequence[MetricType]], - ] - ] = None, - loss_weights=None, - weighted_metrics: Optional[ - Union[ - MetricType, - Sequence[MetricType], - Sequence[Sequence[MetricType]], - Dict[str, MetricType], - Dict[str, Sequence[MetricType]], - ] - ] = None, - run_eagerly=None, - steps_per_execution=None, - jit_compile=None, - **kwargs, - ): - """Configures the model for training. - Example: - ```python - model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), - loss=tf.keras.losses.BinaryCrossentropy(), - metrics=[tf.keras.metrics.BinaryAccuracy(), - tf.keras.metrics.FalseNegatives()]) - ``` - Args: - optimizer: String (name of optimizer) or optimizer instance. See - `tf.keras.optimizers`. - loss: Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]] = None - losses : Optional[Union[str, Loss, Dict[str, Union[str, Loss]]]], optional - {LOSS_PARAMETERS_DOCSTRINGS} - See `tf.keras.losses`. A loss - function is any callable with the signature `loss = fn(y_true, - y_pred)`, where `y_true` are the ground truth values, and - `y_pred` are the model's predictions. - `y_true` should have shape - `(batch_size, d0, .. dN)` (except in the case of - sparse loss functions such as - sparse categorical crossentropy which expects integer arrays of shape - `(batch_size, d0, .. dN-1)`). - `y_pred` should have shape `(batch_size, d0, .. dN)`. - The loss function should return a float tensor. - If a custom `Loss` instance is - used and reduction is set to `None`, return value has shape - `(batch_size, d0, .. dN-1)` i.e. per-sample or per-timestep loss - values; otherwise, it is a scalar. If the model has multiple outputs, - you can use a different loss on each output by passing a dictionary - or a list of losses. The loss value that will be minimized by the - model will then be the sum of all individual losses, unless - `loss_weights` is specified. - loss_weights: Optional list or dictionary specifying scalar coefficients - (Python floats) to weight the loss contributions of different model - outputs. The loss value that will be minimized by the model will then - be the *weighted sum* of all individual losses, weighted by the - `loss_weights` coefficients. - If a list, it is expected to have a 1:1 mapping to the model's - outputs (Keras sorts tasks by name). - If a dict, it is expected to map output names (strings) - to scalar coefficients. - metrics: Optional[ Union[ MetricType, Sequence[MetricType], - Sequence[Sequence[MetricType]], - Dict[str, MetricType], Dict[str, Sequence[MetricType]], ] ], optional - {METRICS_PARAMETERS_DOCSTRING} - weighted_metrics: Optional[ Union[ MetricType, Sequence[MetricType], - Sequence[Sequence[MetricType]], - Dict[str, MetricType], Dict[str, Sequence[MetricType]], ] ], optional - List of metrics to be evaluated and weighted by - `sample_weight` or `class_weight` during training and testing. - {METRICS_PARAMETERS_DOCSTRING} - run_eagerly: Bool. Defaults to `False`. If `True`, this `Model`'s - logic will not be wrapped in a `tf.function`. Recommended to leave - this as `None` unless your `Model` cannot be run inside a - `tf.function`. `run_eagerly=True` is not supported when using - `tf.distribute.experimental.ParameterServerStrategy`. - steps_per_execution: Int. Defaults to 1. The number of batches to run - during each `tf.function` call. Running multiple batches inside a - single `tf.function` call can greatly improve performance on TPUs or - small models with a large Python overhead. At most, one full epoch - will be run each execution. If a number larger than the size of the - epoch is passed, the execution will be truncated to the size of the - epoch. Note that if `steps_per_execution` is set to `N`, - `Callback.on_batch_begin` and `Callback.on_batch_end` methods will - only be called every `N` batches (i.e. before/after each `tf.function` - execution). - jit_compile: If `True`, compile the model training step with XLA. - [XLA](https://www.tensorflow.org/xla) is an optimizing compiler for - machine learning. - `jit_compile` is not enabled for by default. - This option cannot be enabled with `run_eagerly=True`. - Note that `jit_compile=True` is - may not necessarily work for all models. - For more information on supported operations please refer to the - [XLA documentation](https://www.tensorflow.org/xla). - Also refer to - [known XLA issues](https://www.tensorflow.org/xla/known_issues) for - more details. - **kwargs: Arguments supported for backwards compatibility only. - """ - - num_v1_blocks = len(self.prediction_tasks) - num_v2_blocks = len(self.model_outputs) - - if num_v1_blocks > 1 and num_v2_blocks > 1: - raise ValueError( - "You cannot use both `prediction_tasks` and `prediction_blocks` at the same time.", - "`prediction_tasks` is deprecated and will be removed in a future version.", - ) - - if num_v1_blocks > 0: - self.output_names = [task.task_name for task in self.prediction_tasks] - else: - self.output_names = [block.full_name for block in self.model_outputs] - - # This flag will make Keras change the metric-names which is not needed in v2 - from_serialized = kwargs.pop("from_serialized", num_v2_blocks > 0) - - if hvd_installed and hvd.size() > 1: - # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow - # uses hvd.DistributedOptimizer() to compute gradients. - kwargs.update({"experimental_run_tf_function": False}) - - super().compile( - optimizer=self._create_optimizer(optimizer), - loss=self._create_loss(loss), - metrics=self._create_metrics(metrics, weighted=False), - weighted_metrics=self._create_metrics(weighted_metrics, weighted=True), - run_eagerly=run_eagerly, - loss_weights=loss_weights, - steps_per_execution=steps_per_execution, - jit_compile=jit_compile, - from_serialized=from_serialized, - **kwargs, - ) - def build(self, input_shape=None): """Builds the model