Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changing default allocation size in channel/mod.rs #346

Open
ryzhyk opened this issue Dec 11, 2020 · 6 comments
Open

Changing default allocation size in channel/mod.rs #346

ryzhyk opened this issue Dec 11, 2020 · 6 comments

Comments

@ryzhyk
Copy link
Contributor

ryzhyk commented Dec 11, 2020

I am creating a large dataflow graph with ~10000 nodes. It works great, except that I noticed that the static memory footprint of the program (before I feed any data to it) keeps growing as the graph gets bigger. I think it is currently in the order of a 100MB. This does not sound too bad, but my application creates thousands of instances of the dataflow, at which point this overhead becomes significant, and in fact dominates the memory footprint of the program. It appears that the main contributor are the buffers of size Message::default() (currently equal to 1024), pre-allocated for all channels. Is there a way to change this default without forking the repo?

@frankmcsherry
Copy link
Member

There is not, but we could certainly look in to something smarter. Do you have a repro that is easy to share, or another way on hand to assess what effect a change would have? The "natural" thing to do would be to start small and scale up by 2x to some limit, which I've coded up before but never had the examples to test the trade-offs.

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Dec 11, 2020

I don't have a simple repro. I'm just looking at the heap profile generated by massif. I am going to create a fork of timely and differential and play with different values of this constant to evaluate the memory/performance tradeoff. I will share the findings here.

@frankmcsherry
Copy link
Member

frankmcsherry commented Dec 11, 2020

Some more thoughts (trying out a new default is a good idea, btw):

  1. The memory costs should be proportional to (the number of edges) and (the number of workers squared). In Naiad, we saw this as a problem because we were aimed at very high worker counts (because in the day, that was the exciting property to have). In the limit of increasing workers, having a per-(worker x worker) queue is not a great design; a better one would be to have one outbound queue per worker and then e.g. sort by destination worker.

    Do you have increasing numbers of workers? Or pretty boring small numbers of workers?

  2. If not sure what you mean by "thousands of instances of the dataflow", but my immediate reaction is that you might want to multiplex the thousand instances over a single dataflow, where the data are differentiated by a field/column indicating which of the thousand instances that record derives from. Joins and such should include this identifier column to avoid logical interference between instances. This would allow you to amortize the fixed costs of the dataflows across the thousand logical instances.

    Does it seem likely that your thousands of logical instances could use the same physical dataflow, or was that wishful thinking / intentional mis-interpretation of your comment? :D

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Dec 11, 2020

Thanks, @frankmcsherry !

  1. Only one worker, but multiplied by thousands of instances.

  2. Your interpretation is correct, we could definitely organize the whole thing as one dataflow with an extra field in each collection to identify the tenant the record belongs to. The main reason we're not doing that is scalability. Our app currently scales near-linearly with the number of cores, as dataflow instances run in a share-nothing fashion. Note that partitioning dataflows in groups per-core wouldn't necessarily achieve the same due to unpredictable and bursty load patterns.

    Security and simplicity are secondary reasons: as long as each tenant has their own dataflow, we don't have to worry about accidentally sending data across tenants, e.g., by forgetting to specify the tenant id field in a rule. In fact, we are even considering running each dataflow instance in a separate process for added security.


I've so far only confirmed that reducing the parameter to 128 does reduce our memory footprint significantly (at least according to valgring --tool=massif). I will do more experiments today. What sort of performance impact would you expect this to have?

Also, regarding the plan to grow the buffer on-demand, I wonder if it should also shrink when unused to avoid gradually running out of memory over time.

@frankmcsherry
Copy link
Member

The Naiad take on things was that when an operator was not in use it would de-allocate its send buffers. This is worse from a steady state operation point of view, but cuts the standing memory costs of inert dataflows. So, plausibly the buffers grow geometrically each time they fill, and then as part of flushing the channel are wholly de-allocated (starting the geometric growth from scratch the next time the operator is scheduled).

But, I need to look in to things and see which of these signals are clearest to send.

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Dec 11, 2020

I did some more measurements. I see no performance degradation with default buffer size 128. This is probably very specific to my use case that tends to deal with small batches of large objects. While an adaptive buffer size would be awesome, it looks like the ability to set a fixed but user-defined buffer size through the API would do the trick for me. For the time being, I am using a private fork of the repo where I changed the constant to 128.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants