fontsize |
---|
12pt |
Dask is a flexible library to perform parallel computing Data Science tasks in Python. Although multiple parallel and distributed computing libraries already exist in Python, Dask remains Pythonic while being very efficient (see Diagnosing Performance).
Dask is composed of two parts:
- Dynamic task scheduling: Optimized computational workloads (see distributed dask)
- Big Data collections: Parallel and distributed equivalent data collecting extending Numpy array, Pandas dataframes
An interesting feature of Dask is Python iterators for large-than-memory or distributed environments. Dask tries to provide different qualities:
-
Familiar: Provides parallelized NumPy array and Pandas DataFrame objects
-
Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
-
Native: Enables distributed computing in pure Python with access to the PyData stack.
-
Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
-
Scales up: Runs resiliently on clusters with 1000s of cores
-
Scales down: Trivial to set up and run on a laptop in a single process
-
Responsive: Designed with interactive computing in mind, it provides rapid feedback and diagnostics to aid humans
Dask solely rely on a graphs representation to encode algorithms. The main advantage of these structures is a clear and efficient approach for task scheduling. For Dask users, task graphs and operations are fully transparent unless you decide to develop a new module.
<style> .tab_css { width: 100%; } .tab_css td,th{ border: none!important; } </style>Code | Task graph | |
---|---|---|
```python
def inc(i):
return i + 1
def add(a, b): return a + b x = 1 y = inc(x) z = add(y, 10) d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 'y', 10)}
Please note that Dask is already included by default in the Anaconda distribution. In order to install all dependencies (e.g. NumPy, Pandas, ...), use the following command: python -m pip install "dask[complete]"
# OR simply
pip install "dask[complete]" Similarly to conda, dask core can be install with the command pip install "dask[array]" # Install requirements for dask array
pip install "dask[dataframe]" # Install requirements for dask dataframe
pip install "dask[diagnostics]" # Install requirements for dask diagnostics
pip install "dask[distributed]" # Install requirements for distributed dask For those wishing to compile and optimize the library on a dedicated hardware, Dask can be compiled and installed as follows: git clone https://github.com/dask/dask.git
cd dask
python -m pip install .
# OR
pip install ".[complete]" We strongly recommend to install Dask inside a virtual environment using the python versions included in the software set. # See https://hpc-docs.uni.lu/connect/ssh/
ssh [aion,iris]-cluster # assuming proper configuration
# Once on the clusters, ask for a interactive job
si --time=01:00:00 # OR si-gpu --time=01:00:00 if a GPU is needed
module load lang/Python # Load default python
python -m venv dask_env_${ULHPC_CLUSTER}
source dask_env_${ULHPC_CLUSTER}/bin/activate
pip install --upgrade pip
pip install "dask[complete]" Dask can be used on different hardware going from your laptop to a multi-node cluster. For this purpose, Dask considers two families of task schedulers. By default, if no client is instantiated, Dask will turn on the local schedule. import dask.dataframe as dd
df = dd.read_csv(...)
df.x.sum().compute() # This uses the single-machine scheduler by default If you need more resources, dask.distributed will be needed to setup and connect to a distributed cluster. from dask.distributed import Client
client = Client(...) # Connect to distributed cluster and override default
df.x.sum().compute() # This now runs on the distributed system In the remainder of this paper, we will only consider Distributed Dask cluster. Nevertheless, you can also consider a local cluster on your laptop to test your workflow at small scale. More details can be found in the dask.distributed documentation. On the ULHPC platform, you have two strategies to create a Dask cluster:
First, we are going to setup a python virtual environment in order to install all required python libraries. Be sure to start with a bare environment:
Apply the following commands to setup your environment. # Clone tutorial repository
git clone https://github.com/ULHPC/tutorials.git
# cd into the scripts folder
cd tutorials/python/advanced/dask-ml/scripts
# Ask an interactive job
si --time=01:00:00
# Load python3 module (load by default Python3)
module load lang/Python
python -m venv dask_env_${ULHPC_CLUSTER}
source dask_env_${ULHPC_CLUSTER}/bin/activate
pip install --upgrade pip
pip install -r requirements.txt We first create a generic launcher #!/bin/bash -l
#SBATCH -p batch
#SBATCH -J DASK_jobs_workers
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=1
#SBATCH -t 00:30:00
# Load the python version used to install Dask
module load lang/Python
# Make sure that you have an virtualenv dask_env installed
export DASK_VENV="$1"
shift
if [ ! -d "${DASK_VENV}" ] || [ ! -f "${DASK_VENV}/bin/activate" ]; then
echo "Error with virtualenv" && exit 1
fi
# Source the python env
source "${DASK_VENV}/bin/activate"
python -u $* Below a small example to start workers as slurm jobs: from dask_jobqueue import SLURMCluster
from dask.distributed import Client
import dask
# Library to generate plots
import matplotlib as mpl
# Define Agg as Backend for matplotlib when no X server is running
mpl.use('Agg')
import matplotlib.pyplot as plt
import socket
import os
# Submit workers as slurm job
# Below we define the slurm parameters of a single worker
cluster = SLURMCluster(cores=os.environ.get("SLURM_CPUS_PER_TASK",1),
processes=1,
memory="4GB",
walltime="01:00:00",
queue="batch",
interface="ib0")
numworkers = os.environ("SLURM_NTASKS",1)
cluster.scale(numworkers)
# Connect to distributed cluster and override default
client = Client(cluster)
client.wait_for_workers()
# Decorator
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def double(x):
return x * 2
@dask.delayed
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
# Second approach as a delayed function
total = dask.delayed(sum)(output)
total.visualize(filename='task_graph.svg')
# parallel execution workers
results = total.compute()
print(results)
#### Very important ############
cluster.close() The Dask delayed function decorates your functions so that they operate lazily. Rather than executing your function immediately, it will defer execution, placing the function and its arguments into a task graph. You can execute the previous example with the following command: distributed.nanny - INFO - Start Nanny at: 'tcp://172.19.6.19:44324'
distributed.worker - INFO - Start worker at: tcp://172.19.6.19:37538
distributed.worker - INFO - Listening to: tcp://172.19.6.19:37538
distributed.worker - INFO - dashboard at: 172.19.6.19:39535
distributed.worker - INFO - Waiting to connect to: tcp://172.19.6.19:39227
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 3.73 GiB
distributed.worker - INFO - Local Directory: /mnt/irisgpfs/users/ekieffer/Dask/dask-worker-space/worker-p1ij_9ar
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://172.19.6.19:39227
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Stopping worker at tcp://172.19.6.19:37538
distributed.nanny - INFO - Worker closed
distributed.nanny - INFO - Closing Nanny at 'tcp://172.19.6.19:44324'
distributed.dask_worker - INFO - End worker In this part, we show how to start manually the dask-scheduler and how to spawn the workers. Hereafter, workers will not be created as new jobs but new steps inside a main job. We advise to use this workflow to avoid filling the scheduler queue. First, we create a new slurm launcher: #!/bin/bash -l
#SBATCH -p batch
#SBATCH -J DASK_steps_workers
#SBATCH -N 2
#SBATCH -n 10
#SBATCH -c 1
#SBATCH -t 00:30:00
# Load the python version used to install Dask
module load lang/Python
# Make sure that you have an virtualenv dask_env installed
export DASK_VENV="$1"
shift
if [ ! -d "${DASK_VENV}" ] || [ ! -f "${DASK_VENV}/bin/activate" ]; then
echo "Error with virtualenv" && exit 1
fi
# Source the python env
source "${DASK_VENV}/bin/activate"
# Dask configuration to store the scheduler file
DASK_CONFIG="${HOME}/.dask"
DASK_JOB_CONFIG="${DASK_CONFIG}/job_${SLURM_JOB_ID}"
mkdir -p ${DASK_JOB_CONFIG}
export SCHEDULER_FILE="${DASK_JOB_CONFIG}/scheduler.json"
# Number of tasks - 1 controller task - 1 python task
export NB_WORKERS=$((${SLURM_NTASKS}-2))
LOG_DIR="$(pwd)/logs/job_${SLURM_JOBID}"
mkdir -p ${LOG_DIR}
# Start scheduler on this first task
srun -w $(hostname) --output=${LOG_DIR}/scheduler-%j-workers.out --exclusive -N 1 -n 1 -c ${SLURM_CPUS_PER_TASK} \
dask-scheduler --scheduler-file "${SCHEDULER_FILE}" --interface "ib0" &
sleep 10
#srun: runs ipengine on each other available core
srun --output=${LOG_DIR}/ipengine-%j-workers.out \
--exclusive -n ${NB_WORKERS} -c ${SLURM_CPUS_PER_TASK} \
--cpu-bind=cores dask-worker \
--label \
--interface "ib0" \
--scheduler-file "${SCHEDULER_FILE}" &
sleep 25
srun --output=${LOG_DIR}/code-%j-execution.out --exclusive -N 1 -n 1 -c ${SLURM_CPUS_PER_TASK} python -u $*
** Remark ** The launcher below requests 10 tasks on 2 nodes with 1 cpu per task. This is NOT an efficient use of the hardware but only for educational purpose. Please always try to maximize nodes usage, i.e., 28 tasks max on iris, 128 max on aion or decrease and increase multithreading if possible. You may use To illustrate this manual setting, we are going now to scale XGBoost using Dask. XGBoost is an optimized gradient boosting library designed to be highly efficient, flexible and portable. Gradient boosted trees can be distributed by making Dask and XGBoost working together. XGBoost provides a powerful prediction framework, and it works well in practice. It wins Kaggle contests and is popular in industry because it has good performance and can be easily interpreted (i.e., it’s easy to find the important features from a XGBoost model).
Suppose we have access to Dask cluster with a set of workers. The first task is to install the xgboost library. pip install xgboost then create the following script from dask.distributed import Client
# Library to generate plots
import matplotlib as mpl
# Define Agg as Backend for matplotlib when no X server is running
mpl.use('Agg')
import matplotlib.pyplot as plt
import dask
import xgboost as xgb
import dask.array as da
import json
import os
data=[]
# Using the distributed shared file system, we can access to the Dask cluster
# configuration.
# We read the scheduler address and port from the scheduler file
with open(os.environ["SCHEDULER_FILE"]) as f:
data = json.load(f)
scheduler_address=data['address']
# Connect to the the cluster
client = Client(scheduler_address)
client.wait_for_workers()
# X and y must be Dask dataframes or arrays
num_obs = 1e5
num_features = 20
X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features))
y = da.random.random(size=(num_obs, 1), chunks=(1000, 1))
# Training
dtrain = xgb.dask.DaskDMatrix(client, X, y)
output = xgb.dask.train(
client,
{"verbosity": 2, "tree_method": "hist", "objective": "reg:squarederror"},
dtrain,
num_boost_round=10,
evals=[(dtrain, "train")],
)
booster = output['booster'] # booster is the trained model
history = output['history'] # A dictionary containing evaluation results
ax = xgb.plot_importance(booster, height=0.8, max_num_features=9)
ax.grid(False, axis="y")
ax.set_title('Estimated feature importance')
plt.savefig("importance.png")
# Stop Dask cluster
client.shutdown() You can try the manual setup example using the following command: Each worker sends computations to a thread in a concurrent.futures.ThreadPoolExecutor for computation. These computations occur in the same process as the Worker communication server so that they can access and share data efficiently between each other. For the purposes of data locality all threads within a worker are considered the same worker. If your computations are mostly numeric in nature (for example NumPy and Pandas computations) and release the GIL entirely then it is advisable to run dask-worker processes with many threads and one process. This reduces communication costs and generally simplifies deployment. If your computations are mostly Python code and don’t release the GIL then it is advisable to run dask-worker processes with many processes and one thread per process: $ dask-worker scheduler:8786 --nprocs 8 --nthreads 1 This will launch 8 worker processes each of which has its own ThreadPoolExecutor of size 1. If your computations are external to Python and long-running and don’t release the GIL then beware that while the computation is running the worker process will not be able to communicate to other workers or to the scheduler. This situation should be avoided. If you don’t link in your own custom C/Fortran code then this topic probably doesn’t apply. Dask can be used in combination with Jupyter to perform Parallel interactive computations. The #!/bin/bash -l
#SBATCH -p batch
#SBATCH -J DASK_JUPYTER
#SBATCH -N 2
#SBATCH -n 10
#SBATCH -c 1
#SBATCH -t 00:30:00
# Load the python version used to install Dask
module load lang/Python
# Export Environment variables
# Set a environement which depends on which cluster you wish to start the notebook
export VENV="$HOME/.envs/jupyter_dask_${ULHPC_CLUSTER}"
# Replace default jupyter and environement variable by custom ones
# We add to the path the jobid for debugging purpose
export JUPYTER_CONFIG_DIR="$HOME/jupyter/$SLURM_JOBID/"
export JUPYTER_PATH="$VENV/share/jupyter":"$HOME/jupyter_sing/$SLURM_JOBID/jupyter_path"
export JUPYTER_DATA_DIR="$HOME/jupyter/$SLURM_JOBID/jupyter_data"
export JUPYTER_RUNTIME_DIR="$HOME/jupyter/$SLURM_JOBID/jupyter_runtime"
# We create the empty directory
mkdir -p $JUPYTER_CONFIG_DIR
# The Jupyter notebook will run on the first node of the slurm allocation (here only one anyway)
# We retrieve its address
export IP_ADDRESS=$(hostname -I | awk '{print $1}')
# Dask configuration to store the scheduler file
export DASK_CONFIG="${HOME}/.dask"
export DASK_JOB_CONFIG="${DASK_CONFIG}/job_${SLURM_JOB_ID}"
mkdir -p ${DASK_JOB_CONFIG}
export SCHEDULER_FILE="${DASK_JOB_CONFIG}/scheduler.json"
# Minimal virtualenv setup
# We create a minimal virtualenv with the necessary packages to start
if [ ! -d "$VENV" ];then
echo "Building the virtual environment"
# Create the virtualenv
python3 -m venv $VENV
# Load the virtualenv
source "$VENV/bin/activate"
# Upgrade pip
python3 -m pip install pip --upgrade
# Install minimum requirement
python3 -m pip install dask[complete] matplotlib \
dask-jobqueue \
graphviz \
xgboost \
jupyter \
jupyter-server-proxy
# Setup ipykernel
# "--sys-prefix" install ipykernel where python is installed
# here next the python symlink inside the virtualenv
python3 -m ipykernel install --sys-prefix --name custom_kernel --display-name custom_kernel
fi
export XDG_RUNTIME_DIR=""
# Source the python env
source "${VENV}/bin/activate"
#create a new ipython profile appended with the job id number
echo "On your laptop: ssh -p 8022 -NL 8889:${IP_ADDRESS}:8889 ${USER}@access-${ULHPC_CLUSTER}.uni.lu "
# Start jupyter on a single core
srun --exclusive -N 1 -n 1 -c 1 -w $(hostname) jupyter notebook --ip ${IP_ADDRESS} --no-browser --port 8889 &
sleep 5s
# No real need to use srun here ....
# We should only be careful to call the jupyter executable where the
# notebook instance has been started
srun --exclusive -N 1 -n 1 -c 1 -w $(hostname) jupyter notebook list
srun --exclusive -N 1 -n 1 -c 1 -w $(hostname) jupyter --paths
srun --exclusive -N 1 -n 1 -c 1 -w $(hostname) jupyter kernelspec list
# Start scheduler on this first task
srun -w $(hostname) --exclusive -N 1 -n 1 -c 1 \
dask-scheduler --scheduler-file "${SCHEDULER_FILE}" --interface "ib0" &
sleep 10
# Number of tasks - 1 controller task - 1 jupyter task
export NB_WORKERS=$((${SLURM_NTASKS}-2))
#srun: runs ipengine on each other available core
srun --exclusive -n ${NB_WORKERS} -c 1 \
--cpu-bind=cores dask-worker \
--label \
--interface "ib0" \
--scheduler-file "${SCHEDULER_FILE}" &
wait
You can start dask+jupyter by issuing the following command: In your laptop: ssh -p 8022 -NL 8889:172.17.6.155:8889 [email protected]
[I 16:08:17.627 NotebookApp] Writing notebook server cookie secret to /home/users/ekieffer/jupyter/2540915/jupyter_runtime/notebook_cookie_secret
[I 16:08:18.485 NotebookApp] Serving notebooks from local directory: /mnt/irisgpfs/users/ekieffer/HPC_SCHOOL_2021/python/advanced/dask-ml/scripts
[I 16:08:18.485 NotebookApp] Jupyter Notebook 6.4.5 is running at:
[I 16:08:18.485 NotebookApp] http://172.17.6.155:8889/?token=a32b995ddd86e73eac5a4e9d20cbc9907ac52a5afb92c1d7
[I 16:08:18.485 NotebookApp] or http://127.0.0.1:8889/?token=a32b995ddd86e73eac5a4e9d20cbc9907ac52a5afb92c1d7
[I 16:08:18.485 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 16:08:18.490 NotebookApp]
To access the notebook, open this file in a browser:
file:///home/users/ekieffer/jupyter/2540915/jupyter_runtime/nbserver-117695-open.html
Or copy and paste one of these URLs:
http://172.17.6.155:8889/?token=a32b995ddd86e73eac5a4e9d20cbc9907ac52a5afb92c1d7
or http://127.0.0.1:8889/?token=a32b995ddd86e73eac5a4e9d20cbc9907ac52a5afb92c1d7
Currently running servers:
http://172.17.6.155:8889/?token=a32b995ddd86e73eac5a4e9d20cbc9907ac52a5afb92c1d7 :: /mnt/irisgpfs/users/ekieffer/HPC_SCHOOL_2021/python/advanced/dask-ml/scripts
config:
/home/users/ekieffer/jupyter/2540915/
/home/users/ekieffer/.envs/jupyter_dask_iris/etc/jupyter
/usr/local/etc/jupyter
/etc/jupyter
data:
/home/users/ekieffer/.envs/jupyter_dask_iris/share/jupyter
/home/users/ekieffer/jupyter_sing/2540915/jupyter_path
/home/users/ekieffer/jupyter/2540915/jupyter_data
/home/users/ekieffer/.envs/jupyter_dask_iris/share/jupyter
/usr/local/share/jupyter
/usr/share/jupyter
runtime:
/home/users/ekieffer/jupyter/2540915/jupyter_runtime
Available kernels:
custom_kernel /home/users/ekieffer/.envs/jupyter_dask_iris/share/jupyter/kernels/custom_kernel
python3 /home/users/ekieffer/.envs/jupyter_dask_iris/share/jupyter/kernels/python3
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://172.19.6.155:8786
distributed.scheduler - INFO - dashboard at: 172.19.6.155:8787
distributed.nanny - INFO - Start Nanny at: 'tcp://172.19.6.155:46160'
distributed.nanny - INFO - Start Nanny at: 'tcp://172.19.6.155:33160'
distributed.nanny - INFO - Start Nanny at: 'tcp://172.19.6.155:46866'
distributed.worker - INFO - Start worker at: tcp://172.19.6.155:37158
distributed.worker - INFO - Start worker at: tcp://172.19.6.155:41399
distributed.worker - INFO - Listening to: tcp://172.19.6.155:37158
distributed.worker - INFO - Start worker at: tcp://172.19.6.155:41713
distributed.worker - INFO - dashboard at: 172.19.6.155:37929
distributed.worker - INFO - Listening to: tcp://172.19.6.155:41713
distributed.worker - INFO - Waiting to connect to: tcp://172.19.6.155:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - dashboard at: 172.19.6.155:40746
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Waiting to connect to: tcp://172.19.6.155:8786 Use the foolwing ssh command: You should have now access to the |