diff --git a/esmvalcore/_recipe/recipe.py b/esmvalcore/_recipe/recipe.py index 55e789d6f4..8d4809ffa0 100644 --- a/esmvalcore/_recipe/recipe.py +++ b/esmvalcore/_recipe/recipe.py @@ -220,7 +220,10 @@ def _get_default_settings(dataset): settings["remove_supplementary_variables"] = {} # Configure saving cubes to file - settings["save"] = {"compress": session["compress_netcdf"]} + settings["save"] = { + "compress": session["compress_netcdf"], + "compute": False, + } if facets["short_name"] != facets["original_short_name"]: settings["save"]["alias"] = facets["short_name"] @@ -381,6 +384,8 @@ def _get_downstream_settings(step, order, products): if key in remaining_steps: if all(p.settings.get(key, object()) == value for p in products): settings[key] = value + # Set the compute argument to the save step. + settings["save"] = {"compute": some_product.settings["save"]["compute"]} return settings diff --git a/esmvalcore/preprocessor/__init__.py b/esmvalcore/preprocessor/__init__.py index 851aae49f0..aaa2f2fdc0 100644 --- a/esmvalcore/preprocessor/__init__.py +++ b/esmvalcore/preprocessor/__init__.py @@ -9,6 +9,10 @@ from pprint import pformat from typing import Any, Iterable +import dask +import dask.diagnostics +import distributed +from dask.delayed import Delayed from iris.cube import Cube from .._provenance import TrackedFile @@ -428,6 +432,9 @@ def preprocess( ) ) + if step == "save": + return result + items = [] for item in result: if isinstance(item, (PreprocessorFile, Cube, str, Path)): @@ -536,22 +543,24 @@ def cubes(self): def cubes(self, value): self._cubes = value - def save(self): + def save(self) -> list[Delayed] | None: """Save cubes to disk.""" - preprocess( + return preprocess( self._cubes, "save", input_files=self._input_files, **self.settings["save"], ) - def close(self): + def close(self) -> list[Delayed] | None: """Close the file.""" + result = None if self._cubes is not None: self._update_attributes() - self.save() + result = self.save() self._cubes = None self.save_provenance() + return result def _update_attributes(self): """Update product attributes from cube metadata.""" @@ -634,6 +643,24 @@ def _apply_multimodel(products, step, debug): return products +def _compute_with_progress(delayeds: Iterable[Delayed]) -> None: + """Compute delayeds while displaying a progress bar.""" + try: + distributed.get_client() + except ValueError: + use_distributed = False + else: + use_distributed = True + + if use_distributed: + futures = dask.persist(delayeds) + distributed.progress(futures, notebook=False) + dask.compute(futures) + else: + with dask.diagnostics.ProgressBar(): + dask.compute(delayeds) + + class PreprocessingTask(BaseTask): """Task for running the preprocessor.""" @@ -703,6 +730,7 @@ def _run(self, _): blocks = get_step_blocks(steps, self.order) saved = set() + delayeds = [] for block in blocks: logger.debug("Running block %s", block) if block[0] in MULTI_MODEL_FUNCTIONS: @@ -718,14 +746,18 @@ def _run(self, _): product.apply(step, self.debug) if block == blocks[-1]: product.cubes # noqa: B018 pylint: disable=pointless-statement - product.close() + delayed = product.close() + delayeds.append(delayed) saved.add(product.filename) for product in self.products: if product.filename not in saved: product.cubes # noqa: B018 pylint: disable=pointless-statement - product.close() + delayed = product.close() + delayeds.append(delayed) + logger.info("Computing and saving data for task %s", self.name) + _compute_with_progress(delayeds) metadata_files = write_metadata( self.products, self.write_ncl_interface ) diff --git a/esmvalcore/preprocessor/_io.py b/esmvalcore/preprocessor/_io.py index 5f83b1946c..4edb688684 100644 --- a/esmvalcore/preprocessor/_io.py +++ b/esmvalcore/preprocessor/_io.py @@ -5,6 +5,7 @@ import copy import logging import os +from collections.abc import Sequence from itertools import groupby from pathlib import Path from typing import NamedTuple, Optional @@ -17,6 +18,7 @@ import numpy as np import yaml from cf_units import suppress_errors +from dask.delayed import Delayed from iris.cube import CubeList from esmvalcore.cmor.check import CheckLevels @@ -403,19 +405,25 @@ def concatenate(cubes, check_level=CheckLevels.DEFAULT): def save( - cubes, filename, optimize_access="", compress=False, alias="", **kwargs -): + cubes: Sequence[iris.cube.Cube], + filename: Path | str, + optimize_access: str = "", + compress: bool = False, + alias: str = "", + compute: bool = True, + **kwargs, +) -> Delayed | None: """Save iris cubes to file. Parameters ---------- - cubes: iterable of iris.cube.Cube + cubes: Data cubes to be saved - filename: str + filename: Name of target file - optimize_access: str + optimize_access: Set internal NetCDF chunking to favour a reading scheme Values can be map or timeseries, which improve performance when @@ -424,16 +432,30 @@ def save( case the better performance will be avhieved by loading all the values in that coordinate at a time - compress: bool, optional + compress: Use NetCDF internal compression. - alias: str, optional + alias: Var name to use when saving instead of the one in the cube. + compute : bool, default=True + Default is ``True``, meaning complete the file immediately, and return ``None``. + + When ``False``, create the output file but don't write any lazy array content to + its variables, such as lazy cube data or aux-coord points and bounds. + Instead return a :class:`dask.delayed.Delayed` which, when computed, will + stream all the lazy content via :meth:`dask.store`, to complete the file. + Several such data saves can be performed in parallel, by passing a list of them + into a :func:`dask.compute` call. + + **kwargs: + See :func:`iris.fileformats.netcdf.saver.save` for additional + keyword arguments. + Returns ------- - str - filename + :class:`dask.delayed.Delayed` or :obj:`None` + A delayed object that can be used to save the data in the cube. Raises ------ @@ -443,6 +465,9 @@ def save( if not cubes: raise ValueError(f"Cannot save empty cubes '{cubes}'") + if Path(filename).suffix.lower() == ".nc": + kwargs["compute"] = compute + # Rename some arguments kwargs["target"] = filename kwargs["zlib"] = compress @@ -460,7 +485,7 @@ def save( cubes, filename, ) - return filename + return None for cube in cubes: logger.debug( @@ -478,13 +503,11 @@ def save( elif optimize_access == "timeseries": dims = set(cube.coord_dims("time")) else: - dims = tuple() - for coord_dims in ( - cube.coord_dims(dimension) - for dimension in optimize_access.split(" ") - ): - dims += coord_dims - dims = set(dims) + dims = { + dim + for coord_name in optimize_access.split(" ") + for dim in cube.coord_dims(coord_name) + } kwargs["chunksizes"] = tuple( length if index in dims else 1 @@ -510,9 +533,9 @@ def save( category=UserWarning, module="iris", ) - iris.save(cubes, **kwargs) + result = iris.save(cubes, **kwargs) - return filename + return result def _get_debug_filename(filename, step): diff --git a/tests/integration/preprocessor/_io/test_save.py b/tests/integration/preprocessor/_io/test_save.py index 0e4f6b4366..20278fb155 100644 --- a/tests/integration/preprocessor/_io/test_save.py +++ b/tests/integration/preprocessor/_io/test_save.py @@ -1,9 +1,13 @@ """Integration tests for :func:`esmvalcore.preprocessor.save`.""" +import logging +import re + import iris import netCDF4 import numpy as np import pytest +from dask.delayed import Delayed from iris.coords import DimCoord from iris.cube import Cube, CubeList @@ -59,32 +63,51 @@ def _check_chunks(path, expected_chunks): def test_save(cube, filename): """Test save.""" - path = save([cube], filename) - loaded_cube = iris.load_cube(path) + delayed = save([cube], filename) + assert delayed is None + loaded_cube = iris.load_cube(filename) + _compare_cubes(cube, loaded_cube) + + +def test_delayed_save(cube, filename): + """Test save.""" + delayed = save([cube], filename, compute=False) + assert isinstance(delayed, Delayed) + delayed.compute() + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) +def test_save_noop(cube, filename, caplog): + """Test save.""" + cube.data = cube.lazy_data() + save([cube], filename) + with caplog.at_level(logging.DEBUG): + save([cube], filename) + assert re.findall("Not saving cubes .* to avoid data loss.", caplog.text) + + def test_save_create_parent_dir(cube, tmp_path): filename = tmp_path / "preproc" / "something" / "test.nc" - path = save([cube], filename) - loaded_cube = iris.load_cube(path) + save([cube], filename) + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) def test_save_alias(cube, filename): """Test save.""" - path = save([cube], filename, alias="alias") - loaded_cube = iris.load_cube(path) + save([cube], filename, alias="alias") + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) assert loaded_cube.var_name == "alias" def test_save_zlib(cube, filename): """Test save.""" - path = save([cube], filename, compress=True) - loaded_cube = iris.load_cube(path) + save([cube], filename, compress=True) + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) - with netCDF4.Dataset(path, "r") as handler: + with netCDF4.Dataset(filename, "r") as handler: sample_filters = handler.variables["sample"].filters() assert sample_filters["zlib"] is True assert sample_filters["shuffle"] is True @@ -106,32 +129,32 @@ def test_fail_without_filename(cube): def test_save_optimized_map(cube, filename): """Test save.""" - path = save([cube], filename, optimize_access="map") - loaded_cube = iris.load_cube(path) + save([cube], filename, optimize_access="map") + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) - _check_chunks(path, [2, 2, 1]) + _check_chunks(filename, [2, 2, 1]) def test_save_optimized_timeseries(cube, filename): """Test save.""" - path = save([cube], filename, optimize_access="timeseries") - loaded_cube = iris.load_cube(path) + save([cube], filename, optimize_access="timeseries") + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) - _check_chunks(path, [1, 1, 2]) + _check_chunks(filename, [1, 1, 2]) def test_save_optimized_lat(cube, filename): """Test save.""" - path = save([cube], filename, optimize_access="latitude") - loaded_cube = iris.load_cube(path) + save([cube], filename, optimize_access="latitude") + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) expected_chunks = [2, 1, 1] - _check_chunks(path, expected_chunks) + _check_chunks(filename, expected_chunks) def test_save_optimized_lon_time(cube, filename): """Test save.""" - path = save([cube], filename, optimize_access="longitude time") - loaded_cube = iris.load_cube(path) + save([cube], filename, optimize_access="longitude time") + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) - _check_chunks(path, [1, 2, 2]) + _check_chunks(filename, [1, 2, 2]) diff --git a/tests/integration/recipe/test_recipe.py b/tests/integration/recipe/test_recipe.py index f486db1657..90b4985a6e 100644 --- a/tests/integration/recipe/test_recipe.py +++ b/tests/integration/recipe/test_recipe.py @@ -110,6 +110,7 @@ def _get_default_settings_for_chl(save_filename): "save": { "compress": False, "filename": save_filename, + "compute": False, }, } return defaults @@ -693,6 +694,7 @@ def test_default_fx_preprocessor(tmp_path, patched_datafinder, session): "save": { "compress": False, "filename": product.filename, + "compute": False, }, } assert product.settings == defaults diff --git a/tests/unit/preprocessor/test_runner.py b/tests/unit/preprocessor/test_runner.py index 2c9640e866..aa5d4b6633 100644 --- a/tests/unit/preprocessor/test_runner.py +++ b/tests/unit/preprocessor/test_runner.py @@ -1,5 +1,8 @@ +import operator from pathlib import Path +import dask +import distributed import iris.cube import pytest @@ -68,3 +71,15 @@ def test_preprocess_debug(mocker, debug): ) else: esmvalcore.preprocessor.save.assert_not_called() + + +@pytest.mark.parametrize("use_distributed", [False, True]) +def test_compute_with_progress(capsys, use_distributed): + if use_distributed: + distributed.Client(n_workers=1, threads_per_worker=1) + + delayeds = [dask.delayed(operator.add)(1, 1)] + esmvalcore.preprocessor._compute_with_progress(delayeds) + # Assert that some progress bar has been written to stdout. + progressbar = capsys.readouterr().out + assert progressbar diff --git a/tests/unit/recipe/test_recipe.py b/tests/unit/recipe/test_recipe.py index 9934f02d3b..5acc625c8d 100644 --- a/tests/unit/recipe/test_recipe.py +++ b/tests/unit/recipe/test_recipe.py @@ -243,7 +243,10 @@ def test_multi_model_filename_full(): def test_update_multiproduct_multi_model_statistics(): """Test ``_update_multiproduct``.""" - settings = {"multi_model_statistics": {"statistics": ["mean", "std_dev"]}} + settings = { + "multi_model_statistics": {"statistics": ["mean", "std_dev"]}, + "save": {"compute": False}, + } common_attributes = { "project": "CMIP6", "diagnostic": "d", @@ -358,6 +361,7 @@ def test_update_multiproduct_multi_model_statistics_percentile(): {"operator": "percentile", "percent": 95.0}, ] }, + "save": {"compute": False}, } common_attributes = { "project": "CMIP6", @@ -468,7 +472,8 @@ def test_update_multiproduct_multi_model_statistics_percentile(): def test_update_multiproduct_ensemble_statistics(): """Test ``_update_multiproduct``.""" settings = { - "ensemble_statistics": {"statistics": ["median"], "span": "full"} + "ensemble_statistics": {"statistics": ["median"], "span": "full"}, + "save": {"compute": False}, } common_attributes = { "dataset": "CanESM2", @@ -539,6 +544,7 @@ def test_update_multiproduct_ensemble_statistics_percentile(): ], "span": "full", }, + "save": {"compute": False}, } common_attributes = { @@ -773,7 +779,11 @@ def test_get_default_settings(mocker): settings = _recipe._get_default_settings(dataset) assert settings == { "remove_supplementary_variables": {}, - "save": {"compress": False, "alias": "sic"}, + "save": { + "compress": False, + "alias": "sic", + "compute": False, + }, }