Skip to content

fix: Incorrect memory accounting in array_agg function #16519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,20 @@ impl Accumulator for ArrayAggAccumulator {
Some(values) => {
// Make sure we don't insert empty lists
if !values.is_empty() {
self.values.push(values);
// The ArrayRef might be holding a reference to its original input buffer, so
// storing it here directly copied/compacted avoids over accounting memory
// not used here.
self.values
.push(make_array(copy_array_data(&values.to_data())));
}
Comment on lines +344 to 349
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I'm not sure if this will solve the issue. Keep in mind that the merge_batch method argument receives the states of other accumulators, which already hold "compacted" data, so I'd expect this compaction here to be unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add a special case to copy_array_data to avoid copying the data when it already is only a single row / has no offset 🤔

Right now it seems to copy the data unconditionally which is a non trivial overhead on each row 🤔

pub fn copy_array_data(src_data: &ArrayData) -> ArrayData {
let mut copy = MutableArrayData::new(vec![&src_data], true, src_data.len());
copy.extend(0, 0, src_data.len());
copy.freeze()

Perhaps we can do that as a follow of PR

}
None => {
for arr in list_arr.iter().flatten() {
self.values.push(arr);
// The ArrayRef might be holding a reference to its original input buffer, so
// storing it here directly copied/compacted avoids over accounting memory
// not used here.
self.values
.push(make_array(copy_array_data(&arr.to_data())));
}
}
}
Expand Down Expand Up @@ -728,7 +736,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
mod tests {
use super::*;
use arrow::array::{ListBuilder, StringBuilder};
use arrow::datatypes::{FieldRef, Schema};
use arrow::datatypes::{FieldRef, Schema, UInt64Type};
use datafusion_common::cast::as_generic_string_array;
use datafusion_common::internal_err;
use datafusion_physical_expr::expressions::Column;
Expand Down Expand Up @@ -994,6 +1002,34 @@ mod tests {
Ok(())
}

#[test]
fn does_not_over_account_memory_for_merge() -> Result<()> {
let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?;

let a1 = ListArray::from_iter_primitive::<UInt64Type, _, _>(vec![
Some(vec![Some(0), Some(1), Some(2)]),
Some(vec![Some(3)]),
None,
Some(vec![Some(4)]),
]);
let a2 = ListArray::from_iter_primitive::<UInt64Type, _, _>(vec![
Some(vec![Some(0), Some(1), Some(2)]),
Some(vec![Some(3)]),
None,
Some(vec![Some(4)]),
]);

acc1.merge_batch(&[Arc::new(a1.slice(0, 1))])?;
acc2.merge_batch(&[Arc::new(a2.slice(0, 1))])?;

acc1 = merge(acc1, acc2)?;
Comment on lines +1022 to +1025
Copy link
Contributor

@gabotechs gabotechs Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The merge_batch functions do not receive arbitrary data, it receives the results of calling state() in other accumulators. A fairer test would be to do something like:

Suggested change
acc1.merge_batch(&[Arc::new(a1.slice(0, 1))])?;
acc2.merge_batch(&[Arc::new(a2.slice(0, 1))])?;
acc1 = merge(acc1, acc2)?;
acc1.update_batch(&[Arc::new(a1.slice(0, 1))])?;
acc2.update_batch(&[Arc::new(a2.slice(0, 1))])?;
acc1 = merge(acc1, acc2)?;

With this, you would notice that the test result is the same regardless of the changes in this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// without compaction, the size is 16812.
assert_eq!(acc1.size(), 556);

Ok(())
}

#[test]
fn does_not_over_account_memory() -> Result<()> {
let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?;
Expand Down