diff --git a/esmvalcore/_recipe/recipe.py b/esmvalcore/_recipe/recipe.py index 1f2eb63488..ea20f5353a 100644 --- a/esmvalcore/_recipe/recipe.py +++ b/esmvalcore/_recipe/recipe.py @@ -213,7 +213,10 @@ def _get_default_settings(dataset): settings['remove_supplementary_variables'] = {} # Configure saving cubes to file - settings['save'] = {'compress': session['compress_netcdf']} + settings['save'] = { + 'compress': session['compress_netcdf'], + 'compute': False, + } if facets['short_name'] != facets['original_short_name']: settings['save']['alias'] = facets['short_name'] diff --git a/esmvalcore/preprocessor/__init__.py b/esmvalcore/preprocessor/__init__.py index 3800fb1413..e45e9cbb01 100644 --- a/esmvalcore/preprocessor/__init__.py +++ b/esmvalcore/preprocessor/__init__.py @@ -8,6 +8,9 @@ from pprint import pformat from typing import Any, Iterable +import dask +import distributed +from dask.delayed import Delayed from iris.cube import Cube from .._provenance import TrackedFile @@ -401,6 +404,9 @@ def preprocess( result.append(_run_preproc_function(function, item, settings, input_files=input_files)) + if step == 'save': + return result + items = [] for item in result: if isinstance(item, (PreprocessorFile, Cube, str, Path)): @@ -506,20 +512,24 @@ def cubes(self): def cubes(self, value): self._cubes = value - def save(self): + def save(self) -> list[Delayed] | None: """Save cubes to disk.""" - preprocess(self._cubes, - 'save', - input_files=self._input_files, - **self.settings['save']) + return preprocess( + self._cubes, + 'save', + input_files=self._input_files, + **self.settings['save'], + ) - def close(self): + def close(self) -> list[Delayed] | None: """Close the file.""" + result = None if self._cubes is not None: self._update_attributes() - self.save() + result = self.save() self._cubes = None self.save_provenance() + return result def _update_attributes(self): """Update product attributes from cube metadata.""" @@ -600,6 +610,23 @@ def _apply_multimodel(products, step, debug): return products +def _compute_with_progress(delayeds: Iterable[Delayed]) -> None: + """Compute delayeds while displaying a progress bar.""" + try: + distributed.get_client() + except ValueError: + use_distributed = False + else: + use_distributed = True + + if use_distributed: + futures = dask.persist(delayeds) + distributed.progress(futures, notebook=False, multi=False, complete=False) + dask.compute(futures) + else: + with dask.diagnostics.ProgressBar(): + dask.compute(delayeds) + class PreprocessingTask(BaseTask): """Task for running the preprocessor.""" @@ -670,6 +697,7 @@ def _run(self, _): blocks = get_step_blocks(steps, self.order) saved = set() + delayeds = [] for block in blocks: logger.debug("Running block %s", block) if block[0] in MULTI_MODEL_FUNCTIONS: @@ -684,14 +712,19 @@ def _run(self, _): product.apply(step, self.debug) if block == blocks[-1]: product.cubes # pylint: disable=pointless-statement - product.close() + delayed = product.close() + delayeds.append(delayed) + print(f'saving {product.filename} using {delayed}') saved.add(product.filename) for product in self.products: if product.filename not in saved: product.cubes # pylint: disable=pointless-statement - product.close() + delayed = product.close() + print(f'saving {product.filename} using {delayed}') + delayeds.append(delayed) + _compute_with_progress(delayeds) metadata_files = write_metadata(self.products, self.write_ncl_interface) return metadata_files diff --git a/esmvalcore/preprocessor/_io.py b/esmvalcore/preprocessor/_io.py index 8f5202188b..89fcc06b70 100644 --- a/esmvalcore/preprocessor/_io.py +++ b/esmvalcore/preprocessor/_io.py @@ -4,6 +4,7 @@ import copy import logging import os +from collections.abc import Sequence from itertools import groupby from pathlib import Path from typing import NamedTuple, Optional @@ -16,6 +17,7 @@ import numpy as np import yaml from cf_units import suppress_errors +from dask.delayed import Delayed from iris.cube import CubeList from esmvalcore.cmor.check import CheckLevels @@ -347,23 +349,25 @@ def concatenate(cubes, check_level=CheckLevels.DEFAULT): return result -def save(cubes, - filename, - optimize_access='', - compress=False, - alias='', - **kwargs): +def save( + cubes: Sequence[iris.cube.Cube], + filename: Path | str, + optimize_access: str = '', + compress: bool = False, + alias: str = '', + **kwargs, +) -> Delayed | None: """Save iris cubes to file. Parameters ---------- - cubes: iterable of iris.cube.Cube + cubes: Data cubes to be saved - filename: str + filename: Name of target file - optimize_access: str + optimize_access: Set internal NetCDF chunking to favour a reading scheme Values can be map or timeseries, which improve performance when @@ -372,16 +376,20 @@ def save(cubes, case the better performance will be avhieved by loading all the values in that coordinate at a time - compress: bool, optional + compress: Use NetCDF internal compression. - alias: str, optional + alias: Var name to use when saving instead of the one in the cube. + **kwargs: + See :meth:`iris.fileformats.netcdf.saver.save` for additional + keyword arguments. + Returns ------- - str - filename + :class:`dask.delayed.Delayed` or :obj:`None` + A delayed object that can be used to save the data in the cube. Raises ------ @@ -404,7 +412,7 @@ def save(cubes, logger.debug( "Not saving cubes %s to %s to avoid data loss. " "The cube is probably unchanged.", cubes, filename) - return filename + return None for cube in cubes: logger.debug("Saving cube:\n%s\nwith %s data to %s", cube, @@ -417,11 +425,11 @@ def save(cubes, elif optimize_access == 'timeseries': dims = set(cube.coord_dims('time')) else: - dims = tuple() - for coord_dims in (cube.coord_dims(dimension) - for dimension in optimize_access.split(' ')): - dims += coord_dims - dims = set(dims) + dims = { + dim + for coord_name in optimize_access.split(' ') + for dim in cube.coord_dims(coord_name) + } kwargs['chunksizes'] = tuple( length if index in dims else 1 @@ -446,9 +454,9 @@ def save(cubes, category=UserWarning, module='iris', ) - iris.save(cubes, **kwargs) + result = iris.save(cubes, **kwargs) - return filename + return result def _get_debug_filename(filename, step): diff --git a/tests/integration/preprocessor/_io/test_save.py b/tests/integration/preprocessor/_io/test_save.py index 3d8b127703..2c0b56e42e 100644 --- a/tests/integration/preprocessor/_io/test_save.py +++ b/tests/integration/preprocessor/_io/test_save.py @@ -3,6 +3,7 @@ import netCDF4 import numpy as np import pytest +from dask.delayed import Delayed from iris.coords import DimCoord from iris.cube import Cube, CubeList @@ -57,32 +58,42 @@ def _check_chunks(path, expected_chunks): def test_save(cube, filename): """Test save.""" - path = save([cube], filename) - loaded_cube = iris.load_cube(path) + delayed = save([cube], filename) + assert delayed is None + loaded_cube = iris.load_cube(filename) + _compare_cubes(cube, loaded_cube) + + +def test_delayed_save(cube, filename): + """Test save.""" + delayed = save([cube], filename, compute=False) + assert isinstance(delayed, Delayed) + delayed.compute() + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) def test_save_create_parent_dir(cube, tmp_path): filename = tmp_path / 'preproc' / 'something' / 'test.nc' - path = save([cube], filename) - loaded_cube = iris.load_cube(path) + save([cube], filename) + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) def test_save_alias(cube, filename): """Test save.""" - path = save([cube], filename, alias='alias') - loaded_cube = iris.load_cube(path) + save([cube], filename, alias='alias') + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) assert loaded_cube.var_name == 'alias' def test_save_zlib(cube, filename): """Test save.""" - path = save([cube], filename, compress=True) - loaded_cube = iris.load_cube(path) + save([cube], filename, compress=True) + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) - with netCDF4.Dataset(path, 'r') as handler: + with netCDF4.Dataset(filename, 'r') as handler: sample_filters = handler.variables['sample'].filters() assert sample_filters['zlib'] is True assert sample_filters['shuffle'] is True @@ -104,32 +115,32 @@ def test_fail_without_filename(cube): def test_save_optimized_map(cube, filename): """Test save.""" - path = save([cube], filename, optimize_access='map') - loaded_cube = iris.load_cube(path) + save([cube], filename, optimize_access='map') + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) - _check_chunks(path, [2, 2, 1]) + _check_chunks(filename, [2, 2, 1]) def test_save_optimized_timeseries(cube, filename): """Test save.""" - path = save([cube], filename, optimize_access='timeseries') - loaded_cube = iris.load_cube(path) + save([cube], filename, optimize_access='timeseries') + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) - _check_chunks(path, [1, 1, 2]) + _check_chunks(filename, [1, 1, 2]) def test_save_optimized_lat(cube, filename): """Test save.""" - path = save([cube], filename, optimize_access='latitude') - loaded_cube = iris.load_cube(path) + save([cube], filename, optimize_access='latitude') + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) expected_chunks = [2, 1, 1] - _check_chunks(path, expected_chunks) + _check_chunks(filename, expected_chunks) def test_save_optimized_lon_time(cube, filename): """Test save.""" - path = save([cube], filename, optimize_access='longitude time') - loaded_cube = iris.load_cube(path) + save([cube], filename, optimize_access='longitude time') + loaded_cube = iris.load_cube(filename) _compare_cubes(cube, loaded_cube) - _check_chunks(path, [1, 2, 2]) + _check_chunks(filename, [1, 2, 2]) diff --git a/tests/integration/recipe/test_recipe.py b/tests/integration/recipe/test_recipe.py index d8133fc2b7..139a2cc72c 100644 --- a/tests/integration/recipe/test_recipe.py +++ b/tests/integration/recipe/test_recipe.py @@ -110,6 +110,7 @@ def _get_default_settings_for_chl(save_filename): 'save': { 'compress': False, 'filename': save_filename, + 'compute': False, } } return defaults @@ -682,6 +683,7 @@ def test_default_fx_preprocessor(tmp_path, patched_datafinder, session): 'save': { 'compress': False, 'filename': product.filename, + 'compute': False, } } assert product.settings == defaults diff --git a/tests/unit/recipe/test_recipe.py b/tests/unit/recipe/test_recipe.py index 5e73b2fb92..c272617a52 100644 --- a/tests/unit/recipe/test_recipe.py +++ b/tests/unit/recipe/test_recipe.py @@ -703,7 +703,11 @@ def test_get_default_settings(mocker): settings = _recipe._get_default_settings(dataset) assert settings == { 'remove_supplementary_variables': {}, - 'save': {'compress': False, 'alias': 'sic'}, + 'save': { + 'compress': False, + 'alias': 'sic', + 'compute': False, + }, }