Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save all files in a task at the same time to avoid recomputing intermediate results #2522

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion esmvalcore/_recipe/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def _get_default_settings(dataset):
settings["remove_supplementary_variables"] = {}

# Configure saving cubes to file
settings["save"] = {"compress": session["compress_netcdf"]}
settings["save"] = {
"compress": session["compress_netcdf"],
"compute": False,
}
if facets["short_name"] != facets["original_short_name"]:
settings["save"]["alias"] = facets["short_name"]

Expand Down Expand Up @@ -381,6 +384,8 @@ def _get_downstream_settings(step, order, products):
if key in remaining_steps:
if all(p.settings.get(key, object()) == value for p in products):
settings[key] = value
# Set the compute argument to the save step.
settings["save"] = {"compute": some_product.settings["save"]["compute"]}
return settings


Expand Down
44 changes: 38 additions & 6 deletions esmvalcore/preprocessor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
from pprint import pformat
from typing import Any, Iterable

import dask
import dask.diagnostics
import distributed
from dask.delayed import Delayed
from iris.cube import Cube

from .._provenance import TrackedFile
Expand Down Expand Up @@ -428,6 +432,9 @@ def preprocess(
)
)

if step == "save":
return result

items = []
for item in result:
if isinstance(item, (PreprocessorFile, Cube, str, Path)):
Expand Down Expand Up @@ -536,22 +543,24 @@ def cubes(self):
def cubes(self, value):
self._cubes = value

def save(self):
def save(self) -> list[Delayed] | None:
"""Save cubes to disk."""
preprocess(
return preprocess(
self._cubes,
"save",
input_files=self._input_files,
**self.settings["save"],
)

def close(self):
def close(self) -> list[Delayed] | None:
"""Close the file."""
result = None
if self._cubes is not None:
self._update_attributes()
self.save()
result = self.save()
self._cubes = None
self.save_provenance()
return result

def _update_attributes(self):
"""Update product attributes from cube metadata."""
Expand Down Expand Up @@ -634,6 +643,24 @@ def _apply_multimodel(products, step, debug):
return products


def _compute_with_progress(delayeds: Iterable[Delayed]) -> None:
"""Compute delayeds while displaying a progress bar."""
try:
distributed.get_client()
except ValueError:
use_distributed = False
else:
use_distributed = True

if use_distributed:
futures = dask.persist(delayeds)
distributed.progress(futures, notebook=False)
dask.compute(futures)
else:
with dask.diagnostics.ProgressBar():
dask.compute(delayeds)


class PreprocessingTask(BaseTask):
"""Task for running the preprocessor."""

Expand Down Expand Up @@ -703,6 +730,7 @@ def _run(self, _):
blocks = get_step_blocks(steps, self.order)

saved = set()
delayeds = []
for block in blocks:
logger.debug("Running block %s", block)
if block[0] in MULTI_MODEL_FUNCTIONS:
Expand All @@ -718,14 +746,18 @@ def _run(self, _):
product.apply(step, self.debug)
if block == blocks[-1]:
product.cubes # noqa: B018 pylint: disable=pointless-statement
product.close()
delayed = product.close()
delayeds.append(delayed)
saved.add(product.filename)

for product in self.products:
if product.filename not in saved:
product.cubes # noqa: B018 pylint: disable=pointless-statement
product.close()
delayed = product.close()
delayeds.append(delayed)

logger.info("Computing and saving data for task %s", self.name)
_compute_with_progress(delayeds)
metadata_files = write_metadata(
self.products, self.write_ncl_interface
)
Expand Down
61 changes: 42 additions & 19 deletions esmvalcore/preprocessor/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import copy
import logging
import os
from collections.abc import Sequence
from itertools import groupby
from pathlib import Path
from typing import NamedTuple, Optional
Expand All @@ -17,6 +18,7 @@
import numpy as np
import yaml
from cf_units import suppress_errors
from dask.delayed import Delayed
from iris.cube import CubeList

from esmvalcore.cmor.check import CheckLevels
Expand Down Expand Up @@ -403,19 +405,25 @@ def concatenate(cubes, check_level=CheckLevels.DEFAULT):


def save(
cubes, filename, optimize_access="", compress=False, alias="", **kwargs
):
cubes: Sequence[iris.cube.Cube],
filename: Path | str,
optimize_access: str = "",
compress: bool = False,
alias: str = "",
compute: bool = True,
**kwargs,
) -> Delayed | None:
"""Save iris cubes to file.

Parameters
----------
cubes: iterable of iris.cube.Cube
cubes:
Data cubes to be saved

filename: str
filename:
Name of target file

optimize_access: str
optimize_access:
Set internal NetCDF chunking to favour a reading scheme

Values can be map or timeseries, which improve performance when
Expand All @@ -424,16 +432,30 @@ def save(
case the better performance will be avhieved by loading all the values
in that coordinate at a time

compress: bool, optional
compress:
Use NetCDF internal compression.

alias: str, optional
alias:
Var name to use when saving instead of the one in the cube.

compute : bool, default=True
Default is ``True``, meaning complete the file immediately, and return ``None``.

When ``False``, create the output file but don't write any lazy array content to
its variables, such as lazy cube data or aux-coord points and bounds.
Instead return a :class:`dask.delayed.Delayed` which, when computed, will
stream all the lazy content via :meth:`dask.store`, to complete the file.
Several such data saves can be performed in parallel, by passing a list of them
into a :func:`dask.compute` call.

**kwargs:
See :func:`iris.fileformats.netcdf.saver.save` for additional
keyword arguments.

Returns
-------
str
filename
:class:`dask.delayed.Delayed` or :obj:`None`
A delayed object that can be used to save the data in the cube.

Raises
------
Expand All @@ -443,6 +465,9 @@ def save(
if not cubes:
raise ValueError(f"Cannot save empty cubes '{cubes}'")

if Path(filename).suffix.lower() == ".nc":
kwargs["compute"] = compute

# Rename some arguments
kwargs["target"] = filename
kwargs["zlib"] = compress
Expand All @@ -460,7 +485,7 @@ def save(
cubes,
filename,
)
return filename
return None

for cube in cubes:
logger.debug(
Expand All @@ -478,13 +503,11 @@ def save(
elif optimize_access == "timeseries":
dims = set(cube.coord_dims("time"))
else:
dims = tuple()
for coord_dims in (
cube.coord_dims(dimension)
for dimension in optimize_access.split(" ")
):
dims += coord_dims
dims = set(dims)
dims = {
dim
for coord_name in optimize_access.split(" ")
for dim in cube.coord_dims(coord_name)
}

kwargs["chunksizes"] = tuple(
length if index in dims else 1
Expand All @@ -510,9 +533,9 @@ def save(
category=UserWarning,
module="iris",
)
iris.save(cubes, **kwargs)
result = iris.save(cubes, **kwargs)

return filename
return result


def _get_debug_filename(filename, step):
Expand Down
65 changes: 44 additions & 21 deletions tests/integration/preprocessor/_io/test_save.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Integration tests for :func:`esmvalcore.preprocessor.save`."""

