Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save all files in a task at the same time to avoid recomputing intermediate results #2522

Merged
merged 33 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4599bf6
Various improvements to lazy preprocessor functions
bouweandela Sep 11, 2024
692ef21
Add import
bouweandela Sep 11, 2024
773e6e2
Propagate compute argument to save for multi model functions
bouweandela Sep 12, 2024
5b964c2
Add log message and fix test
bouweandela Sep 12, 2024
8ccdc0e
Use ruff formatting
bouweandela Sep 26, 2024
a657567
Merge remote-tracking branch 'origin/main' into delayed-save
bouweandela Sep 26, 2024
aabf7b3
Fix style issue
bouweandela Sep 26, 2024
76b43d0
Merge branch 'main' of github.com:ESMValGroup/ESMValCore into delayed…
bouweandela Oct 14, 2024
4f57e68
Add tests
bouweandela Oct 14, 2024
97a894e
Copy iris docstring for more visibility
bouweandela Oct 14, 2024
c8104e4
Pass compute arg to iris save function
bouweandela Oct 14, 2024
d4567a6
Merge branch 'main' of github.com:ESMValGroup/ESMValCore into delayed…
bouweandela Oct 17, 2024
709dd05
Merge branch 'main' into delayed-save
valeriupredoi Oct 21, 2024
85725a9
Merge branch 'main' into delayed-save
bouweandela Oct 23, 2024
c4f8f8d
Add progress bars
bouweandela Nov 27, 2024
bc15877
Improve configuration
bouweandela Nov 27, 2024
6a82e04
Add rich as dependency
bouweandela Nov 27, 2024
3f3f533
Merge branch 'main' of github.com:ESMValGroup/ESMValCore into delayed…
bouweandela Nov 27, 2024
fdde677
Add default logging configuration
bouweandela Nov 27, 2024
808b56e
Merge branch 'main' of github.com:ESMValGroup/ESMValCore into delayed…
bouweandela Nov 27, 2024
a5f8f8f
Fix case where save is called with compute=True for all files
bouweandela Nov 27, 2024
ef87f10
Improve type hints
bouweandela Nov 27, 2024
2162dc5
Longer test
bouweandela Nov 27, 2024
2e01812
Improve test coverage
bouweandela Nov 28, 2024
acfc3ab
Fix code style issues
bouweandela Nov 28, 2024
9f3d784
Make Codacy happy
bouweandela Nov 29, 2024
bd32302
Merge branch 'main' of github.com:ESMValGroup/ESMValCore into delayed…
bouweandela Nov 29, 2024
c70dfe2
Improve docs
bouweandela Nov 29, 2024
10d253f
Improve bars and default interval
bouweandela Nov 29, 2024
98ad5e4
Merge branch 'main' of github.com:ESMValGroup/ESMValCore into delayed…
bouweandela Nov 29, 2024
ec3318c
Add option to disable progress info and improve docs
bouweandela Nov 29, 2024
866c216
Improve test coverage
bouweandela Nov 29, 2024
e60520a
Simplify distributed task bar
bouweandela Nov 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions doc/quickstart/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,38 @@ For example, Python's ``None`` is YAML's ``null``, Python's ``True`` is YAML's
will be downloaded; otherwise, local data will be used.


.. _config-logging:

Logging configuration
schlunma marked this conversation as resolved.
Show resolved Hide resolved
=====================

These options can be specified under the ``"logging"`` section in the
schlunma marked this conversation as resolved.
Show resolved Hide resolved
configuration file.

Example:

.. code:: yaml

logging:
log_progress_interval: 10s
schlunma marked this conversation as resolved.
Show resolved Hide resolved

will log progress with Dask computations every 10 seconds instead of showing a
progress bar. If ``max_parallel_tasks`` is not equal to 1, progress is always
logged.
schlunma marked this conversation as resolved.
Show resolved Hide resolved

Available options:

+-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+
| Option | Description | Type | Default value |
+===============================+========================================+=============================+========================================+
| ``log_progress_interval`` | When running computations with Dask, | :obj:`str` or :obj:`float` | 0 |
| | log progress every | | |
| | ``log_progress_interval`` instead of | | |
| | showing a progress bar. The value can | | |
| | be specified in the format accepted by | | |
| | :func:`dask.utils.parse_timedelta`. | | |
+-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+

