-
Notifications
You must be signed in to change notification settings - Fork 990
feat: Support exact size config for BatchCoalescer #8112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
FYI @alamb , we can trigger benchmark for this PR, i setting the benchmark to use exact size false which is consistent with original datafusion implementation. |
And now we reserve target_batch_size, but for non exact size logic, i am thinking how we can improve it. It may some batch size is huge compare target_batch_size. It may affect performance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice @zhuqi-lucas -- I left a suggestion - let me know what you think
// If we have reached the target batch size, finalize the buffered batch | ||
if self.buffered_rows >= self.target_batch_size { | ||
self.finish_buffered_batch()?; | ||
// If we've reached or exceeded target, emit the whole buffered set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we go over the target size, I think it means the underlying storage will reallocate (and thus copy the data)
I think the more performant way to do this is if adding num_rows
to the output would go over target_rows
, emit early (even though some of the allocated space is not yet used)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @alamb , thank you for review and good suggestion!
I am trying with the patch to do this based current PR, but the performance not getting better, the best performance from benchmark is still the exact size batch emit. 🤔
diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index be2bbcafb6..93bdf86a2d 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -100,16 +100,23 @@ use primitive::InProgressPrimitiveArray;
/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
///
-/// // Non-strict: produce batch once buffered >= target, batch may be larger than target
+/// // Non-strict: optimized for memory efficiency, may emit early to avoid reallocation
/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4).with_exact_size(false);
/// coalescer.push_batch(batch1).unwrap();
/// // still < 4 rows buffered
/// assert!(coalescer.next_completed_batch().is_none());
/// coalescer.push_batch(batch2).unwrap();
-/// // now buffered >= 4, non-strict mode emits whole buffered set (5 rows)
+/// // buffered=3, new=2, total would be 5 > 4, so emit buffered 3 rows first
/// let finished = coalescer.next_completed_batch().unwrap();
-/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4, 5])).unwrap();
+/// let expected = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
/// assert_eq!(finished, expected);
+///
+/// // The remaining 2 rows from batch2 are now buffered
+/// assert!(coalescer.next_completed_batch().is_none());
+/// coalescer.finish_buffered_batch().unwrap();
+/// let remaining = coalescer.next_completed_batch().unwrap();
+/// let expected = record_batch!(("a", Int32, [4, 5])).unwrap();
+/// assert_eq!(remaining, expected);
/// ```
///
/// # Background
@@ -145,16 +152,21 @@ use primitive::InProgressPrimitiveArray;
///
/// 1. Output rows are produced in the same order as the input rows
///
-/// 2. The output is a sequence of batches, with all but the last being at exactly
-/// `target_batch_size` rows.
+/// 2. The output batch sizes depend on the `exact_size` setting:
+/// - In strict mode: all but the last batch have exactly `target_batch_size` rows
+/// - In non-strict mode: batch sizes are optimized to avoid memory reallocation
///
/// Notes on `exact_size`:
///
/// - `exact_size == true` (strict): output batches are produced so that all but
/// the final batch have exactly `target_batch_size` rows (default behavior).
-/// - `exact_size == false` (non-strict, default for this crate): output batches
-/// will be produced when the buffered rows are >= `target_batch_size`. The
-/// produced batch may be larger than `target_batch_size` (i.e., size >= target).
+/// - `exact_size == false` (non-strict): output batches are optimized for memory
+/// efficiency. Batches are emitted early to avoid buffer reallocation when adding
+/// new data would exceed the target size. Large input batches are split into
+/// target-sized chunks to prevent excessive memory allocation. This may result in
+/// output batches that are smaller than `target_batch_size`, but the algorithm
+/// ensures batches are as close to the target size as possible while maintaining
+/// memory efficiency. Small batches only occur to avoid costly memory operations.
#[derive(Debug)]
pub struct BatchCoalescer {
/// The input schema
@@ -320,7 +332,29 @@ impl BatchCoalescer {
self.finish_buffered_batch()?;
}
} else {
- // Non-strict: append all remaining rows; if buffered >= target, emit them
+ // Non-strict: emit early if adding num_rows would exceed target to avoid reallocation
+ if self.buffered_rows > 0 && self.buffered_rows + num_rows > self.target_batch_size {
+ // Emit the current buffered data before processing the new batch
+ // This avoids potential reallocation in the underlying storage
+ self.finish_buffered_batch()?;
+ }
+
+ // If num_rows is larger than target_batch_size, split it into target-sized chunks
+ // to avoid allocating overly large buffers
+ while num_rows > self.target_batch_size {
+ let chunk_size = self.target_batch_size;
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, chunk_size)?;
+ }
+ self.buffered_rows += chunk_size;
+ offset += chunk_size;
+ num_rows -= chunk_size;
+
+ // Emit this full chunk immediately
+ self.finish_buffered_batch()?;
+ }
+
+ // Now append remaining rows (guaranteed to be <= target_batch_size) to buffer
if num_rows > 0 {
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows(offset, num_rows)?;
@@ -328,7 +362,7 @@ impl BatchCoalescer {
self.buffered_rows += num_rows;
}
- // If we've reached or exceeded target, emit the whole buffered set
+ // If the current buffer has reached or exceeded target, emit it
if self.buffered_rows >= self.target_batch_size {
self.finish_buffered_batch()?;
}
@@ -1381,38 +1415,62 @@ mod tests {
coalescer.push_batch(batch1).unwrap();
assert!(coalescer.next_completed_batch().is_none());
- // push second batch (2 rows) -> buffered becomes 5 >= 4, non-strict emits all 5 rows
+ // push second batch (2 rows) -> buffered=3, new=2, 3+2=5 > 4
+ // NEW BEHAVIOR: emit buffered 3 rows first to avoid reallocation
coalescer.push_batch(batch2).unwrap();
- let out = coalescer
+ let out1 = coalescer
.next_completed_batch()
- .expect("expected a completed batch");
- assert_eq!(out.num_rows(), 5);
-
- // check contents equal to concatenation of 0..5
- let expected = uint32_batch(0..5);
- let actual = normalize_batch(out);
- let expected = normalize_batch(expected);
- assert_eq!(expected, actual);
+ .expect("expected first batch");
+ assert_eq!(out1.num_rows(), 3); // Only the first batch (early emit)
+
+ // The second batch should be buffered now
+ assert!(coalescer.next_completed_batch().is_none());
+
+ // Finish to get the remaining buffered data
+ coalescer.finish_buffered_batch().unwrap();
+ let out2 = coalescer
+ .next_completed_batch()
+ .expect("expected second batch");
+ assert_eq!(out2.num_rows(), 2); // The second batch
+
+ // check contents
+ let expected1 = uint32_batch(0..3);
+ let expected2 = uint32_batch(3..5);
+ assert_eq!(normalize_batch(out1), normalize_batch(expected1));
+ assert_eq!(normalize_batch(out2), normalize_batch(expected2));
}
#[test]
fn test_non_strict_single_large_batch() {
- // one large batch > target: in non-strict mode whole batch should be emitted
+ // one large batch > target: should be split into target-sized chunks
let batch = uint32_batch(0..4096);
let schema = Arc::clone(&batch.schema());
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), 1000).with_exact_size(false);
coalescer.push_batch(batch).unwrap();
- let out = coalescer
- .next_completed_batch()
- .expect("expected a completed batch");
- assert_eq!(out.num_rows(), 4096);
-
- // compare to expected
- let expected = uint32_batch(0..4096);
- let actual = normalize_batch(out);
- let expected = normalize_batch(expected);
- assert_eq!(expected, actual);
+
+ // NEW BEHAVIOR: large batch should be split into chunks of target_batch_size
+ // 4096 / 1000 = 4 full batches + 96 remainder
+ let mut outputs = vec![];
+ while let Some(b) = coalescer.next_completed_batch() {
+ outputs.push(b);
+ }
+
+ assert_eq!(outputs.len(), 4); // 4 full batches emitted immediately
+
+ // Each should be exactly 1000 rows
+ for (i, out) in outputs.iter().enumerate() {
+ assert_eq!(out.num_rows(), 1000);
+ let expected = uint32_batch((i * 1000) as u32..((i + 1) * 1000) as u32);
+ assert_eq!(normalize_batch(out.clone()), normalize_batch(expected));
+ }
+
+ // Remaining 96 rows should be buffered
+ coalescer.finish_buffered_batch().unwrap();
+ let final_batch = coalescer.next_completed_batch().expect("expected final batch");
+ assert_eq!(final_batch.num_rows(), 96);
+ let expected_final = uint32_batch(4000..4096);
+ assert_eq!(normalize_batch(final_batch), normalize_batch(expected_final));
}
#[test]
@@ -1439,71 +1497,104 @@ mod tests {
#[test]
fn test_non_strict_multiple_emits_over_time() {
- // multiple pushes that each eventually push buffered >= target and emit
+ // multiple pushes with early emit behavior
let b1 = uint32_batch(0..3); // 3
- let b2 = uint32_batch(3..5); // 2 -> 3+2=5 emit (first)
- let b3 = uint32_batch(5..8); // 3
- let b4 = uint32_batch(8..10); // 2 -> 3+2=5 emit (second)
+ let b2 = uint32_batch(3..5); // 2 -> 3+2=5 > 4, emit 3 first
+ let b3 = uint32_batch(5..8); // 3 -> 2+3=5 > 4, emit 2 first
+ let b4 = uint32_batch(8..10); // 2 -> 3+2=5 > 4, emit 3 first
let schema = Arc::clone(&b1.schema());
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), 4).with_exact_size(false);
+ // Push first batch (3 rows) -> buffered
coalescer.push_batch(b1).unwrap();
assert!(coalescer.next_completed_batch().is_none());
+ // Push second batch (2 rows) -> 3+2=5 > 4, emit buffered 3 rows first
coalescer.push_batch(b2).unwrap();
let out1 = coalescer
.next_completed_batch()
.expect("expected first batch");
- assert_eq!(out1.num_rows(), 5);
- assert_eq!(normalize_batch(out1), normalize_batch(uint32_batch(0..5)));
+ assert_eq!(out1.num_rows(), 3);
+ assert_eq!(normalize_batch(out1), normalize_batch(uint32_batch(0..3)));
+ // Now 2 rows from b2 are buffered, push b3 (3 rows) -> 2+3=5 > 4, emit 2 rows first
coalescer.push_batch(b3).unwrap();
- assert!(coalescer.next_completed_batch().is_none());
-
- coalescer.push_batch(b4).unwrap();
let out2 = coalescer
.next_completed_batch()
.expect("expected second batch");
- assert_eq!(out2.num_rows(), 5);
- assert_eq!(normalize_batch(out2), normalize_batch(uint32_batch(5..10)));
+ assert_eq!(out2.num_rows(), 2);
+ assert_eq!(normalize_batch(out2), normalize_batch(uint32_batch(3..5)));
+
+ // Now 3 rows from b3 are buffered, push b4 (2 rows) -> 3+2=5 > 4, emit 3 rows first
+ coalescer.push_batch(b4).unwrap();
+ let out3 = coalescer
+ .next_completed_batch()
+ .expect("expected third batch");
+ assert_eq!(out3.num_rows(), 3);
+ assert_eq!(normalize_batch(out3), normalize_batch(uint32_batch(5..8)));
+
+ // Finish to get remaining 2 rows from b4
+ coalescer.finish_buffered_batch().unwrap();
+ let out4 = coalescer
+ .next_completed_batch()
+ .expect("expected fourth batch");
+ assert_eq!(out4.num_rows(), 2);
+ assert_eq!(normalize_batch(out4), normalize_batch(uint32_batch(8..10)));
}
#[test]
fn test_non_strict_large_then_more_outputs() {
- // first push a large batch (should produce one big output), then push more small ones to produce another
+ // first push a large batch (should be split), then push more small ones
let big = uint32_batch(0..5000);
let small1 = uint32_batch(5000..5002); // 2
- let small2 = uint32_batch(5002..5005); // 3 -> 2+3=5 >=4 emit
+ let small2 = uint32_batch(5002..5005); // 3 -> 2+3=5 > 4, emit 2 first
let schema = Arc::clone(&big.schema());
- // Use small target (4) so that small1 + small2 will trigger an emit
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), 4).with_exact_size(false);
- // push big: non-strict mode should emit the whole big batch (5000 rows)
+ // push big: should be split into chunks of 4
+ // 5000 / 4 = 1250 full batches
coalescer.push_batch(big).unwrap();
- let out_big = coalescer
- .next_completed_batch()
- .expect("expected big batch");
- assert_eq!(out_big.num_rows(), 5000);
- assert_eq!(
- normalize_batch(out_big),
- normalize_batch(uint32_batch(0..5000))
- );
- // push small1 (2 rows) -> not enough yet
+ let mut big_outputs = vec![];
+ while let Some(b) = coalescer.next_completed_batch() {
+ big_outputs.push(b);
+ }
+
+ assert_eq!(big_outputs.len(), 1250); // 1250 batches of 4 rows each
+ for (i, out) in big_outputs.iter().enumerate() {
+ assert_eq!(out.num_rows(), 4);
+ let start = i * 4;
+ let end = (i + 1) * 4;
+ let expected = uint32_batch(start as u32..end as u32);
+ assert_eq!(normalize_batch(out.clone()), normalize_batch(expected));
+ }
+
+ // push small1 (2 rows) -> buffered
coalescer.push_batch(small1).unwrap();
assert!(coalescer.next_completed_batch().is_none());
- // push small2 (3 rows) -> now buffered = 2 + 3 = 5 >= 4, non-strict emits all 5 rows
+ // push small2 (3 rows) -> 2+3=5 > 4, emit buffered 2 rows first
coalescer.push_batch(small2).unwrap();
- let out_small = coalescer
+ let out_small1 = coalescer
+ .next_completed_batch()
+ .expect("expected small batch 1");
+ assert_eq!(out_small1.num_rows(), 2);
+ assert_eq!(
+ normalize_batch(out_small1),
+ normalize_batch(uint32_batch(5000..5002))
+ );
+
+ // Finish to get remaining 3 rows from small2
+ coalescer.finish_buffered_batch().unwrap();
+ let out_small2 = coalescer
.next_completed_batch()
- .expect("expected small batch");
- assert_eq!(out_small.num_rows(), 5);
+ .expect("expected small batch 2");
+ assert_eq!(out_small2.num_rows(), 3);
assert_eq!(
- normalize_batch(out_small),
- normalize_batch(uint32_batch(5000..5005))
+ normalize_batch(out_small2),
+ normalize_batch(uint32_batch(5002..5005))
);
}
}
Thank you @alamb , i do some experiment, but it seems the exact size way still the best performance from benchmark until now, i will investigate more. |
Which issue does this PR close?
Related to :
apache/datafusion#17105
Related to Draft: Use upstream arrow
coalesce
kernel in DataFusion datafusion#16249Related to Optimize take/filter/concat from multiple input arrays to a single large output array #6692
Related to Enable parquet filter pushdown (
filter_pushdown
) by default datafusion#3463Rationale for this change
We want to keep consistent with original behaviour for BatchCoalescer in datafusion, so we introduce new config to support
no exact size config, which means :
What changes are included in this PR?
Are these changes tested?
Yes, new tests added.
Are there any user-facing changes?
No, the default behaviour is still exact size = true