- Repro for dask/dask#2456
- Investigate why a simple shuffle OOMs when total data exceeds total worker ram
- Latest stable dask+distributed versions (requirements.txt):
dask==0.15.2
distributed==1.18.3
- Use local docker containers to make a reproducible and portable distributed environment
- Worker setup (docker-compose.yml):
- 4 workers @ 1g mem + no swap (4g total worker ram)
- Default
--memory-limit
(each worker reports 0.584g)
- Limited to 1 concurrent task per worker, to minimize mem contention and oom risk
- For each of ddf, dask array, and dask bag:
- Run a simple shuffle operation and test whether the operation succeeds or OOMs
- Test against increasing data volumes, from much smaller than total worker ram to larger than total worker ram
- Try to keep partition sizes below ~10-15m, to control for oom risk from large partitions
params |
ddf_bytes |
part_bytes |
runtime |
success/OOM? |
cols=10 part_rows=157500 nparts=64 |
.75g |
12m |
00:08 |
success |
cols=10 part_rows=157500 nparts=128 |
1.5g |
12m |
~00:20 |
usually OOM, sometimes success |
cols=10 part_rows=157500 nparts=256 |
3g |
12m |
~00:20 |
OOM |
cols=10 part_rows=157500 nparts=512 |
6g |
12m |
~00:30 |
OOM |
params |
da_bytes |
chunk_bytes |
chunk_n |
chunk_shape |
runtime |
success/OOM? |
sqrt_n=64 |
128m |
2m |
64 |
(4096, 64) |
00:01 |
success |
sqrt_n=96 |
648m |
6.8m |
96 |
(9216, 96) |
00:03 |
success |
sqrt_n=112 |
1.2g |
11m |
112 |
(12544, 112) |
00:05 |
success |
sqrt_n=120 |
1.5g |
13m |
120 |
(14400, 120) |
~00:15 |
usually OOM, rare success |
sqrt_n=128 |
2g |
16m |
128 |
(16384, 128) |
~00:10 |
OOM |
- Much slower than ddf and array, since bag operations are bottlenecked by more python execution
params |
bag_bytes |
part_bytes |
runtime |
success/OOM? |
nparts=2 |
25m |
12m |
00:00:08 |
success |
nparts=4 |
49m |
12m |
00:00:14 |
success |
nparts=8 |
98m |
12m |
00:00:24 |
success |
nparts=32 |
394m |
12m |
00:01:53 |
success |
nparts=64 |
787m |
12m |
00:07:46 |
success |
nparts=128 |
1.5g |
12m |
00:17:40 |
success |
nparts=256 |
3.1g |
12m |
00:37:09 |
success |
nparts=512 |
6.2g |
12m |
01:16:30 |
OOM (first OOM ~57:00, then ~4 more OOMs before client saw KilledWorker ) |
# Setup conda/pip env
conda create -y --name dask-oom --file conda-requirements.txt
source activate dask-oom
pip install -r requirements.txt
# Build docker image named dask-oom-local, for docker-compose.yml
# - Need to rebuild if you change *requirements.txt
# - Don't need to rebuilt if you only change *.py
docker build . -t dask-oom-local
# Launch 4 workers + 1 scheduler defined in docker-compose.yml
docker-compose up -t0 -d --scale={worker=4,scheduler=1}
# Run each repro
DASK_SCHEDULER_URL=localhost:8786 python oom_ddf.py cols=... part_rows=... nparts=...
DASK_SCHEDULER_URL=localhost:8786 python oom_array.py sqrt_n=...
DASK_SCHEDULER_URL=localhost:8786 python oom_bag.py nparts=...
# Restart workers, leave scheduler as is
docker-compose up -t0 -d --scale={worker=0,scheduler=1} && docker-compose up -t0 -d --scale={worker=4,scheduler=1}
# Restart workers + scheduler, in case scheduler state gets dirty
docker-compose up -t0 -d --scale={worker=0,scheduler=0} && docker-compose up -t0 -d --scale={worker=4,scheduler=1}
# Tail logs of containers launched from docker-compose.yml
docker-compose logs -f
# top for containers, to monitor resource usage
ctop
# List/kill running docker containers
docker ps
docker kill ...