Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for configuring Dask distributed #2049

Merged
merged 15 commits into from
Jun 1, 2023
2 changes: 2 additions & 0 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@
(f'https://docs.esmvaltool.org/projects/ESMValCore/en/{rtd_version}/',
None),
'esmvaltool': (f'https://docs.esmvaltool.org/en/{rtd_version}/', None),
'dask': ('https://docs.dask.org/en/stable/', None),
'distributed': ('https://distributed.dask.org/en/stable/', None),
'iris': ('https://scitools-iris.readthedocs.io/en/latest/', None),
'iris-esmf-regrid': ('https://iris-esmf-regrid.readthedocs.io/en/latest',
None),
Expand Down
100 changes: 100 additions & 0 deletions doc/quickstart/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,106 @@ the user.
debugging, etc. You can even provide any config user value as a run flag
``--argument_name argument_value``

.. _config-dask:

Dask distributed configuration
==============================

The :ref:`preprocessor functions <preprocessor_functions>` and many of the
:ref:`Python diagnostics in ESMValTool <esmvaltool:recipes>` make use of the
:ref:`Iris <iris:iris_docs>` library to work with the data.
In Iris, data can be either :ref:`real or lazy <iris:real_and_lazy_data>`.
Lazy data is represented by `dask arrays <https://docs.dask.org/en/stable/array.html>`_.
Dask arrays consist of many small
`numpy arrays <https://numpy.org/doc/stable/user/absolute_beginners.html#what-is-an-array>`_
(called chunks) and if possible, computations are run on those small arrays in
parallel.
In order to figure out what needs to be computed when, Dask makes use of a
'`scheduler <https://docs.dask.org/en/stable/scheduling.html>`_'.
The default scheduler in Dask is rather basic, so it can only run on a single
computer and it may not always find the optimal task scheduling solution,
resulting in excessive memory use when using e.g. the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
resulting in excessive memory use when using e.g. the
resulting in excessive memory use when running an already memory-intensive task like e.g. the

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem isn't so much that it's memory-intensive, but that the task graph becomes too complicated for the built-in scheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - for regular Joe the Modeller: moar memory! Let's scare them before they even think of touching anything 😁

:func:`esmvalcore.preprocessor.multi_model_statistics` preprocessor function.
Therefore it is recommended that you take a moment to configure the
`Dask distributed <https://distributed.dask.org>`_ scheduler.
A Dask scheduler and the 'workers' running the actual computations, are
collectively called a 'Dask cluster'.

In ESMValCore, the Dask cluster can configured by creating a file called
``~/.esmvaltool/dask.yml``, where ``~`` is short for your home directory.
In this file, under the ``client`` keyword, the arguments to
:obj:`distributed.Client` can be provided.
Under the ``cluster`` keyword, the type of cluster (e.g.
:obj:`distributed.LocalCluster`), as well as any arguments required to start
the cluster can be provided.

Below are some example configurations:

Create a Dask distruted cluster on the computer running ESMValCore using
bouweandela marked this conversation as resolved.
Show resolved Hide resolved
all available resources:

.. code:: yaml

cluster:
type: distributed.LocalCluster

this should work well for most personal computers.
bouweandela marked this conversation as resolved.
Show resolved Hide resolved

.. note::

Note that, if running this configuration on a shared node of an HPC cluster,
Dask will try and use as many resources it can find available, and this may
lead to overcrowding the node by a single user (you)!


Create a Dask distruted cluster on the computer running ESMValCore, with
bouweandela marked this conversation as resolved.
Show resolved Hide resolved
4 workers with two 2 GiB of memory each (8 GiB in total):

.. code:: yaml

cluster:
type: distributed.LocalCluster
n_workers: 4
memory_limit: 2 GiB

Create a Dask distributed cluster on the
`Levante <https://docs.dkrz.de/doc/levante/running-jobs/index.html>`_
supercomputer using the `Dask-Jobqueue <https://jobqueue.dask.org/en/latest/>`_
package:
Comment on lines +286 to +289
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to mention that this needs to be installed by the user (e.g., mamba install dask-jobqueue) because it's not part of our environment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like @valeriupredoi's suggestion of just adding it to the dependencies. It doesn't have any dependencies that we do not already have and it's a very small Python package.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, that's even better!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 25dc5ce


