Skip to content

Commit

Permalink
Various improvements to lazy preprocessor functions
Browse files Browse the repository at this point in the history
  • Loading branch information
bouweandela committed Sep 11, 2024
1 parent 21ce35a commit 4599bf6
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 53 deletions.
5 changes: 4 additions & 1 deletion esmvalcore/_recipe/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ def _get_default_settings(dataset):
settings['remove_supplementary_variables'] = {}

# Configure saving cubes to file
settings['save'] = {'compress': session['compress_netcdf']}
settings['save'] = {
'compress': session['compress_netcdf'],
'compute': False,
}
if facets['short_name'] != facets['original_short_name']:
settings['save']['alias'] = facets['short_name']

Expand Down
50 changes: 41 additions & 9 deletions esmvalcore/preprocessor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from pprint import pformat
from typing import Any, Iterable

import dask
import distributed
from dask.delayed import Delayed
from iris.cube import Cube

from .._provenance import TrackedFile
Expand Down Expand Up @@ -401,6 +404,9 @@ def preprocess(
result.append(_run_preproc_function(function, item, settings,
input_files=input_files))

if step == 'save':
return result

items = []
for item in result:
if isinstance(item, (PreprocessorFile, Cube, str, Path)):
Expand Down Expand Up @@ -506,20 +512,24 @@ def cubes(self):
def cubes(self, value):
self._cubes = value

def save(self):
def save(self) -> list[Delayed] | None:
"""Save cubes to disk."""
preprocess(self._cubes,
'save',
input_files=self._input_files,
**self.settings['save'])
return preprocess(
self._cubes,
'save',
input_files=self._input_files,
**self.settings['save'],
)

def close(self):
def close(self) -> list[Delayed] | None:
"""Close the file."""
result = None
if self._cubes is not None:
self._update_attributes()
self.save()
result = self.save()
self._cubes = None
self.save_provenance()
return result

def _update_attributes(self):
"""Update product attributes from cube metadata."""
Expand Down Expand Up @@ -600,6 +610,24 @@ def _apply_multimodel(products, step, debug):
return products


def _compute_with_progress(delayeds: Iterable[Delayed]) -> None:
"""Compute delayeds while displaying a progress bar."""
try:
distributed.get_client()
except ValueError:
use_distributed = False
else:
use_distributed = True

if use_distributed:
futures = dask.persist(delayeds)
distributed.progress(futures, notebook=False)
dask.compute(futures)
else:
with dask.diagnostics.ProgressBar():
dask.compute(delayeds)


class PreprocessingTask(BaseTask):
"""Task for running the preprocessor."""

Expand Down Expand Up @@ -670,6 +698,7 @@ def _run(self, _):
blocks = get_step_blocks(steps, self.order)

saved = set()
delayeds = []
for block in blocks:
logger.debug("Running block %s", block)
if block[0] in MULTI_MODEL_FUNCTIONS:
Expand All @@ -684,14 +713,17 @@ def _run(self, _):
product.apply(step, self.debug)
if block == blocks[-1]:
product.cubes # pylint: disable=pointless-statement
product.close()
delayed = product.close()
delayeds.append(delayed)
saved.add(product.filename)

for product in self.products:
if product.filename not in saved:
product.cubes # pylint: disable=pointless-statement
product.close()
delayed = product.close()
delayeds.append(delayed)

_compute_with_progress(delayeds)
metadata_files = write_metadata(self.products,
self.write_ncl_interface)
return metadata_files
Expand Down
50 changes: 29 additions & 21 deletions esmvalcore/preprocessor/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import copy
import logging
import os
from collections.abc import Sequence
from itertools import groupby
from pathlib import Path
from typing import NamedTuple, Optional
Expand All @@ -16,6 +17,7 @@
import numpy as np
import yaml
from cf_units import suppress_errors
from dask.delayed import Delayed
from iris.cube import CubeList

from esmvalcore.cmor.check import CheckLevels
Expand Down Expand Up @@ -345,23 +347,25 @@ def concatenate(cubes, check_level=CheckLevels.DEFAULT):
return result


def save(cubes,
filename,
optimize_access='',
compress=False,
alias='',
**kwargs):
def save(
cubes: Sequence[iris.cube.Cube],
filename: Path | str,
optimize_access: str = '',
compress: bool = False,
alias: str = '',
**kwargs,
) -> Delayed | None:
"""Save iris cubes to file.
Parameters
----------
cubes: iterable of iris.cube.Cube
cubes:
Data cubes to be saved
filename: str
filename:
Name of target file
optimize_access: str
optimize_access:
Set internal NetCDF chunking to favour a reading scheme
Values can be map or timeseries, which improve performance when
Expand All @@ -370,16 +374,20 @@ def save(cubes,
case the better performance will be avhieved by loading all the values
in that coordinate at a time
compress: bool, optional
compress:
Use NetCDF internal compression.
alias: str, optional
alias:
Var name to use when saving instead of the one in the cube.
**kwargs:
See :meth:`iris.fileformats.netcdf.saver.save` for additional
keyword arguments.
Returns
-------
str
filename
:class:`dask.delayed.Delayed` or :obj:`None`
A delayed object that can be used to save the data in the cube.
Raises
------
Expand All @@ -402,7 +410,7 @@ def save(cubes,
logger.debug(
"Not saving cubes %s to %s to avoid data loss. "
"The cube is probably unchanged.", cubes, filename)
return filename
return None

for cube in cubes:
logger.debug("Saving cube:\n%s\nwith %s data to %s", cube,
Expand All @@ -415,11 +423,11 @@ def save(cubes,
elif optimize_access == 'timeseries':
dims = set(cube.coord_dims('time'))
else:
dims = tuple()
for coord_dims in (cube.coord_dims(dimension)
for dimension in optimize_access.split(' ')):
dims += coord_dims
dims = set(dims)
dims = {
dim
for coord_name in optimize_access.split(' ')
for dim in cube.coord_dims(coord_name)
}

kwargs['chunksizes'] = tuple(
length if index in dims else 1
Expand All @@ -444,9 +452,9 @@ def save(cubes,
category=UserWarning,
module='iris',
)
iris.save(cubes, **kwargs)
result = iris.save(cubes, **kwargs)

return filename
return result


def _get_debug_filename(filename, step):
Expand Down
53 changes: 32 additions & 21 deletions tests/integration/preprocessor/_io/test_save.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import netCDF4
import numpy as np
import pytest
from dask.delayed import Delayed
from iris.coords import DimCoord
from iris.cube import Cube, CubeList

Expand Down Expand Up @@ -57,32 +58,42 @@ def _check_chunks(path, expected_chunks):

def test_save(cube, filename):
"""Test save."""
path = save([cube], filename)
loaded_cube = iris.load_cube(path)
delayed = save([cube], filename)
assert delayed is None
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)


def test_delayed_save(cube, filename):
"""Test save."""
delayed = save([cube], filename, compute=False)
assert isinstance(delayed, Delayed)
delayed.compute()
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)


def test_save_create_parent_dir(cube, tmp_path):
filename = tmp_path / 'preproc' / 'something' / 'test.nc'
path = save([cube], filename)
loaded_cube = iris.load_cube(path)
save([cube], filename)
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)


def test_save_alias(cube, filename):
"""Test save."""
path = save([cube], filename, alias='alias')
loaded_cube = iris.load_cube(path)
save([cube], filename, alias='alias')
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
assert loaded_cube.var_name == 'alias'


def test_save_zlib(cube, filename):
"""Test save."""
path = save([cube], filename, compress=True)
loaded_cube = iris.load_cube(path)
save([cube], filename, compress=True)
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
with netCDF4.Dataset(path, 'r') as handler:
with netCDF4.Dataset(filename, 'r') as handler:
sample_filters = handler.variables['sample'].filters()
assert sample_filters['zlib'] is True
assert sample_filters['shuffle'] is True
Expand All @@ -104,32 +115,32 @@ def test_fail_without_filename(cube):

def test_save_optimized_map(cube, filename):
"""Test save."""
path = save([cube], filename, optimize_access='map')
loaded_cube = iris.load_cube(path)
save([cube], filename, optimize_access='map')
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
_check_chunks(path, [2, 2, 1])
_check_chunks(filename, [2, 2, 1])


def test_save_optimized_timeseries(cube, filename):
"""Test save."""
path = save([cube], filename, optimize_access='timeseries')
loaded_cube = iris.load_cube(path)
save([cube], filename, optimize_access='timeseries')
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
_check_chunks(path, [1, 1, 2])
_check_chunks(filename, [1, 1, 2])


def test_save_optimized_lat(cube, filename):
"""Test save."""
path = save([cube], filename, optimize_access='latitude')
loaded_cube = iris.load_cube(path)
save([cube], filename, optimize_access='latitude')
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
expected_chunks = [2, 1, 1]
_check_chunks(path, expected_chunks)
_check_chunks(filename, expected_chunks)


def test_save_optimized_lon_time(cube, filename):
"""Test save."""
path = save([cube], filename, optimize_access='longitude time')
loaded_cube = iris.load_cube(path)
save([cube], filename, optimize_access='longitude time')
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
_check_chunks(path, [1, 2, 2])
_check_chunks(filename, [1, 2, 2])
2 changes: 2 additions & 0 deletions tests/integration/recipe/test_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def _get_default_settings_for_chl(save_filename):
'save': {
'compress': False,
'filename': save_filename,
'compute': False,
}
}
return defaults
Expand Down Expand Up @@ -682,6 +683,7 @@ def test_default_fx_preprocessor(tmp_path, patched_datafinder, session):
'save': {
'compress': False,
'filename': product.filename,
'compute': False,
}
}
assert product.settings == defaults
Expand Down
6 changes: 5 additions & 1 deletion tests/unit/recipe/test_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,11 @@ def test_get_default_settings(mocker):
settings = _recipe._get_default_settings(dataset)
assert settings == {
'remove_supplementary_variables': {},
'save': {'compress': False, 'alias': 'sic'},
'save': {
'compress': False,
'alias': 'sic',
'compute': False,
},
}


Expand Down

0 comments on commit 4599bf6

Please sign in to comment.