diff --git a/docs/examples/parallel-computing-with-dask.ipynb b/docs/examples/parallel-computing-with-dask.ipynb index 57baf8ce..08dd79ed 100644 --- a/docs/examples/parallel-computing-with-dask.ipynb +++ b/docs/examples/parallel-computing-with-dask.ipynb @@ -7,7 +7,9 @@ "source": [ "# General Guide for Parallelizing xCDAT Operations with Dask\n", "\n", - "Author: [Tom Vo](https://github.com/tomvothecoder)\n" + "Author: [Tom Vo](https://github.com/tomvothecoder)\n", + "\n", + "Last Updated: 04/16/24 (v0.7.0)\n" ] }, { @@ -22,11 +24,7 @@ "\n", "- Basics of Dask Arrays\n", "- General Dask Best Practice\n", - "- How to use Dask with Xarray\n", - "- How to use Dask with xCDAT, including real-world examples and performance metrics\n", - "- Dask Schedulers and using a local distributed scheduler for more resource-intensive needs\n", - "\n", - "_The data used in the code examples can be found through the [Earth System Grid Federation (ESGF) search portal](https://aims2.llnl.gov/search)._\n", + "- How to use Dask with xCDAT, including real-world examples\n", "\n", "### More Resources\n", "\n", @@ -35,7 +33,9 @@ "- [Official Xarray Parallel Computing with Dask Guide](https://docs.xarray.dev/en/stable/user-guide/dask.html)\n", "- [Official Xarray Parallel Computing with Dask Jupyter Notebook Tutorial](https://tutorial.xarray.dev/intermediate/xarray_and_dask.html)\n", "- [Official Dask guide for Xarray with Dask Arrays](https://examples.dask.org/xarray.html)\n", - "- [Project Pythia: Dask Arrays with Xarray](https://foundations.projectpythia.org/core/xarray/dask-arrays-xarray.html)\n" + "- [Project Pythia: Dask Arrays with Xarray](https://foundations.projectpythia.org/core/xarray/dask-arrays-xarray.html)\n", + "\n", + "_The data used in the code examples can be found through the [Earth System Grid Federation (ESGF) search portal](https://aims2.llnl.gov/search)._\n" ] }, { @@ -270,7 +270,32 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Code Example - Setup\n" + "## Using a Dask Cluster for Scalable Computations\n", + "\n", + "> All of the large-scale Dask collections like Dask Array, Dask DataFrame, and Dask Bag and the fine-grained APIs like delayed and futures generate task graphs where each node in the graph is a normal Python function and edges between nodes are normal Python objects that are created by one task as outputs and used as inputs in another task. After Dask generates these task graphs, it needs to execute them on parallel hardware. This is the job of a task scheduler. Different task schedulers exist, and each will consume a task graph and compute the same result, but with different performance characteristics.\n", + "> Dask has two families of task schedulers:\n", + ">\n", + "> 1. **Single-machine scheduler**: This scheduler provides basic features on a local process or thread pool. This scheduler was made first and is the default. It is simple and cheap to use, although it can only be used on a single machine and does not scale\n", + "> 2. **Distributed scheduler**: This scheduler is more sophisticated, offers more features, but also requires a bit more effort to set up. It can run locally or distributed across a cluster\n", + ">\n", + "> — https://docs.dask.org/en/stable/scheduling.html\n", + "\n", + "
\n", + " \"Dask\n", + "
\n", + "\n", + "Xarray is setup to use 1. **Single-machine scheduler**. However, Dask advises users to use the Dask distributed scheduler for more advanced functionality and more resource-intensive needs.\n", + "\n", + "— https://docs.dask.org/en/stable/scheduling.html#dask-distributed-local\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Code Example - Setup\n", + "\n", + "# TODO: Use a larger dataset\n" ] }, { @@ -299,31 +324,425 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Code Example - Parallelism with xCDAT + Dask\n", + "### 1. Setup the Dask Cluster on your local machine\n", "\n", - "The code example below demonstrates chunking a dataset in Xarray and grouping the data\n", - "in parallel across chunks.\n", + "We will quickly setup a local cluster using the Dask `Client` and `LocalCluster`\n", + "Python modules.\n", "\n", - "**By default, dask uses its multi-threaded scheduler**, which distributes work across multiple cores and allows for processing some datasets that do not fit into memory.\n", + "You can configure the Dask Client (e.g., memory limit) to your needs. In this case,\n", + "we are deploying a cluster with 4 workers, automatic memory limitation, and processors\n", + "(instead of threads). For info on cluster configurations, visit these links:\n", "\n", - "If you are interested in using a distributed scheduler (local or cluster) for more resource-intensive computational operations, there is more information below in this notebook.\n" + "- https://distributed.dask.org/en/latest/api.html#client\n", + "- https://distributed.dask.org/en/latest/api.html#distributed.LocalCluster\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-04-16 15:14:38,008 [INFO]: scheduler.py(__init__:1709) >> State start\n", + "2024-04-16 15:14:38,008 [INFO]: scheduler.py(__init__:1709) >> State start\n", + "2024-04-16 15:14:38,014 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-0fbhq831', purging\n", + "2024-04-16 15:14:38,014 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-0fbhq831', purging\n", + "2024-04-16 15:14:38,017 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-cwzvww8f', purging\n", + "2024-04-16 15:14:38,017 [INFO]: diskutils.py(_check_lock_or_purge:252) >> Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-cwzvww8f', purging\n", + "2024-04-16 15:14:38,025 [INFO]: scheduler.py(start_unsafe:4060) >> Scheduler at: tcp://127.0.0.1:37919\n", + "2024-04-16 15:14:38,025 [INFO]: scheduler.py(start_unsafe:4060) >> Scheduler at: tcp://127.0.0.1:37919\n", + "2024-04-16 15:14:38,027 [INFO]: scheduler.py(start_unsafe:4075) >> dashboard at: http://127.0.0.1:8787/status\n", + "2024-04-16 15:14:38,027 [INFO]: scheduler.py(start_unsafe:4075) >> dashboard at: http://127.0.0.1:8787/status\n", + "2024-04-16 15:14:38,029 [INFO]: scheduler.py(register_worker_plugin:7676) >> Registering Worker plugin shuffle\n", + "2024-04-16 15:14:38,029 [INFO]: scheduler.py(register_worker_plugin:7676) >> Registering Worker plugin shuffle\n", + "2024-04-16 15:14:38,051 [INFO]: nanny.py(start_unsafe:368) >> Start Nanny at: 'tcp://127.0.0.1:46038'\n", + "2024-04-16 15:14:38,051 [INFO]: nanny.py(start_unsafe:368) >> Start Nanny at: 'tcp://127.0.0.1:46038'\n", + "2024-04-16 15:14:38,063 [INFO]: nanny.py(start_unsafe:368) >> Start Nanny at: 'tcp://127.0.0.1:37970'\n", + "2024-04-16 15:14:38,063 [INFO]: nanny.py(start_unsafe:368) >> Start Nanny at: 'tcp://127.0.0.1:37970'\n", + "2024-04-16 15:14:40,516 [INFO]: scheduler.py(add_worker:4412) >> Register worker \n", + "2024-04-16 15:14:40,516 [INFO]: scheduler.py(add_worker:4412) >> Register worker \n", + "2024-04-16 15:14:40,523 [INFO]: scheduler.py(handle_worker:5892) >> Starting worker compute stream, tcp://127.0.0.1:45995\n", + "2024-04-16 15:14:40,523 [INFO]: scheduler.py(handle_worker:5892) >> Starting worker compute stream, tcp://127.0.0.1:45995\n", + "2024-04-16 15:14:40,525 [INFO]: core.py(handle_stream:1019) >> Starting established connection to tcp://127.0.0.1:37940\n", + "2024-04-16 15:14:40,525 [INFO]: core.py(handle_stream:1019) >> Starting established connection to tcp://127.0.0.1:37940\n", + "2024-04-16 15:14:40,530 [INFO]: scheduler.py(add_worker:4412) >> Register worker \n", + "2024-04-16 15:14:40,530 [INFO]: scheduler.py(add_worker:4412) >> Register worker \n", + "2024-04-16 15:14:40,532 [INFO]: scheduler.py(handle_worker:5892) >> Starting worker compute stream, tcp://127.0.0.1:34301\n", + "2024-04-16 15:14:40,532 [INFO]: scheduler.py(handle_worker:5892) >> Starting worker compute stream, tcp://127.0.0.1:34301\n", + "2024-04-16 15:14:40,534 [INFO]: core.py(handle_stream:1019) >> Starting established connection to tcp://127.0.0.1:37938\n", + "2024-04-16 15:14:40,534 [INFO]: core.py(handle_stream:1019) >> Starting established connection to tcp://127.0.0.1:37938\n", + "2024-04-16 15:14:40,557 [INFO]: scheduler.py(add_client:5650) >> Receive client connection: Client-b84f7600-fc3e-11ee-84a4-f4e9d4af2192\n", + "2024-04-16 15:14:40,557 [INFO]: scheduler.py(add_client:5650) >> Receive client connection: Client-b84f7600-fc3e-11ee-84a4-f4e9d4af2192\n", + "2024-04-16 15:14:40,560 [INFO]: core.py(handle_stream:1019) >> Starting established connection to tcp://127.0.0.1:37968\n", + "2024-04-16 15:14:40,560 [INFO]: core.py(handle_stream:1019) >> Starting established connection to tcp://127.0.0.1:37968\n" + ] + } + ], + "source": [ + "from dask.distributed import Client, LocalCluster\n", + "\n", + "# This line of code will automatically start a local cluster\n", + "# https://docs.dask.org/en/stable/deploying.html#local-machine\n", + "\n", + "cluster = LocalCluster(\n", + " n_workers=2, threads_per_worker=1, memory_limit=\"auto\", processes=True\n", + ")\n", + "\n", + "client = Client(cluster)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "e77c4df4b8bc4a468cf508b648e73a59", + "version_major": 2, + "version_minor": 0 + }, + "text/html": [ + "
\n", + "
\n", + "
\n", + "
\n", + "

LocalCluster

\n", + "

f9a5f35e

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + "\n", + " \n", + "
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + " \n", + " Workers: 4\n", + "
\n", + " Total threads: 4\n", + " \n", + " Total memory: 20.99 GiB\n", + "
Status: runningUsing processes: True
\n", + "\n", + "
\n", + " \n", + "

Scheduler Info

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

Scheduler

\n", + "

Scheduler-7f13e4ee-f768-4cec-8e1b-7982ce6bb54a

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " Comm: tcp://127.0.0.1:42380\n", + " \n", + " Workers: 4\n", + "
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + " \n", + " Total threads: 4\n", + "
\n", + " Started: Just now\n", + " \n", + " Total memory: 20.99 GiB\n", + "
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "

Workers

\n", + "
\n", + "\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 0

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:44505\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://127.0.0.1:33999/status\n", + " \n", + " Memory: 5.25 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:36299\n", + "
\n", + " Local directory: /tmp/dask-scratch-space/worker-pc13f2f2\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 1

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:41416\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://127.0.0.1:41748/status\n", + " \n", + " Memory: 5.25 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:33631\n", + "
\n", + " Local directory: /tmp/dask-scratch-space/worker-wdn2y822\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 2

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:34857\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://127.0.0.1:45434/status\n", + " \n", + " Memory: 5.25 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:32895\n", + "
\n", + " Local directory: /tmp/dask-scratch-space/worker-t7k7waej\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 3

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:34749\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://127.0.0.1:36100/status\n", + " \n", + " Memory: 5.25 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:40486\n", + "
\n", + " Local directory: /tmp/dask-scratch-space/worker-3dksi2wd\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
\n", + "\n", + "
\n", + "
\n", + "
" + ], + "text/plain": [ + "LocalCluster(f9a5f35e, 'tcp://127.0.0.1:42380', workers=4, threads=4, memory=20.99 GiB)" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "client.cluster" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2. Open the Dask Dashboard UI\n", + "\n", + "The Dask distributed scheduler provides an interactive dashboard containing many plots\n", + "and tables with live information.\n", + "\n", + "Check this [Dask documentation page](https://docs.dask.org/en/stable/dashboard.html) to learn how to interpret the information.\n", + "\n", + "Here's an example:\n", + "\n", + "
\n", + " \"Dask\n", + "
\n" ] }, { - "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ - "We're letting Dask auto-scale all dimensions to get a good chunk size using `chunks=\"auto\"`, which references the `.chunks` attribute.\n" + "#### Open the link to the dashboard\n" ] }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'http://127.0.0.1:8787/status'" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.dashboard_link" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "ds = xr.open_dataset(filepath, chunks=\"auto\")" + "### 3. Open a dataset with xCDAT and chunk it\n", + "\n", + "We're letting Dask auto-scale all dimensions to get a good chunk size using `chunks=\"auto\"`, which references the `.chunks` attribute.\n", + "\n", + "It tries to reach chunk equal to the config value of `array.chunk-size`, which is set to\n", + "128MiB by default, which you can change in your [configuration](https://docs.dask.org/en/stable/configuration.html).\n", + "\n", + "— https://docs.dask.org/en/stable/array-chunks.html#automatic-chunking\n" ] }, { @@ -723,10 +1142,10 @@ " license: CMIP6 model data produced by CSIRO is li...\n", " cmor_version: 3.4.0\n", " tracking_id: hdl:21.14100/af78ae5e-f3a6-4e99-8cfe-5f2...\n", - " DODS_EXTRA.Unlimited_Dimension: time