.. code:: yaml

cluster:
type: dask_jobqueue.SLURMCluster
queue: interactive
account: bk1088
cores: 8
memory: 16GiB
sloosvel marked this conversation as resolved.
Show resolved Hide resolved
local_directory: "/work/bd0854/b381141/dask-tmp"
remi-kazeroni marked this conversation as resolved.
Show resolved Hide resolved
n_workers: 2

Use an externally managed cluster, e.g. a cluster that you started using the
`Dask Jupyterlab extension <https://github.com/dask/dask-labextension#dask-jupyterlab-extension>`_:

.. code:: yaml

client:
address: '127.0.0.1:8786'

See `here <https://jobqueue.dask.org/en/latest/interactive.html>`_
for an example of how to configure this on a remote system.

For debugging purposes, it can be useful to start the cluster outside of
ESMValCore because then
`Dask dashboard <https://docs.dask.org/en/stable/dashboard.html>`_ remains
available after ESMValCore has finished running.

.. note::

If not all preprocessor functions support lazy data, computational
performance may be best with the default scheduler.
See `issue #674 <https://github.com/ESMValGroup/ESMValCore/issues/674>`_ for
progress on making all preprocessor functions lazy.

.. _config-esgf:

Expand Down
3 changes: 2 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:
- cftime
- compilers
- dask
- distributed
- esgf-pyclient>=0.3.1
- esmpy!=8.1.0
- filelock
Expand All @@ -18,7 +19,7 @@ dependencies:
- geopy
- humanfriendly
- importlib_resources
- iris>=3.4.0
- iris>=3.6.0
valeriupredoi marked this conversation as resolved.
Show resolved Hide resolved
- iris-esmf-regrid >=0.6.0 # to work with latest esmpy
- isodate
- jinja2
Expand Down
38 changes: 30 additions & 8 deletions esmvalcore/_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import psutil
import yaml
from distributed import Client

from ._citation import _write_citation_files
from ._provenance import TrackedFile, get_task_provenance
from .config._dask import get_distributed_client
from .config._diagnostics import DIAGNOSTICS, TAGS


Expand Down Expand Up @@ -718,10 +720,22 @@ def run(self, max_parallel_tasks: Optional[int] = None) -> None:
max_parallel_tasks : int
Number of processes to run. If `1`, run the tasks sequentially.
"""
if max_parallel_tasks == 1:
self._run_sequential()
else:
self._run_parallel(max_parallel_tasks)
with get_distributed_client() as client:
if client is None:
address = None
else:
address = client.scheduler.address
for task in self.flatten():
if (isinstance(task, DiagnosticTask)
and Path(task.script).suffix.lower() == '.py'):
# Only insert the scheduler address if running a
# Python script.
task.settings['scheduler_address'] = address

if max_parallel_tasks == 1:
self._run_sequential()
else:
self._run_parallel(address, max_parallel_tasks)

def _run_sequential(self) -> None:
"""Run tasks sequentially."""
Expand All @@ -732,7 +746,7 @@ def _run_sequential(self) -> None:
for task in sorted(tasks, key=lambda t: t.priority):
task.run()

def _run_parallel(self, max_parallel_tasks=None):
def _run_parallel(self, scheduler_address, max_parallel_tasks):
"""Run tasks in parallel."""
scheduled = self.flatten()
running = {}
Expand All @@ -757,7 +771,8 @@ def done(task):
if len(running) >= max_parallel_tasks:
break
if all(done(t) for t in task.ancestors):
future = pool.apply_async(_run_task, [task])
future = pool.apply_async(_run_task,
[task, scheduler_address])
running[task] = future
scheduled.remove(task)

Expand Down Expand Up @@ -790,7 +805,14 @@ def _copy_results(task, future):
task.output_files, task.products = future.get()


def _run_task(task):
def _run_task(task, scheduler_address):
"""Run task and return the result."""
output_files = task.run()
if scheduler_address is None:
client = contextlib.nullcontext()
else:
client = Client(scheduler_address)

with client:
output_files = task.run()

return output_files, task.products
65 changes: 65 additions & 0 deletions esmvalcore/config/_dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Configuration for Dask distributed."""
import contextlib
import importlib
import logging
from pathlib import Path

