From a52a90cf03f742d9618cbb94ccd69b7fb98928f9 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Tue, 14 Sep 2021 12:05:30 -0700 Subject: [PATCH 01/28] Update docstrings for issue #1077 Update docstrings for issue #1077. This touches the tensorflow and torch dataloader modules and the list_slice op module. The motivation for this is to improve readability. This commit is towards resolving issue #1077 on implementing left padding for sparse sequential features. --- nvtabular/loader/tensorflow.py | 19 ++++++++++--------- nvtabular/loader/torch.py | 2 +- nvtabular/ops/list_slice.py | 10 +++++----- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index 0c6d35ccf77..4cb84b67273 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -140,20 +140,20 @@ class KerasSequenceLoader(tf.keras.utils.Sequence, DataLoader): Iterator output is of the form `(dict(features), list(labels))`, where each element of the features dict is a - `feature_name: feature_tensor` and each elemtn of the labels + `feature_name: feature_tensor` and each element of the labels list is a tensor, and all tensors are of shape `(batch_size, 1)`. Note that this means vectorized continuous and multi-hot categorical features are not currently supported. The underlying NVTabular `Dataset` object is stored in the `data` attribute, and should be used for updating NVTabular `Workflow` - statistics:: + statistics: workflow = nvt.Workflow(...) dataset = KerasSequenceLoader(...) workflow.update_stats(dataset.data.to_iter(), record_stats=True) Parameters - ------------- + ---------- - paths_or_dataset: str or list(str) Either a string representing a file pattern (see `tf.glob` for pattern rules), a list of filenames to be iterated through, or @@ -205,6 +205,7 @@ class KerasSequenceLoader(tf.keras.utils.Sequence, DataLoader): dictionary of key: column_name + value: integer representing max sequence length for column sparse_dense : bool bool value to activate transforming sparse tensors to dense + """ _use_nnz = True @@ -238,7 +239,7 @@ def __init__( feature_columns, cat_names, cont_names, schema=dataset.schema ) - # sort the ccolumns to avoid getting incorrect output + # Sort the columns to avoid getting incorrect output. # (https://github.com/NVIDIA/NVTabular/issues/412) cat_names = _get_embedding_order(cat_names) cont_names = _get_embedding_order(cont_names) @@ -265,9 +266,7 @@ def __init__( self._map_fns = [] def __len__(self): - """ - recreating since otherwise Keras yells at you - """ + """Recreating since otherwise Keras yells at you.""" # TODO: what's a better way to do this inheritance # of the appropriate methods? A Metaclass? DataLoader.stop(self) @@ -275,9 +274,10 @@ def __len__(self): def __getitem__(self, idx): """ - implemented exclusively for consistency + Implemented exclusively for consistency with Keras model.fit. Does not leverage - passed idx in any way + passed idx in any way. + """ return DataLoader.__next__(self) @@ -286,6 +286,7 @@ def map(self, fn): Applying a function to each batch. This can for instance be used to add `sample_weight` to the model. + """ self._map_fns.append(fn) diff --git a/nvtabular/loader/torch.py b/nvtabular/loader/torch.py index 3aa57468a06..3dd4fac0b1d 100644 --- a/nvtabular/loader/torch.py +++ b/nvtabular/loader/torch.py @@ -42,7 +42,7 @@ class TorchAsyncItr(torch.utils.data.IterableDataset, DataLoader): batches are the specified size until the final batch. Parameters - ----------- + ---------- dataset : NVTabular dataset cats : [str] the list of categorical columns in the dataset diff --git a/nvtabular/ops/list_slice.py b/nvtabular/ops/list_slice.py index a4f3cafab13..f2c86ef5c6e 100644 --- a/nvtabular/ops/list_slice.py +++ b/nvtabular/ops/list_slice.py @@ -61,7 +61,7 @@ def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFram on_cpu = _is_cpu_object(df) ret = type(df)() for col in col_selector.names: - # handle CPU via normal python slicing (not very efficient) + # Handle CPU via normal python slicing (not very efficient). if on_cpu: ret[col] = [row[self.start : self.end] for row in df[col]] else: @@ -99,8 +99,8 @@ def output_tags(self): @numba.cuda.jit def _calculate_row_sizes(start, end, offsets, row_sizes): - """given a slice (start/end) and existing offsets indicating row lengths, this - calculates the size for each new row after slicing""" + """Given a slice (start/end) and existing offsets indicating row lengths, this + calculates the size for each new row after slicing.""" rowid = numba.cuda.grid(1) if rowid < offsets.size - 1: original_row_size = offsets[rowid + 1] - offsets[rowid] @@ -120,9 +120,9 @@ def _calculate_row_sizes(start, end, offsets, row_sizes): @numba.cuda.jit def _slice_rows(start, offsets, elements, new_offsets, new_elements): - """slices rows of a list column. requires the 'new_offsets' to + """Slices rows of a list column. requires the 'new_offsets' to be previously calculated (meaning that we don't need the 'end' slice index - since thats baked into the new_offsets""" + since thats baked into the new_offsets.""" rowid = numba.cuda.grid(1) if rowid < (new_offsets.size - 1): if start >= 0: From ff1e3964a2399bf748fc608c6919faa6a74cad32 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Thu, 16 Sep 2021 14:08:21 -0700 Subject: [PATCH 02/28] Implementation of left padding for issue #1077 Implementation of left padding for issue #1077. This is based on a suggestion by @gabrielspmoreira. I am not exactly sure if this change will completely work, and this is untested due to current failing tests on main on this part of the codebase. But the motivation of this commit is to start a commit for comments, suggestions, and revisions on this issue's implementation. --- nvtabular/loader/torch.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nvtabular/loader/torch.py b/nvtabular/loader/torch.py index 3dd4fac0b1d..244eb3a3882 100644 --- a/nvtabular/loader/torch.py +++ b/nvtabular/loader/torch.py @@ -174,8 +174,12 @@ def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): sparse_tensor = sparse_tensor.to_dense() return sparse_tensor - def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limit): + def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limit, padding=None): indices = self._get_indices(offsets, diff_offsets) + if padding == "left": + indices[:,1] = seq_limit - 1 - indices[:,1] + if padding == "right": + raise NotImplementedError return self._get_sparse_tensor(values, indices, num_rows, seq_limit) From d9e457d0a5cb6316f669e023d506c54ccce53d43 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Thu, 16 Sep 2021 14:32:31 -0700 Subject: [PATCH 03/28] Update #1077 implementation Update #1077 implementation with some useful feedback from running pre-commit and linters. The motivation is to better pass the CI checks and code consistency. --- nvtabular/loader/torch.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nvtabular/loader/torch.py b/nvtabular/loader/torch.py index 244eb3a3882..ac1a44685c6 100644 --- a/nvtabular/loader/torch.py +++ b/nvtabular/loader/torch.py @@ -174,10 +174,12 @@ def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): sparse_tensor = sparse_tensor.to_dense() return sparse_tensor - def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limit, padding=None): + def _build_sparse_tensor( + self, values, offsets, diff_offsets, num_rows, seq_limit, padding=None + ): indices = self._get_indices(offsets, diff_offsets) if padding == "left": - indices[:,1] = seq_limit - 1 - indices[:,1] + indices[:, 1] = seq_limit - 1 - indices[:, 1] if padding == "right": raise NotImplementedError return self._get_sparse_tensor(values, indices, num_rows, seq_limit) From e25c6e8229eb988dfe529d71078ce206e2358f54 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Thu, 16 Sep 2021 14:44:25 -0700 Subject: [PATCH 04/28] Implement #1077 update with docstring and type hinting. Implement #1077 update with docstring and type hinting. Note that black adds spaces in the method signature type hinting for the `padding` argument. We add a docstring for _build_spare_tensor(), as this is being modified in this issue's implementation. The motivation for this is improved codebase readability. --- nvtabular/loader/torch.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/nvtabular/loader/torch.py b/nvtabular/loader/torch.py index ac1a44685c6..b2cff4697e6 100644 --- a/nvtabular/loader/torch.py +++ b/nvtabular/loader/torch.py @@ -175,8 +175,32 @@ def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): return sparse_tensor def _build_sparse_tensor( - self, values, offsets, diff_offsets, num_rows, seq_limit, padding=None + self, values, offsets, diff_offsets, num_rows, seq_limit, padding: str = "" ): + """Builds sparse tensors in our torch dataloader. + + Parameters + ---------- + values : + offsets : + diff_offsets : + num_rows : + seq_limit : + padding : str, optional + Padding mode, choose among 'left' for left padding, 'right' for right padding, + or '' for no padding, by default '' + + Returns + ------- + torch.sparse + Our built torch sparse tensor. + + Raises + ------ + NotImplementedError + Raises this error when this method is called with a not implemented + padding mode string. + """ indices = self._get_indices(offsets, diff_offsets) if padding == "left": indices[:, 1] = seq_limit - 1 - indices[:, 1] From 299d3564e61bd11f16c92289c7dcfcb9684b828f Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Thu, 23 Sep 2021 14:14:40 -0700 Subject: [PATCH 05/28] Update tensorflow module docstring for docs syntax Update tensorflow dataloader module docstring for docs syntax by using double colons instead of single colon. --- nvtabular/loader/tensorflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index 4cb84b67273..c05600f5b75 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -146,7 +146,7 @@ class KerasSequenceLoader(tf.keras.utils.Sequence, DataLoader): features are not currently supported. The underlying NVTabular `Dataset` object is stored in the `data` attribute, and should be used for updating NVTabular `Workflow` - statistics: + statistics:: workflow = nvt.Workflow(...) dataset = KerasSequenceLoader(...) From 1285783cf6a2b4cc8e56dfbe28c3b5649c0cff0e Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Fri, 24 Sep 2021 14:54:17 -0700 Subject: [PATCH 06/28] Expose pad_left to user Expose pad_left argument to user argument to user through including this argument in the signatures in the TorchAsyncIter() and KerasSequenceLoader() classes, as well as their mutual parent class DataLoader(). The motivation is to allow user-specification of left padding. --- nvtabular/loader/backend.py | 2 ++ nvtabular/loader/tensorflow.py | 2 ++ nvtabular/loader/torch.py | 13 +++++++++---- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/nvtabular/loader/backend.py b/nvtabular/loader/backend.py index 64bcd58e879..5115b15e7bc 100644 --- a/nvtabular/loader/backend.py +++ b/nvtabular/loader/backend.py @@ -188,6 +188,7 @@ def __init__( sparse_names=None, sparse_max=None, sparse_as_dense=False, + pad_left=False, ): self.data = dataset self.indices = cp.arange(dataset.to_ddf().npartitions) @@ -196,6 +197,7 @@ def __init__( self.sparse_names = sparse_names or [] self.sparse_max = sparse_max or {} self.sparse_as_dense = sparse_as_dense + self.pad_left = pad_left self.global_size = global_size or 1 self.global_rank = global_rank or 0 diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index c05600f5b75..e5f38305b17 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -231,6 +231,7 @@ def __init__( sparse_names=None, sparse_max=None, sparse_as_dense=False, + pad_left=False, ): dataset = _validate_dataset( paths_or_dataset, batch_size, buffer_size, engine, reader_kwargs @@ -262,6 +263,7 @@ def __init__( sparse_names=sparse_names, sparse_max=sparse_max, sparse_as_dense=sparse_as_dense, + pad_left=pad_left, ) self._map_fns = [] diff --git a/nvtabular/loader/torch.py b/nvtabular/loader/torch.py index b2cff4697e6..6a775126961 100644 --- a/nvtabular/loader/torch.py +++ b/nvtabular/loader/torch.py @@ -83,6 +83,7 @@ def __init__( sparse_names=None, sparse_max=None, sparse_as_dense=False, + pad_left=False, ): DataLoader.__init__( self, @@ -101,6 +102,7 @@ def __init__( sparse_names=sparse_names, sparse_max=sparse_max, sparse_as_dense=sparse_as_dense, + pad_left=pad_left, ) def __iter__(self): @@ -175,7 +177,12 @@ def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): return sparse_tensor def _build_sparse_tensor( - self, values, offsets, diff_offsets, num_rows, seq_limit, padding: str = "" + self, + values, + offsets, + diff_offsets, + num_rows, + seq_limit, ): """Builds sparse tensors in our torch dataloader. @@ -202,10 +209,8 @@ def _build_sparse_tensor( padding mode string. """ indices = self._get_indices(offsets, diff_offsets) - if padding == "left": + if self.pad_left: indices[:, 1] = seq_limit - 1 - indices[:, 1] - if padding == "right": - raise NotImplementedError return self._get_sparse_tensor(values, indices, num_rows, seq_limit) From 364bcf1cfb6c95f57f1e10de1ac2cf2a0c2f5102 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Fri, 24 Sep 2021 15:26:17 -0700 Subject: [PATCH 07/28] Skip test_distributed_multigpu() Skip test_distributed_multigpu() so that I can see a clean pytest output, since this test is failing locally for some mysterious reason. --- tests/unit/loader/test_torch_dataloader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/loader/test_torch_dataloader.py b/tests/unit/loader/test_torch_dataloader.py index ceecbdeb4b0..1d5b1aec8ae 100644 --- a/tests/unit/loader/test_torch_dataloader.py +++ b/tests/unit/loader/test_torch_dataloader.py @@ -663,6 +663,7 @@ def test_horovod_multigpu(tmpdir): assert "Training complete" in str(stdout) +@pytest.mark.skip(reason="Currently mysterious failure locally.") def test_distributed_multigpu(tmpdir): json_sample = { "conts": {}, From 071b8bffd4b627e045888eb47e7bd19f6bcecadd Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Fri, 24 Sep 2021 15:56:52 -0700 Subject: [PATCH 08/28] Add unit test for torch dataloader and padding argument Add unit test for torch dataloader and padding argument. The motivation for this is to add a test for both True and False values of the new pad_left argument for our torch dataloader. --- tests/unit/loader/test_torch_dataloader.py | 29 ++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/unit/loader/test_torch_dataloader.py b/tests/unit/loader/test_torch_dataloader.py index 1d5b1aec8ae..92ff785a159 100644 --- a/tests/unit/loader/test_torch_dataloader.py +++ b/tests/unit/loader/test_torch_dataloader.py @@ -511,6 +511,35 @@ def test_sparse_tensors(sparse_dense): # ensure they are correct structurally +@pytest.mark.parametrize("pad_left", [False, True]) +def test_torch_dataloader_left_padding(pad_left): + """Tests the pad_left functionality of our Torch dataloader + to pad data on the left.""" + df = cudf.DataFrame({"A": [[3, 1, 5, 1], [9, 2], [6]], "B": [[3, 1, 5, 1, 9], [2], [6, 5, 3]]}) + + print("df is:\n{}".format(df)) + + categorical_columns = ["A", "B"] + batch_size = 2 + data_itr = torch_dataloader.TorchAsyncItr( + nvt.Dataset(df), pad_left=pad_left, cats=categorical_columns, batch_size=batch_size + ) + + print("data_itr is:\n{}".format(data_itr)) + + for batch in data_itr: + features, labels = batch + print("batch is:\n{}".format(batch)) + print("features, labels are:\n{}, {}".format(features, labels)) + + expected_feature_length_with_padding = 5 + for categorical_column in categorical_columns: + feature_tensor = features[categorical_column] + print("feature_tensor is:\n{}".format(feature_tensor)) + if pad_left: + assert len(feature_tensor[0]) == expected_feature_length_with_padding + + def test_mh_model_support(tmpdir): df = cudf.DataFrame( { From 3cce162461d9865c24d39f155f9d7f47130f59b6 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Fri, 24 Sep 2021 16:37:15 -0700 Subject: [PATCH 09/28] Update torch test for padding argument Update torch test for padding argument. --- tests/unit/loader/test_torch_dataloader.py | 29 +++++++++++++++------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/tests/unit/loader/test_torch_dataloader.py b/tests/unit/loader/test_torch_dataloader.py index 92ff785a159..02756ba7106 100644 --- a/tests/unit/loader/test_torch_dataloader.py +++ b/tests/unit/loader/test_torch_dataloader.py @@ -517,27 +517,38 @@ def test_torch_dataloader_left_padding(pad_left): to pad data on the left.""" df = cudf.DataFrame({"A": [[3, 1, 5, 1], [9, 2], [6]], "B": [[3, 1, 5, 1, 9], [2], [6, 5, 3]]}) - print("df is:\n{}".format(df)) + # print("df is:\n{}".format(df)) categorical_columns = ["A", "B"] - batch_size = 2 + sparse_max = {"A": 5, "B": 8} + batch_size = 4 data_itr = torch_dataloader.TorchAsyncItr( - nvt.Dataset(df), pad_left=pad_left, cats=categorical_columns, batch_size=batch_size + nvt.Dataset(df), + cats=categorical_columns, + conts=[], + labels=[], + batch_size=batch_size, + sparse_names=categorical_columns, + sparse_max=sparse_max, + sparse_as_dense=True, + pad_left=pad_left, ) - - print("data_itr is:\n{}".format(data_itr)) + # print("data_itr is:\n{}".format(data_itr)) for batch in data_itr: features, labels = batch - print("batch is:\n{}".format(batch)) - print("features, labels are:\n{}, {}".format(features, labels)) + # print("batch is:\n{}".format(batch)) + # print("features, labels are:\n{}, {}".format(features, labels)) - expected_feature_length_with_padding = 5 + # expected_feature_length_with_padding = 5 for categorical_column in categorical_columns: feature_tensor = features[categorical_column] print("feature_tensor is:\n{}".format(feature_tensor)) if pad_left: - assert len(feature_tensor[0]) == expected_feature_length_with_padding + pass + # assert len(feature_tensor[0]) == expected_feature_length_with_padding + + # assert False def test_mh_model_support(tmpdir): From cebb715019365dbd0a81f613d0658d46760df9eb Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Fri, 24 Sep 2021 19:14:49 -0700 Subject: [PATCH 10/28] Update unit test for padding argument Update unit test for padding argument for torch dataloader. We add explicit constructions of our expected output tensors with both padding on the left and the default padding on the right. The motivation is to make more clear that this test is testing these different padding modes for different categorical columns. --- tests/unit/loader/test_torch_dataloader.py | 40 ++++++++++++++++------ 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/tests/unit/loader/test_torch_dataloader.py b/tests/unit/loader/test_torch_dataloader.py index 02756ba7106..fea1e83ed3e 100644 --- a/tests/unit/loader/test_torch_dataloader.py +++ b/tests/unit/loader/test_torch_dataloader.py @@ -516,9 +516,7 @@ def test_torch_dataloader_left_padding(pad_left): """Tests the pad_left functionality of our Torch dataloader to pad data on the left.""" df = cudf.DataFrame({"A": [[3, 1, 5, 1], [9, 2], [6]], "B": [[3, 1, 5, 1, 9], [2], [6, 5, 3]]}) - # print("df is:\n{}".format(df)) - categorical_columns = ["A", "B"] sparse_max = {"A": 5, "B": 8} batch_size = 4 @@ -534,21 +532,43 @@ def test_torch_dataloader_left_padding(pad_left): pad_left=pad_left, ) # print("data_itr is:\n{}".format(data_itr)) - for batch in data_itr: features, labels = batch # print("batch is:\n{}".format(batch)) # print("features, labels are:\n{}, {}".format(features, labels)) - - # expected_feature_length_with_padding = 5 for categorical_column in categorical_columns: feature_tensor = features[categorical_column] - print("feature_tensor is:\n{}".format(feature_tensor)) + # print("feature_tensor is:\n{}".format(feature_tensor)) if pad_left: - pass - # assert len(feature_tensor[0]) == expected_feature_length_with_padding - - # assert False + if categorical_column == "A": + expected_tensor = torch.tensor( + [[0, 3, 1, 5, 1], [0, 0, 0, 9, 2], [0, 0, 0, 0, 6]], dtype=torch.int64 + ).cuda() + if categorical_column == "B": + expected_tensor = torch.tensor( + [ + [0, 0, 0, 3, 1, 5, 1, 9], + [0, 0, 0, 0, 0, 0, 0, 2], + [0, 0, 0, 0, 0, 6, 5, 3], + ], + dtype=torch.int64, + ).cuda() + + elif not pad_left: + if categorical_column == "A": + expected_tensor = torch.tensor( + [[3, 1, 5, 1, 0], [9, 2, 0, 0, 0], [6, 0, 0, 0, 0]], dtype=torch.int64 + ).cuda() + if categorical_column == "B": + expected_tensor = torch.tensor( + [ + [3, 1, 5, 1, 9, 0, 0, 0], + [2, 0, 0, 0, 0, 0, 0, 0], + [6, 5, 3, 0, 0, 0, 0, 0], + ], + dtype=torch.int64, + ).cuda() + assert torch.allclose(feature_tensor, expected_tensor) def test_mh_model_support(tmpdir): From 5acd76acc4e93d3698b1dc7890638b03f921e4c1 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Fri, 24 Sep 2021 19:25:03 -0700 Subject: [PATCH 11/28] Update dataloader torch to pass new tests Update dataloader torch to pass new tests for the torch dataloader. We add new code in order to correctly modify the indices of a sparse matrix in order to correctly output a dense matrix with padding on the left. Previously the order of elements in the row were reversed. This new code makes the elements of the dense matrix in correct order by modifying the indices of its sparse matrix representation. We also update pad_left to be a boolean and move this argument to the top-level TorchAsyncItr() class for better module-level consistency with the rest of the similar dataloader arguments as pad_left. --- nvtabular/loader/torch.py | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/nvtabular/loader/torch.py b/nvtabular/loader/torch.py index 6a775126961..7a3ed0f2a71 100644 --- a/nvtabular/loader/torch.py +++ b/nvtabular/loader/torch.py @@ -64,6 +64,9 @@ class TorchAsyncItr(torch.utils.data.IterableDataset, DataLoader): dictionary of key: column_name + value: integer representing max sequence length for column sparse_dense : bool bool value to activate transforming sparse tensors to dense + pad_left : bool + Boolean value to indicate whether to pad on the left. Use True to pad on the left, + False to pad on the right. Default: False """ def __init__( @@ -193,9 +196,6 @@ def _build_sparse_tensor( diff_offsets : num_rows : seq_limit : - padding : str, optional - Padding mode, choose among 'left' for left padding, 'right' for right padding, - or '' for no padding, by default '' Returns ------- @@ -209,8 +209,38 @@ def _build_sparse_tensor( padding mode string. """ indices = self._get_indices(offsets, diff_offsets) + # print("indices is:\n{}\n".format(indices)) + # print("indices.T is:\n{}\n".format(indices.T)) if self.pad_left: - indices[:, 1] = seq_limit - 1 - indices[:, 1] + indices[:, 1] = (seq_limit - 1) - indices[:, 1] + + # We make sure that the elements of our sparse matrix indices are in the correct + # non-reversed order. We do this by flipping increasing blocks in the second column + # of indices. + def _process_row(row: torch.Tensor) -> torch.Tensor: + """Process row by blocks for use in left padding.""" + row = row.tolist() + prev, curr = 0, 0 + while curr < len(row): + # print("prev, curr are:\n{}".format((prev, curr))) + if row[curr] >= row[curr - 1]: + # print("flipping block at: {}".format((prev, curr))) + row[prev:curr] = row[prev:curr][::-1] + prev = curr + if curr == (len(row) - 1): + # print("flipping final block at: {}".format((prev, curr))) + row[prev : curr + 1] = row[prev : curr + 1][::-1] + curr += 1 + return torch.Tensor(row) + + out_indices = indices.T + row_to_process = indices.T[1] + # print("row_to_process is:\n{}".format(row_to_process)) + processed_row = _process_row(row_to_process) + # print("processed_row is:\n{}".format(processed_row)) + out_indices[1] = processed_row + indices = out_indices.T + # print("indices is now:\n{}\n".format(indices)) return self._get_sparse_tensor(values, indices, num_rows, seq_limit) From 1684289b1e14709591eb21330123f385f7c70b35 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Fri, 24 Sep 2021 19:41:07 -0700 Subject: [PATCH 12/28] Clean up loader/torch module Clean up loader/torch module. We remove print statements used in debugging and condense certain lines of code for improved readability. --- nvtabular/loader/torch.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/nvtabular/loader/torch.py b/nvtabular/loader/torch.py index 7a3ed0f2a71..9eb6f0f52a8 100644 --- a/nvtabular/loader/torch.py +++ b/nvtabular/loader/torch.py @@ -209,38 +209,29 @@ def _build_sparse_tensor( padding mode string. """ indices = self._get_indices(offsets, diff_offsets) - # print("indices is:\n{}\n".format(indices)) - # print("indices.T is:\n{}\n".format(indices.T)) if self.pad_left: indices[:, 1] = (seq_limit - 1) - indices[:, 1] # We make sure that the elements of our sparse matrix indices are in the correct # non-reversed order. We do this by flipping increasing blocks in the second column - # of indices. + # of indices. We find it convienient and more efficient to modify the transpose + # of indices and transpose indices back before returning the indices matrix. def _process_row(row: torch.Tensor) -> torch.Tensor: """Process row by blocks for use in left padding.""" row = row.tolist() prev, curr = 0, 0 while curr < len(row): - # print("prev, curr are:\n{}".format((prev, curr))) if row[curr] >= row[curr - 1]: - # print("flipping block at: {}".format((prev, curr))) row[prev:curr] = row[prev:curr][::-1] prev = curr if curr == (len(row) - 1): - # print("flipping final block at: {}".format((prev, curr))) row[prev : curr + 1] = row[prev : curr + 1][::-1] curr += 1 return torch.Tensor(row) - out_indices = indices.T - row_to_process = indices.T[1] - # print("row_to_process is:\n{}".format(row_to_process)) - processed_row = _process_row(row_to_process) - # print("processed_row is:\n{}".format(processed_row)) - out_indices[1] = processed_row - indices = out_indices.T - # print("indices is now:\n{}\n".format(indices)) + indices = indices.T + indices[1] = _process_row(indices[1]) + indices = indices.T return self._get_sparse_tensor(values, indices, num_rows, seq_limit) From a319501ab7c7d6380c975a50d1760e0f4d7a7c95 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Fri, 24 Sep 2021 19:43:53 -0700 Subject: [PATCH 13/28] Clean up test_torch_dataloader module Clean up test_torch_dataloader module by removing print statements used previously for debugging and understanding program flow. The motivation for this change is improved readability. --- tests/unit/loader/test_torch_dataloader.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/unit/loader/test_torch_dataloader.py b/tests/unit/loader/test_torch_dataloader.py index fea1e83ed3e..4a00b473a86 100644 --- a/tests/unit/loader/test_torch_dataloader.py +++ b/tests/unit/loader/test_torch_dataloader.py @@ -516,7 +516,6 @@ def test_torch_dataloader_left_padding(pad_left): """Tests the pad_left functionality of our Torch dataloader to pad data on the left.""" df = cudf.DataFrame({"A": [[3, 1, 5, 1], [9, 2], [6]], "B": [[3, 1, 5, 1, 9], [2], [6, 5, 3]]}) - # print("df is:\n{}".format(df)) categorical_columns = ["A", "B"] sparse_max = {"A": 5, "B": 8} batch_size = 4 @@ -531,14 +530,10 @@ def test_torch_dataloader_left_padding(pad_left): sparse_as_dense=True, pad_left=pad_left, ) - # print("data_itr is:\n{}".format(data_itr)) for batch in data_itr: features, labels = batch - # print("batch is:\n{}".format(batch)) - # print("features, labels are:\n{}, {}".format(features, labels)) for categorical_column in categorical_columns: feature_tensor = features[categorical_column] - # print("feature_tensor is:\n{}".format(feature_tensor)) if pad_left: if categorical_column == "A": expected_tensor = torch.tensor( @@ -553,7 +548,6 @@ def test_torch_dataloader_left_padding(pad_left): ], dtype=torch.int64, ).cuda() - elif not pad_left: if categorical_column == "A": expected_tensor = torch.tensor( From 0be389eed146e374bb9b27800fd24b9c01a10f78 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Mon, 27 Sep 2021 20:05:16 +0000 Subject: [PATCH 14/28] Update tests Update tests for issue #1077. We update the test name to something more descriptive, and update the test docstring to something more informative. --- tests/unit/loader/test_torch_dataloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/loader/test_torch_dataloader.py b/tests/unit/loader/test_torch_dataloader.py index 4a00b473a86..43624f70de6 100644 --- a/tests/unit/loader/test_torch_dataloader.py +++ b/tests/unit/loader/test_torch_dataloader.py @@ -512,9 +512,9 @@ def test_sparse_tensors(sparse_dense): @pytest.mark.parametrize("pad_left", [False, True]) -def test_torch_dataloader_left_padding(pad_left): +def test_sparse_tensor_left_padding(pad_left): """Tests the pad_left functionality of our Torch dataloader - to pad data on the left.""" + to pad data on the left for sparse tensors.""" df = cudf.DataFrame({"A": [[3, 1, 5, 1], [9, 2], [6]], "B": [[3, 1, 5, 1, 9], [2], [6, 5, 3]]}) categorical_columns = ["A", "B"] sparse_max = {"A": 5, "B": 8} From d93f9c58d51f0c8a0eaffecc74fde75b12c9828d Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Tue, 28 Sep 2021 01:07:55 +0000 Subject: [PATCH 15/28] Add tests for the TensorFlow runtime dataloader Add tests for issue #1077 for the TensorFlow runtime dataloader. The motivation for this update is increased test coverage. --- tests/unit/loader/test_tf_dataloader.py | 62 +++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/tests/unit/loader/test_tf_dataloader.py b/tests/unit/loader/test_tf_dataloader.py index 8922d07a4f9..96264af0a0d 100644 --- a/tests/unit/loader/test_tf_dataloader.py +++ b/tests/unit/loader/test_tf_dataloader.py @@ -518,6 +518,68 @@ def test_sparse_tensors(tmpdir, sparse_dense): assert not isinstance(feature_tensor, tf.sparse.SparseTensor) +@pytest.mark.parametrize("pad_left", [False, True]) +def test_sparse_tensor_left_padding(pad_left): + """Tests the pad_left functionality of our TensorFlow dataloader + to pad data on the left for sparse tensors.""" + df = cudf.DataFrame({"A": [[3, 1, 5, 1], [9, 2], [6]], "B": [[3, 1, 5, 1, 9], [2], [6, 5, 3]]}) + categorical_columns = ["A", "B"] + sparse_max = {"A": 5, "B": 8} + batch_size = 4 + + data_itr = tf_dataloader.KerasSequenceLoader( + nvt.Dataset(df), + cat_names=categorical_columns, + cont_names=[], + label_names=[], + batch_size=batch_size, + sparse_max=sparse_max, + sparse_names=categorical_columns, + sparse_as_dense=True, + pad_left=pad_left, + ) + + for batch in data_itr: + features, labels = batch + for categorical_column in categorical_columns: + feature_tensor = features[categorical_column] + print("feature_tensor is:\n{}".format(feature_tensor)) + print("categorical_column is:\n{}".format(categorical_column)) + if pad_left: + if categorical_column == "A": + expected_tensor = tf.constant( + [[0, 3, 1, 5, 1], [0, 0, 0, 9, 2], [0, 0, 0, 0, 6]], dtype=tf.int64 + ) + print("expected_tensor is:\n{}".format(expected_tensor)) + if categorical_column == "B": + expected_tensor = tf.constant( + [ + [0, 0, 0, 3, 1, 5, 1, 9], + [0, 0, 0, 0, 0, 0, 0, 2], + [0, 0, 0, 0, 0, 6, 5, 3], + ], + dtype=tf.int64, + ) + print("expected_tensor is:\n{}".format(expected_tensor)) + elif not pad_left: + if categorical_column == "A": + expected_tensor = tf.constant( + [[3, 1, 5, 1, 0], [9, 2, 0, 0, 0], [6, 0, 0, 0, 0]], dtype=tf.int64 + ) + print("expected_tensor is:\n{}".format(expected_tensor)) + if categorical_column == "B": + expected_tensor = tf.constant( + [ + [3, 1, 5, 1, 9, 0, 0, 0], + [2, 0, 0, 0, 0, 0, 0, 0], + [6, 5, 3, 0, 0, 0, 0, 0], + ], + dtype=tf.int64, + ) + print("expected_tensor is:\n{}".format(expected_tensor)) + assert tf.experimental.numpy.allclose(feature_tensor, expected_tensor) + + @pytest.mark.skipif( os.environ.get("NR_USER") is not None, reason="not working correctly in ci environment" ) From 0c0ce69ad2442d915976c293d95a6fcf134f1879 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Tue, 28 Sep 2021 03:15:43 +0000 Subject: [PATCH 16/28] Implement pad_left in _build_sparse_tensor TF Implement pad_left functionality in _build_sparse_tensor() method in the TensorFlow dataloader. --- nvtabular/loader/tensorflow.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index e5f38305b17..6c46cd7884d 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -205,6 +205,9 @@ class KerasSequenceLoader(tf.keras.utils.Sequence, DataLoader): dictionary of key: column_name + value: integer representing max sequence length for column sparse_dense : bool bool value to activate transforming sparse tensors to dense + pad_left : bool + Boolean value to indicate whether to pad on the left. Use True to pad on the left, + False to pad on the right. Default: False """ @@ -419,10 +422,29 @@ def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): return sparse_tensor def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limit): + print(f"values is:\n {values}") + print(f"diff_offsets is:\n {diff_offsets}") + print(f"seq_limit is:\n {seq_limit}") + ragged = tf.RaggedTensor.from_row_lengths(values=values, row_lengths=diff_offsets) - tensor = tf.RaggedTensor.from_tensor(ragged.to_tensor(shape=[None, seq_limit])).to_sparse() + print(f"ragged is:\n{ragged}") + + if self.pad_left: + max_len = max(max(len(row) for row in ragged), seq_limit) + tensor = tf.stack([tf.pad(row, [[max_len - len(row), 0]]) for row in ragged], axis=0) + print(f"tensor is mapped to:\n{tensor}") + else: + tensor = ragged.to_tensor(shape=[None, seq_limit]) + print(f"tensor is:\n{tensor}") + + tensor = tf.RaggedTensor.from_tensor(tensor) + print(f"tensor is mapped now to:\n{tensor}") + tensor = tensor.to_sparse() + print(f"tensor is now the sparse:\n{tensor}") + if self.sparse_as_dense: tensor = tf.sparse.to_dense(tensor) + print(f"tensor is transformed to to the dense:\n{tensor}") return tensor def _handle_tensors(self, cats, conts, labels): From 941d2f3fbdf7a063326b605a60ebf5474be167e3 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Tue, 28 Sep 2021 03:21:55 +0000 Subject: [PATCH 17/28] Update torch loader documentation Update torch loader documentation in order to keep documentation in sync with the changes made in our previous commits. --- nvtabular/loader/torch.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/nvtabular/loader/torch.py b/nvtabular/loader/torch.py index 9eb6f0f52a8..f66ea76dfde 100644 --- a/nvtabular/loader/torch.py +++ b/nvtabular/loader/torch.py @@ -67,6 +67,7 @@ class TorchAsyncItr(torch.utils.data.IterableDataset, DataLoader): pad_left : bool Boolean value to indicate whether to pad on the left. Use True to pad on the left, False to pad on the right. Default: False + """ def __init__( @@ -202,11 +203,6 @@ def _build_sparse_tensor( torch.sparse Our built torch sparse tensor. - Raises - ------ - NotImplementedError - Raises this error when this method is called with a not implemented - padding mode string. """ indices = self._get_indices(offsets, diff_offsets) if self.pad_left: From 76c0024e815f4c825245d850b5f9fc39a27e7a92 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Tue, 28 Sep 2021 03:33:01 +0000 Subject: [PATCH 18/28] Cleanup _build_sparese_tensor for TF dataloader Cleanup _build_sparese_tensor for TF dataloader. The motivation is improved readability. --- nvtabular/loader/tensorflow.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index 6c46cd7884d..f289950cd9d 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -422,29 +422,17 @@ def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): return sparse_tensor def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limit): - print(f"values is:\n {values}") - print(f"diff_offsets is:\n {diff_offsets}") - print(f"seq_limit is:\n {seq_limit}") ragged = tf.RaggedTensor.from_row_lengths(values=values, row_lengths=diff_offsets) - print(f"ragged is:\n{ragged}") - if self.pad_left: max_len = max(max(len(row) for row in ragged), seq_limit) tensor = tf.stack([tf.pad(row, [[max_len - len(row), 0]]) for row in ragged], axis=0) - print(f"tensor is mapped to:\n{tensor}") else: tensor = ragged.to_tensor(shape=[None, seq_limit]) - print(f"tensor is:\n{tensor}") - - tensor = tf.RaggedTensor.from_tensor(tensor) - print(f"tensor is mapped now to:\n{tensor}") - tensor = tensor.to_sparse() - print(f"tensor is now the sparse:\n{tensor}") + tensor = tf.RaggedTensor.from_tensor(tensor).to_sparse() if self.sparse_as_dense: tensor = tf.sparse.to_dense(tensor) - print(f"tensor is transformed to to the dense:\n{tensor}") return tensor def _handle_tensors(self, cats, conts, labels): From 46847cbd199e98feb086352bb7f28b9d01e3cb3e Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Tue, 28 Sep 2021 03:35:28 +0000 Subject: [PATCH 19/28] Add docstring to _build_sparse_tensor() for tf Add docstring to _build_sparse_tensor() for the TF implementation. --- nvtabular/loader/tensorflow.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index f289950cd9d..182bf8584f5 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -422,7 +422,22 @@ def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): return sparse_tensor def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limit): + """Builds sparse tensors in the TensorFlwo dataloader. + Parameters + ---------- + values : + offsets : + diff_offsets : + num_rows : + seq_limit : + + Returns + ------- + tf.sparse + Our built TensorFlow sparse tensor. + + """ ragged = tf.RaggedTensor.from_row_lengths(values=values, row_lengths=diff_offsets) if self.pad_left: max_len = max(max(len(row) for row in ragged), seq_limit) From c7ae873e94c419d3365d444355c463971d579d62 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Tue, 28 Sep 2021 03:36:42 +0000 Subject: [PATCH 20/28] Update docstring Update docstring for a small spelling error. --- nvtabular/loader/tensorflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index 182bf8584f5..eaf5713fccb 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -422,7 +422,7 @@ def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): return sparse_tensor def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limit): - """Builds sparse tensors in the TensorFlwo dataloader. + """Builds sparse tensors in the TensorFlow dataloader. Parameters ---------- From d86cec336d5a4ace559b037fabbfa469b1c84ae0 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Tue, 28 Sep 2021 03:58:16 +0000 Subject: [PATCH 21/28] Refactor torch dataloader pad_left and _build_spar Refactor torch dataloader pad_left and _build_sparse_tensor() method. The motivation is for improved readability and maintainability. --- nvtabular/loader/torch.py | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/nvtabular/loader/torch.py b/nvtabular/loader/torch.py index f66ea76dfde..16cb0dae933 100644 --- a/nvtabular/loader/torch.py +++ b/nvtabular/loader/torch.py @@ -180,6 +180,19 @@ def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): sparse_tensor = sparse_tensor.to_dense() return sparse_tensor + def _build_sparse_tensor_helper_process_column(self, col: torch.Tensor) -> torch.Tensor: + """Process column by increasing blocks for use in left padding.""" + col = col.tolist() + prev, curr = 0, 0 + while curr < len(col): + if col[curr] >= col[curr - 1]: + col[prev:curr] = col[prev:curr][::-1] + prev = curr + if curr == (len(col) - 1): + col[prev : curr + 1] = col[prev : curr + 1][::-1] + curr += 1 + return torch.Tensor(col) + def _build_sparse_tensor( self, values, @@ -207,27 +220,10 @@ def _build_sparse_tensor( indices = self._get_indices(offsets, diff_offsets) if self.pad_left: indices[:, 1] = (seq_limit - 1) - indices[:, 1] - # We make sure that the elements of our sparse matrix indices are in the correct # non-reversed order. We do this by flipping increasing blocks in the second column - # of indices. We find it convienient and more efficient to modify the transpose - # of indices and transpose indices back before returning the indices matrix. - def _process_row(row: torch.Tensor) -> torch.Tensor: - """Process row by blocks for use in left padding.""" - row = row.tolist() - prev, curr = 0, 0 - while curr < len(row): - if row[curr] >= row[curr - 1]: - row[prev:curr] = row[prev:curr][::-1] - prev = curr - if curr == (len(row) - 1): - row[prev : curr + 1] = row[prev : curr + 1][::-1] - curr += 1 - return torch.Tensor(row) - - indices = indices.T - indices[1] = _process_row(indices[1]) - indices = indices.T + # of indices. + indices[:, 1] = self._build_sparse_tensor_helper_process_column(indices[:, 1]) return self._get_sparse_tensor(values, indices, num_rows, seq_limit) From d90e1dfe9db39a37e000f5c3f12526aa9a71fde4 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Tue, 28 Sep 2021 04:01:58 +0000 Subject: [PATCH 22/28] Update pytest decorator Update pytest decorator to not skip unrealted test. --- tests/unit/loader/test_torch_dataloader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/loader/test_torch_dataloader.py b/tests/unit/loader/test_torch_dataloader.py index 43624f70de6..4e87315beab 100644 --- a/tests/unit/loader/test_torch_dataloader.py +++ b/tests/unit/loader/test_torch_dataloader.py @@ -717,7 +717,6 @@ def test_horovod_multigpu(tmpdir): assert "Training complete" in str(stdout) -@pytest.mark.skip(reason="Currently mysterious failure locally.") def test_distributed_multigpu(tmpdir): json_sample = { "conts": {}, From b21c57dce7348c43107760bdc0c68b62f7f98709 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Tue, 28 Sep 2021 04:10:08 +0000 Subject: [PATCH 23/28] Cleanup torch loader Cleanup torch loader for improved readability. --- nvtabular/loader/torch.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/nvtabular/loader/torch.py b/nvtabular/loader/torch.py index 16cb0dae933..ae31439e4dc 100644 --- a/nvtabular/loader/torch.py +++ b/nvtabular/loader/torch.py @@ -219,11 +219,9 @@ def _build_sparse_tensor( """ indices = self._get_indices(offsets, diff_offsets) if self.pad_left: - indices[:, 1] = (seq_limit - 1) - indices[:, 1] - # We make sure that the elements of our sparse matrix indices are in the correct - # non-reversed order. We do this by flipping increasing blocks in the second column - # of indices. - indices[:, 1] = self._build_sparse_tensor_helper_process_column(indices[:, 1]) + indices[:, 1] = self._build_sparse_tensor_helper_process_column( + (seq_limit - 1) - indices[:, 1] + ) return self._get_sparse_tensor(values, indices, num_rows, seq_limit) From 2150edea1fc8b3e37fa9e71f6c68e11136f7c39d Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Wed, 29 Sep 2021 18:15:16 +0000 Subject: [PATCH 24/28] Implement pad_left with TF ops Implement pad_left with TF ops, to address code reviewer's concerns with the former Python for loop with TF ops construction. --- nvtabular/loader/tensorflow.py | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index eaf5713fccb..52019b2d0a3 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -427,23 +427,46 @@ def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limi Parameters ---------- values : + The values to build our intermediate ragged tensor from. offsets : + Not currently used. diff_offsets : + The row lengths to build our intermediate ragged tensor from. num_rows : - seq_limit : + Not currently used. + seq_limit : int + The desired length of output sequences. Returns ------- tf.sparse - Our built TensorFlow sparse tensor. + The built TensorFlow sparse tensor. """ ragged = tf.RaggedTensor.from_row_lengths(values=values, row_lengths=diff_offsets) + + # Get vector of padding lengths using tf ops like reduce_sum. + non_zero_entries_by_row = tf.math.reduce_sum(ragged / ragged, axis=1) + paddings = seq_limit - non_zero_entries_by_row.numpy() + # print(f"paddings is:\n{paddings}") + + # Make zeros ragged tensor. + total_entries = ragged.shape[0] * seq_limit + non_zero_entries = tf.reduce_sum(ragged / ragged).numpy() + zeros_count = total_entries - non_zero_entries + zeros_values = tf.zeros(shape=(int(zeros_count)), dtype=tf.dtypes.int64) + print(f"zeros_values is:\n{zeros_values}") + print(f"paddings is:\n{paddings}") + zeros = tf.RaggedTensor.from_row_lengths(values=zeros_values, row_lengths=paddings) + print(f"zeros is:\n{zeros}") + + # Concatenate zeros ragged tensor with ragged tensor on either the left or the right, + # depending on either left_pad or not. + # Pad our minor matrix up to the sequence limit. if self.pad_left: - max_len = max(max(len(row) for row in ragged), seq_limit) - tensor = tf.stack([tf.pad(row, [[max_len - len(row), 0]]) for row in ragged], axis=0) + tensor = tf.concat([zeros, ragged], axis=1).to_tensor() else: - tensor = ragged.to_tensor(shape=[None, seq_limit]) + tensor = tf.concat([ragged, zeros], axis=1).to_tensor() tensor = tf.RaggedTensor.from_tensor(tensor).to_sparse() if self.sparse_as_dense: From a51aa441865dbf02c7993562f90a4f8ec6a9ae23 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Wed, 29 Sep 2021 18:21:19 +0000 Subject: [PATCH 25/28] Implement pad_left with TF ops cleanup Implement pad_left with TF ops, to address code reviewer's concerns with the former Python for loop with TF ops construction. This commit cleans up this implementation. --- nvtabular/loader/tensorflow.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index 52019b2d0a3..b74629883a2 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -448,21 +448,16 @@ def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limi # Get vector of padding lengths using tf ops like reduce_sum. non_zero_entries_by_row = tf.math.reduce_sum(ragged / ragged, axis=1) paddings = seq_limit - non_zero_entries_by_row.numpy() - # print(f"paddings is:\n{paddings}") - # Make zeros ragged tensor. + # Make zeros ragged tensor to pad our data tensor with. total_entries = ragged.shape[0] * seq_limit non_zero_entries = tf.reduce_sum(ragged / ragged).numpy() zeros_count = total_entries - non_zero_entries zeros_values = tf.zeros(shape=(int(zeros_count)), dtype=tf.dtypes.int64) - print(f"zeros_values is:\n{zeros_values}") - print(f"paddings is:\n{paddings}") zeros = tf.RaggedTensor.from_row_lengths(values=zeros_values, row_lengths=paddings) - print(f"zeros is:\n{zeros}") - # Concatenate zeros ragged tensor with ragged tensor on either the left or the right, + # Concatenate zeros ragged tensor with our data tensor on either the left or the right, # depending on either left_pad or not. - # Pad our minor matrix up to the sequence limit. if self.pad_left: tensor = tf.concat([zeros, ragged], axis=1).to_tensor() else: From b305afa1ab8c123449361916f94a16040636c924 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Thu, 30 Sep 2021 19:13:51 +0000 Subject: [PATCH 26/28] Update tensorflow dataloader implementation Update tensorflow dataloader implementation for speed optimization. This implements a suggested revision by @jperez999 for issue #1077. --- nvtabular/loader/tensorflow.py | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index b74629883a2..d7fd2f296e7 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -445,23 +445,10 @@ def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limi """ ragged = tf.RaggedTensor.from_row_lengths(values=values, row_lengths=diff_offsets) - # Get vector of padding lengths using tf ops like reduce_sum. - non_zero_entries_by_row = tf.math.reduce_sum(ragged / ragged, axis=1) - paddings = seq_limit - non_zero_entries_by_row.numpy() - - # Make zeros ragged tensor to pad our data tensor with. - total_entries = ragged.shape[0] * seq_limit - non_zero_entries = tf.reduce_sum(ragged / ragged).numpy() - zeros_count = total_entries - non_zero_entries - zeros_values = tf.zeros(shape=(int(zeros_count)), dtype=tf.dtypes.int64) - zeros = tf.RaggedTensor.from_row_lengths(values=zeros_values, row_lengths=paddings) - - # Concatenate zeros ragged tensor with our data tensor on either the left or the right, - # depending on either left_pad or not. - if self.pad_left: - tensor = tf.concat([zeros, ragged], axis=1).to_tensor() - else: - tensor = tf.concat([ragged, zeros], axis=1).to_tensor() + reverse = tf.reverse(ragged, [-1]).to_tensor(0) + tensor = tf.reverse(reverse, [-1]) + paddings = tf.constant([[0, 0], [seq_limit - tensor.shape[1], 0]]) + tensor = tf.pad(tensor, paddings) tensor = tf.RaggedTensor.from_tensor(tensor).to_sparse() if self.sparse_as_dense: From 587ef0c48d21b059449a6e2a857de571f8305f1c Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Thu, 30 Sep 2021 19:33:09 +0000 Subject: [PATCH 27/28] Update pad_left TF unit tests Update pad_left TF unit tests to make name consistent with other sparse tensor test and to collect print statements. --- tests/unit/loader/test_tf_dataloader.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/unit/loader/test_tf_dataloader.py b/tests/unit/loader/test_tf_dataloader.py index 96264af0a0d..261433f77cd 100644 --- a/tests/unit/loader/test_tf_dataloader.py +++ b/tests/unit/loader/test_tf_dataloader.py @@ -519,7 +519,7 @@ def test_sparse_tensors(tmpdir, sparse_dense): @pytest.mark.parametrize("pad_left", [False, True]) -def test_sparse_tensor_left_padding(pad_left): +def test_sparse_tensors_left_padding(pad_left): """Tests the pad_left functionality of our TensorFlow dataloader to pad data on the left for sparse tensors.""" df = cudf.DataFrame({"A": [[3, 1, 5, 1], [9, 2], [6]], "B": [[3, 1, 5, 1, 9], [2], [6, 5, 3]]}) @@ -543,14 +543,12 @@ def test_sparse_tensor_left_padding(pad_left): features, labels = batch for categorical_column in categorical_columns: feature_tensor = features[categorical_column] - print("feature_tensor is:\n{}".format(feature_tensor)) print("categorical_column is:\n{}".format(categorical_column)) if pad_left: if categorical_column == "A": expected_tensor = tf.constant( [[0, 3, 1, 5, 1], [0, 0, 0, 9, 2], [0, 0, 0, 0, 6]], dtype=tf.int64 ) - print("expected_tensor is:\n{}".format(expected_tensor)) if categorical_column == "B": expected_tensor = tf.constant( [ @@ -560,13 +558,11 @@ def test_sparse_tensor_left_padding(pad_left): ], dtype=tf.int64, ) - print("expected_tensor is:\n{}".format(expected_tensor)) elif not pad_left: if categorical_column == "A": expected_tensor = tf.constant( [[3, 1, 5, 1, 0], [9, 2, 0, 0, 0], [6, 0, 0, 0, 0]], dtype=tf.int64 ) - print("expected_tensor is:\n{}".format(expected_tensor)) if categorical_column == "B": expected_tensor = tf.constant( [ @@ -576,7 +572,8 @@ def test_sparse_tensor_left_padding(pad_left): ], dtype=tf.int64, ) - print("expected_tensor is:\n{}".format(expected_tensor)) + print("expected_tensor is:\n{}".format(expected_tensor)) + print("feature_tensor is:\n{}".format(feature_tensor)) assert tf.experimental.numpy.allclose(feature_tensor, expected_tensor) From dd9927e22a03742a2a1f6f7b45ac5e58fdd82b10 Mon Sep 17 00:00:00 2001 From: Adam Lesnikowski Date: Thu, 30 Sep 2021 19:34:00 +0000 Subject: [PATCH 28/28] Update pad_left code for TF sparse tensors Update pad_left code for TF sparse tensors to properly handle the default pad right case. --- nvtabular/loader/tensorflow.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/nvtabular/loader/tensorflow.py b/nvtabular/loader/tensorflow.py index d7fd2f296e7..d1a17bafc83 100644 --- a/nvtabular/loader/tensorflow.py +++ b/nvtabular/loader/tensorflow.py @@ -443,11 +443,15 @@ def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limi The built TensorFlow sparse tensor. """ - ragged = tf.RaggedTensor.from_row_lengths(values=values, row_lengths=diff_offsets) + tensor = tf.RaggedTensor.from_row_lengths(values=values, row_lengths=diff_offsets) - reverse = tf.reverse(ragged, [-1]).to_tensor(0) - tensor = tf.reverse(reverse, [-1]) - paddings = tf.constant([[0, 0], [seq_limit - tensor.shape[1], 0]]) + if self.pad_left: + reverse = tf.reverse(tensor, [-1]).to_tensor(default_value=0) + tensor = tf.reverse(reverse, [-1]) + paddings = tf.constant([[0, 0], [seq_limit - tensor.shape[1], 0]]) + else: + tensor = tensor.to_tensor(default_value=0) + paddings = tf.constant([[0, 0], [0, seq_limit - tensor.shape[1]]]) tensor = tf.pad(tensor, paddings) tensor = tf.RaggedTensor.from_tensor(tensor).to_sparse()