-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for configuring Dask distributed #2049
Conversation
Codecov Report
@@ Coverage Diff @@
## main #2049 +/- ##
==========================================
+ Coverage 92.87% 92.90% +0.03%
==========================================
Files 234 235 +1
Lines 12447 12506 +59
==========================================
+ Hits 11560 11619 +59
Misses 887 887
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small bits and bobs before I go into the Python scripts, bud 🍺
'`scheduler <https://docs.dask.org/en/stable/scheduling.html>`_'. | ||
The default scheduler in Dask is rather basic, so it can only run on a single | ||
computer and it may not always find the optimal task scheduling solution, | ||
resulting in excessive memory use when using e.g. the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resulting in excessive memory use when using e.g. the | |
resulting in excessive memory use when running an already memory-intensive task like e.g. the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem isn't so much that it's memory-intensive, but that the task graph becomes too complicated for the built-in scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes - for regular Joe the Modeller: moar memory! Let's scare them before they even think of touching anything 😁
Co-authored-by: Valeriu Predoi <[email protected]>
Co-authored-by: Valeriu Predoi <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple more comments, bud. test_dask.py to follow, then we done here 😁
if CONFIG_FILE.exists(): | ||
config = yaml.safe_load(CONFIG_FILE.read_text(encoding='utf-8')) | ||
if config is not None: | ||
dask_args = config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a warning would be nice, telling the user to have the config available and configured if they want to use dasky stuff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in 25dc5ce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very nice work @bouweandela 🎖️ Couple comments I left earlier that have not been resolved, up to you to address, no biggie. It looks very nice and hopefully it'll work well in practice too ie iris will play well
btw I've just created a label called "Dask" - let's use that for Dask-related improvements, and maybe even collect Dask items in the changelog, at least for this upcoming release, that'll be Dask-heavy. Wanted to call it Bouwe instead of Dask 😁 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @bouweandela! Just taking a look at the well written Docs to get a better idea of these new developments.
Co-authored-by: Rémi Kazeroni <[email protected]>
I'd like to test this with a couple of recipes, please do not merge just yet 😁 |
good call, Manu! 🍺 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @bouweandela, I just tested this thoroughly - this is really amazing stuff and works super well! 🚀 For the first time ever I was able to run ESMValTool on multiple nodes! In addition, monitoring an ESMValTool run with a dask dashboard is also super convenient. See here a nice visualization of our example recipe:
I have some general comments/questions:
- How does this interact with our current
multiprocessing
-based parallelization? I tested this with different--max-parallel-tasks
options and all seemed to work well, but I do not quite understand how. Does every process use it's own cluster? Or do they share it? Is this safe? - As far as I understand the only way to use multiple nodes in a
SLURMCluster
is to use thecluster.scale(jobs=n_jobs)
method, see here. Would it make sense to add ascale
option to the configuration file where one could add keyword arguments forscale
? - When running this, I always get
UserWarning: Sending large graph of size 56.83 MiB. This may cause some slowdown. Consider scattering data ahead of time and using futures.
, even for the example recipe. Not sure if this is/will be a problem. Not something we need to tackle here though; just wanted to document it.
Thanks!! 👍
Create a Dask distributed cluster on the | ||
`Levante <https://docs.dkrz.de/doc/levante/running-jobs/index.html>`_ | ||
supercomputer using the `Dask-Jobqueue <https://jobqueue.dask.org/en/latest/>`_ | ||
package: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to mention that this needs to be installed by the user (e.g., mamba install dask-jobqueue
) because it's not part of our environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like @valeriupredoi's suggestion of just adding it to the dependencies. It doesn't have any dependencies that we do not already have and it's a very small Python package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, that's even better!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 25dc5ce
That's wonderful to see, Manu, awesome you tested, and of course, awesome work by Bouwe! I'll let Bouwe answer your q's (although I know the answer for the first q from code review hehe), but about the jobqueue dep - we should probably just add it in our env, I'd reckon whatever's Dask-related stuff should be tweaked at a minimum by the user. About data scattering - I've done it before, and have seen marginal performance improvement, but I used fairly small data sizes, so it could be better for busloads of data, as we have 😁 |
Thanks for testing @schlunma!
When no cluster is configured, every process will use it's own. This is what is causing memory errors at the moment. With the pull request, this is fixed (provided that a user sets up a cluster), by passing the scheduler address to the process running the task and making it connect to the cluster. You can see that here in the code: ESMValCore/esmvalcore/_task.py Lines 774 to 775 in 98edcb1
ESMValCore/esmvalcore/_task.py Lines 808 to 818 in 98edcb1
The multiprocessing configuration is used to parallelize the metadata crunching (i.e. all the stuff with coordinates etc). In a future pull request, I intend to make it possible to also run that on a Dask cluster, see #2041.
This can also be configured with the parameter
I suspect this warning occurs because some of the preprocessor functions are not lazy, e.g. |
Could you elaborate on that? What do you mean by "at the moment"? The current
True, could have guessed that from the code 😁 I suppose the cluster is smart enough to handle input from different processes? Are there any recommendations for the ratio
Cool!
I tried that with
Not yet, but I can try to run a heavy recipe with this 👍 |
Alright, I did a lot of tests now. Note: Varying Test with existing heavy recipe using
|
n_workers |
memory_limit |
run time | memory usage | comment |
---|---|---|---|---|
4 | 128 GiB | 3:59 min | 3.6GB | |
16 | 32 GiB | 2:07 min | 17.9GB | default settings when using distributed.LocalCluster() without any arguments |
8 | 32 GiB | 1:56 min | 8.7GB | |
- | - | 9:31 min | 15.7GB | no cluster (i.e., current main ) |
As you can see, I could easily get a speed boost of ~5!
Test with simple heavy recipe that uses a non-lazy preprocessor using distributed.LocalCluster
on a 512 GiB compute node
preprocessors:
test:
area_statistics:
operator: mean
diagnostics:
test:
variables:
ta:
preprocessor: test
mip: Amon
project: CMIP6
exp: hist-1950
start_year: 1995
end_year: 2014
additional_datasets:
- {dataset: CMCC-CM2-VHR4, ensemble: r1i1p1f1, grid: gn}
- {dataset: CNRM-CM6-1-HR, ensemble: r1i1p1f2, grid: gr}
- {dataset: ECMWF-IFS-HR, ensemble: r1i1p1f1, grid: gr}
- {dataset: HadGEM3-GC31-HM, ensemble: r1i1p1f1, grid: gn}
- {dataset: MPI-ESM1-2-XR, ensemble: r1i1p1f1, grid: gn}
scripts:
null
However, when using a non-lazy preprocessor (area_statistics
), I always got the same concurrent.futures._base.CancelledError
as for the other heavy recipe, regardless of the settings (main_log_debug.txt).
I also tried combinations where the cluster only had a small fraction of the memory (e.g., n_workers: 4, memory_limit: 64 GiB
, i.e., 256 GiB remain!), which also did not work.
I'm not familiar enough with dask to tell why this is not working, but it looks like this is related to using a non-lazy function. I thought one could maybe address by allocated enough memory for the "non-dask" part, but that also didn't work.
Tests with dask_jobqueue.SLURMCluster
I got basically the same behavior as above. With the following settings the lazy recipe ran in 8:27 min (memory usage of 0.8 GB; not quite sure if that is correct or wrong because the heavy stuff is now done on a different node):
cluster:
type: dask_jobqueue.SLURMCluster
...
queue: compute
cores: 128
memory: 32 GiB
n_workers: 8
There's probably a lot to optimize here, but tbh I don't full understand the all the options for SLURMCluster
yet 😁
To sum this all up: for lazy preprocessors this works exceptionally well! 🚀
Yes, in the current |
hellow cool stuff! Very cool indeed @schlunma 💃 Here's some prelim opinions:
Very cool, still 🍺 |
Provided that all preprocessor functions are lazy, I would recommend setting |
that's the ideal case, right? Minimum data transfer between workers - that's why Manu's jobs are failing now for non-lazy pp's - too much ephemeral data exchange between workers, I thnk |
this post says the Cancelled error comes from some node/worker being idle for too long - interesting - so it's either too short a computation that deletes the key on the worker, but the Client still needs it/recreates it, or the thing's too long - it'd be worth examining the "timeout" options see https://docs.dask.org/en/stable/configuration.html - beats me which one though, used to be a |
I now also tested the (fully lazy) recipe of #1193 now (which used to not run back then): # ESMValTool
---
documentation:
description: Test ERA5.
authors:
- schlund_manuel
references:
- acknow_project
preprocessors:
mean:
zonal_statistics:
operator: mean
diagnostics:
ta_obs:
variables:
ta:
preprocessor: mean
mip: Amon
additional_datasets:
- {dataset: ERA5, project: native6, type: reanaly, version: v1, tier: 3, start_year: 1990, end_year: 2014}
scripts:
null The good new is: it is now running without any dask configuration, and I could also get a significant speed up (5:41 min -> 1:56 min) by using a Replacing the lazy |
esmvalcore/config/_dask.py
Outdated
logger.warning( | ||
"Using the Dask basic scheduler. This may lead to slow " | ||
"computations and out-of-memory errors. See https://docs." | ||
"esmvaltool.org/projects/ESMValCore/en/latest/quickstart/" | ||
"configure.html#dask-distributed-configuration for information " | ||
"on how to configure the Dask distributed scheduler." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not too happy with this being a warning at the current stage: as shown by my tests, using a distributed scheduler can actually lead to recipes not running anymore. Thus, I would be very careful recommending this to users at the moment.
We should either phrase this more carefully (maybe add that this is an "experimental feature") or convert it to an info message. Once we are more confident with this we can change it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: this does not raise a warning if the dask config file exists but is empty. Should we also consider this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can change it back to INFO if @valeriupredoi agrees because he asked for a WARNING in #2049 (comment). Note that not configuring the scheduler can also lead to recipes not running, so it rather depends on what you're trying to do what the best setting is, as also noted in the documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that was me before Manu's actual testing. But alas, Bouwe is right too - I'd keep it as a warning but add what Manu suggests - experimental feature with twitchy settings that depend on the actual run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: this does not raise a warning if the dask config file exists but is empty. Should we also consider this case?
Won't that default to a LocalCluster etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd keep it as a warning but add what Manu suggests - experimental feature with twitchy settings that depend on the actual run
Fine for me!
Won't that default to a LocalCluster etc?
No, this also results in the basic scheduler: https://github.com/ESMValGroup/ESMValCore/pull/2049/files#diff-b046a48e3366bf6517887e3c39fe7ba6f46c0833ac02fbbb9062fb23654b06bdR64-R69
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aw. Nicht gut. That needs be communicated to the user methinks - ah it is, didn't read the very first sentence 😆 Fine for me then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 80f5c62
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the changes @bouweandela! I love the changes to the documentation, with this I could get a recipe to run in 30s which previously took 6 mins!
Just ouf of curiosity: where did you get that n_jobs = n_workers / processes
from? I searched for that yesterday but couldn't find anything.
Please make sure to adapt the warning as discussed in the corresponding comment. Will approve now since I will be traveling next week.
Thanks again, this is really cool! 🚀
I think that's just Bouwe being sensible, as he usually is 😁 ie you'd ideally want one process per worker |
awesome testing @schlunma - cheers for that! I'll dig deeper into those errors you've been seeing once I start testing myself, sometime next week 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bouweandela ! I managed to create some SLURM clusters just fine with these changes. Regarding the configuration, is it possible to have multiple configurations in dask.yml
? Not every recipe will require the same type of resources.
However, I am running in a machine that has quite modest technical specifications, to put it nicely, and even running in a distributed way does not seem to help much in the performance and memory usage.
Some issues that were found for instance:
- The concatenation of multiple files seems to slow down
- The heaviest jobs all seem to die during the saving of the files.
- Even for jobs that finish successfully, I get errors in the middle of the debug log about workers dying, not being responsive and communications dying. Not sure where these are coming from.
It could very well be that I am not configuring the jobs properly though.
It is true though that everything that is already lazy performs even faster. The lazy regridder went from taking 4 minutes to run in a high resolution experiment, to taking less than 2 minutes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice @bouweandela, looks great to me! Nice that you have both the cluster and the client config sections, so that the following also works as a 'shortcut' to create a local cluster:
client:
n_workers: 2
threads_per_worker: 4
@schlunma From the documentation of the
i.e. |
Good idea @sloosvel. @ESMValGroup/technical-lead-development-team Shall I add a warning that the current configuration file format is experimental and may change in the next release? We can then discuss how to best add this to the configuration at the SMHI workshop ESMValGroup/Community#98. |
@schlunma and @sloosvel Thank you for running the tests! Really good to learn from your experiences. Regarding the dying workers: I suspect they are using more than the configured amount of memory and then killed, so giving them more memory may solve the problem, though in case the preprocessor functions you're using are not lazy that may not help. Regarding the |
@sloosvel If this is a problem for your work, could you please open an issue and add an example recipe so we can look into it? |
…re-dask-distributed
@ESMValGroup/technical-lead-development-team I think this is ready to be merged. Could someone please do a final check and merge? |
Thanks a lot @bouweandela for your work on this great extension! It should definitely one of the highlights of the next Core release! And thanks to everyone who reviewed this PR and tested the new Dask capabilities! Merging now 👍 |
Description
Add support for configuring the Dask scheduler/cluster that is used to run the tool. See the linked documentation for information on how to use the feature.
Use this together with ESMValGroup/ESMValTool#3151 to let the Python diagnostics also make use of the Dask cluster.
Closes #2040
Link to documentation: https://esmvaltool--2049.org.readthedocs.build/projects/ESMValCore/en/2049/quickstart/configure.html#dask-distributed-configuration
Before you get started
Checklist
It is the responsibility of the author to make sure the pull request is ready to review. The icons indicate whether the item will be subject to the 🛠 Technical or 🧪 Scientific review.
To help with the number pull requests: