From da2e34e41af40c5209ee16599632ad9c95fd3530 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Tue, 9 Apr 2024 11:58:22 +0800 Subject: [PATCH] support slice view data on writing ipc --- arrow-ipc/src/writer.rs | 73 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 65 insertions(+), 8 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 97136bd97c2f..8df0ae823e87 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -580,7 +580,12 @@ fn append_variadic_buffer_counts(counts: &mut Vec, array: &ArrayData) { DataType::BinaryView | DataType::Utf8View => { // The spec documents the counts only includes the variadic buffers, not the view/null buffers. // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers - counts.push(array.buffers().len() as i64 - 1); + let views = array.buffers()[0].typed_data::(); + if views.iter().any(|view| *view as u32 > 12) { + counts.push(array.buffers().len() as i64 - 1); + } else { + counts.push(0); + } } DataType::Dictionary(_, _) => { // Do nothing @@ -1245,6 +1250,63 @@ fn get_list_array_buffers(data: &ArrayData) -> (Buffer, Arra (offsets, child_data) } +fn update_buffer_index(value: &mut u128, new_buffer_index: u32) { + // keep length、prefix and offset,clear old buffer_index, see [`ByteView`] for detail. + let mask = (0xFFFFFFFFu128 << 96) | (0xFFFFFFFFu128 << 32) | (0xFFFFFFFFu128); + *value &= mask; + + // move new buffer index to right position + let new_buffer_index = (new_buffer_index as u128) << 64; + + // update value with new buffer index + *value |= new_buffer_index; +} + +fn select_data_buffers(mut views_slice: Vec, data: &ArrayData) -> Vec { + let first_buffer = views_slice.iter().find(|view| (**view) as u32 > 12); + // all values shorter than 12 bytes. + if first_buffer.is_none() { + return vec![Buffer::from_vec(views_slice)]; + } + let first_buffer_index = ((*first_buffer.unwrap()) >> 64) as u32 as usize; + + let last_buffer = views_slice + .iter() + .rfind(|view| (**view) as u32 > 12) + .unwrap(); + let last_buffer_index = ((*last_buffer) >> 64) as u32 as usize; + + let data_buffers = &data.buffers()[1..]; + let sliced_data_buffers = &data_buffers[first_buffer_index..last_buffer_index + 1]; + + // if first buffer index not 0, we need re-mapping view's buffer index to sliced data buffers + if first_buffer_index != 0 { + views_slice + .iter_mut() + .filter(|view| (**view) as u32 > 12) + .for_each(|view| { + // new buffer index = original buffer index - offset + let new_buffer_index = ((*view >> 64) as u32) - first_buffer_index as u32; + update_buffer_index(view, new_buffer_index); + }); + } + + let mut buffers = Vec::with_capacity(sliced_data_buffers.len() + 1); + buffers.push(views_slice.iter().copied().collect()); + buffers.extend_from_slice(sliced_data_buffers); + buffers +} + +fn get_byte_view_buffers(data: &ArrayData) -> Vec { + if data.is_empty() { + return Vec::with_capacity(0); + } + + let views_slice = data.buffers()[0].typed_data::(); + let views_slice = &views_slice[data.offset()..data.offset() + data.len()]; + select_data_buffers(views_slice.to_vec(), data) +} + /// Write array data to a vector of bytes #[allow(clippy::too_many_arguments)] fn write_array_data( @@ -1303,13 +1365,8 @@ fn write_array_data( )?; } } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) { - // Slicing the views buffer is safe and easy, - // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers - // - // Current implementation just serialize the raw arrays as given and not try to optimize anything. - // If users wants to "compact" the arrays prior to sending them over IPC, - // they should consider the gc API suggested in #5513 - for buffer in array_data.buffers() { + let view_buffers = get_byte_view_buffers(array_data); + for buffer in view_buffers { offset = write_buffer( buffer.as_slice(), buffers,