-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OOMs on seemingly simple shuffle job: mem usage greatly exceeds --memory-limit #2456
Comments
First, thank you for the detailed writeup. As for the core cause, my first guess is that this is due to inaccurate accounting between estimating the size of the pandas dataframes and determining their true cost in memory. This often happens when we have custom objects that are large but don't implement the
This provides a different possible answer. The second half of the job tends to be communication heavy, so maybe most of our data use isn't in storing data, but rather in bytes that are in flight. Dask doesn't account for bytes in active use by user code or the network layer. These bytes are much harder for us to control. Questions
|
Answer 3
Hmm, no good reason—thanks for the heads up! I've switched DASK_COMPRESSION=lz4 dask-worker --nprocs 1 --nthreads 1 --memory-limit=4e9 --no-nanny <scheduler-url> I also went ahead and upgraded dask/distributed/fastparquet to the latest masters, at the risk of changing my repros in subtle ways. Still seeing OOMs, but fwiw the job seems to be failing faster than before (based just on the 2 trials below). $ python --version
Python 3.6.0
$ cat requirements.txt | egrep 'dask|distributed|fastparquet'
git+https://github.com/dask/dask.git@d0aa50a
git+https://github.com/dask/distributed.git@312a41a
git+https://github.com/dask/[email protected] Answer 1
All text, numbers, and timestamps, and no custom python objects. Here's the distribution of types per column (206 columns total), in terms of the count of columns that are comprised of a given set of types, which there are only 8 of: >>> df['types'].value_counts()
NoneType, str 100
float 67
NoneType, list[], list[str] 16
str 9
NaTType, Timestamp 9
Timestamp 3
list[], list[str] 2
int 1
Name: types, dtype: int64 And here's a more detailed unpacking of each column with its mean size per value: >>> df.sort_values('mean_size', ascending=False)
mean_size types
col_130 500.707 NoneType, str
col_129 161.355 NoneType, str
col_45 149.221 NoneType, str
col_27 104.002 NaTType, Timestamp
col_102 104.002 Timestamp
col_122 104.002 NaTType, Timestamp
col_54 104.002 NaTType, Timestamp
col_120 104.002 NaTType, Timestamp
col_53 104.002 NaTType, Timestamp
col_127 104.002 NaTType, Timestamp
col_88 104.002 Timestamp
col_173 104.002 NaTType, Timestamp
col_110 104.002 NaTType, Timestamp
col_109 104.002 Timestamp
col_32 104.002 NaTType, Timestamp
col_82 98.263 NoneType, list[], list[str]
col_111 93.002 str
col_2 93.002 str
col_157 82.367 NoneType, str
col_144 78.460 list[], list[str]
...
col_66 32.002 float
col_163 32.002 float
col_160 32.002 float
col_158 32.002 float
col_63 32.002 float
col_75 32.002 float
col_65 32.002 float
col_103 32.002 float
col_152 32.002 float
col_150 32.002 float
col_68 32.002 float
col_69 32.002 float
col_159 30.155 NoneType, str
col_162 29.534 NoneType, str
col_200 28.546 NoneType, str
col_161 28.368 NoneType, str
col_164 27.727 NoneType, str
col_180 27.037 NoneType, str
col_183 24.971 NoneType, str
col_175 24.002 NoneType, str Answer 2
Hmm, I ran two more trials and collected some (partial) data from some of the worker
Questions
|
FYI I am out this week with limited connectivity. I may not respond to
this for a week. My apologies for the lack of response. (these are great
figures by the way and I'm sure will be of great use shortly)
…On Tue, Jun 20, 2017 at 3:23 AM, Dan Brown ***@***.***> wrote:
Another datapoint, which also answers my 3rd question:
- Got no OOMs by doubling mem and ~halving workers (meant to get 8 but
only got 7)
params outcome task counts task aftermath sys metrics
238 parts in
128 parts out
4g mem limit
16g mem
7 workers 0 OOMs
28m
success [image: tasks 4g 128 x]
<https://user-images.githubusercontent.com/627486/27321167-069a6234-554e-11e7-9e6e-f9540fea6cc5.png> [image:
dask 4g 128 x]
<https://user-images.githubusercontent.com/627486/27321221-3e1ce1e6-554e-11e7-9f2e-c16879deb331.png> [image:
datadog 4g 128 x]
<https://user-images.githubusercontent.com/627486/27321168-069a87e6-554e-11e7-8fe8-39e4b915b434.png>
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#2456 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszITtFuEe-qaG7H2t930CLtixdHDvks5sF3NUgaJpZM4N65rg>
.
|
Another unsuccessful approach at solving the "4g mem limit, 8g mem, 16 workers" case:
|
Hey, sorry for the delay in getting to this. It dropped off of my radar in the confusion of vacation two weeks ago. The fact that the worker-memory-plot (upper left) is red says that Dask knows that its workers are out of memory. The workers are actively sending things to disk, they're just not sending things fast enough to avoid OOM-ing. We should probably have a separate coroutine running that tracks used memory using Something that would be interesting to try is a shuffle on purely numeric data import dask.array as da
import dask.dataframe as dd
x = da.random.randint(0, 100000, size=(N, 10), chunks=(100000, 10))
df = dd.from_dask_array(x)
df2 = df.set_index(0).persist() I would set |
@jdanbrown Indeed, if you run workers as replicas in Kubernetes, each of the workers by default gets the host stats. In my case I run 10 workers, and I get my HOST RAM * 10, which is wrong. My current understanding is that dask should support docker metrics https://docs.docker.com/engine/admin/runmetrics/ , i.e. use docker API to pull the resource consumption by the container, and not the host. But I did not have time to implement this yet. |
psutil uses standard system tools, which get the wrong answers within a container. |
(note that dask-kubernetes always passes |
@mrocklin No worries, and sorry for the delay on my end as well—been busy juggling multiple projects.
# Pick params as per table above
cols = ...
part_rows = ...
nparts = ...
import dask.dataframe as dd
import distributed
from dask import delayed
client = distributed.Client(...)
# Force workers to spill to disk for every key write
# - Code ref: distributed/worker.py:119 -> zict/buffer.py
def worker_disable_mem_buffer(worker):
if hasattr(worker.data, 'slow'):
worker._data_orig = worker.data
worker.data = worker.data.slow
return str(worker.data)
print(client.run(lambda: [
worker_disable_mem_buffer(worker()) for worker in distributed.worker._global_workers
]))
# Synthesize a ddf of random ints
max_int = 1024**2
ddf = dd.from_delayed(meta={col: 'int64' for col in range(cols)}, dfs=[
delayed(lambda: pd.DataFrame({
col: np.random.randint(0, max_int, part_rows)
for col in range(cols)
}))()
for _ in range(nparts)
])
# Compute a shuffle on a column of random ints
# - Specify divisions so (a) we only compute once and (b) we ensure balanced tasks
divisions = {
n: list(range(0, max_int, max_int // n)) + [max_int]
for n in [2**i for i in range(13)]
}
ddf2 = ddf.set_index(0, divisions=divisions[nparts])
print(ddf2.count().compute()) |
@martindurant Yep, in my k8s docker container |
@mrocklin Any thoughts on my most recent comment above (#2456 (comment))? It seems to indicate this is a task-shuffle issue and not a sizeof issue. More generally, my team is looking for ways to safely shuffle datasets that are bigger than total worker ram. Our main pipeline is an iterative model fit on training data that easily partitions across workers' ram and works great with dask, but to get that training data we have to go through a series of more traditional ETL munging steps that require shuffles ( Any pointers or suggestions? Are we too far off the happy path of well tread use cases? We'd love to be able to make dask a go-to tool for our broader data team. |
Sorry for the lack of response on this. I'll try to take a look at this either Tuesday or Wednesday this week. |
Looking at this again, it looks like you may be running into issues similar to @bluenote10 You might find the following issues and PRs of interest: |
Those issues point to a possible narrative that Pandas and the Python garbage collector sometimes leave data lying around for longer than they should. Periodically calling Another possibility is that the data that is in-flight is large enough to push things over the edge. Dask isn't able to account for this data. I wouldn't expect this to be that large though. |
Hmm, I think this issue is separate from all 4 of the issues you linked since I was able to remove Here's a summary of the thread above, since it's a long one:
Thanks @mrocklin! |
I don't think this has to do with psutil reporting wrong metrics. Both of the explanations above I think remain possible causes. The discussion on zict shows that pandas can leave some data lying around. My early attempt to run your example above behaved fine (stayed well below memory limits) but I'll try other parameters. This may not happen today though, I'm finishing up some vacation time and flying home today. |
You might try an equivalent computation with dask.array. A rechunk operation has the same communcation pattern as a dask.dataframe shuffle. However rather than using pandas it will use numpy, which tends to behave a little nicer. This would help to isolate the problem between dask book keeping and pandas+python book-keeping. |
x = da.random.random((N, N), chunks=(N, 1))
y = x.rechunk((1, N)).persist() |
I'm starting to think that there may be multiple effects involved. Note that the GC issues are mainly related to However, the GC issue alone also does not fully explain the behavior. I'm running with a patched version that performs explicit GC, which has lowered the memory usage on the workers significantly. However, it is still not enough to fully solve the problem. The following plots show RSS over time for 75 workers using a 2 GB The majority of the workers are at around ~2 * memory_limit for most of the time. However some individual workers show very sudden jumps in their memory usage, which eventually makes them go OOM: The algorithm performs the same logic in a loop (~15 minutes), so nothing really changes over time. Chunk sizes are below 50 MB, no rebalancing, no workers added/removed -- so it's hard to explain the sudden jumps/ramps in the memory usage. For comparison, many workers do behave as expected showing a memory profile like this: |
@bluenote10 are you able to correlate the jumps with any particular event occurring in the worker logs? Do you have any suspicions about what causes these jumps? |
@bluenote10 I'm also curious to know if anything changes if you disable work stealing. I've identified a small leak there. I wouldn't expect it to result in sharp spikes like what you're seeing here, but it would be interesting to see if there is any difference. See dask/distributed#1325 for how to disable work stealing (either by using the config option or just deleting the right line of code.) |
@mrocklin I haven't had time to test your dask.array hypothesis yet, but I'm very curious to. In the meantime, what's the setup you used to try my dask.dataframe repro? Above you said:
and I'm curious to see if I can repro your good behavior on my end. |
@mrocklin Check it out—I now have an OOM repro for array, bag, and dataframe:
Eager to hear your thoughts! Here's the summary (copied from the repo's README): Experimental setup
Resultsoom_ddf.py
oom_array.py
oom_bag.py
|
Cool. I really appreciate the work here. I'm looking forward to taking a
look at this. As a disclaimer I'm decently slammed this week but hope to
have some time next week.
…On Mon, Sep 11, 2017 at 2:50 PM, Dan Brown ***@***.***> wrote:
@mrocklin <https://github.com/mrocklin> Check it out—I now have an OOM
repro for array, bag, and dataframe:
- All three OOM when total data volume roughly approaches or exceeds
total worker ram, like my various ddf repros above
- I dockerized everything so it's hopefully easy to repro on your end
- I packaged it up in a repo: https://github.com/jdanbrown/dask-oom
Eager to hear your thoughts! Here's the summary (copied from the repo's
README <https://github.com/jdanbrown/dask-oom>):
Experimental setup
- Latest stable dask+distributed versions (requirements.txt
<https://github.com/jdanbrown/dask-oom/blob/master/requirements.txt>):
- dask==0.15.2
- distributed==1.18.3
- Use local docker containers to make a reproducible and portable
distributed environment
- Worker setup (docker-compose.yml
<https://github.com/jdanbrown/dask-oom/blob/master/docker-compose.yml>):
- 4 workers @ 1g mem + no swap (4g total worker ram)
- Default --memory-limit (each worker reports 0.584g)
- Limited to 1 concurrent task per worker, to minimize mem
contention and oom risk
- For each of ddf, dask array, and dask bag:
- Run a simple shuffle operation and test whether the operation
succeeds or OOMs
- Test against increasing data volumes, from much smaller than
total worker ram to larger than total worker ram
- Try to keep partition sizes below ~10-15m, to control for oom
risk from large partitions
Results oom_ddf.py
<https://github.com/jdanbrown/dask-oom/blob/master/oom_ddf.py>
params ddf_bytes part_bytes runtime success/OOM?
cols=10 part_rows=157500 nparts=64 .75g 12m 00:08 success
cols=10 part_rows=157500 nparts=128 1.5g 12m ~00:20 usually OOM,
sometimes success
cols=10 part_rows=157500 nparts=256 3g 12m ~00:20 OOM
cols=10 part_rows=157500 nparts=512 6g 12m ~00:30 OOM oom_array.py
<https://github.com/jdanbrown/dask-oom/blob/master/oom_array.py>
params da_bytes chunk_bytes chunk_n chunk_shape runtime success/OOM?
sqrt_n=64 128m 2m 64 (4096, 64) 00:01 success
sqrt_n=96 648m 6.8m 96 (9216, 96) 00:03 success
sqrt_n=112 1.2g 11m 112 (12544, 112) 00:05 success
sqrt_n=120 1.5g 13m 120 (14400, 120) ~00:15 usually OOM, rare success
sqrt_n=128 2g 16m 128 (16384, 128) ~00:10 OOM oom_bag.py
<https://github.com/jdanbrown/dask-oom/blob/master/oom_bag.py>
- Much slower than ddf and array, since bag operations are
bottlenecked by more python execution
params bag_bytes part_bytes runtime success/OOM?
nparts=2 25m 12m 00:00:08 success
nparts=4 49m 12m 00:00:14 success
nparts=8 98m 12m 00:00:24 success
nparts=32 394m 12m 00:01:53 success
nparts=64 787m 12m 00:07:46 success
nparts=128 1.5g 12m 00:17:40 success
nparts=256 3.1g 12m 00:37:09 success
nparts=512 6.2g 12m 01:16:30 OOM (first OOM ~57:00, then ~4 more OOMs
before client saw KilledWorker)
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#2456 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszEhWGGFroXH1g2ftE2pRxm4NlpHiks5shYD2gaJpZM4N65rg>
.
|
Note that there has been some good work in avoiding memory leaking when splitting many small pandas dataframes recently. You might want to try out the master branch. |
I downloaded and reran |
If you have time to investigate this @Spacerat that would be most welcome |
Although I would like to, unfortunately I do not have enough time right now; for us it was quicker to just port our problematic pipeline to use Go & some command-line tools. (Dask was probably not the right fit anyway.) |
Having the same issues on dask version 2.1.0. Any plans to fix this issue? |
I suspect that the comments above summarize the current state of things. Is this something that you wanted to volunteer for @denisvlah ? |
For those still encountering this issue, an interesting exercise may be to try Dask + Distributed 2.21.0 on Python 3.8 (or an earlier version with It's worth noting this may not fix the problem completely. There may still be other issues here. However it would be interesting to see what issues remain, what those issues look like, and what impact they have. |
This issue seems pretty old but I think I've experienced the same behaviour (OOM) with the latest version of dask (2021.02.1) while trying to perform a set_index on a column from an input dataset. I've used the MemorySampler to see how the memory usage looks like. In blue, only one worker; no errors; I'm not sure why but it seems that I do not have issues with the dask 2.22.0 |
@odovad Can you please provide a minimal reproducer for your example? Thank you! |
Summary
--memory-limit
for the first ~1/2 of the job and then large variance for the second ~1/2 of the job, during which time OOMs start happening (plots below).--memory-limit
(4gb/8gb -> 2gb/8gb) and I did observe many fewer OOMs, but still 1 OOM, and moreover 2gb/8gb impedes our ability to persist data later in this pipeline for an iterative ML algo so this isn't a feasible solution.--memory-limit
? Or maybe I'm just making a dumb user error somewhere that's easy to fix?Setup
DASK_COMPRESSION=zlib dask-worker --nprocs 1 --nthreads 1 --memory-limit=4e9 --no-nanny <scheduler-url>
238 parts
128 parts
Trials
num_parts_out=128
with--memory-limit=4e9
, which fails a lot of the time but actually succeeded twice with many OOMs and long runtimesnum_parts_out=512
, but saw a similar frequency of OOMs and killed the job--memory-limit=2e9
but still saw 1 OOM (and thus some amount of repeated work)128 parts out
4g mem limit
111m
success
128 parts out
4g mem limit
47m
success
512 parts out
4g mem limit
gave up early
128 parts out
2g mem limit
56m
success
Versions
The text was updated successfully, but these errors were encountered: