From e2b5fe814e06eb77a9f7213d7a1cce28f2d64f0c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 25 Oct 2024 17:37:44 +0800 Subject: [PATCH] feat: implement memory pool for mutable buffer Signed-off-by: Ruihang Xia --- arrow-buffer/src/buffer/mutable.rs | 90 ++++++++++++++++++++++++++++-- arrow-buffer/src/bytes.rs | 19 +++++++ arrow-buffer/src/pool.rs | 2 +- 3 files changed, 106 insertions(+), 5 deletions(-) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 7fcbd89dd262..f486c0cba424 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -24,6 +24,7 @@ use crate::{ bytes::Bytes, native::{ArrowNativeType, ToByteSlice}, util::bit_util, + MemoryPool, }; use super::Buffer; @@ -57,6 +58,9 @@ pub struct MutableBuffer { // invariant: len <= capacity len: usize, layout: Layout, + + #[cfg(feature = "pool")] + reservation: std::sync::Mutex>>, } impl MutableBuffer { @@ -91,6 +95,8 @@ impl MutableBuffer { data, len: 0, layout, + #[cfg(feature = "pool")] + reservation: std::sync::Mutex::new(None), } } @@ -115,7 +121,13 @@ impl MutableBuffer { NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) } }; - Self { data, len, layout } + Self { + data, + len, + layout, + #[cfg(feature = "pool")] + reservation: std::sync::Mutex::new(None), + } } /// Create a [`MutableBuffer`] from the provided [`Vec`] without copying @@ -136,7 +148,13 @@ impl MutableBuffer { let data = bytes.ptr(); mem::forget(bytes); - Ok(Self { data, len, layout }) + Ok(Self { + data, + len, + layout, + #[cfg(feature = "pool")] + reservation: std::sync::Mutex::new(None), + }) } /// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits. @@ -211,7 +229,11 @@ impl MutableBuffer { if self.layout.size() != 0 { // Safety: data was allocated with layout unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) }; - self.layout = new_layout + self.layout = new_layout; + #[cfg(feature = "pool")] + if let Some(reservation) = self.reservation.lock().unwrap().as_mut() { + reservation.resize(0); + } } return; } @@ -224,6 +246,10 @@ impl MutableBuffer { }; self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout)); self.layout = new_layout; + #[cfg(feature = "pool")] + if let Some(reservation) = self.reservation.lock().unwrap().as_mut() { + reservation.resize(capacity); + } } /// Truncates this buffer to `len` bytes @@ -340,6 +366,7 @@ impl MutableBuffer { self.into_buffer() } + #[cfg(not(feature = "pool"))] #[inline] pub(super) fn into_buffer(self) -> Buffer { let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) }; @@ -347,6 +374,22 @@ impl MutableBuffer { Buffer::from_bytes(bytes) } + #[cfg(feature = "pool")] + #[inline] + pub(super) fn into_buffer(self) -> Buffer { + let reservation = std::mem::take(&mut *self.reservation.lock().unwrap()); + let bytes = unsafe { + Bytes::new_with_reservation( + self.data, + self.len, + Deallocation::Standard(self.layout), + reservation, + ) + }; + std::mem::forget(self); + Buffer::from_bytes(bytes) + } + /// View this buffer as a mutable slice of a specific type. /// /// # Panics @@ -481,6 +524,15 @@ impl MutableBuffer { buffer.truncate(bit_util::ceil(len, 8)); buffer } + + #[cfg(feature = "pool")] + /// Register this [`MutableBuffer`] with the provided [`MemoryPool`], replacing any prior assignment + pub fn claim(&self, pool: &dyn MemoryPool) { + self.reservation + .lock() + .unwrap() + .replace(pool.register(self.capacity())); + } } #[inline] @@ -518,7 +570,13 @@ impl From> for MutableBuffer { // This is based on `RawVec::current_memory` let layout = unsafe { Layout::array::(value.capacity()).unwrap_unchecked() }; mem::forget(value); - Self { data, len, layout } + Self { + data, + len, + layout, + #[cfg(feature = "pool")] + reservation: std::sync::Mutex::new(None), + } } } @@ -1025,4 +1083,28 @@ mod tests { let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT); let _ = MutableBuffer::with_capacity(max_capacity + 1); } + + #[cfg(feature = "pool")] + #[test] + fn test_memory_pool() { + use crate::TrackingMemoryPool; + + let pool = TrackingMemoryPool::default(); + let mut buffer = MutableBuffer::from_iter(std::iter::repeat(1u64).take(64)); + buffer.claim(&pool); + assert_eq!(pool.allocated(), buffer.capacity()); + + buffer.reserve(128); + assert_eq!(pool.allocated(), buffer.capacity()); + + buffer.shrink_to_fit(); + assert_eq!(pool.allocated(), buffer.capacity()); + + buffer.claim(&pool); + buffer.claim(&pool); + assert_eq!(pool.allocated(), buffer.capacity()); + + let immutable = buffer.into_buffer(); + assert_eq!(pool.allocated(), immutable.capacity()); + } } diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs index acc955851114..1b297626a715 100644 --- a/arrow-buffer/src/bytes.rs +++ b/arrow-buffer/src/bytes.rs @@ -73,6 +73,25 @@ impl Bytes { } } + /// Takes ownership of an allocated memory region with a reservation. + /// + /// Similiar to [`Bytes::new`], but also takes a reservation that is used to track the memory usage. + #[cfg(feature = "pool")] + #[inline] + pub(crate) unsafe fn new_with_reservation( + ptr: NonNull, + len: usize, + deallocation: Deallocation, + reservation: Option>, + ) -> Bytes { + Bytes { + ptr, + len, + deallocation, + reservation: std::sync::Mutex::new(reservation), + } + } + fn as_slice(&self) -> &[u8] { self } diff --git a/arrow-buffer/src/pool.rs b/arrow-buffer/src/pool.rs index ea73b4faa431..1aeb0bce1a92 100644 --- a/arrow-buffer/src/pool.rs +++ b/arrow-buffer/src/pool.rs @@ -8,7 +8,7 @@ pub trait MemoryPool { } /// A memory reservation within a [`MemoryPool`] that is freed on drop -pub trait MemoryReservation { +pub trait MemoryReservation: std::fmt::Debug { /// Resize this reservation to `new` bytes fn resize(&mut self, new: usize); }