Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof of concept: running metadata and data computations on Dask #2316

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions esmvalcore/_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
from shutil import which
from typing import Optional

import dask
import psutil
import yaml
from distributed import Client
from dask.diagnostics import ProgressBar
from distributed import Client, progress

from ._citation import _write_citation_files
from ._provenance import TrackedFile, get_task_provenance
Expand Down Expand Up @@ -712,6 +714,10 @@
independent_tasks.add(task)
return independent_tasks

@staticmethod
def _is_preprocessing_task(task):
return hasattr(task, 'lazy_files')

def run(self, max_parallel_tasks: Optional[int] = None) -> None:
"""Run tasks.

Expand All @@ -732,14 +738,51 @@
# Python script.
task.settings['scheduler_address'] = address

# Run preprocessor metadata computations
logger.info("Running preprocessor metadata computations")
preprocessing_tasks = [
t for t in self.flatten() if self._is_preprocessing_task(t)
]
work = [(t._run(None), t.products, t.lazy_files)

Check notice on line 746 in esmvalcore/_task.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

esmvalcore/_task.py#L746

Access to a protected member _run of a client class (protected-access)
for t in preprocessing_tasks]
if client is None:
with ProgressBar():
result = dask.compute(*work)
else:
work = dask.persist(*work)
progress(work)
result = dask.compute(*work)
for task, (output_files, products, lazy_files) in zip(
preprocessing_tasks,
result,
):
task.output_files = output_files
task.products = products
task.lazy_files = lazy_files

# Fill preprocessor files with data
logger.info("Running preprocessor data computations")
work = [
f.delayed for t in preprocessing_tasks for f in t.lazy_files
]
if client is None:
with ProgressBar():
dask.compute(work)
else:
work = dask.persist(work)
progress(work)
dask.compute(work)

# Then run remaining tasks
if max_parallel_tasks == 1:
self._run_sequential()
else:
self._run_parallel(address, max_parallel_tasks)

def _run_sequential(self) -> None:
"""Run tasks sequentially."""
n_tasks = len(self.flatten())
n_tasks = len(
[t for t in self.flatten() if not self._is_preprocessing_task(t)])
logger.info("Running %s tasks sequentially", n_tasks)

tasks = self.get_independent()
Expand All @@ -748,7 +791,10 @@

def _run_parallel(self, scheduler_address, max_parallel_tasks):
"""Run tasks in parallel."""
scheduled = self.flatten()
scheduled = {
t
for t in self.flatten() if not self._is_preprocessing_task(t)
}
running = {}

n_tasks = n_scheduled = len(scheduled)
Expand Down
188 changes: 137 additions & 51 deletions esmvalcore/preprocessor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
from __future__ import annotations

import copy
import functools
import inspect
import logging
import time
from importlib import import_module
from pathlib import Path
from pprint import pformat
from typing import Any, Iterable
from typing import Any, Callable, Iterable

import dask
from iris.cube import Cube

from .._provenance import TrackedFile
from .._task import BaseTask
from .._version import __version__
from ..cmor.check import cmor_check_data, cmor_check_metadata
from ..cmor.fix import fix_data, fix_file, fix_metadata
from ._area import (
Expand All @@ -27,8 +32,8 @@
from ._derive import derive
from ._detrend import detrend
from ._io import (
LazyFile,
_get_debug_filename,
_sort_products,
concatenate,
load,
save,
Expand Down Expand Up @@ -222,6 +227,28 @@
}


def _get_multimodel_products(function_name: str) -> Callable:
"""Get function that computes multimodel output products."""
preproc_fn = globals()[function_name]
module_name = preproc_fn.__module__
module = import_module(module_name)
output_products_fn_name = f"{function_name}_outputs"
try:
return getattr(module, output_products_fn_name)
except AttributeError as exc:
raise NotImplementedError(
"Please implement the function "
f"{module_name}.{output_products_fn_name}") from exc


MULTI_MODEL_FUNCTIONS_OUT_PRODUCTS = {
function: _get_multimodel_products(function)
for function in MULTI_MODEL_FUNCTIONS
}

_delayed = functools.partial(dask.delayed, pure=False, traverse=False)


def _get_itype(step):
"""Get the input type of a preprocessor function."""
function = globals()[step]
Expand Down Expand Up @@ -291,7 +318,10 @@
)


def _get_multi_model_settings(products, step):
def _get_multi_model_settings(
products: Iterable[PreprocessorFile],
step: str,
) -> tuple[dict[str, Any], set[PreprocessorFile]]:
"""Select settings for multi model step."""
_check_multi_model_settings(products)
settings = {}
Expand Down Expand Up @@ -360,7 +390,8 @@
):
"""Run preprocessor."""
logger.debug("Running preprocessor step %s", step)
function = globals()[step]
import esmvalcore.preprocessor

Check notice on line 393 in esmvalcore/preprocessor/__init__.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

esmvalcore/preprocessor/__init__.py#L393

Import outside toplevel (esmvalcore.preprocessor) (import-outside-toplevel)
function = getattr(esmvalcore.preprocessor, step)
itype = _get_itype(step)

for item in items:
Expand All @@ -378,7 +409,7 @@

items = []
for item in result:
if isinstance(item, (PreprocessorFile, Cube, str, Path)):
if isinstance(item, (PreprocessorFile, Cube, str, Path, LazyFile)):
items.append(item)
else:
items.extend(item)
Expand Down Expand Up @@ -464,11 +495,15 @@
raise ValueError(
f"PreprocessorFile {self} has no settings for step {step}"
)
self.cubes = preprocess(self.cubes, step,
input_files=self._input_files,
output_file=self.filename,
debug=debug,
**self.settings[step])
self.cubes = preprocess(
self.cubes,
step,
input_files=self._input_files,
output_file=self.filename,
debug=debug,
**self.settings[step],
)
self._update_attributes()

@property
def cubes(self):
Expand All @@ -483,24 +518,28 @@

def save(self):
"""Save cubes to disk."""
preprocess(self._cubes,
'save',
input_files=self._input_files,
**self.settings['save'])
version = f"Created with ESMValCore v{__version__}"
for cube in self._cubes:
cube.attributes['software'] = version
return preprocess(
self._cubes,
'save',
input_files=self._input_files,
**self.settings['save'],
)[0]

def close(self):
"""Close the file."""
if self._cubes is not None:
self._update_attributes()
self.save()
lazy_file = self.save()
self._cubes = None
self.save_provenance()
return lazy_file
raise ValueError(f"{self.filename} has already been closed.")

def _update_attributes(self):
"""Update product attributes from cube metadata."""
if not self._cubes:
return
ref_cube = self._cubes[0]
ref_cube = self.cubes[0]

# Names
names = {
Expand Down Expand Up @@ -533,6 +572,9 @@
}
self.entity.add_attributes(settings)

def _include_provenance(self):
"""Do nothing; already done when saving the cube(s) to file."""

def group(self, keys: list) -> str:
"""Generate group keyword.

