-
Notifications
You must be signed in to change notification settings - Fork 1.5k
fix: Incorrect memory accounting in array_agg
function
#16519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: Incorrect memory accounting in array_agg
function
#16519
Conversation
d4a69ce
to
1da3e04
Compare
// The ArrayRef might be holding a reference to its original input buffer, so | ||
// storing it here directly copied/compacted avoids over accounting memory | ||
// not used here. | ||
self.values | ||
.push(make_array(copy_array_data(&values.to_data()))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I'm not sure if this will solve the issue. Keep in mind that the merge_batch
method argument receives the states of other accumulators, which already hold "compacted" data, so I'd expect this compaction here to be unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should add a special case to copy_array_data to avoid copying the data when it already is only a single row / has no offset 🤔
Right now it seems to copy the data unconditionally which is a non trivial overhead on each row 🤔
datafusion/datafusion/common/src/scalar/mod.rs
Lines 3564 to 3567 in a87d6f2
pub fn copy_array_data(src_data: &ArrayData) -> ArrayData { | |
let mut copy = MutableArrayData::new(vec![&src_data], true, src_data.len()); | |
copy.extend(0, 0, src_data.len()); | |
copy.freeze() |
Perhaps we can do that as a follow of PR
acc1.merge_batch(&[Arc::new(a1.slice(0, 1))])?; | ||
acc2.merge_batch(&[Arc::new(a2.slice(0, 1))])?; | ||
|
||
acc1 = merge(acc1, acc2)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The merge_batch
functions do not receive arbitrary data, it receives the results of calling state()
in other accumulators. A fairer test would be to do something like:
acc1.merge_batch(&[Arc::new(a1.slice(0, 1))])?; | |
acc2.merge_batch(&[Arc::new(a2.slice(0, 1))])?; | |
acc1 = merge(acc1, acc2)?; | |
acc1.update_batch(&[Arc::new(a1.slice(0, 1))])?; | |
acc2.update_batch(&[Arc::new(a2.slice(0, 1))])?; | |
acc1 = merge(acc1, acc2)?; |
With this, you would notice that the test result is the same regardless of the changes in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: I'm seeing that there's cases where the datafusion/datafusion/physical-plan/src/aggregates/no_grouping.rs Lines 232 to 239 in 9278233
Which means that probably we do want compaction to happen also in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good for a review cc @alamb
Thanks for the code and review @gabotechs and @sfluor |
There appears to be an array_agg benchmark -- I will run that on this PR to see what it shows |
This comment was marked as outdated.
This comment was marked as outdated.
1 similar comment
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
🤖 |
🤖: Benchmark completed Details
|
🤔 the benchmarks show a significant regression in performance (10x in some cases) I think we need to resolve that prior to merging this in We have some documentation on how to profile here: https://datafusion.apache.org/library-user-guide/profiling.html The benchmarks can be run locally like this cargo bench --bench array_agg I think it might be worth explroing: #16519 (comment) |
Which issue does this PR close?
See this issue: #16517
array_agg
function #16517.What changes are included in this PR?
Fixes the over-accounting in
array_agg
functionsAre these changes tested?
Added a test that shows the problem.
Are there any user-facing changes?
Cloning the data might lead to slightly higher "real" memory usage.