import yaml
from distributed import Client

logger = logging.getLogger(__name__)

CONFIG_FILE = Path.home() / '.esmvaltool' / 'dask.yml'
valeriupredoi marked this conversation as resolved.
Show resolved Hide resolved


@contextlib.contextmanager
def get_distributed_client():
"""Get a Dask distributed client."""
dask_args = {}
if CONFIG_FILE.exists():
config = yaml.safe_load(CONFIG_FILE.read_text(encoding='utf-8'))
if config is not None:
dask_args = config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a warning would be nice, telling the user to have the config available and configured if they want to use dasky stuff

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 25dc5ce


client_args = dask_args.get('client') or {}
cluster_args = dask_args.get('cluster') or {}

# Start a cluster, if requested
if 'address' in client_args:
# Use an externally managed cluster.
cluster = None
if cluster_args:
logger.warning(
"Not using Dask 'cluster' settings from %s because a cluster "
"'address' is already provided in 'client'.", CONFIG_FILE)
elif cluster_args:
# Start cluster.
cluster_type = cluster_args.pop(
'type',
'distributed.LocalCluster',
)
cluster_module_name, cluster_cls_name = cluster_type.rsplit('.', 1)
cluster_module = importlib.import_module(cluster_module_name)
valeriupredoi marked this conversation as resolved.
Show resolved Hide resolved
cluster_cls = getattr(cluster_module, cluster_cls_name)
cluster = cluster_cls(**cluster_args)
client_args['address'] = cluster.scheduler_address
else:
# No cluster configured, use Dask basic scheduler, or a LocalCluster
# managed through Client.
cluster = None

# Start a client, if requested
if dask_args:
client = Client(**client_args)
logger.info("Dask dashboard: %s", client.dashboard_link)
else:
logger.info("Using the Dask basic scheduler.")
client = None

try:
yield client
finally:
if client is not None:
client.close()
if cluster is not None:
cluster.close()
7 changes: 3 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
# Use with pip install . to install from source
'install': [
'cartopy',
# see https://github.com/SciTools/cf-units/issues/218
'cf-units',
'dask[array]',
'dask[array,distributed]',
'esgf-pyclient>=0.3.1',
'esmf-regrid',
'esmpy!=8.1.0',
Expand All @@ -56,8 +55,8 @@
'pyyaml',
'requests',
'scipy>=1.6',
'scitools-iris>=3.4.0',
'shapely[vectorized]',
'scitools-iris>=3.6.0',
'shapely',
'stratify',
'yamale',
],
valeriupredoi marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
20 changes: 19 additions & 1 deletion tests/integration/test_diagnostic_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,26 @@
import pytest
import yaml

import esmvalcore._task
from esmvalcore._main import run
from esmvalcore.config._diagnostics import TAGS


@pytest.fixture(autouse=True)
def get_mock_distributed_client(monkeypatch):
"""Mock `get_distributed_client` to avoid starting a Dask cluster."""

@contextlib.contextmanager
def get_distributed_client():
yield None

monkeypatch.setattr(
esmvalcore._task,
'get_distributed_client',
get_distributed_client,
)


def write_config_user_file(dirname):
config_file = dirname / 'config-user.yml'
cfg = {
Expand Down Expand Up @@ -51,7 +67,9 @@ def check(result_file):
}
missing = required_keys - set(result)
assert not missing
unwanted_keys = ['profile_diagnostic', ]
unwanted_keys = [
'profile_diagnostic',
]
for unwanted_key in unwanted_keys:
assert unwanted_key not in result

Expand Down
Loading