From 2420f4990a9ed406dedcd455aa5c72ff3a0decad Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 28 May 2024 16:18:44 +0100 Subject: [PATCH] Revisit List Row Encoding (#5807) --- arrow-row/src/lib.rs | 48 +++++++------ arrow-row/src/list.rs | 138 +++++++++++++++++++++----------------- arrow-row/src/variable.rs | 28 ++++---- 3 files changed, 118 insertions(+), 96 deletions(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 037ed404adca..4dc7349ca2d5 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -295,16 +295,9 @@ mod variable; /// /// Lists are encoded by first encoding all child elements to the row format. /// -/// A "canonical byte array" is then constructed by concatenating the row -/// encodings of all their elements into a single binary array, followed -/// by the lengths of each encoded row, and the number of elements, encoded -/// as big endian `u32`. -/// -/// This canonical byte array is then encoded using the variable length byte -/// encoding described above. -/// -/// _The lengths are not strictly necessary but greatly simplify decode, they -/// may be removed in a future iteration_. +/// A list value is then encoded as the concatenation of each of the child elements, +/// separately encoded using the variable length encoding described above, followed +/// by the variable length encoding of an empty byte array. /// /// For example given: /// @@ -323,24 +316,23 @@ mod variable; /// └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ ///``` /// -/// Which would be grouped into the following canonical byte arrays: +/// Which would be encoded as /// /// ```text -/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ -/// [1_u8, 2_u8, 3_u8] │01│01│01│02│01│03│00│00│00│02│00│00│00│02│00│00│00│02│00│00│00│03│ -/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ -/// └──── rows ────┘ └───────── row lengths ─────────┘ └─ count ─┘ -/// -/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ -/// [1_u8, null] │01│01│00│00│00│00│00│02│00│00│00│02│00│00│00│02│ -/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ +/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ +/// [1_u8, 2_u8, 3_u8] │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│ +/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ +/// └──── 1_u8 ────┘ └──── 2_u8 ────┘ └──── 3_u8 ────┘ +/// +/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ +/// [1_u8, null] │02│01│01│00│00│02│02│00│00│00│00│02│01│ +/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ +/// └──── 1_u8 ────┘ └──── null ────┘ +/// ///``` /// /// With `[]` represented by an empty byte array, and `null` a null byte array. /// -/// These byte arrays will then be encoded using the variable length byte encoding -/// described above. -/// /// # Ordering /// /// ## Float Ordering @@ -2271,4 +2263,16 @@ mod tests { dictionary_eq(&back[0], &array); } + + #[test] + fn test_list_prefix() { + let mut a = ListBuilder::new(Int8Builder::new()); + a.append_value([None]); + a.append_value([None, None]); + let a = a.finish(); + + let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); + let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap(); + assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less); + } } diff --git a/arrow-row/src/list.rs b/arrow-row/src/list.rs index 511fb4ffb282..83d4493692ae 100644 --- a/arrow-row/src/list.rs +++ b/arrow-row/src/list.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::{RowConverter, Rows, SortField}; -use arrow_array::builder::BufferBuilder; +use crate::{null_sentinel, RowConverter, Rows, SortField}; use arrow_array::{Array, GenericListArray, OffsetSizeTrait}; +use arrow_buffer::{Buffer, MutableBuffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, SortOptions}; use std::ops::Range; @@ -43,12 +43,10 @@ pub fn compute_lengths( fn encoded_len(rows: &Rows, range: Option>) -> usize { match range { None => 1, - Some(range) if range.start == range.end => 1, Some(range) => { - let element_count = range.end - range.start; - let row_bytes = range.map(|i| rows.row(i).as_ref().len()).sum::(); - let total = (1 + element_count) * std::mem::size_of::() + row_bytes; - super::variable::padded_length(Some(total)) + 1 + range + .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len()))) + .sum::() } } } @@ -63,7 +61,6 @@ pub fn encode( opts: SortOptions, array: &GenericListArray, ) { - let mut temporary = vec![]; offsets .iter_mut() .skip(1) @@ -74,42 +71,28 @@ pub fn encode( let end = offsets[1].as_usize(); let range = array.is_valid(idx).then_some(start..end); let out = &mut data[*offset..]; - *offset += encode_one(out, &mut temporary, rows, range, opts) + *offset += encode_one(out, rows, range, opts) }); } #[inline] fn encode_one( out: &mut [u8], - temporary: &mut Vec, rows: &Rows, range: Option>, opts: SortOptions, ) -> usize { - temporary.clear(); - match range { - None => super::variable::encode_one(out, None, opts), - Some(range) if range.start == range.end => { - super::variable::encode_one(out, Some(&[]), opts) - } + None => super::variable::encode_null(out, opts), + Some(range) if range.start == range.end => super::variable::encode_empty(out, opts), Some(range) => { - for row in range.clone().map(|i| rows.row(i)) { - temporary.extend_from_slice(row.as_ref()); - } - for row in range.clone().map(|i| rows.row(i)) { - let len: u32 = row - .as_ref() - .len() - .try_into() - .expect("ListArray or LargeListArray containing a list of more than u32::MAX items is not supported"); - temporary.extend_from_slice(&len.to_be_bytes()); + let mut offset = 0; + for i in range { + let row = rows.row(i); + offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts); } - let row_count: u32 = (range.end - range.start) - .try_into() - .expect("lists containing more than u32::MAX elements not supported"); - temporary.extend_from_slice(&row_count.to_be_bytes()); - super::variable::encode_one(out, Some(temporary), opts) + offset += super::variable::encode_empty(&mut out[offset..], opts); + offset } } } @@ -125,50 +108,81 @@ pub unsafe fn decode( field: &SortField, validate_utf8: bool, ) -> Result, ArrowError> { - let canonical = super::variable::decode_binary::(rows, field.options); - - let mut offsets = BufferBuilder::::new(rows.len() + 1); - offsets.append(O::from_usize(0).unwrap()); - let mut current_offset = 0; - - let mut child_rows = Vec::with_capacity(rows.len()); - canonical.value_offsets().windows(2).for_each(|w| { - let start = w[0] as usize; - let end = w[1] as usize; - if start == end { - // Null or empty list - offsets.append(O::from_usize(current_offset).unwrap()); - return; - } + let opts = field.options; + + let mut values_bytes = 0; - let row = &canonical.value_data()[start..end]; - let element_count_start = row.len() - 4; - let element_count = - u32::from_be_bytes((&row[element_count_start..]).try_into().unwrap()) as usize; + let mut offset = 0; + let mut offsets = Vec::with_capacity(rows.len() + 1); + offsets.push(O::usize_as(0)); - let lengths_start = element_count_start - (element_count * 4); + for row in rows.iter_mut() { let mut row_offset = 0; - row[lengths_start..element_count_start] - .chunks_exact(4) - .for_each(|chunk| { - let len = u32::from_be_bytes(chunk.try_into().unwrap()); - let next_row_offset = row_offset + len as usize; - child_rows.push(&row[row_offset..next_row_offset]); - row_offset = next_row_offset; + loop { + let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| { + values_bytes += x.len(); }); + if decoded <= 1 { + offsets.push(O::usize_as(offset)); + break; + } + row_offset += decoded; + offset += 1; + } + } + O::from_usize(offset).expect("overflow"); - current_offset += element_count; - offsets.append(O::from_usize(current_offset).unwrap()); + let mut null_count = 0; + let nulls = MutableBuffer::collect_bool(rows.len(), |x| { + let valid = rows[x][0] != null_sentinel(opts); + null_count += !valid as usize; + valid }); + let mut values_offsets = Vec::with_capacity(offset); + let mut values_bytes = Vec::with_capacity(values_bytes); + for row in rows.iter_mut() { + let mut row_offset = 0; + loop { + let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| { + values_bytes.extend_from_slice(x) + }); + if decoded <= 1 { + break; + } + row_offset += decoded; + values_offsets.push(values_bytes.len()); + } + *row = &row[row_offset..]; + } + + if opts.descending { + values_bytes.iter_mut().for_each(|o| *o = !*o); + } + + let mut last_value_offset = 0; + let mut child_rows: Vec<_> = values_offsets + .into_iter() + .map(|offset| { + let v = &values_bytes[last_value_offset..offset]; + last_value_offset = offset; + v + }) + .collect(); + + println!("Decoded Child rows: {child_rows:?}"); + let child = converter.convert_raw(&mut child_rows, validate_utf8)?; assert_eq!(child.len(), 1); + println!("{child:?}"); + let child_data = child[0].to_data(); let builder = ArrayDataBuilder::new(field.data_type.clone()) .len(rows.len()) - .nulls(canonical.nulls().cloned()) - .add_buffer(offsets.finish()) + .null_count(null_count) + .null_bit_buffer(Some(nulls.into())) + .add_buffer(Buffer::from_vec(offsets)) .add_child_data(child_data); Ok(GenericListArray::from(unsafe { builder.build_unchecked() })) diff --git a/arrow-row/src/variable.rs b/arrow-row/src/variable.rs index 466214e7540e..45068baf2a32 100644 --- a/arrow-row/src/variable.rs +++ b/arrow-row/src/variable.rs @@ -83,15 +83,23 @@ pub fn encode<'a, I: Iterator>>( } } +pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize { + out[0] = null_sentinel(opts); + 1 +} + +pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize { + out[0] = match opts.descending { + true => !EMPTY_SENTINEL, + false => EMPTY_SENTINEL, + }; + 1 +} + pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize { match val { - Some([]) => { - out[0] = match opts.descending { - true => !EMPTY_SENTINEL, - false => EMPTY_SENTINEL, - }; - 1 - } + None => encode_null(out, opts), + Some([]) => encode_empty(out, opts), Some(val) => { // Write `2_u8` to demarcate as non-empty, non-null string out[0] = NON_EMPTY_SENTINEL; @@ -111,10 +119,6 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz } len } - None => { - out[0] = null_sentinel(opts); - 1 - } } } @@ -148,7 +152,7 @@ fn encode_blocks(out: &mut [u8], val: &[u8]) -> usize { end_offset } -fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize { +pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize { let (non_empty_sentinel, continuation) = match options.descending { true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION), false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),