diff --git a/esmvalcore/_task.py b/esmvalcore/_task.py index 04200371cd..d46db0768c 100644 --- a/esmvalcore/_task.py +++ b/esmvalcore/_task.py @@ -17,9 +17,11 @@ from shutil import which from typing import Optional +import dask import psutil import yaml -from distributed import Client +from dask.diagnostics import ProgressBar +from distributed import Client, progress from ._citation import _write_citation_files from ._provenance import TrackedFile, get_task_provenance @@ -712,6 +714,10 @@ def get_independent(self) -> 'TaskSet': independent_tasks.add(task) return independent_tasks + @staticmethod + def _is_preprocessing_task(task): + return hasattr(task, 'lazy_files') + def run(self, max_parallel_tasks: Optional[int] = None) -> None: """Run tasks. @@ -732,6 +738,42 @@ def run(self, max_parallel_tasks: Optional[int] = None) -> None: # Python script. task.settings['scheduler_address'] = address + # Run preprocessor metadata computations + logger.info("Running preprocessor metadata computations") + preprocessing_tasks = [ + t for t in self.flatten() if self._is_preprocessing_task(t) + ] + work = [(t._run(None), t.products, t.lazy_files) + for t in preprocessing_tasks] + if client is None: + with ProgressBar(): + result = dask.compute(*work) + else: + work = dask.persist(*work) + progress(work) + result = dask.compute(*work) + for task, (output_files, products, lazy_files) in zip( + preprocessing_tasks, + result, + ): + task.output_files = output_files + task.products = products + task.lazy_files = lazy_files + + # Fill preprocessor files with data + logger.info("Running preprocessor data computations") + work = [ + f.delayed for t in preprocessing_tasks for f in t.lazy_files + ] + if client is None: + with ProgressBar(): + dask.compute(work) + else: + work = dask.persist(work) + progress(work) + dask.compute(work) + + # Then run remaining tasks if max_parallel_tasks == 1: self._run_sequential() else: @@ -739,7 +781,8 @@ def run(self, max_parallel_tasks: Optional[int] = None) -> None: def _run_sequential(self) -> None: """Run tasks sequentially.""" - n_tasks = len(self.flatten()) + n_tasks = len( + [t for t in self.flatten() if not self._is_preprocessing_task(t)]) logger.info("Running %s tasks sequentially", n_tasks) tasks = self.get_independent() @@ -748,7 +791,10 @@ def _run_sequential(self) -> None: def _run_parallel(self, scheduler_address, max_parallel_tasks): """Run tasks in parallel.""" - scheduled = self.flatten() + scheduled = { + t + for t in self.flatten() if not self._is_preprocessing_task(t) + } running = {} n_tasks = n_scheduled = len(scheduled) diff --git a/esmvalcore/preprocessor/__init__.py b/esmvalcore/preprocessor/__init__.py index 2144e5edce..fce6078225 100644 --- a/esmvalcore/preprocessor/__init__.py +++ b/esmvalcore/preprocessor/__init__.py @@ -2,16 +2,21 @@ from __future__ import annotations import copy +import functools import inspect import logging +import time +from importlib import import_module from pathlib import Path from pprint import pformat -from typing import Any, Iterable +from typing import Any, Callable, Iterable +import dask from iris.cube import Cube from .._provenance import TrackedFile from .._task import BaseTask +from .._version import __version__ from ..cmor.check import cmor_check_data, cmor_check_metadata from ..cmor.fix import fix_data, fix_file, fix_metadata from ._area import ( @@ -27,8 +32,8 @@ from ._derive import derive from ._detrend import detrend from ._io import ( + LazyFile, _get_debug_filename, - _sort_products, concatenate, load, save, @@ -222,6 +227,28 @@ } +def _get_multimodel_products(function_name: str) -> Callable: + """Get function that computes multimodel output products.""" + preproc_fn = globals()[function_name] + module_name = preproc_fn.__module__ + module = import_module(module_name) + output_products_fn_name = f"{function_name}_outputs" + try: + return getattr(module, output_products_fn_name) + except AttributeError as exc: + raise NotImplementedError( + "Please implement the function " + f"{module_name}.{output_products_fn_name}") from exc + + +MULTI_MODEL_FUNCTIONS_OUT_PRODUCTS = { + function: _get_multimodel_products(function) + for function in MULTI_MODEL_FUNCTIONS +} + +_delayed = functools.partial(dask.delayed, pure=False, traverse=False) + + def _get_itype(step): """Get the input type of a preprocessor function.""" function = globals()[step] @@ -291,7 +318,10 @@ def _check_multi_model_settings(products): ) -def _get_multi_model_settings(products, step): +def _get_multi_model_settings( + products: Iterable[PreprocessorFile], + step: str, +) -> tuple[dict[str, Any], set[PreprocessorFile]]: """Select settings for multi model step.""" _check_multi_model_settings(products) settings = {} @@ -360,7 +390,8 @@ def preprocess( ): """Run preprocessor.""" logger.debug("Running preprocessor step %s", step) - function = globals()[step] + import esmvalcore.preprocessor + function = getattr(esmvalcore.preprocessor, step) itype = _get_itype(step) for item in items: @@ -378,7 +409,7 @@ def preprocess( items = [] for item in result: - if isinstance(item, (PreprocessorFile, Cube, str, Path)): + if isinstance(item, (PreprocessorFile, Cube, str, Path, LazyFile)): items.append(item) else: items.extend(item) @@ -464,11 +495,15 @@ def apply(self, step: str, debug: bool = False): raise ValueError( f"PreprocessorFile {self} has no settings for step {step}" ) - self.cubes = preprocess(self.cubes, step, - input_files=self._input_files, - output_file=self.filename, - debug=debug, - **self.settings[step]) + self.cubes = preprocess( + self.cubes, + step, + input_files=self._input_files, + output_file=self.filename, + debug=debug, + **self.settings[step], + ) + self._update_attributes() @property def cubes(self): @@ -483,24 +518,28 @@ def cubes(self, value): def save(self): """Save cubes to disk.""" - preprocess(self._cubes, - 'save', - input_files=self._input_files, - **self.settings['save']) + version = f"Created with ESMValCore v{__version__}" + for cube in self._cubes: + cube.attributes['software'] = version + return preprocess( + self._cubes, + 'save', + input_files=self._input_files, + **self.settings['save'], + )[0] def close(self): """Close the file.""" if self._cubes is not None: - self._update_attributes() - self.save() + lazy_file = self.save() self._cubes = None self.save_provenance() + return lazy_file + raise ValueError(f"{self.filename} has already been closed.") def _update_attributes(self): """Update product attributes from cube metadata.""" - if not self._cubes: - return - ref_cube = self._cubes[0] + ref_cube = self.cubes[0] # Names names = { @@ -533,6 +572,9 @@ def _initialize_entity(self): } self.entity.add_attributes(settings) + def _include_provenance(self): + """Do nothing; already done when saving the cube(s) to file.""" + def group(self, keys: list) -> str: """Generate group keyword. @@ -556,23 +598,62 @@ def group(self, keys: list) -> str: return '_'.join(identifier) -def _apply_multimodel(products, step, debug): +def _get_multimodel_n_outs( + products: list[PreprocessorFile], + steps: list[str], + order: list[str], +) -> dict[str, int]: + n_outs = {} + for step in order[len(INITIAL_STEPS):-len(FINAL_STEPS)]: + if step in steps: + if step in MULTI_MODEL_FUNCTIONS: + settings, exclude = _get_multi_model_settings(products, step) + include = set(products) - exclude + multimodel_outputs = MULTI_MODEL_FUNCTIONS_OUT_PRODUCTS[step] + result = multimodel_outputs(include, **settings) + products = result | exclude + n_outs[step] = len(products) + return n_outs + + +def _apply_multimodel( + products: list[PreprocessorFile], + step: str, + debug: bool, +) -> list[PreprocessorFile]: """Apply multi model step to products.""" settings, exclude = _get_multi_model_settings(products, step) - logger.debug("Applying %s to\n%s", step, - '\n'.join(str(p) for p in products - exclude)) - result = preprocess(products - exclude, step, **settings) - products = set(result) | exclude + include = set(products) - exclude + result = preprocess(include, step, **settings) + result.extend(exclude) if debug: - for product in products: + for product in result: logger.debug("Result %s", product.filename) if not product.is_closed: for cube in product.cubes: logger.debug("with cube %s", cube) - return products + return result + + +def _apply_singlemodel( + product: PreprocessorFile, + steps: list[str], + debug: bool, +) -> PreprocessorFile: + logger.debug("Applying single-model steps to %s", product) + for step in steps: + if step in product.settings: + product.apply(step, debug) + return product + + +def _save(product: PreprocessorFile): + product.cubes # pylint: disable=pointless-statement + file = product.close() + return file class PreprocessingTask(BaseTask): @@ -589,6 +670,7 @@ def __init__( """Initialize.""" _check_multi_model_settings(products) super().__init__(name=name, products=products) + self.lazy_files: list[LazyFile] = [] self.order = list(order) self.debug = debug self.write_ncl_interface = write_ncl_interface @@ -636,39 +718,43 @@ def _initialize_products(self, products): def _run(self, _): """Run the preprocessor.""" + start = time.perf_counter() self._initialize_product_provenance() + logger.info("Initializing provenance of task %s took %.0f seconds", + self.name, time.perf_counter() - start) + start = time.perf_counter() + products = list(self.products) - steps = { - step - for product in self.products for step in product.settings - } + steps = {step for product in products for step in product.settings} blocks = get_step_blocks(steps, self.order) - - saved = set() + n_outs = _get_multimodel_n_outs(products, steps, self.order) for block in blocks: logger.debug("Running block %s", block) if block[0] in MULTI_MODEL_FUNCTIONS: for step in block: - self.products = _apply_multimodel(self.products, step, - self.debug) + products = _delayed( + _apply_multimodel, + nout=n_outs[step], + )( + products, + step, + self.debug, + ) else: - for product in _sort_products(self.products): - logger.debug("Applying single-model steps to %s", product) - for step in block: - if step in product.settings: - product.apply(step, self.debug) - if block == blocks[-1]: - product.cubes # pylint: disable=pointless-statement - product.close() - saved.add(product.filename) - - for product in self.products: - if product.filename not in saved: - product.cubes # pylint: disable=pointless-statement - product.close() - - metadata_files = write_metadata(self.products, - self.write_ncl_interface) + products = [ + _delayed(_apply_singlemodel)(product, block, self.debug) + for product in products + ] + + self.lazy_files = [_delayed(_save)(product) for product in products] + self.products = products + metadata_files = _delayed(write_metadata)( + products, + self.write_ncl_interface, + ) + logger.info( + "Building metadata task graph for task %s took %.0f seconds", + self.name, time.perf_counter() - start) return metadata_files def __str__(self): diff --git a/esmvalcore/preprocessor/_bias.py b/esmvalcore/preprocessor/_bias.py index a8e0f4f2c0..886c1fc7be 100644 --- a/esmvalcore/preprocessor/_bias.py +++ b/esmvalcore/preprocessor/_bias.py @@ -19,6 +19,20 @@ BiasType = Literal['absolute', 'relative'] +def bias_outputs( + products: set[PreprocessorFile] | Iterable[Cube], + ref_cube: Optional[Cube] = None, + bias_type: BiasType = 'absolute', + denominator_mask_threshold: float = 1e-3, + keep_reference_dataset: bool = False, +) -> set[PreprocessorFile]: + outputs = set(products) + if not keep_reference_dataset: + _, ref_product = _get_ref(products, 'reference_for_bias') + outputs.remove(ref_product) + return outputs + + def bias( products: set[PreprocessorFile] | Iterable[Cube], ref_cube: Optional[Cube] = None, diff --git a/esmvalcore/preprocessor/_io.py b/esmvalcore/preprocessor/_io.py index 86460328a6..66fe36fc0c 100644 --- a/esmvalcore/preprocessor/_io.py +++ b/esmvalcore/preprocessor/_io.py @@ -394,6 +394,16 @@ def concatenate(cubes, check_level=CheckLevels.DEFAULT): return result +class LazyFile: + + def __init__(self, filename: Path, delayed): + self.path = filename + self.delayed = delayed + + def __repr__(self): + return f"LazyFile('{self.path}', {self.delayed})" + + def save(cubes, filename, optimize_access='', @@ -438,13 +448,15 @@ def save(cubes, if not cubes: raise ValueError(f"Cannot save empty cubes '{cubes}'") + if hasattr(iris.FUTURE, 'save_split_attrs'): + iris.FUTURE.save_split_attrs = True + # Rename some arguments kwargs['target'] = filename kwargs['zlib'] = compress dirname = os.path.dirname(filename) - if not os.path.exists(dirname): - os.makedirs(dirname) + os.makedirs(dirname, exist_ok=True) if (os.path.exists(filename) and all(cube.has_lazy_data() for cube in cubes)): @@ -481,9 +493,12 @@ def save(cubes, logger.debug('Changing var_name from %s to %s', cube.var_name, alias) cube.var_name = alias - iris.save(cubes, **kwargs) + kwargs['compute'] = False - return filename + delayed = iris.save(cubes, **kwargs) + result = LazyFile(filename, delayed) + + return result def _get_debug_filename(filename, step): @@ -513,6 +528,7 @@ def write_metadata(products, write_ncl=False): output_files = [] for output_dir, prods in groupby(products, lambda p: os.path.dirname(p.filename)): + os.makedirs(output_dir, exist_ok=True) sorted_products = _sort_products(prods) metadata = {} for product in sorted_products: diff --git a/esmvalcore/preprocessor/_mask.py b/esmvalcore/preprocessor/_mask.py index c0e8348d77..7bbd95c7f9 100644 --- a/esmvalcore/preprocessor/_mask.py +++ b/esmvalcore/preprocessor/_mask.py @@ -528,6 +528,10 @@ def _multimodel_mask_products(products, shape): return products +def mask_multimodel_outputs(products) -> set: + return set(products) + + def mask_multimodel(products): """Apply common mask to all datasets (using logical OR). @@ -572,6 +576,15 @@ def mask_multimodel(products): f"got {product_types}") +def mask_fillvalues_outputs( + products, + threshold_fraction, + min_value=None, + time_window=1, +) -> set: + return set(products) + + def mask_fillvalues(products, threshold_fraction, min_value=None, diff --git a/esmvalcore/preprocessor/_multimodel.py b/esmvalcore/preprocessor/_multimodel.py index 5390517a26..141d7537a4 100644 --- a/esmvalcore/preprocessor/_multimodel.py +++ b/esmvalcore/preprocessor/_multimodel.py @@ -677,6 +677,23 @@ def _get_stat_identifier(statistic: str | dict) -> str: return operator +def multi_model_statistics_outputs( + products: set[PreprocessorFile], + span: str, + statistics: list[str], + output_products=None, + groupby: Optional[tuple] = None, + keep_input_datasets: bool = True, + ignore_scalar_coords: bool = False, +) -> set[PreprocessorFile]: + outputs = set() + for group, _ in _group_products(products, by_key=groupby): + outputs |= set(output_products[group].values()) + if keep_input_datasets: + outputs |= products + return outputs + + def multi_model_statistics( products: set[PreprocessorFile] | Iterable[Cube], span: str, @@ -824,6 +841,25 @@ def multi_model_statistics( ) +def ensemble_statistics_outputs( + products: set[PreprocessorFile], + statistics: list[str], + output_products, + span: str = 'overlap', + ignore_scalar_coords: bool = False, +) -> set[PreprocessorFile]: + ensemble_grouping = ('project', 'dataset', 'exp', 'sub_experiment') + return multi_model_statistics_outputs( + products=products, + span=span, + statistics=statistics, + output_products=output_products, + groupby=ensemble_grouping, + keep_input_datasets=False, + ignore_scalar_coords=ignore_scalar_coords, + ) + + def ensemble_statistics( products: set[PreprocessorFile] | Iterable[Cube], statistics: list[str | dict],