.. _config-dask:

Dask configuration
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies:
- python-stratify >=0.3
- pyyaml
- requests
- rich
- scipy >=1.6
- shapely >=2.0.0
- yamale
Expand Down
7 changes: 6 additions & 1 deletion esmvalcore/_recipe/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def _get_default_settings(dataset):
settings["remove_supplementary_variables"] = {}

# Configure saving cubes to file
settings["save"] = {"compress": session["compress_netcdf"]}
settings["save"] = {
"compress": session["compress_netcdf"],
"compute": False,
}
if facets["short_name"] != facets["original_short_name"]:
settings["save"]["alias"] = facets["short_name"]

Expand Down Expand Up @@ -381,6 +384,8 @@ def _get_downstream_settings(step, order, products):
if key in remaining_steps:
if all(p.settings.get(key, object()) == value for p in products):
settings[key] = value
# Set the compute argument to the save step.
settings["save"] = {"compute": some_product.settings["save"]["compute"]}
return settings


Expand Down
1 change: 1 addition & 0 deletions esmvalcore/config/_config_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ def validate_extra_facets_dir(value):
"exit_on_warning": validate_bool,
"extra_facets_dir": validate_extra_facets_dir,
"log_level": validate_string,
"logging": validate_dict,
"max_datasets": validate_int_positive_or_none,
"max_parallel_tasks": validate_int_or_none,
"max_years": validate_int_positive_or_none,
Expand Down
2 changes: 2 additions & 0 deletions esmvalcore/config/configurations/defaults/logging.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
logging:
log_progress_interval: 0.
29 changes: 21 additions & 8 deletions esmvalcore/preprocessor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pprint import pformat
from typing import Any, Iterable

from dask.delayed import Delayed
from iris.cube import Cube

from .._provenance import TrackedFile
Expand All @@ -25,6 +26,7 @@
)
from ._compare_with_refs import bias, distance_metric
from ._cycles import amplitude
from ._dask_progress import _compute_with_progress
from ._derive import derive
from ._detrend import detrend
from ._io import (
Expand Down Expand Up @@ -428,6 +430,9 @@ def preprocess(
)
)

if step == "save":
return result

items = []
for item in result:
if isinstance(item, (PreprocessorFile, Cube, str, Path)):
Expand Down Expand Up @@ -536,22 +541,24 @@ def cubes(self):
def cubes(self, value):
self._cubes = value

def save(self):
def save(self) -> Delayed | None:
"""Save cubes to disk."""
preprocess(
return preprocess(
self._cubes,
"save",
input_files=self._input_files,
**self.settings["save"],
)
)[0]

def close(self):
def close(self) -> Delayed | None:
"""Close the file."""
result = None
if self._cubes is not None:
self._update_attributes()
self.save()
result = self.save()
self._cubes = None
self.save_provenance()
return result

def _update_attributes(self):
"""Update product attributes from cube metadata."""
Expand Down Expand Up @@ -693,7 +700,7 @@ def _initialize_products(self, products):
for product in products:
product.initialize_provenance(self.activity)

def _run(self, _):
def _run(self, _) -> list[str]:
"""Run the preprocessor."""
self._initialize_product_provenance()

Expand All @@ -703,6 +710,7 @@ def _run(self, _):
blocks = get_step_blocks(steps, self.order)

saved = set()
delayeds = []
for block in blocks:
logger.debug("Running block %s", block)
if block[0] in MULTI_MODEL_FUNCTIONS:
Expand All @@ -718,14 +726,19 @@ def _run(self, _):
product.apply(step, self.debug)
if block == blocks[-1]:
product.cubes # noqa: B018 pylint: disable=pointless-statement
product.close()
delayed = product.close()
delayeds.append(delayed)
saved.add(product.filename)

for product in self.products:
if product.filename not in saved:
product.cubes # noqa: B018 pylint: disable=pointless-statement
product.close()
delayed = product.close()
delayeds.append(delayed)

logger.info("Computing and saving data for task %s", self.name)
delayeds = [d for d in delayeds if d is not None]
_compute_with_progress(delayeds, description=self.name)
metadata_files = write_metadata(
self.products, self.write_ncl_interface
)
Expand Down
Loading