import logging
import re

import iris
import netCDF4
import numpy as np
import pytest
from dask.delayed import Delayed
from iris.coords import DimCoord
from iris.cube import Cube, CubeList

Expand Down Expand Up @@ -59,32 +63,51 @@ def _check_chunks(path, expected_chunks):

def test_save(cube, filename):
"""Test save."""
path = save([cube], filename)
loaded_cube = iris.load_cube(path)
delayed = save([cube], filename)
assert delayed is None
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)


def test_delayed_save(cube, filename):
"""Test save."""
delayed = save([cube], filename, compute=False)
assert isinstance(delayed, Delayed)
delayed.compute()
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)


def test_save_noop(cube, filename, caplog):
"""Test save."""
cube.data = cube.lazy_data()
save([cube], filename)
with caplog.at_level(logging.DEBUG):
save([cube], filename)
assert re.findall("Not saving cubes .* to avoid data loss.", caplog.text)


def test_save_create_parent_dir(cube, tmp_path):
filename = tmp_path / "preproc" / "something" / "test.nc"
path = save([cube], filename)
loaded_cube = iris.load_cube(path)
save([cube], filename)
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)


def test_save_alias(cube, filename):
"""Test save."""
path = save([cube], filename, alias="alias")
loaded_cube = iris.load_cube(path)
save([cube], filename, alias="alias")
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
assert loaded_cube.var_name == "alias"


def test_save_zlib(cube, filename):
"""Test save."""
path = save([cube], filename, compress=True)
loaded_cube = iris.load_cube(path)
save([cube], filename, compress=True)
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
with netCDF4.Dataset(path, "r") as handler:
with netCDF4.Dataset(filename, "r") as handler:
sample_filters = handler.variables["sample"].filters()
assert sample_filters["zlib"] is True
assert sample_filters["shuffle"] is True
Expand All @@ -106,32 +129,32 @@ def test_fail_without_filename(cube):

def test_save_optimized_map(cube, filename):
"""Test save."""
path = save([cube], filename, optimize_access="map")
loaded_cube = iris.load_cube(path)
save([cube], filename, optimize_access="map")
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
_check_chunks(path, [2, 2, 1])
_check_chunks(filename, [2, 2, 1])


def test_save_optimized_timeseries(cube, filename):
"""Test save."""
path = save([cube], filename, optimize_access="timeseries")
loaded_cube = iris.load_cube(path)
save([cube], filename, optimize_access="timeseries")
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
_check_chunks(path, [1, 1, 2])
_check_chunks(filename, [1, 1, 2])


def test_save_optimized_lat(cube, filename):
"""Test save."""
path = save([cube], filename, optimize_access="latitude")
loaded_cube = iris.load_cube(path)
save([cube], filename, optimize_access="latitude")
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
expected_chunks = [2, 1, 1]
_check_chunks(path, expected_chunks)
_check_chunks(filename, expected_chunks)


def test_save_optimized_lon_time(cube, filename):
"""Test save."""
path = save([cube], filename, optimize_access="longitude time")
loaded_cube = iris.load_cube(path)
save([cube], filename, optimize_access="longitude time")
loaded_cube = iris.load_cube(filename)
_compare_cubes(cube, loaded_cube)
_check_chunks(path, [1, 2, 2])
_check_chunks(filename, [1, 2, 2])
2 changes: 2 additions & 0 deletions tests/integration/recipe/test_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def _get_default_settings_for_chl(save_filename):
"save": {
"compress": False,
"filename": save_filename,
"compute": False,
},
}
return defaults
Expand Down Expand Up @@ -693,6 +694,7 @@ def test_default_fx_preprocessor(tmp_path, patched_datafinder, session):
"save": {
"compress": False,
"filename": product.filename,
"compute": False,
},
}
assert product.settings == defaults
Expand Down
Loading