Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizing reduction #331

Closed
Tracked by #339
tomwhite opened this issue Nov 27, 2023 · 4 comments
Closed
Tracked by #339

Optimizing reduction #331

tomwhite opened this issue Nov 27, 2023 · 4 comments
Labels
help wanted Extra attention is needed optimization

Comments

@tomwhite
Copy link
Member

The way that reduction currently works is illustrated here: https://tom-e-white.com/cubed/operations.html#reduction-and-arg-reduction. To summarize, each chunk in the input has a reduce function applied to it (with one task per chunk), the results are written to a new Zarr file, which then has merge_chunks called on it to combine into new chunks. This procedure is repeated until there is a single chunk in the reduction axis.

As suggested in #284, we could do much better by having one task process more than one chunk at once. This actually happens during subsequent rounds after the merge_chunks operation, but this is inefficient since the number of chunks that are merged is limited by the available memory since all the chunks are loaded into memory at once. If instead a task read chunks sequentially and called the reduce function before reading the next one, then the number of chunks processed by that task would not be limited by memory. This would also have the effect of making the number of chunks loaded by each task the same, which would make rounds of the reduction more uniform, and hence more predictable. In practice it would allow more chunks to be loaded per task, which would mean fewer rounds, and less data transferred. (It would be interesting to quantify this.)

This change would make the reduction algorithm more similar to tree reduce in Dask.

It would be worth comparing the effect of this change on performance with the existing reduction algorithm in Cubed for matrix multiplication, and Quadratic Means.

@TomNicholas
Copy link
Collaborator

This sounds interesting :)

As suggested in #284, we could do much better by having one task process more than one chunk at once.

If instead a task read chunks sequentially and called the reduce function before reading the next one, then the number of chunks processed by that task would not be limited by memory.

Would we be sacrificing some parallelism to do this?

@TomNicholas
Copy link
Collaborator

TomNicholas commented Dec 11, 2023

Some notes from meeting with @tomwhite (and @saulshanabrook) today, in which I clarify my understanding:

  1. This suggestion is basically a tree_reduce for reductions

We're suggesting that reductions be implemented using two distinct types of reduce step. Type (a) loads as many chunks as will fit in RAM, aggregates them, writes results to disk, then re-opens >1 of the new smaller chunks with a new container and repeats. The other type (b) opens more chunks than will fit in RAM, loads one at a time, sequentially aggregating each one as it is loaded. (Both of these have the bounded-memory property.)

A reduction which uses two types of aggregation in alternating sequence like this is a "tree-reduce" (implemented in dask as dask.array._tree_reduce). Cubed doesn't currently have a tree-reduce primitive, but adding one might also simplify integration with flox (see xarray-contrib/flox#224) (cc @dcherian).

  1. There is a trade-off to optimize

Each time the type (a) reduction above is used it is totally embarrassingly parallel, but incurs writing and reading from persistent storage once. Each time the type (b) reduction is used it's linear in time for increasing number of chunks but does not require persistent storage (so to answer my question above this operation does sacrifice parallelism).

Performing the entire reduction with only (b) would not be parallel at all. Performing it only with (a) misses opportunities to fuse (see #284) and therefore remove rounds of writing to persistent storage. Another way of saying this is that there is some number of chunks N where sequentially opening and aggregating the N chunks in serial is faster than the IO cost of another round of writing to and reading all N chunks from persistent storage in parallel.

Therefore there is some optimal "elbow" in the plot of time taken vs how much of the overall reduction is performed using (a) vs (b).

  1. The Primula paper discusses this exact trade-off

For a given type of aggregation, the optimal number of chunks to be loaded sequentially by each task N (the "optimal fan-in parameter") should only be a function of the CPU processing speed, the time taken to complete a round of IO (i.e. the IO throughput speed and and the time taken to obtain a new serverless worker), and possibly the chunk size?. As long as these are all known before the step of the computation starts, we can just set it to the optimal value.

In the Primula paper (see #326) they do an equivalent analysis in the case of a serverless full shuffle (with the aggregation being a sort operation). They have a simple conceptual model for estimating what the optimal value of N should be, which we might be able to re-use.

In their case they find that optimizing N reduces overall running time by about a factor of 2 (assuming I'm reading figure 2 correctly). Interestingly figure 2 seems to imply that their predictive model of N actually underestimated the speedup that that they observed empirically.

@tomwhite
Copy link
Member Author

@TomNicholas thanks for writing up the discussion!

@tomwhite
Copy link
Member Author

This was implemented in #368 and made the default in #412. I've opened #418 for improving the default parameters (the trade off discussed above).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed optimization
Projects
None yet
Development

No branches or pull requests

2 participants