Expand All @@ -556,23 +598,62 @@
return '_'.join(identifier)


def _apply_multimodel(products, step, debug):
def _get_multimodel_n_outs(
products: list[PreprocessorFile],
steps: list[str],
order: list[str],
) -> dict[str, int]:
n_outs = {}
for step in order[len(INITIAL_STEPS):-len(FINAL_STEPS)]:
if step in steps:
if step in MULTI_MODEL_FUNCTIONS:
settings, exclude = _get_multi_model_settings(products, step)
include = set(products) - exclude
multimodel_outputs = MULTI_MODEL_FUNCTIONS_OUT_PRODUCTS[step]
result = multimodel_outputs(include, **settings)
products = result | exclude
n_outs[step] = len(products)
return n_outs


def _apply_multimodel(
products: list[PreprocessorFile],
step: str,
debug: bool,
) -> list[PreprocessorFile]:
"""Apply multi model step to products."""
settings, exclude = _get_multi_model_settings(products, step)

logger.debug("Applying %s to\n%s", step,
'\n'.join(str(p) for p in products - exclude))
result = preprocess(products - exclude, step, **settings)
products = set(result) | exclude
include = set(products) - exclude
result = preprocess(include, step, **settings)
result.extend(exclude)

if debug:
for product in products:
for product in result:
logger.debug("Result %s", product.filename)
if not product.is_closed:
for cube in product.cubes:
logger.debug("with cube %s", cube)

return products
return result


def _apply_singlemodel(
product: PreprocessorFile,
steps: list[str],
debug: bool,
) -> PreprocessorFile:
logger.debug("Applying single-model steps to %s", product)
for step in steps:
if step in product.settings:
product.apply(step, debug)
return product


def _save(product: PreprocessorFile):
product.cubes # pylint: disable=pointless-statement
file = product.close()
return file


class PreprocessingTask(BaseTask):
Expand All @@ -589,6 +670,7 @@
"""Initialize."""
_check_multi_model_settings(products)
super().__init__(name=name, products=products)
self.lazy_files: list[LazyFile] = []
self.order = list(order)
self.debug = debug
self.write_ncl_interface = write_ncl_interface
Expand Down Expand Up @@ -636,39 +718,43 @@

def _run(self, _):
"""Run the preprocessor."""
start = time.perf_counter()
self._initialize_product_provenance()
logger.info("Initializing provenance of task %s took %.0f seconds",
self.name, time.perf_counter() - start)
start = time.perf_counter()
products = list(self.products)

steps = {
step
for product in self.products for step in product.settings
}
steps = {step for product in products for step in product.settings}
blocks = get_step_blocks(steps, self.order)

saved = set()
n_outs = _get_multimodel_n_outs(products, steps, self.order)
for block in blocks:
logger.debug("Running block %s", block)
if block[0] in MULTI_MODEL_FUNCTIONS:
for step in block:
self.products = _apply_multimodel(self.products, step,
self.debug)
products = _delayed(
_apply_multimodel,
nout=n_outs[step],
)(
products,
step,
self.debug,
)
else:
for product in _sort_products(self.products):
logger.debug("Applying single-model steps to %s", product)
for step in block:
if step in product.settings:
product.apply(step, self.debug)
if block == blocks[-1]:
product.cubes # pylint: disable=pointless-statement
product.close()
saved.add(product.filename)

for product in self.products:
if product.filename not in saved:
product.cubes # pylint: disable=pointless-statement
product.close()

metadata_files = write_metadata(self.products,
self.write_ncl_interface)
products = [
_delayed(_apply_singlemodel)(product, block, self.debug)
for product in products
]

self.lazy_files = [_delayed(_save)(product) for product in products]
self.products = products
metadata_files = _delayed(write_metadata)(
products,
self.write_ncl_interface,
)
logger.info(
"Building metadata task graph for task %s took %.0f seconds",
self.name, time.perf_counter() - start)
return metadata_files

def __str__(self):
Expand Down
Loading
Loading