From a729f47491538e5de720a515f17de37d3cfeebf5 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 22 Sep 2023 14:06:06 +0100 Subject: [PATCH] Add `column_offsets` to row format This allows a user to more easily extract the encoded data for a single value for the row format. --- arrow-row/src/lib.rs | 86 ++++++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 23 deletions(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 58dc42a4cacb..0b2f32f9eb97 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -399,6 +399,7 @@ impl Codec { let owned = OwnedRow { data: nulls.buffer.into(), + offsets: nulls.column_offsets.first().unwrap()[..].into(), config: nulls.config, }; Ok(Self::Dictionary(converter, owned)) @@ -436,6 +437,7 @@ impl Codec { let nulls = converter.convert_columns(&nulls)?; let owned = OwnedRow { data: nulls.buffer.into(), + offsets: nulls.column_offsets.first().unwrap()[..].into(), config: nulls.config, }; @@ -641,7 +643,19 @@ impl RowConverter { .collect::, _>>()?; let write_offset = rows.num_rows(); - let lengths = row_lengths(columns, &encoders); + let lengths = column_lengths(columns, &encoders); + + // These are offsets within the row's data buffer. + for (row_idx, col_lengths) in lengths.iter().enumerate() { + rows.column_offsets.reserve(columns.len() + 1); + rows.column_offsets[row_idx].push(0); + + let mut cur_offset = 0_usize; + for l in col_lengths { + cur_offset = cur_offset.checked_add(*l).expect("overflow"); + rows.column_offsets[row_idx].push(cur_offset); + } + } // We initialize the offsets shifted down by one row index. // @@ -662,7 +676,9 @@ impl RowConverter { let mut cur_offset = rows.offsets[write_offset]; for l in lengths { rows.offsets.push(cur_offset); - cur_offset = cur_offset.checked_add(l).expect("overflow"); + cur_offset = cur_offset + .checked_add(l.iter().sum::()) + .expect("overflow"); } // Note this will not zero out any trailing data in `rows.buffer`, @@ -755,6 +771,7 @@ impl RowConverter { Rows { offsets, + column_offsets: vec![vec![]; row_capacity], buffer: Vec::with_capacity(data_capacity), config: RowConfig { fields: self.fields.clone(), @@ -819,6 +836,7 @@ impl RowParser { pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> { Row { data: bytes, + offsets: &[0], config: &self.config, } } @@ -842,6 +860,8 @@ pub struct Rows { buffer: Vec, /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]` offsets: Vec, + /// Column `j` of row `i` has data `&buffer[offsets[i]..offsets[i+1]][column_offsets[i][j]..column_offsets[i][j+1]]` + column_offsets: Vec>, /// The config for these rows config: RowConfig, } @@ -864,6 +884,7 @@ impl Rows { let start = self.offsets[row]; Row { data: &self.buffer[start..end], + offsets: &self.column_offsets[row], config: &self.config, } } @@ -962,6 +983,7 @@ impl<'a> DoubleEndedIterator for RowsIter<'a> { #[derive(Debug, Copy, Clone)] pub struct Row<'a> { data: &'a [u8], + offsets: &'a [usize], config: &'a RowConfig, } @@ -970,6 +992,7 @@ impl<'a> Row<'a> { pub fn owned(&self) -> OwnedRow { OwnedRow { data: self.data.into(), + offsets: self.offsets.into(), config: self.config.clone(), } } @@ -1020,6 +1043,7 @@ impl<'a> AsRef<[u8]> for Row<'a> { #[derive(Debug, Clone)] pub struct OwnedRow { data: Box<[u8]>, + offsets: Box<[usize]>, config: RowConfig, } @@ -1030,6 +1054,7 @@ impl OwnedRow { pub fn row(&self) -> Row<'_> { Row { data: &self.data, + offsets: &self.offsets, config: &self.config, } } @@ -1084,42 +1109,42 @@ fn null_sentinel(options: SortOptions) -> u8 { } /// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] -fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec { +fn column_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec> { use fixed::FixedLengthEncoding; + let num_columns = cols.len(); let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); - let mut lengths = vec![0; num_rows]; - + let mut lengths = vec![Vec::with_capacity(num_columns); num_rows]; for (array, encoder) in cols.iter().zip(encoders) { match encoder { Encoder::Stateless => { downcast_primitive_array! { - array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)), - DataType::Null => {}, - DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN), + array => lengths.iter_mut().for_each(|x| x.push(fixed::encoded_len(array))), + DataType::Null => {lengths.iter_mut().for_each(|x| x.push(0))}, + DataType::Boolean => lengths.iter_mut().for_each(|x| x.push(bool::ENCODED_LEN)), DataType::Binary => as_generic_binary_array::(array) .iter() .zip(lengths.iter_mut()) - .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + .for_each(|(slice, length)| length.push(variable::encoded_len(slice))), DataType::LargeBinary => as_generic_binary_array::(array) .iter() .zip(lengths.iter_mut()) - .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + .for_each(|(slice, length)| length.push(variable::encoded_len(slice))), DataType::Utf8 => array.as_string::() .iter() .zip(lengths.iter_mut()) .for_each(|(slice, length)| { - *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + length.push(variable::encoded_len(slice.map(|x| x.as_bytes()))) }), DataType::LargeUtf8 => array.as_string::() .iter() .zip(lengths.iter_mut()) .for_each(|(slice, length)| { - *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + length.push(variable::encoded_len(slice.map(|x| x.as_bytes()))) }), DataType::FixedSizeBinary(len) => { let len = len.to_usize().unwrap(); - lengths.iter_mut().for_each(|x| *x += 1 + len) + lengths.iter_mut().for_each(|x| x.push(1 + len)) } _ => unreachable!(), } @@ -1128,10 +1153,10 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec { downcast_dictionary_array! { array => { for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { - *length += match v { + length.push(match v { Some(k) => values.row(k.as_usize()).data.len(), None => null.data.len(), - } + }) } } _ => unreachable!(), @@ -1139,19 +1164,34 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec { } Encoder::Struct(rows, null) => { let array = as_struct_array(array); - lengths.iter_mut().enumerate().for_each(|(idx, length)| { - match array.is_valid(idx) { - true => *length += 1 + rows.row(idx).as_ref().len(), - false => *length += 1 + null.data.len(), - } - }); + lengths + .iter_mut() + .enumerate() + .for_each(|(row_idx, length)| match array.is_valid(row_idx) { + true => length.push(1 + rows.row(row_idx).as_ref().len()), + false => length.push(1 + null.data.len()), + }); } Encoder::List(rows) => match array.data_type() { DataType::List(_) => { - list::compute_lengths(&mut lengths, rows, as_list_array(array)) + let mut x = vec![0; num_rows]; + list::compute_lengths(&mut x, rows, as_list_array(array)); + lengths + .iter_mut() + .enumerate() + .for_each(|(row_idx, length)| { + length.push(x[row_idx]); + }); } DataType::LargeList(_) => { - list::compute_lengths(&mut lengths, rows, as_large_list_array(array)) + let mut x = vec![0; num_rows]; + list::compute_lengths(&mut x, rows, as_large_list_array(array)); + lengths + .iter_mut() + .enumerate() + .for_each(|(row_idx, length)| { + length.push(x[row_idx]); + }); } _ => unreachable!(), },