From 08e33eaad77f1f8081e8637e546fb243de019b0b Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 22 Aug 2023 15:30:50 -0700 Subject: [PATCH 1/3] feat(datafusion-7181): enable slicing of rows --- arrow-row/src/lib.rs | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 18b5890d4a3a..c4f308b42a30 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -1002,6 +1002,46 @@ impl Rows { + self.buffer.len() + self.offsets.len() * std::mem::size_of::() } + + /// Return a new Rows sliced according to `offset` and `length` + /// This copies memory from the original Rows + /// + /// # Panics + /// + /// Panics if `offset` with `length` is greater than row length. + pub fn slice(&self, offset: usize, length: usize) -> Self { + assert!( + offset + length <= self.num_rows(), + "offset + length is greater than row length" + ); + + let Rows { + buffer: current_buffer, + offsets: current_offsets, + config, + } = self; + + let start = current_offsets[offset]; + let end = current_offsets[offset + length]; + let mut buffer: Vec = vec![0_u8; end - start]; + buffer.copy_from_slice(¤t_buffer[start..end]); + + let mut offsets: Vec = vec![0_usize; length + 1]; + offsets.copy_from_slice(¤t_offsets[offset..offset + length + 1]); + // mutate offsets to match new buffer length + offsets = offsets.iter_mut().map(|x| *x - start).collect(); + + if length > 0 { + assert_eq!(offsets.last().unwrap(), &buffer.len()); + assert_eq!(offsets.first().unwrap(), &0_usize); + } + + Self { + buffer, + offsets, + config: config.clone(), + } + } } impl<'a> IntoIterator for &'a Rows { From eccaaf242c3dc4d48f432a712c408aef6a0d3842 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 14 Sep 2023 10:41:02 -0700 Subject: [PATCH 2/3] test(datafusion-7181): test to confirm row slicing --- arrow-row/src/lib.rs | 95 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index c4f308b42a30..12b0a2f80211 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -2264,6 +2264,101 @@ mod tests { } } + #[test] + fn test_row_slice() { + let mut converter = RowConverter::new(vec![ + SortField::new(DataType::Int16), + SortField::new(DataType::Float32), + ]) + .unwrap(); + + let cols = [ + Arc::new(Int16Array::from_iter([Some(1), Some(2), Some(3), Some(4)])) + as ArrayRef, + Arc::new(Float32Array::from_iter([ + Some(1.1), + Some(1.2), + Some(1.3), + Some(1.4), + ])) as ArrayRef, + ]; + let rows = converter.convert_columns(&cols).unwrap(); + + let rows_first_half = rows.slice(0, 2); + let rows_last_one = rows.slice(3, 1); + + assert_eq!(rows_first_half.num_rows(), 2); + assert_eq!(rows_last_one.num_rows(), 1); + + let expected = converter + .convert_columns(&[ + Arc::new(Int16Array::from_iter([Some(1), Some(2)])) as ArrayRef, + Arc::new(Float32Array::from_iter([Some(1.1), Some(1.2)])) as ArrayRef, + ]) + .unwrap(); + assert_eq!( + rows_first_half + .into_iter() + .map(|row| row.as_ref().to_owned()) + .collect::>(), + expected + .into_iter() + .map(|row| row.as_ref().to_owned()) + .collect::>(), + ); + + let expected = converter + .convert_columns(&[ + Arc::new(Int16Array::from_iter([Some(4)])) as ArrayRef, + Arc::new(Float32Array::from_iter([Some(1.4)])) as ArrayRef, + ]) + .unwrap(); + assert_eq!( + rows_last_one + .into_iter() + .map(|row| row.as_ref().to_owned()) + .collect::>(), + expected + .into_iter() + .map(|row| row.as_ref().to_owned()) + .collect::>(), + ); + + assert_eq!( + rows.num_rows(), + 4, + "expected the original rows to remain intact" + ); + let expected = converter.convert_columns(&cols).unwrap(); + assert_eq!( + rows.into_iter() + .map(|row| row.as_ref().to_owned()) + .collect::>(), + expected + .into_iter() + .map(|row| row.as_ref().to_owned()) + .collect::>(), + ); + } + + #[test] + #[should_panic(expected = "offset + length is greater than row length")] + fn test_row_slice_invalid_slice() { + let mut converter = RowConverter::new(vec![ + SortField::new(DataType::Int16), + SortField::new(DataType::Float32), + ]) + .unwrap(); + + let cols = [ + Arc::new(Int16Array::from_iter([Some(1), Some(2)])) as ArrayRef, + Arc::new(Float32Array::from_iter([Some(1.1), Some(1.2)])) as ArrayRef, + ]; + let rows = converter.convert_columns(&cols).unwrap(); + + rows.slice(1, 2); + } + fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray where K: ArrowPrimitiveType, From b83baeb124736ae63bd326bb9b1f5a5d5a67ff60 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 14 Sep 2023 11:33:16 -0700 Subject: [PATCH 3/3] fix(datafusion-7181): avoid unnecessary memory zeroing, and prevent overflow errors --- arrow-row/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 12b0a2f80211..d62014d8e3b8 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -1011,7 +1011,7 @@ impl Rows { /// Panics if `offset` with `length` is greater than row length. pub fn slice(&self, offset: usize, length: usize) -> Self { assert!( - offset + length <= self.num_rows(), + offset.saturating_add(length) <= self.num_rows(), "offset + length is greater than row length" ); @@ -1023,11 +1023,11 @@ impl Rows { let start = current_offsets[offset]; let end = current_offsets[offset + length]; - let mut buffer: Vec = vec![0_u8; end - start]; - buffer.copy_from_slice(¤t_buffer[start..end]); + let mut buffer: Vec = Vec::with_capacity(end - start); + buffer.extend_from_slice(¤t_buffer[start..end]); - let mut offsets: Vec = vec![0_usize; length + 1]; - offsets.copy_from_slice(¤t_offsets[offset..offset + length + 1]); + let mut offsets: Vec = Vec::with_capacity(length + 1); + offsets.extend_from_slice(¤t_offsets[offset..offset + length + 1]); // mutate offsets to match new buffer length offsets = offsets.iter_mut().map(|x| *x - start).collect();