From 4977c53c85ddbd62476d3cd68c1acc27d056a9ae Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Mon, 25 Nov 2024 12:02:13 +0100 Subject: [PATCH 01/13] Add `Array::shrink_to_fit` --- arrow-array/src/array/boolean_array.rs | 7 +++ arrow-array/src/array/byte_array.rs | 8 ++++ arrow-array/src/array/byte_view_array.rs | 43 +++++++++++-------- arrow-array/src/array/dictionary_array.rs | 5 +++ .../src/array/fixed_size_binary_array.rs | 7 +++ .../src/array/fixed_size_list_array.rs | 7 +++ arrow-array/src/array/list_array.rs | 8 ++++ arrow-array/src/array/list_view_array.rs | 9 ++++ arrow-array/src/array/map_array.rs | 8 ++++ arrow-array/src/array/mod.rs | 13 ++++++ arrow-array/src/array/primitive_array.rs | 7 +++ arrow-array/src/array/run_array.rs | 5 +++ arrow-array/src/array/struct_array.rs | 7 +++ arrow-array/src/array/union_array.rs | 11 +++++ arrow-buffer/src/buffer/boolean.rs | 7 +++ arrow-buffer/src/buffer/immutable.rs | 27 ++++++++++++ arrow-buffer/src/buffer/null.rs | 6 +++ arrow-buffer/src/buffer/offset.rs | 6 +++ arrow-buffer/src/buffer/run.rs | 7 +++ arrow-buffer/src/buffer/scalar.rs | 6 +++ 20 files changed, 187 insertions(+), 17 deletions(-) diff --git a/arrow-array/src/array/boolean_array.rs b/arrow-array/src/array/boolean_array.rs index 0f95adacf10c..9c2d4af8c454 100644 --- a/arrow-array/src/array/boolean_array.rs +++ b/arrow-array/src/array/boolean_array.rs @@ -308,6 +308,13 @@ impl Array for BooleanArray { self.values.is_empty() } + fn shrink_to_fit(&mut self) { + self.values.shrink_to_fit(); + if let Some(nulls) = &mut self.nulls { + nulls.shrink_to_fit(); + } + } + fn offset(&self) -> usize { self.values.offset() } diff --git a/arrow-array/src/array/byte_array.rs b/arrow-array/src/array/byte_array.rs index bec0caab1045..f2b22507081d 100644 --- a/arrow-array/src/array/byte_array.rs +++ b/arrow-array/src/array/byte_array.rs @@ -453,6 +453,14 @@ impl Array for GenericByteArray { self.value_offsets.len() <= 1 } + fn shrink_to_fit(&mut self) { + self.value_offsets.shrink_to_fit(); + self.value_data.shrink_to_fit(); + if let Some(nulls) = &mut self.nulls { + nulls.shrink_to_fit(); + } + } + fn offset(&self) -> usize { 0 } diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 81bb6a38550b..9d2d396a5266 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -430,31 +430,31 @@ impl GenericByteViewArray { /// /// Before GC: /// ```text - /// ┌──────┐ - /// │......│ - /// │......│ - /// ┌────────────────────┐ ┌ ─ ─ ─ ▶ │Data1 │ Large buffer + /// ┌──────┐ + /// │......│ + /// │......│ + /// ┌────────────────────┐ ┌ ─ ─ ─ ▶ │Data1 │ Large buffer /// │ View 1 │─ ─ ─ ─ │......│ with data that /// ├────────────────────┤ │......│ is not referred /// │ View 2 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data2 │ to by View 1 or - /// └────────────────────┘ │......│ View 2 - /// │......│ - /// 2 views, refer to │......│ - /// small portions of a └──────┘ - /// large buffer + /// └────────────────────┘ │......│ View 2 + /// │......│ + /// 2 views, refer to │......│ + /// small portions of a └──────┘ + /// large buffer /// ``` - /// + /// /// After GC: /// /// ```text /// ┌────────────────────┐ ┌─────┐ After gc, only - /// │ View 1 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data1│ data that is - /// ├────────────────────┤ ┌ ─ ─ ─ ▶ │Data2│ pointed to by - /// │ View 2 │─ ─ ─ ─ └─────┘ the views is - /// └────────────────────┘ left - /// - /// - /// 2 views + /// │ View 1 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data1│ data that is + /// ├────────────────────┤ ┌ ─ ─ ─ ▶ │Data2│ pointed to by + /// │ View 2 │─ ─ ─ ─ └─────┘ the views is + /// └────────────────────┘ left + /// + /// + /// 2 views /// ``` /// This method will compact the data buffers by recreating the view array and only include the data /// that is pointed to by the views. @@ -575,6 +575,15 @@ impl Array for GenericByteViewArray { self.views.is_empty() } + fn shrink_to_fit(&mut self) { + self.views.shrink_to_fit(); + self.buffers.iter_mut().for_each(|b| b.shrink_to_fit()); + self.buffers.shrink_to_fit(); + if let Some(nulls) = &mut self.nulls { + nulls.shrink_to_fit(); + } + } + fn offset(&self) -> usize { 0 } diff --git a/arrow-array/src/array/dictionary_array.rs b/arrow-array/src/array/dictionary_array.rs index 55ecd56f987e..e69de783dd54 100644 --- a/arrow-array/src/array/dictionary_array.rs +++ b/arrow-array/src/array/dictionary_array.rs @@ -720,6 +720,11 @@ impl Array for DictionaryArray { self.keys.is_empty() } + fn shrink_to_fit(&mut self) { + self.keys.shrink_to_fit(); + self.values.shrink_to_fit(); + } + fn offset(&self) -> usize { self.keys.offset() } diff --git a/arrow-array/src/array/fixed_size_binary_array.rs b/arrow-array/src/array/fixed_size_binary_array.rs index ee6cc8021bca..576b8012491b 100644 --- a/arrow-array/src/array/fixed_size_binary_array.rs +++ b/arrow-array/src/array/fixed_size_binary_array.rs @@ -603,6 +603,13 @@ impl Array for FixedSizeBinaryArray { self.len == 0 } + fn shrink_to_fit(&mut self) { + self.value_data.shrink_to_fit(); + if let Some(nulls) = &mut self.nulls { + nulls.shrink_to_fit(); + } + } + fn offset(&self) -> usize { 0 } diff --git a/arrow-array/src/array/fixed_size_list_array.rs b/arrow-array/src/array/fixed_size_list_array.rs index 863733484c1c..44be442c9f85 100644 --- a/arrow-array/src/array/fixed_size_list_array.rs +++ b/arrow-array/src/array/fixed_size_list_array.rs @@ -401,6 +401,13 @@ impl Array for FixedSizeListArray { self.len == 0 } + fn shrink_to_fit(&mut self) { + self.values.shrink_to_fit(); + if let Some(nulls) = &mut self.nulls { + nulls.shrink_to_fit(); + } + } + fn offset(&self) -> usize { 0 } diff --git a/arrow-array/src/array/list_array.rs b/arrow-array/src/array/list_array.rs index ebb285e2032b..bed0bdf889b2 100644 --- a/arrow-array/src/array/list_array.rs +++ b/arrow-array/src/array/list_array.rs @@ -485,6 +485,14 @@ impl Array for GenericListArray { self.value_offsets.len() <= 1 } + fn shrink_to_fit(&mut self) { + if let Some(nulls) = &mut self.nulls { + nulls.shrink_to_fit(); + } + self.values.shrink_to_fit(); + self.value_offsets.shrink_to_fit(); + } + fn offset(&self) -> usize { 0 } diff --git a/arrow-array/src/array/list_view_array.rs b/arrow-array/src/array/list_view_array.rs index cd358e031e8f..7e52a6f3e457 100644 --- a/arrow-array/src/array/list_view_array.rs +++ b/arrow-array/src/array/list_view_array.rs @@ -326,6 +326,15 @@ impl Array for GenericListViewArray { self.value_sizes.is_empty() } + fn shrink_to_fit(&mut self) { + if let Some(nulls) = &mut self.nulls { + nulls.shrink_to_fit(); + } + self.values.shrink_to_fit(); + self.value_offsets.shrink_to_fit(); + self.value_sizes.shrink_to_fit(); + } + fn offset(&self) -> usize { 0 } diff --git a/arrow-array/src/array/map_array.rs b/arrow-array/src/array/map_array.rs index 254437630a44..18a7c491aa16 100644 --- a/arrow-array/src/array/map_array.rs +++ b/arrow-array/src/array/map_array.rs @@ -372,6 +372,14 @@ impl Array for MapArray { self.value_offsets.len() <= 1 } + fn shrink_to_fit(&mut self) { + if let Some(nulls) = &mut self.nulls { + nulls.shrink_to_fit(); + } + self.entries.shrink_to_fit(); + self.value_offsets.shrink_to_fit(); + } + fn offset(&self) -> usize { 0 } diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index e2ce49422978..be12080d6d89 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -167,6 +167,9 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// ``` fn is_empty(&self) -> bool; + /// Frees up unused memory. + fn shrink_to_fit(&mut self) {} + /// Returns the offset into the underlying data used by this array(-slice). /// Note that the underlying data can be shared by many arrays. /// This defaults to `0`. @@ -365,6 +368,16 @@ impl Array for ArrayRef { self.as_ref().is_empty() } + fn shrink_to_fit(&mut self) { + if let Some(slf) = Arc::get_mut(self) { + slf.shrink_to_fit(); + } else { + // TODO(emilk): clone the contents and shrink that. + // This can be accomplished if we add `trait Array { fn clone(&self) -> Box>; }`. + // Or we clone using `let clone = self.slice(0, self.len());` and hope that the returned `ArrayRef` is not shared. + } + } + fn offset(&self) -> usize { self.as_ref().offset() } diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index bad6af959d07..57aa23bf9040 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -1152,6 +1152,13 @@ impl Array for PrimitiveArray { self.values.is_empty() } + fn shrink_to_fit(&mut self) { + self.values.shrink_to_fit(); + if let Some(nulls) = &mut self.nulls { + nulls.shrink_to_fit(); + } + } + fn offset(&self) -> usize { 0 } diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs index dc4e6c96d9da..b340bf9a9065 100644 --- a/arrow-array/src/array/run_array.rs +++ b/arrow-array/src/array/run_array.rs @@ -330,6 +330,11 @@ impl Array for RunArray { self.run_ends.is_empty() } + fn shrink_to_fit(&mut self) { + self.run_ends.shrink_to_fit(); + self.values.shrink_to_fit(); + } + fn offset(&self) -> usize { self.run_ends.offset() } diff --git a/arrow-array/src/array/struct_array.rs b/arrow-array/src/array/struct_array.rs index 692f76e4821d..de6d9c699d22 100644 --- a/arrow-array/src/array/struct_array.rs +++ b/arrow-array/src/array/struct_array.rs @@ -364,6 +364,13 @@ impl Array for StructArray { self.len == 0 } + fn shrink_to_fit(&mut self) { + if let Some(nulls) = &mut self.nulls { + nulls.shrink_to_fit(); + } + self.fields.iter_mut().for_each(|n| n.shrink_to_fit()); + } + fn offset(&self) -> usize { 0 } diff --git a/arrow-array/src/array/union_array.rs b/arrow-array/src/array/union_array.rs index 3c6da5a7b5c0..43019c659f0a 100644 --- a/arrow-array/src/array/union_array.rs +++ b/arrow-array/src/array/union_array.rs @@ -744,6 +744,17 @@ impl Array for UnionArray { self.type_ids.is_empty() } + fn shrink_to_fit(&mut self) { + self.type_ids.shrink_to_fit(); + if let Some(offsets) = &mut self.offsets { + offsets.shrink_to_fit(); + } + for array in self.fields.iter_mut().flatten() { + array.shrink_to_fit(); + } + self.fields.shrink_to_fit(); + } + fn offset(&self) -> usize { 0 } diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs index d9d2ea04e35a..1f9b3f87826e 100644 --- a/arrow-buffer/src/buffer/boolean.rs +++ b/arrow-buffer/src/buffer/boolean.rs @@ -114,6 +114,13 @@ impl BooleanBuffer { self.len == 0 } + /// Free up unused memory. + #[inline] + pub fn shrink_to_fit(&mut self) { + // TODO(emilk): we could shrink even more in the case where we are a small sub-slice of the full buffer + self.buffer.shrink_to_fit(); + } + /// Returns the boolean value at index `i`. /// /// # Panics diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index 10534ccfe327..2bcac8799226 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -167,6 +167,17 @@ impl Buffer { self.data.capacity() } + /// Shrinks the capacity of the buffer as much as possible, freeing unused memory. + /// + /// The capacity of the returned buffer will be the same as [`Self::len`]. + /// + /// If the capacity is already less than or equal to the desired capacity, this is a no-op. + pub fn shrink_to_fit(&mut self) { + if self.len() < self.capacity() { + *self = Self::from_vec(self.as_slice().to_vec()) + } + } + /// Returns whether the buffer is empty. #[inline] pub fn is_empty(&self) -> bool { @@ -554,6 +565,22 @@ mod tests { assert_eq!(buf2.slice_with_length(2, 1).as_slice(), &[10]); } + #[test] + fn test_shrink_to_fit() { + let original = Buffer::from(&[1, 2, 3, 4, 5, 6, 7]); + assert_eq!(original.len(), 7); + assert_eq!(original.capacity(), 64); + + let slice = original.slice(3); + assert_eq!(slice.len(), 4); + assert_eq!(slice.capacity(), 64); + + let mut shrunk = slice.clone(); + shrunk.shrink_to_fit(); + assert_eq!(shrunk.len(), 4); + assert_eq!(shrunk.capacity(), 4); + } + #[test] #[should_panic(expected = "the offset of the new Buffer cannot exceed the existing length")] fn test_slice_offset_out_of_bound() { diff --git a/arrow-buffer/src/buffer/null.rs b/arrow-buffer/src/buffer/null.rs index e9f383ca2707..a5b37b7f5243 100644 --- a/arrow-buffer/src/buffer/null.rs +++ b/arrow-buffer/src/buffer/null.rs @@ -130,6 +130,12 @@ impl NullBuffer { self.buffer.is_empty() } + /// Free up unused memory. + #[inline] + pub fn shrink_to_fit(&mut self) { + self.buffer.shrink_to_fit(); + } + /// Returns the null count for this [`NullBuffer`] #[inline] pub fn null_count(&self) -> usize { diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index e9087d30098c..7acee3c4b98b 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -133,6 +133,12 @@ impl OffsetBuffer { Self(out.into()) } + /// Free up unused memory. + #[inline] + pub fn shrink_to_fit(&mut self) { + self.0.shrink_to_fit(); + } + /// Returns the inner [`ScalarBuffer`] pub fn inner(&self) -> &ScalarBuffer { &self.0 diff --git a/arrow-buffer/src/buffer/run.rs b/arrow-buffer/src/buffer/run.rs index 3dbbe344a025..107862ba5f5c 100644 --- a/arrow-buffer/src/buffer/run.rs +++ b/arrow-buffer/src/buffer/run.rs @@ -136,6 +136,13 @@ where self.len == 0 } + /// Free up unused memory. + #[inline] + pub fn shrink_to_fit(&mut self) { + // TODO(emilk): we could shrink even more in the case where we are a small sub-slice of the full buffer + self.run_ends.shrink_to_fit(); + } + /// Returns the values of this [`RunEndBuffer`] not including any offset #[inline] pub fn values(&self) -> &[E] { diff --git a/arrow-buffer/src/buffer/scalar.rs b/arrow-buffer/src/buffer/scalar.rs index 343b8549e93d..49a57b32e8a6 100644 --- a/arrow-buffer/src/buffer/scalar.rs +++ b/arrow-buffer/src/buffer/scalar.rs @@ -72,6 +72,12 @@ impl ScalarBuffer { buffer.slice_with_length(byte_offset, byte_len).into() } + /// Free up unused memory. + #[inline] + pub fn shrink_to_fit(&mut self) { + self.buffer.shrink_to_fit(); + } + /// Returns a zero-copy slice of this buffer with length `len` and starting at `offset` pub fn slice(&self, offset: usize, len: usize) -> Self { Self::new(self.buffer.clone(), offset, len) From aeb7fe483edbc8d3d314794a195274a93230dccf Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 10:23:36 +0100 Subject: [PATCH 02/13] Test that shrink_to_fit actually frees memory --- arrow/tests/shrink_to_fit.rs | 134 +++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 arrow/tests/shrink_to_fit.rs diff --git a/arrow/tests/shrink_to_fit.rs b/arrow/tests/shrink_to_fit.rs new file mode 100644 index 000000000000..51901cad8c33 --- /dev/null +++ b/arrow/tests/shrink_to_fit.rs @@ -0,0 +1,134 @@ +use arrow::{ + array::{Array, ArrayRef, ListArray, PrimitiveArray}, + buffer::OffsetBuffer, + datatypes::{Field, UInt8Type}, +}; + +/// Test that `shrink_to_fit` frees memory after concatenating a large number of arrays. +#[test] +fn test_shrink_to_fit_after_concat() { + let array_len = 6_000; + let num_concats = 100; + + let primitive_array: PrimitiveArray = (0..array_len) + .map(|v| (v % 255) as u8) + .collect::>() + .into(); + let primitive_array: ArrayRef = Arc::new(primitive_array); + + let list_array: ArrayRef = Arc::new(ListArray::new( + Field::new_list_field(primitive_array.data_type().clone(), false).into(), + OffsetBuffer::from_lengths([primitive_array.len()]), + primitive_array.clone(), + None, + )); + + // Num bytes allocated globally and by this thread, respectively. + let (concatenated, _bytes_allocated_globally, bytes_allocated_by_this_thread) = + memory_use(|| { + let mut concatenated = concatenate(num_concats, list_array.clone()); + concatenated.shrink_to_fit(); // This is what we're testing! + dbg!(concatenated.data_type()); + concatenated + }); + let expected_len = num_concats * array_len; + assert_eq!(bytes_used(concatenated.clone()), expected_len); + eprintln!("The concatenated array is {expected_len} B long. Amount of memory used by this thread: {bytes_allocated_by_this_thread} B"); + + assert!( + expected_len <= bytes_allocated_by_this_thread, + "We must allocate at least as much space as the concatenated array" + ); + assert!( + bytes_allocated_by_this_thread <= expected_len + expected_len / 100, + "We shouldn't have more than 1% memory overhead. In fact, we are using {bytes_allocated_by_this_thread}B of memory for {expected_len}B of data" + ); +} + +fn concatenate(num_times: usize, array: ArrayRef) -> ArrayRef { + let mut concatenated = array.clone(); + for _ in 0..num_times - 1 { + concatenated = arrow::compute::kernels::concat::concat(&[&*concatenated, &*array]).unwrap(); + } + concatenated +} + +fn bytes_used(array: ArrayRef) -> usize { + let mut array = array; + loop { + match array.data_type() { + arrow::datatypes::DataType::UInt8 => break, + arrow::datatypes::DataType::List(_) => { + let list = array.as_any().downcast_ref::().unwrap(); + array = list.values().clone(); + } + _ => unreachable!(), + } + } + + array.len() +} + +// --- Memory tracking --- + +use std::sync::{ + atomic::{AtomicUsize, Ordering::Relaxed}, + Arc, +}; + +static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0); + +thread_local! { + static LIVE_BYTES_IN_THREAD: AtomicUsize = const { AtomicUsize::new(0) } ; +} + +pub struct TrackingAllocator { + allocator: std::alloc::System, +} + +#[global_allocator] +pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator { + allocator: std::alloc::System, +}; + +#[allow(unsafe_code)] +// SAFETY: +// We just do book-keeping and then let another allocator do all the actual work. +unsafe impl std::alloc::GlobalAlloc for TrackingAllocator { + #[allow(clippy::let_and_return)] + unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 { + LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), Relaxed)); + LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed); + + // SAFETY: + // Just deferring + unsafe { self.allocator.alloc(layout) } + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) { + LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(), Relaxed)); + LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed); + + // SAFETY: + // Just deferring + unsafe { self.allocator.dealloc(ptr, layout) }; + } +} + +fn live_bytes_local() -> usize { + LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed)) +} + +fn live_bytes_global() -> usize { + LIVE_BYTES_GLOBAL.load(Relaxed) +} + +/// Returns `(num_bytes_allocated, num_bytes_allocated_by_this_thread)`. +fn memory_use(run: impl Fn() -> R) -> (R, usize, usize) { + let used_bytes_start_local = live_bytes_local(); + let used_bytes_start_global = live_bytes_global(); + let ret = run(); + let bytes_used_local = live_bytes_local() - used_bytes_start_local; + let bytes_used_global = live_bytes_global() - used_bytes_start_global; + (ret, bytes_used_global, bytes_used_local) +} From e010cc9bacedfaf50dc109258645c7f5f0a7720f Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 10:24:04 +0100 Subject: [PATCH 03/13] Make sure the buffer isn't shared in the test of shrink_to_fit --- arrow-buffer/src/buffer/immutable.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index 2bcac8799226..40625329a162 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -575,7 +575,9 @@ mod tests { assert_eq!(slice.len(), 4); assert_eq!(slice.capacity(), 64); - let mut shrunk = slice.clone(); + drop(original); + + let mut shrunk = slice; shrunk.shrink_to_fit(); assert_eq!(shrunk.len(), 4); assert_eq!(shrunk.capacity(), 4); From 18acd88bbdefb9b56000296ee0bb21ceeee7d28a Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 10:32:18 +0100 Subject: [PATCH 04/13] Remove `#[inline]` --- arrow-buffer/src/buffer/boolean.rs | 1 - arrow-buffer/src/buffer/null.rs | 1 - arrow-buffer/src/buffer/offset.rs | 1 - arrow-buffer/src/buffer/run.rs | 1 - arrow-buffer/src/buffer/scalar.rs | 1 - 5 files changed, 5 deletions(-) diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs index 1f9b3f87826e..8770303c2ac7 100644 --- a/arrow-buffer/src/buffer/boolean.rs +++ b/arrow-buffer/src/buffer/boolean.rs @@ -115,7 +115,6 @@ impl BooleanBuffer { } /// Free up unused memory. - #[inline] pub fn shrink_to_fit(&mut self) { // TODO(emilk): we could shrink even more in the case where we are a small sub-slice of the full buffer self.buffer.shrink_to_fit(); diff --git a/arrow-buffer/src/buffer/null.rs b/arrow-buffer/src/buffer/null.rs index a5b37b7f5243..ec12b885eb5a 100644 --- a/arrow-buffer/src/buffer/null.rs +++ b/arrow-buffer/src/buffer/null.rs @@ -131,7 +131,6 @@ impl NullBuffer { } /// Free up unused memory. - #[inline] pub fn shrink_to_fit(&mut self) { self.buffer.shrink_to_fit(); } diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index 7acee3c4b98b..a6be2b67af84 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -134,7 +134,6 @@ impl OffsetBuffer { } /// Free up unused memory. - #[inline] pub fn shrink_to_fit(&mut self) { self.0.shrink_to_fit(); } diff --git a/arrow-buffer/src/buffer/run.rs b/arrow-buffer/src/buffer/run.rs index 107862ba5f5c..cc6d19044feb 100644 --- a/arrow-buffer/src/buffer/run.rs +++ b/arrow-buffer/src/buffer/run.rs @@ -137,7 +137,6 @@ where } /// Free up unused memory. - #[inline] pub fn shrink_to_fit(&mut self) { // TODO(emilk): we could shrink even more in the case where we are a small sub-slice of the full buffer self.run_ends.shrink_to_fit(); diff --git a/arrow-buffer/src/buffer/scalar.rs b/arrow-buffer/src/buffer/scalar.rs index 49a57b32e8a6..ab6c87168e5c 100644 --- a/arrow-buffer/src/buffer/scalar.rs +++ b/arrow-buffer/src/buffer/scalar.rs @@ -73,7 +73,6 @@ impl ScalarBuffer { } /// Free up unused memory. - #[inline] pub fn shrink_to_fit(&mut self) { self.buffer.shrink_to_fit(); } From 93d07f74f3f46c632ddb8e5d12cbb5da219f1a31 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 12:11:33 +0100 Subject: [PATCH 05/13] Use `realloc` to reallocate the bytes --- arrow-buffer/src/buffer/immutable.rs | 10 +++++++++- arrow-buffer/src/bytes.rs | 24 ++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index 40625329a162..70a1007344ca 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -173,7 +173,15 @@ impl Buffer { /// /// If the capacity is already less than or equal to the desired capacity, this is a no-op. pub fn shrink_to_fit(&mut self) { - if self.len() < self.capacity() { + let desired_capacity = self.len(); + if desired_capacity < self.capacity() { + if let Some(bytes) = Arc::get_mut(&mut self.data) { + if bytes.try_realloc(desired_capacity).is_ok() { + return; + } + } + + // Fallback: *self = Self::from_vec(self.as_slice().to_vec()) } } diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs index ba61342d8e39..133294b32a66 100644 --- a/arrow-buffer/src/bytes.rs +++ b/arrow-buffer/src/bytes.rs @@ -96,6 +96,30 @@ impl Bytes { } } + /// Try to reallocate the underlying memory region to a new size (smaller or larger). + /// + /// Only works for bytes allocated with the standard allocator. + /// Returns `Err` if the memory was allocated with a custom allocator, + /// or the call to `realloc` failed, for whatever reason. + /// In case of `Err`, the [`Bytes`] will remain as it was (i.e. have the old size). + pub fn try_realloc(&mut self, new_len: usize) -> Result<(), ()> { + if let Deallocation::Standard(old_layout) = self.deallocation { + if let Ok(new_layout) = std::alloc::Layout::from_size_align(new_len, old_layout.align()) + { + let new_ptr = + unsafe { std::alloc::realloc(self.ptr.as_mut(), old_layout, new_len) }; + if let Some(ptr) = NonNull::new(new_ptr) { + self.ptr = ptr; + self.len = new_len; + self.deallocation = Deallocation::Standard(new_layout); + return Ok(()); + } + } + } + + Err(()) + } + #[inline] pub(crate) fn deallocation(&self) -> &Deallocation { &self.deallocation From 3f5c6e75ac9d0300ed449251306c5c147f31a98e Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 12:11:44 +0100 Subject: [PATCH 06/13] Clean up test --- arrow/tests/shrink_to_fit.rs | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/arrow/tests/shrink_to_fit.rs b/arrow/tests/shrink_to_fit.rs index 51901cad8c33..1ee99fcc29a2 100644 --- a/arrow/tests/shrink_to_fit.rs +++ b/arrow/tests/shrink_to_fit.rs @@ -71,9 +71,12 @@ fn bytes_used(array: ArrayRef) -> usize { // --- Memory tracking --- -use std::sync::{ - atomic::{AtomicUsize, Ordering::Relaxed}, - Arc, +use std::{ + alloc::Layout, + sync::{ + atomic::{AtomicUsize, Ordering::Relaxed}, + Arc, + }, }; static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0); @@ -96,16 +99,18 @@ pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator { // We just do book-keeping and then let another allocator do all the actual work. unsafe impl std::alloc::GlobalAlloc for TrackingAllocator { #[allow(clippy::let_and_return)] - unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 { - LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), Relaxed)); - LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed); - + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { // SAFETY: // Just deferring - unsafe { self.allocator.alloc(layout) } + let ptr = unsafe { self.allocator.alloc(layout) }; + if !ptr.is_null() { + LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), Relaxed)); + LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed); + } + ptr } - unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) { + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(), Relaxed)); LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed); @@ -113,6 +118,9 @@ unsafe impl std::alloc::GlobalAlloc for TrackingAllocator { // Just deferring unsafe { self.allocator.dealloc(ptr, layout) }; } + + // No need to override `alloc_zeroed` or `realloc`, + // since they both by default just defer to `alloc` and `dealloc`. } fn live_bytes_local() -> usize { From 74cc447aab23e5f19ba966c7e6d545ec37ba4268 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 12:14:46 +0100 Subject: [PATCH 07/13] Improve docstring for `Array::shrink_to_fit` Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- arrow-array/src/array/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index be12080d6d89..30ff11f2edc9 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -167,7 +167,10 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// ``` fn is_empty(&self) -> bool; - /// Frees up unused memory. + /// Shrinks the capacity of any exclusively owned buffer as much as possible + /// + /// Shared or externally allocated buffers will be ignored, and + /// any buffer offsets will be preserved. fn shrink_to_fit(&mut self) {} /// Returns the offset into the underlying data used by this array(-slice). From 8c0cfe41332d5a04b9114ab6ea0ec2d83ea444e4 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 12:17:24 +0100 Subject: [PATCH 08/13] `Buffer::shrink_to_fit`: ignore shared buffers --- arrow-buffer/src/buffer/immutable.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index 70a1007344ca..b8181f9bb330 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -167,22 +167,23 @@ impl Buffer { self.data.capacity() } - /// Shrinks the capacity of the buffer as much as possible, freeing unused memory. + /// Tried to shrink the capacity of the buffer as much as possible, freeing unused memory. /// - /// The capacity of the returned buffer will be the same as [`Self::len`]. + /// If the buffer is shared, this is a no-op. + /// + /// If the memory was allocated with a custom allocator, this is a no-op. /// /// If the capacity is already less than or equal to the desired capacity, this is a no-op. + /// + /// The memory region will be reallocated using `std::alloc::realloc`. pub fn shrink_to_fit(&mut self) { let desired_capacity = self.len(); if desired_capacity < self.capacity() { if let Some(bytes) = Arc::get_mut(&mut self.data) { - if bytes.try_realloc(desired_capacity).is_ok() { - return; - } + // We can sefely ignore errors. + // We are best-effort, and if the realloc fails, we still have the old size. + bytes.try_realloc(desired_capacity).ok(); } - - // Fallback: - *self = Self::from_vec(self.as_slice().to_vec()) } } From e7c1ba71cf4f69dfc42113f9d6d944c4c93166e9 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 12:18:28 +0100 Subject: [PATCH 09/13] Improve comment in `ArrayRef::shrink_to_fit` --- arrow-array/src/array/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 30ff11f2edc9..23b3cb628aaf 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -371,13 +371,12 @@ impl Array for ArrayRef { self.as_ref().is_empty() } + /// For shared buffers, this is a no-op. fn shrink_to_fit(&mut self) { if let Some(slf) = Arc::get_mut(self) { slf.shrink_to_fit(); } else { - // TODO(emilk): clone the contents and shrink that. - // This can be accomplished if we add `trait Array { fn clone(&self) -> Box>; }`. - // Or we clone using `let clone = self.slice(0, self.len());` and hope that the returned `ArrayRef` is not shared. + // We ignore shared buffers. } } From b843ae217de4157419538317e859cbcc88d42b28 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 13:24:00 +0100 Subject: [PATCH 10/13] Document why `try_realloc` is safe, and actually make it safe :) --- arrow-buffer/src/bytes.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs index 133294b32a66..924b7a720270 100644 --- a/arrow-buffer/src/bytes.rs +++ b/arrow-buffer/src/bytes.rs @@ -104,10 +104,19 @@ impl Bytes { /// In case of `Err`, the [`Bytes`] will remain as it was (i.e. have the old size). pub fn try_realloc(&mut self, new_len: usize) -> Result<(), ()> { if let Deallocation::Standard(old_layout) = self.deallocation { + let new_len = new_len.max(1); // realloc requires a non-zero size + if old_layout.size() == new_len { + return Ok(()); // Nothing to do + } if let Ok(new_layout) = std::alloc::Layout::from_size_align(new_len, old_layout.align()) { - let new_ptr = - unsafe { std::alloc::realloc(self.ptr.as_mut(), old_layout, new_len) }; + let old_ptr = self.ptr.as_ptr(); + // SAFETY: the call to `realloc` is safe if all of the following holds (from https://doc.rust-lang.org/stable/std/alloc/trait.GlobalAlloc.html#method.realloc): + // * `old_ptr` must be currently allocated via this allocator (guaranteed by the invariant/contract of `Bytes`) + // * `old_layout` must be the same layout that was used to allocate that block of memory (same) + // * `new_len` must be greater than zero (ensured by the `max` call earlier) + // * `new_len`, when rounded up to the nearest multiple of `layout.align()`, must not overflow `isize` (guaranteed by the success of `Layout::from_size_align`) + let new_ptr = unsafe { std::alloc::realloc(old_ptr, old_layout, new_len) }; if let Some(ptr) = NonNull::new(new_ptr) { self.ptr = ptr; self.len = new_len; From bc8c761c2f7b4f542ee2530d34213ce72186cf38 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 13:24:38 +0100 Subject: [PATCH 11/13] Improve testing of shrink_to_fit --- arrow-buffer/src/buffer/immutable.rs | 14 ++++++++++++-- arrow/tests/shrink_to_fit.rs | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index b8181f9bb330..d80d4cb21a68 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -581,15 +581,25 @@ mod tests { assert_eq!(original.capacity(), 64); let slice = original.slice(3); + drop(original); // Make sure the buffer isn't shared (or shrink_to_fit won't work) assert_eq!(slice.len(), 4); assert_eq!(slice.capacity(), 64); - drop(original); - let mut shrunk = slice; shrunk.shrink_to_fit(); assert_eq!(shrunk.len(), 4); assert_eq!(shrunk.capacity(), 4); + + // Test that we can handle empty slices: + let empty_slice = shrunk.slice(4); + drop(shrunk); // Make sure the buffer isn't shared (or shrink_to_fit won't work) + assert_eq!(empty_slice.len(), 0); + assert_eq!(empty_slice.capacity(), 4); + + let mut shrunk_empty = empty_slice; + shrunk_empty.shrink_to_fit(); + assert_eq!(shrunk_empty.len(), 0); + assert_eq!(shrunk_empty.capacity(), 1); // `Buffer` and `Bytes` doesn't support 0-capacity, so we shrink to 1 } #[test] diff --git a/arrow/tests/shrink_to_fit.rs b/arrow/tests/shrink_to_fit.rs index 1ee99fcc29a2..bc2b97668a11 100644 --- a/arrow/tests/shrink_to_fit.rs +++ b/arrow/tests/shrink_to_fit.rs @@ -41,7 +41,7 @@ fn test_shrink_to_fit_after_concat() { ); assert!( bytes_allocated_by_this_thread <= expected_len + expected_len / 100, - "We shouldn't have more than 1% memory overhead. In fact, we are using {bytes_allocated_by_this_thread}B of memory for {expected_len}B of data" + "We shouldn't have more than 1% memory overhead. In fact, we are using {bytes_allocated_by_this_thread} B of memory for {expected_len} B of data" ); } From 3b7cdebbe306b1e58f9ecfb4ce3b7a1b635a5d07 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 16:36:20 +0100 Subject: [PATCH 12/13] Fix a few corner cases, and improve test --- arrow-buffer/src/buffer/immutable.rs | 45 ++++++++++++++++++---------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index d80d4cb21a68..be87d726e869 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -177,12 +177,27 @@ impl Buffer { /// /// The memory region will be reallocated using `std::alloc::realloc`. pub fn shrink_to_fit(&mut self) { - let desired_capacity = self.len(); + let offset = self.ptr_offset(); + let is_empty = self.is_empty(); + let desired_capacity = if is_empty { + 0 + } else { + // For realloc to work, we cannot free the elements before the offset + offset + self.len() + }; if desired_capacity < self.capacity() { if let Some(bytes) = Arc::get_mut(&mut self.data) { - // We can sefely ignore errors. - // We are best-effort, and if the realloc fails, we still have the old size. - bytes.try_realloc(desired_capacity).ok(); + if bytes.try_realloc(desired_capacity).is_ok() { + // Realloc complete - update our pointer into `bytes`: + self.ptr = if is_empty { + bytes.as_ptr() + } else { + // SAFETY: we kept all elements leading up to the offset + unsafe { bytes.as_ptr().add(offset) } + } + } else { + // Failure to reallocate is fine; we just failed to free up memory. + } } } } @@ -576,30 +591,30 @@ mod tests { #[test] fn test_shrink_to_fit() { - let original = Buffer::from(&[1, 2, 3, 4, 5, 6, 7]); - assert_eq!(original.len(), 7); + let original = Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7]); + assert_eq!(original.as_slice(), &[0, 1, 2, 3, 4, 5, 6, 7]); assert_eq!(original.capacity(), 64); - let slice = original.slice(3); + let slice = original.slice_with_length(2, 3); drop(original); // Make sure the buffer isn't shared (or shrink_to_fit won't work) - assert_eq!(slice.len(), 4); + assert_eq!(slice.as_slice(), &[2, 3, 4]); assert_eq!(slice.capacity(), 64); let mut shrunk = slice; shrunk.shrink_to_fit(); - assert_eq!(shrunk.len(), 4); - assert_eq!(shrunk.capacity(), 4); + assert_eq!(shrunk.as_slice(), &[2, 3, 4]); + assert_eq!(shrunk.capacity(), 5); // shrink_to_fit is allowed to keep the elements before the offset // Test that we can handle empty slices: - let empty_slice = shrunk.slice(4); + let empty_slice = shrunk.slice_with_length(1, 0); drop(shrunk); // Make sure the buffer isn't shared (or shrink_to_fit won't work) - assert_eq!(empty_slice.len(), 0); - assert_eq!(empty_slice.capacity(), 4); + assert_eq!(empty_slice.as_slice(), &[]); + assert_eq!(empty_slice.capacity(), 5); let mut shrunk_empty = empty_slice; shrunk_empty.shrink_to_fit(); - assert_eq!(shrunk_empty.len(), 0); - assert_eq!(shrunk_empty.capacity(), 1); // `Buffer` and `Bytes` doesn't support 0-capacity, so we shrink to 1 + assert_eq!(shrunk_empty.as_slice(), &[]); + assert_eq!(shrunk_empty.capacity(), 1); // NOTE: `Buffer` and `Bytes` doesn't support 0-capacity } #[test] From 4e9278235d91aa6e354ca44d1acb8065e130177c Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 26 Nov 2024 20:01:52 +0100 Subject: [PATCH 13/13] Add license header to new test file --- arrow/tests/shrink_to_fit.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/arrow/tests/shrink_to_fit.rs b/arrow/tests/shrink_to_fit.rs index bc2b97668a11..5d7c2cf98bc9 100644 --- a/arrow/tests/shrink_to_fit.rs +++ b/arrow/tests/shrink_to_fit.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use arrow::{ array::{Array, ArrayRef, ListArray, PrimitiveArray}, buffer::OffsetBuffer,