diff --git a/merlin/config/__init__.py b/merlin/config/__init__.py new file mode 100644 index 000000000..a1f37a010 --- /dev/null +++ b/merlin/config/__init__.py @@ -0,0 +1,67 @@ +# +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +_DASK_QUERY_PLANNING_ENABLED = False +try: + # Disable query-planning and string conversion + import dask + + dask.config.set( + { + "dataframe.query-planning": False, + "dataframe.convert-string": False, + } + ) +except ImportError: + dask = None +else: + import sys + + import dask.dataframe as dd + from packaging.version import parse + + if parse(dask.__version__) > parse("2024.6.0"): + # For newer versions of dask, we can just check + # the official DASK_EXPR_ENABLED constant + _DASK_QUERY_PLANNING_ENABLED = dd.DASK_EXPR_ENABLED + else: + # For older versions of dask, we must assume query + # planning is enabled if dask_expr was imported + # (because we can't know for sure) + _DASK_QUERY_PLANNING_ENABLED = "dask_expr" in sys.modules + + +def validate_dask_configs(): + """Central check for problematic config options in Dask""" + if _DASK_QUERY_PLANNING_ENABLED: + raise NotImplementedError( + "Merlin does not support the query-planning API in " + "Dask Dataframe yet. Please make sure query-planning is " + "disabled before dask.dataframe is imported.\n\ne.g." + "dask.config.set({'dataframe.query-planning': False})" + "\n\nOr set the environment variable: " + "export DASK_DATAFRAME__QUERY_PLANNING=False" + ) + + if dask is not None and dask.config.get("dataframe.convert-string"): + raise NotImplementedError( + "Merlin does not support automatic string conversion in " + "Dask Dataframe yet. Please make sure this option is " + "disabled.\n\ne.g." + "dask.config.set({'dataframe.convert-string': False})" + "\n\nOr set the environment variable: " + "export DASK_DATAFRAME__CONVERT_STRING=False" + ) diff --git a/merlin/core/__init__.py b/merlin/core/__init__.py index f35898e5d..0dda4f9d5 100644 --- a/merlin/core/__init__.py +++ b/merlin/core/__init__.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +14,8 @@ # limitations under the License. # +from merlin.config import validate_dask_configs from merlin.core import _version __version__ = _version.get_versions()["version"] +validate_dask_configs() diff --git a/merlin/dag/__init__.py b/merlin/dag/__init__.py index dca0c76dd..c668e8945 100644 --- a/merlin/dag/__init__.py +++ b/merlin/dag/__init__.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +15,10 @@ # # flake8: noqa +from merlin.config import validate_dask_configs + +validate_dask_configs() + from merlin.dag.graph import Graph from merlin.dag.node import Node, iter_nodes, postorder_iter_nodes, preorder_iter_nodes from merlin.dag.operator import DataFormats, Operator, Supports diff --git a/merlin/io/__init__.py b/merlin/io/__init__.py index ff4058c5a..851f5a558 100644 --- a/merlin/io/__init__.py +++ b/merlin/io/__init__.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,8 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # - # flake8: noqa + +from merlin.config import validate_dask_configs + +validate_dask_configs() + from merlin.io import dataframe_iter, dataset, shuffle from merlin.io.dataframe_iter import DataFrameIter from merlin.io.dataset import MERLIN_METADATA_DIR_NAME, Dataset diff --git a/merlin/io/dataset.py b/merlin/io/dataset.py index cf7644eb1..d1414c61f 100644 --- a/merlin/io/dataset.py +++ b/merlin/io/dataset.py @@ -1130,103 +1130,10 @@ def npartitions(self): return self.to_ddf().npartitions def validate_dataset(self, **kwargs): - """Validate for efficient processing. + raise NotImplementedError(""" validate_dataset is not supported for merlin >23.08 """) - The purpose of this method is to validate that the Dataset object - meets the minimal requirements for efficient NVTabular processing. - For now, this criteria requires the data to be in parquet format. - - Example Usage:: - - dataset = Dataset("/path/to/data_pq", engine="parquet") - assert validate_dataset(dataset) - - Parameters - ----------- - **kwargs : - Key-word arguments to pass down to the engine's validate_dataset - method. For the recommended parquet format, these arguments - include `add_metadata_file`, `row_group_max_size`, `file_min_size`, - and `require_metadata_file`. For more information, see - `ParquetDatasetEngine.validate_dataset`. - - Returns - ------- - valid : bool - `True` if the input dataset is valid for efficient NVTabular - processing. - """ - - # Check that the dataset format is Parquet - if not isinstance(self.engine, ParquetDatasetEngine): - msg = ( - "NVTabular is optimized for the parquet format. Please use " - "the to_parquet method to convert your dataset." - ) - warnings.warn(msg) - return False # Early return - - return self.engine.validate_dataset(**kwargs) - - def regenerate_dataset( - self, - output_path, - columns=None, - output_format="parquet", - compute=True, - **kwargs, - ): - """EXPERIMENTAL: - Regenerate an NVTabular Dataset for efficient processing by writing - out new Parquet files. In contrast to default ``to_parquet`` behavior, - this method preserves the original ordering. - - Example Usage:: - - dataset = Dataset("/path/to/data_pq", engine="parquet") - dataset.regenerate_dataset( - out_path, part_size="1MiB", file_size="10MiB" - ) - - Parameters - ----------- - output_path : string - Root directory path to use for the new (regenerated) dataset. - columns : list(string), optional - Subset of columns to include in the regenerated dataset. - output_format : string, optional - Format to use for regenerated dataset. Only "parquet" (default) - is currently supported. - compute : bool, optional - Whether to compute the task graph or to return a Delayed object. - By default, the graph will be executed. - **kwargs : - Key-word arguments to pass down to the engine's regenerate_dataset - method. See `ParquetDatasetEngine.regenerate_dataset` for more - information. - - Returns - ------- - result : int or Delayed - If `compute=True` (default), the return value will be an integer - corresponding to the number of generated data files. If `False`, - the returned value will be a `Delayed` object. - """ - - # Check that the desired output format is Parquet - if output_format not in ["parquet"]: - msg = ( - f"NVTabular is optimized for the parquet format. " - f"{output_format} is not yet a supported output format for " - f"regenerate_dataset." - ) - raise ValueError(msg) - - result = ParquetDatasetEngine.regenerate_dataset(self, output_path, columns=None, **kwargs) - if compute: - return result.compute() - else: - return result + def regenerate_dataset(self, *args, **kwargs): + raise NotImplementedError(""" regenerate_dataset is not supported for merlin >23.08 """) def infer_schema(self, n=1): """Create a schema containing the column names and inferred dtypes of the Dataset diff --git a/merlin/io/dataset_engine.py b/merlin/io/dataset_engine.py index 37ea82acd..5a128a7e8 100644 --- a/merlin/io/dataset_engine.py +++ b/merlin/io/dataset_engine.py @@ -51,13 +51,6 @@ def _path_partition_map(self): def num_rows(self): raise NotImplementedError(""" Returns the number of rows in the dataset """) - def validate_dataset(self, **kwargs): - raise NotImplementedError(""" Returns True if the raw data is efficient for NVTabular """) - - @classmethod - def regenerate_dataset(cls, dataset, output_path, columns=None, **kwargs): - raise NotImplementedError(""" Regenerate a dataset with optimal properties """) - def sample_data(self, n=1): """Return a sample of real data from the dataset diff --git a/merlin/io/parquet.py b/merlin/io/parquet.py index 710b6ea57..fff5c60f6 100644 --- a/merlin/io/parquet.py +++ b/merlin/io/parquet.py @@ -18,11 +18,9 @@ import itertools import logging import math -import operator import os import threading import warnings -from collections import defaultdict from uuid import uuid4 from packaging.version import Version @@ -30,7 +28,6 @@ from merlin.core.compat import cudf if cudf: - import dask_cudf from cudf.io.parquet import ParquetWriter as pwriter_cudf from dask_cudf.io.parquet import CudfEngine @@ -41,15 +38,9 @@ import pyarrow as pa import pyarrow.dataset as pa_ds import toolz as tlz -from dask.base import tokenize -from dask.dataframe.core import _concat, new_dd_object from dask.dataframe.io.parquet.arrow import ArrowDatasetEngine from dask.dataframe.io.parquet.core import apply_filters -from dask.dataframe.io.parquet.utils import _analyze_paths -from dask.delayed import Delayed -from dask.highlevelgraph import HighLevelGraph -from dask.utils import natural_sort_key, parse_bytes -from fsspec.core import get_fs_token_paths +from dask.utils import natural_sort_key from pyarrow import parquet as pq from pyarrow.parquet import ParquetWriter as pwriter_pyarrow @@ -359,24 +350,6 @@ def __init__( def _path0(self): return next(self._dataset.get_fragments()).path - @property # type: ignore - @functools.lru_cache(1) - def _legacy_dataset(self): - # TODO: Remove this after finding a way to avoid - # the use of `ParquetDataset` in `validate_dataset` - paths = self.paths - fs = self.fs - if len(paths) > 1: - # This is a list of files - dataset = pq.ParquetDataset(paths, filesystem=fs, validate_schema=False) - elif fs.isdir(paths[0]): - # This is a directory - dataset = pq.ParquetDataset(paths[0], filesystem=fs, validate_schema=False) - else: - # This is a single file - dataset = pq.ParquetDataset(paths[0], filesystem=fs) - return dataset - @property # type: ignore @functools.lru_cache(1) def _dataset(self): @@ -503,434 +476,6 @@ def sample_data(self, n=1): **self.read_parquet_kwargs, ).take(list(range(n))) - def validate_dataset( - self, - add_metadata_file=False, - require_metadata_file=True, - row_group_max_size=None, - file_min_size=None, - ): - """Validate ParquetDatasetEngine object for efficient processing. - - The purpose of this method is to validate that the raw dataset - meets the minimal requirements for efficient NVTabular processing. - Warnings are raised if any of the following conditions are not met: - - - The raw dataset directory should contain a global "_metadata" - file. If this file is missing, ``add_metadata_file=True`` can - be passed to generate a new one. - - If there is no _metadata file, the parquet schema must be - consistent for all row-groups/files in the raw dataset. - Otherwise, a new _metadata file must be generated to avoid - errors at IO time. - - The row-groups should be no larger than the maximum size limit - (``row_group_max_size``). - - For multi-file datasets, the files should be no smaller than - the minimum size limit (``file_min_size``). - - Parameters - ----------- - add_metadata_file : bool, default False - Whether to add a global _metadata file to the dataset if one - is missing. - require_metadata_file : bool, default True - Whether to require the existence of a _metadata file to pass - the dataset validation. - row_group_max_size : int or str, default None - Maximum size (in bytes) of each parquet row-group in the - dataset. If None, the minimum of ``self.part_size`` and 500MB - will be used. - file_min_size : int or str, default None - Minimum size (in bytes) of each parquet file in the dataset. This - limit is only applied if there are >1 file in the dataset. If None, - ``self.part_size`` will be used. - - Returns - ------- - valid : bool - `True` if the input dataset is valid for efficient NVTabular - processing. - """ - - meta_valid = True # Parquet format and _metadata exists - size_valid = False # Row-group sizes are appropriate - - # Check for user-specified row-group size limit. - # Otherwise we use the smaller of the dataset partition - # size and 500MB. - if row_group_max_size is None: - row_group_max_size = min(self.part_size, 500_000_000) - else: - row_group_max_size = parse_bytes(row_group_max_size) - - # Check for user-specified file size limit. - # Otherwise we use the smaller of the dataset partition - # size and 500MB. - if file_min_size is None: - file_min_size = self.part_size - else: - file_min_size = parse_bytes(file_min_size) - - # Get dataset and path list - pa_dataset = self._legacy_dataset - paths = [p.path for p in pa_dataset.pieces] - root_dir, fns = _analyze_paths(paths, self.fs) - - # Collect dataset metadata - metadata_file_exists = bool(pa_dataset.metadata) - schema_errors = defaultdict(set) - if metadata_file_exists: - # We have a metadata file - metadata = pa_dataset.metadata - else: - # No metadata file - Collect manually - metadata = None - for piece, fn in zip(pa_dataset.pieces, fns): - md = piece.get_metadata() - md.set_file_path(fn) - if metadata: - _append_row_groups(metadata, md, schema_errors, piece.path) - else: - metadata = md - - # Check for inconsistent schemas. - # This is not a problem if a _metadata file exists - for field in schema_errors: - msg = f"Schema mismatch detected in column: '{field}'." - warnings.warn(msg) - for item in schema_errors[field]: - msg = f"[{item[0]}] Expected {item[1]}, got {item[2]}." - warnings.warn(msg) - - # If there is schema mismatch, urge the user to add a _metadata file - if len(schema_errors): - meta_valid = False # There are schema-mismatch errors - - # Check that the Dask version supports `create_metadata_file` - if Version(dask.__version__) <= Version("2.30.0"): - msg = ( - "\nThe installed version of Dask is too old to handle " - "schema mismatch. Try installing the latest version." - ) - warnings.warn(msg) - return meta_valid and size_valid # Early return - - # Collect the metadata with dask_cudf and then convert to pyarrow - metadata_bytes = dask_cudf.io.parquet.create_metadata_file( - paths, - out_dir=False, - ) - with py_io.BytesIO() as myio: - myio.write(memoryview(metadata_bytes)) - myio.seek(0) - metadata = pq.ParquetFile(myio).metadata - - if not add_metadata_file: - msg = ( - "\nPlease pass add_metadata_file=True to add a global " - "_metadata file, or use the regenerate_dataset utility to " - "rewrite your dataset. Without a _metadata file, the schema " - "mismatch may cause errors at read time." - ) - warnings.warn(msg) - - # Record the total byte size of all row groups and files - max_rg_size = 0 - max_rg_size_path = None - file_sizes = defaultdict(int) - for rg in range(metadata.num_row_groups): - row_group = metadata.row_group(rg) - path = row_group.column(0).file_path - total_byte_size = row_group.total_byte_size - if total_byte_size > max_rg_size: - max_rg_size = total_byte_size - max_rg_size_path = path - file_sizes[path] += total_byte_size - - # Check if any row groups are prohibitively large. - # Also check if any row groups are larger than recommended. - if max_rg_size > row_group_max_size: - # One or more row-groups are above the "required" limit - msg = ( - f"Excessive row_group size ({max_rg_size}) detected in file " - f"{max_rg_size_path}. Please use the regenerate_dataset utility " - f"to rewrite your dataset." - ) - warnings.warn(msg) - else: - # The only way size_valid==True is if we get here - size_valid = True - - # Check if any files are smaller than the desired size. - # We only warn if there are >1 files in the dataset. - for path, size in file_sizes.items(): - if size < file_min_size and len(pa_dataset.pieces) > 1: - msg = ( - f"File {path} is smaller than the desired dataset " - f"partition size ({self.part_size}). Consider using the " - f"regenerate_dataset utility to rewrite your dataset with a smaller " - f"number of (larger) files." - ) - warnings.warn(msg) - size_valid = False - - # If the _metadata file is missing, we need to write - # it (or inform the user that it is missing) - if not metadata_file_exists: - if add_metadata_file: - # Write missing _metadata file - fs = self.fs - metadata_path = fs.sep.join([root_dir, "_metadata"]) - with fs.open(metadata_path, "wb") as fil: - metadata.write_metadata_file(fil) - meta_valid = True - else: - # Inform user that the _metadata file is missing - msg = ( - "For best performance with NVTabular, there should be a " - "global _metadata file located in the root directory of the " - "dataset. Please pass add_metadata_file=True to add the " - "missing file." - ) - warnings.warn(msg) - if require_metadata_file: - meta_valid = False - - # Return True if we have a parquet dataset with a _metadata file (meta_valid) - # and the row-groups and file are appropriate sizes (size_valid) - return meta_valid and size_valid - - @classmethod - def regenerate_dataset( - cls, - dataset, - output_path, - columns=None, - file_size=None, - part_size=None, - cats=None, - conts=None, - labels=None, - storage_options=None, - ): - """Regenerate an NVTabular Dataset for efficient processing. - - Example Usage:: - - dataset = Dataset("/path/to/data_pq", engine="parquet") - dataset.regenerate_dataset( - out_path, part_size="1MiB", file_size="10MiB" - ) - - Parameters - ----------- - dataset : Dataset - Input `Dataset` object (to be regenerated). - output_path : string - Root directory path to use for the new (regenerated) dataset. - columns : list[string], optional - Subset of columns to include in the regenerated dataset. - file_size : int or string, optional - Desired size of each output file. - part_size : int or string, optional - Desired partition size to use within regeneration algorithm. - Note that this is effectively the size of each contiguous write - operation in cudf. - cats : list[string], optional - Categorical column list. - conts : list[string], optional - Continuous column list. - labels : list[string], optional - Label column list. - storage_options : dict, optional - Storage-option kwargs to pass through to the `fsspec` file-system - interface. - - Returns - ------- - result : int or Delayed - If `compute=True` (default), the return value will be an integer - corresponding to the number of generated data files. If `False`, - the returned value will be a `Delayed` object. - """ - - # Specify ideal file size and partition size - row_group_size = 128_000_000 - file_size = parse_bytes(file_size) or row_group_size * 100 - part_size = parse_bytes(part_size) or row_group_size * 10 - part_size = min(part_size, file_size) - - fs, _, _ = get_fs_token_paths(output_path, mode="wb", storage_options=storage_options) - - # Start by converting the original dataset to a Dask-Dataframe - # object in CPU memory. We avoid GPU memory in case the original - # dataset is prone to OOM errors. - _ddf = dataset.engine.to_ddf(columns=columns, cpu=True) - - # Prepare general metadata (gmd) - gmd = {} - cats = cats or [] - conts = conts or [] - labels = labels or [] - if not len(cats + conts + labels): - warnings.warn( - "General-metadata information not detected! " - "Please pass lists for `cats`, `conts`, and `labels` as" - "arguments to `regenerate_dataset` to ensure a complete " - "and correct _metadata.json file." - ) - col_idx = {str(name): i for i, name in enumerate(_ddf.columns)} - gmd["cats"] = [{"col_name": c, "index": col_idx[c]} for c in cats] - gmd["conts"] = [{"col_name": c, "index": col_idx[c]} for c in conts] - gmd["labels"] = [{"col_name": c, "index": col_idx[c]} for c in labels] - - # Get list of partition lengths - token = tokenize( - dataset, - output_path, - columns, - part_size, - file_size, - cats, - conts, - labels, - storage_options, - ) - getlen_name = "getlen-" + token - name = "all-" + getlen_name - dsk = {(getlen_name, i): (len, (_ddf._name, i)) for i in range(_ddf.npartitions)} - dsk[name] = [(getlen_name, i) for i in range(_ddf.npartitions)] - graph = HighLevelGraph.from_collections(name, dsk, dependencies=[_ddf]) - size_list = Delayed(name, graph).compute() - - # Get memory usage per row using first partition - p0_mem_size = _ddf.partitions[0].memory_usage(deep=True, index=True).sum().compute() - mem_per_row = int(float(p0_mem_size) / float(size_list[0])) - - # Determine the number of rows to assign to each output partition - # and the number of output partitions to assign to each output file - rows_per_part = int(part_size / mem_per_row) - parts_per_file = int(file_size / part_size) - - # Construct re-partition graph - dsk2 = {} - repartition_name = "repartition-" + token - split_name = "split-" + repartition_name - getitem_name = "getitem-" + repartition_name - - gets = defaultdict(list) - out_parts = 0 - remaining_out_part_rows = rows_per_part - for i, in_part_size in enumerate(size_list): - # The `split` dictionary will be passed to this input - # partition to dictate how that partition will be split - # into different output partitions/files. The "key" of - # this dict is the output partition, and the value is a - # tuple specifying the (start, end) row range. - split = {} - last = 0 - while in_part_size >= remaining_out_part_rows: - gets[out_parts].append(i) - split[out_parts] = (last, last + remaining_out_part_rows) - last += remaining_out_part_rows - in_part_size = in_part_size - remaining_out_part_rows - - remaining_out_part_rows = rows_per_part - out_parts += 1 - - if in_part_size: - gets[out_parts].append(i) - split[out_parts] = (last, last + in_part_size) - remaining_out_part_rows -= in_part_size - - if remaining_out_part_rows == 0: - remaining_out_part_rows = rows_per_part - out_parts += 1 - - dsk2[(split_name, i)] = (_split_part, (_ddf._name, i), split) - npartitions = max(gets) + 1 - - for k, v_list in gets.items(): - last = None - _concat_list = [] - for v in v_list: - key = (getitem_name, v, k) - _concat_list.append(key) - dsk2[key] = (operator.getitem, (split_name, v), k) - - ignore_index = True - dsk2[(repartition_name, k)] = (_concat, _concat_list, ignore_index) - - graph2 = HighLevelGraph.from_collections(repartition_name, dsk2, dependencies=[_ddf]) - divisions = [None] * (npartitions + 1) - _ddf2 = new_dd_object(graph2, repartition_name, _ddf._meta, divisions) - - # Make sure the root directory exists - fs.mkdirs(output_path, exist_ok=True) - - # Construct rewrite graph - dsk3 = {} - rewrite_name = "rewrite-" + token - write_data_name = "write-data-" + rewrite_name - write_metadata_name = "write-metadata-" + rewrite_name - inputs = [] - final_inputs = [] - for i in range(_ddf2.npartitions): - index = i // parts_per_file - nex_index = (i + 1) // parts_per_file - package_task = (index != nex_index) or (i == (_ddf2.npartitions - 1)) - fn = f"part.{index}.parquet" - inputs.append((repartition_name, i)) - if package_task: - final_inputs.append((write_data_name, i)) - dsk3[(write_data_name, i)] = ( - _write_data, - inputs, - output_path, - fs, - fn, - ) - inputs = [] - - # Final task collects and writes all metadata - dsk3[write_metadata_name] = ( - _write_metadata_file, - final_inputs, - fs, - output_path, - gmd, - ) - graph3 = HighLevelGraph.from_collections(write_metadata_name, dsk3, dependencies=[_ddf2]) - - return Delayed(write_metadata_name, graph3) - - -def _write_metadata_file(md_list, fs, output_path, gmd_base): - # Prepare both "general" and parquet metadata - gmd = gmd_base.copy() - pmd = {} - data_paths = [] - file_stats = [] - for m in md_list: - for path in m.keys(): - md = m[path]["md"] - rows = m[path]["rows"] - pmd[path] = md - data_paths.append(path) - fn = path.split(fs.sep)[-1] - file_stats.append({"file_name": fn, "num_rows": rows}) - gmd["data_paths"] = data_paths - gmd["file_stats"] = file_stats - - # Write general metadata file - GPUParquetWriter.write_general_metadata(gmd, fs, output_path) - - # Write specialized parquet metadata file - GPUParquetWriter.write_special_metadata(pmd, fs, output_path) - - # Return total file count (sanity check) - return len(data_paths) - def _write_data(data_list, output_path, fs, fn): # Initialize chunked writer diff --git a/tests/unit/core/test_protocols.py b/tests/unit/core/test_protocols.py index 08f2929a8..9b0b26efd 100644 --- a/tests/unit/core/test_protocols.py +++ b/tests/unit/core/test_protocols.py @@ -42,6 +42,6 @@ def test_dataframes_match_protocols(protocol, device): @pytest.mark.parametrize("device", _DEVICES) def test_series_are_serieslike(device): - obj = make_series([], device=device) + obj = make_series([0], device=device) assert isinstance(obj, SeriesLike) diff --git a/tests/unit/dag/ops/test_udf.py b/tests/unit/dag/ops/test_udf.py index 6a52b554f..038575025 100644 --- a/tests/unit/dag/ops/test_udf.py +++ b/tests/unit/dag/ops/test_udf.py @@ -229,7 +229,7 @@ def test_udf_dtype_multi_op_propagation(cpu): { "a": np.arange(size), "b": np.random.choice(["apple", "banana", "orange"], size), - "c": np.random.choice([0, 1], size).astype(np.float16), + "c": np.random.choice([0, 1], size), } ) ddf0 = dd.from_pandas(df0, npartitions=4) diff --git a/tests/unit/io/test_io.py b/tests/unit/io/test_io.py index c64ecf61a..a7ee8d15d 100644 --- a/tests/unit/io/test_io.py +++ b/tests/unit/io/test_io.py @@ -15,9 +15,7 @@ # import glob -import math import os -import warnings import dask import dask.dataframe as dd @@ -52,36 +50,6 @@ def _check_partition_lens(ds): assert ds.engine._partition_lens == _lens -def test_validate_dataset_bad_schema(tmpdir): - if Version(dask.__version__) <= Version("2.30.0"): - # Older versions of Dask will not handle schema mismatch - pytest.skip("Test requires newer version of Dask.") - - path = str(tmpdir) - for fn, df in [ - ("part.0.parquet", pd.DataFrame({"a": range(10), "b": range(10)})), - ("part.1.parquet", pd.DataFrame({"a": [None] * 10, "b": range(10)})), - ]: - df.to_parquet(os.path.join(path, fn)) - - # Initial dataset has mismatched schema and is missing a _metadata file. - dataset = merlin.io.Dataset(path, engine="parquet") - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - # Schema issue should cause validation failure, even if _metadata is ignored - assert not dataset.validate_dataset(require_metadata_file=False) - # File size should cause validation error, even if _metadata is generated - assert not dataset.validate_dataset(add_metadata_file=True) - # Make sure the last call added a `_metadata` file - assert len(glob.glob(os.path.join(path, "_metadata"))) - - # New dataset has a _metadata file, but the file size is still too small - dataset = merlin.io.Dataset(path, engine="parquet") - assert not dataset.validate_dataset() - # Ignore file size to get validation success - assert dataset.validate_dataset(file_min_size=1, row_group_max_size="1GB") - - def test_incorrect_schema_dataset(): with pytest.raises(TypeError) as err: merlin.io.Dataset("", schema={}) @@ -190,7 +158,7 @@ def test_io_partitions_push(tmpdir): # Generate random csv files files = [os.path.join(tmpdir, f"csv/day_{i}") for i in range(23)] for file in files: - with open(file, "w") as f: + with open(file, "w", encoding="utf-8") as f: f.write("0,1,2,3,a,b,c\n" * 1000) # Load csv files @@ -493,67 +461,6 @@ def test_to_parquet_row_group_size(tmpdir, cpu, row_group_size): assert all(len(part) <= row_group_size for part in result.partitions) -@pytest.mark.parametrize("engine", ["csv", "parquet"]) -def test_validate_dataset(datasets, engine): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - paths = glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0]) - if engine == "parquet": - dataset = merlin.io.Dataset(str(datasets[engine]), engine=engine) - - # Default file_min_size should result in failed validation - assert not dataset.validate_dataset() - assert dataset.validate_dataset(file_min_size=1, require_metadata_file=False) - else: - dataset = merlin.io.Dataset(paths, header=False, names=allcols_csv) - - # CSV format should always fail validation - assert not dataset.validate_dataset() - - -def test_validate_and_regenerate_dataset(tmpdir): - # Initial timeseries dataset (in cpu memory) - ddf = dask.datasets.timeseries( - start="2000-01-01", - end="2000-01-05", - freq="60s", - partition_freq="1d", - seed=42, - ) - ds = merlin.io.Dataset(ddf) - - # Regenerate dataset on disk - path = str(tmpdir) - ds.regenerate_dataset(path, part_size="50KiB", file_size="150KiB") - - # Check that the regenerated dataset makes sense. - # Dataset is ~544KiB - Expect 4 data files - N = math.ceil(ddf.compute().memory_usage(deep=True).sum() / 150000) - file_list = glob.glob(os.path.join(path, "*")) - assert os.path.join(path, "_metadata") in file_list - assert os.path.join(path, "_file_list.txt") in file_list - assert os.path.join(path, "_metadata.json") in file_list - assert len(file_list) == N + 3 # N data files + 3 metadata files - - # Check new dataset validation - ds2 = merlin.io.Dataset(path, engine="parquet", part_size="64KiB") - ds2.validate_dataset(file_min_size=1) - - # Check that dataset content is correct - assert_eq( - ddf.reset_index(drop=False), - ds2.to_ddf().compute(), - check_index=False, - ) - - # Check cpu version of `to_ddf` - assert_eq( - ddf.reset_index(drop=False), - ds2.engine.to_ddf(cpu=True).compute(), - check_index=False, - ) - - @pytest.mark.parametrize("preserve_files", [True, False]) @pytest.mark.parametrize("cpu", [True, False]) def test_dataset_conversion(tmpdir, cpu, preserve_files):