-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for configuring Dask distributed #2049
Changes from 12 commits
737f912
ff001ed
8bb0f87
79b38db
54bc482
41ab178
16c11ee
e56e562
cb71b2a
2898217
98edcb1
25dc5ce
296cca1
80f5c62
2b428a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -199,6 +199,107 @@ the user. | |
debugging, etc. You can even provide any config user value as a run flag | ||
``--argument_name argument_value`` | ||
|
||
.. _config-dask: | ||
|
||
Dask distributed configuration | ||
============================== | ||
|
||
The :ref:`preprocessor functions <preprocessor_functions>` and many of the | ||
:ref:`Python diagnostics in ESMValTool <esmvaltool:recipes>` make use of the | ||
:ref:`Iris <iris:iris_docs>` library to work with the data. | ||
In Iris, data can be either :ref:`real or lazy <iris:real_and_lazy_data>`. | ||
Lazy data is represented by `dask arrays <https://docs.dask.org/en/stable/array.html>`_. | ||
Dask arrays consist of many small | ||
`numpy arrays <https://numpy.org/doc/stable/user/absolute_beginners.html#what-is-an-array>`_ | ||
(called chunks) and if possible, computations are run on those small arrays in | ||
parallel. | ||
In order to figure out what needs to be computed when, Dask makes use of a | ||
'`scheduler <https://docs.dask.org/en/stable/scheduling.html>`_'. | ||
The default scheduler in Dask is rather basic, so it can only run on a single | ||
computer and it may not always find the optimal task scheduling solution, | ||
resulting in excessive memory use when using e.g. the | ||
:func:`esmvalcore.preprocessor.multi_model_statistics` preprocessor function. | ||
Therefore it is recommended that you take a moment to configure the | ||
`Dask distributed <https://distributed.dask.org>`_ scheduler. | ||
A Dask scheduler and the 'workers' running the actual computations, are | ||
collectively called a 'Dask cluster'. | ||
|
||
In ESMValCore, the Dask cluster can configured by creating a file called | ||
``~/.esmvaltool/dask.yml``, where ``~`` is short for your home directory. | ||
In this file, under the ``client`` keyword, the arguments to | ||
:obj:`distributed.Client` can be provided. | ||
Under the ``cluster`` keyword, the type of cluster (e.g. | ||
:obj:`distributed.LocalCluster`), as well as any arguments required to start | ||
the cluster can be provided. | ||
|
||
Below are some example configurations: | ||
|
||
Create a Dask distributed cluster on the computer running ESMValCore using | ||
all available resources: | ||
|
||
.. code:: yaml | ||
|
||
cluster: | ||
type: distributed.LocalCluster | ||
|
||
this should work well for most personal computers. | ||
bouweandela marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
.. note:: | ||
|
||
Note that, if running this configuration on a shared node of an HPC cluster, | ||
Dask will try and use as many resources it can find available, and this may | ||
lead to overcrowding the node by a single user (you)! | ||
|
||
|
||
Create a Dask distributed cluster on the computer running ESMValCore, with | ||
4 workers with two 2 GiB of memory each (8 GiB in total): | ||
|
||
.. code:: yaml | ||
|
||
cluster: | ||
type: distributed.LocalCluster | ||
n_workers: 4 | ||
memory_limit: 2 GiB | ||
|
||
Create a Dask distributed cluster on the | ||
`Levante <https://docs.dkrz.de/doc/levante/running-jobs/index.html>`_ | ||
supercomputer using the `Dask-Jobqueue <https://jobqueue.dask.org/en/latest/>`_ | ||
package: | ||
Comment on lines
+286
to
+289
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to mention that this needs to be installed by the user (e.g., There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like @valeriupredoi's suggestion of just adding it to the dependencies. It doesn't have any dependencies that we do not already have and it's a very small Python package. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, that's even better! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in 25dc5ce |
||
|
||
.. code:: yaml | ||
|
||
cluster: | ||
type: dask_jobqueue.SLURMCluster | ||
queue: interactive | ||
account: bk1088 | ||
cores: 8 | ||
memory: 16GiB | ||
sloosvel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
interface: ib0 | ||
local_directory: "/work/bd0854/b381141/dask-tmp" | ||
remi-kazeroni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
n_workers: 2 | ||
|
||
Use an externally managed cluster, e.g. a cluster that you started using the | ||
`Dask Jupyterlab extension <https://github.com/dask/dask-labextension#dask-jupyterlab-extension>`_: | ||
|
||
.. code:: yaml | ||
|
||
client: | ||
address: '127.0.0.1:8786' | ||
|
||
See `here <https://jobqueue.dask.org/en/latest/interactive.html>`_ | ||
for an example of how to configure this on a remote system. | ||
|
||
For debugging purposes, it can be useful to start the cluster outside of | ||
ESMValCore because then | ||
`Dask dashboard <https://docs.dask.org/en/stable/dashboard.html>`_ remains | ||
available after ESMValCore has finished running. | ||
|
||
.. note:: | ||
|
||
If not all preprocessor functions support lazy data, computational | ||
performance may be best with the default scheduler. | ||
See `issue #674 <https://github.com/ESMValGroup/ESMValCore/issues/674>`_ for | ||
progress on making all preprocessor functions lazy. | ||
|
||
.. _config-esgf: | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
"""Configuration for Dask distributed.""" | ||
import contextlib | ||
import importlib | ||
import logging | ||
from pathlib import Path | ||
|
||
import yaml | ||
from distributed import Client | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
CONFIG_FILE = Path.home() / '.esmvaltool' / 'dask.yml' | ||
valeriupredoi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def check_distributed_config(): | ||
"""Check the Dask distributed configuration.""" | ||
if not CONFIG_FILE.exists(): | ||
logger.warning( | ||
"Using the Dask basic scheduler. This may lead to slow " | ||
"computations and out-of-memory errors. See https://docs." | ||
"esmvaltool.org/projects/ESMValCore/en/latest/quickstart/" | ||
"configure.html#dask-distributed-configuration for information " | ||
"on how to configure the Dask distributed scheduler." | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not too happy with this being a warning at the current stage: as shown by my tests, using a distributed scheduler can actually lead to recipes not running anymore. Thus, I would be very careful recommending this to users at the moment. We should either phrase this more carefully (maybe add that this is an "experimental feature") or convert it to an info message. Once we are more confident with this we can change it back. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also: this does not raise a warning if the dask config file exists but is empty. Should we also consider this case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can change it back to INFO if @valeriupredoi agrees because he asked for a WARNING in #2049 (comment). Note that not configuring the scheduler can also lead to recipes not running, so it rather depends on what you're trying to do what the best setting is, as also noted in the documentation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah that was me before Manu's actual testing. But alas, Bouwe is right too - I'd keep it as a warning but add what Manu suggests - experimental feature with twitchy settings that depend on the actual run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Won't that default to a LocalCluster etc? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Fine for me!
No, this also results in the basic scheduler: https://github.com/ESMValGroup/ESMValCore/pull/2049/files#diff-b046a48e3366bf6517887e3c39fe7ba6f46c0833ac02fbbb9062fb23654b06bdR64-R69 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aw. Nicht gut. That needs be communicated to the user methinks - ah it is, didn't read the very first sentence π Fine for me then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in 80f5c62 |
||
|
||
|
||
@contextlib.contextmanager | ||
def get_distributed_client(): | ||
"""Get a Dask distributed client.""" | ||
dask_args = {} | ||
if CONFIG_FILE.exists(): | ||
config = yaml.safe_load(CONFIG_FILE.read_text(encoding='utf-8')) | ||
if config is not None: | ||
dask_args = config | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a warning would be nice, telling the user to have the config available and configured if they want to use dasky stuff There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added in 25dc5ce |
||
|
||
client_args = dask_args.get('client') or {} | ||
cluster_args = dask_args.get('cluster') or {} | ||
|
||
# Start a cluster, if requested | ||
if 'address' in client_args: | ||
# Use an externally managed cluster. | ||
cluster = None | ||
if cluster_args: | ||
logger.warning( | ||
"Not using Dask 'cluster' settings from %s because a cluster " | ||
"'address' is already provided in 'client'.", CONFIG_FILE) | ||
elif cluster_args: | ||
# Start cluster. | ||
cluster_type = cluster_args.pop( | ||
'type', | ||
'distributed.LocalCluster', | ||
) | ||
cluster_module_name, cluster_cls_name = cluster_type.rsplit('.', 1) | ||
cluster_module = importlib.import_module(cluster_module_name) | ||
valeriupredoi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cluster_cls = getattr(cluster_module, cluster_cls_name) | ||
cluster = cluster_cls(**cluster_args) | ||
client_args['address'] = cluster.scheduler_address | ||
else: | ||
# No cluster configured, use Dask basic scheduler, or a LocalCluster | ||
# managed through Client. | ||
cluster = None | ||
|
||
# Start a client, if requested | ||
if dask_args: | ||
client = Client(**client_args) | ||
logger.info("Dask dashboard: %s", client.dashboard_link) | ||
else: | ||
logger.info("Using the Dask basic scheduler.") | ||
client = None | ||
|
||
try: | ||
yield client | ||
finally: | ||
if client is not None: | ||
client.close() | ||
if cluster is not None: | ||
cluster.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem isn't so much that it's memory-intensive, but that the task graph becomes too complicated for the built-in scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes - for regular Joe the Modeller: moar memory! Let's scare them before they even think of touching anything π