Skip to content

Commit

Permalink
Revisit List Row Encoding (#5807)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 28, 2024
1 parent 95791f1 commit 2420f49
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 96 deletions.
48 changes: 26 additions & 22 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,9 @@ mod variable;
///
/// Lists are encoded by first encoding all child elements to the row format.
///
/// A "canonical byte array" is then constructed by concatenating the row
/// encodings of all their elements into a single binary array, followed
/// by the lengths of each encoded row, and the number of elements, encoded
/// as big endian `u32`.
///
/// This canonical byte array is then encoded using the variable length byte
/// encoding described above.
///
/// _The lengths are not strictly necessary but greatly simplify decode, they
/// may be removed in a future iteration_.
/// A list value is then encoded as the concatenation of each of the child elements,
/// separately encoded using the variable length encoding described above, followed
/// by the variable length encoding of an empty byte array.
///
/// For example given:
///
Expand All @@ -323,24 +316,23 @@ mod variable;
/// └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘
///```
///
/// Which would be grouped into the following canonical byte arrays:
/// Which would be encoded as
///
/// ```text
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
/// [1_u8, 2_u8, 3_u8] │01│01│01│02│01│03│00│00│00│02│00│00│00│02│00│00│00│02│00│00│00│03│
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
/// └──── rows ────┘ └───────── row lengths ─────────┘ └─ count ─┘
///
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
/// [1_u8, null] │01│01│00│00│00│00│00│02│00│00│00│02│00│00│00│02│
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
/// [1_u8, 2_u8, 3_u8] │02│01│01│00│00│02│02│01│02│00│00│02│02│01│03│00│00│02│01│
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
/// └──── 1_u8 ────┘ └──── 2_u8 ────┘ └──── 3_u8 ────┘
///
/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
/// [1_u8, null] │02│01│01│00│00│02│02│00│00│00│00│02│01│
/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘
/// └──── 1_u8 ────┘ └──── null ────┘
///
///```
///
/// With `[]` represented by an empty byte array, and `null` a null byte array.
///
/// These byte arrays will then be encoded using the variable length byte encoding
/// described above.
///
/// # Ordering
///
/// ## Float Ordering
Expand Down Expand Up @@ -2271,4 +2263,16 @@ mod tests {

dictionary_eq(&back[0], &array);
}

#[test]
fn test_list_prefix() {
let mut a = ListBuilder::new(Int8Builder::new());
a.append_value([None]);
a.append_value([None, None]);
let a = a.finish();

let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
}
}
138 changes: 76 additions & 62 deletions arrow-row/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use crate::{RowConverter, Rows, SortField};
use arrow_array::builder::BufferBuilder;
use crate::{null_sentinel, RowConverter, Rows, SortField};
use arrow_array::{Array, GenericListArray, OffsetSizeTrait};
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, SortOptions};
use std::ops::Range;
Expand All @@ -43,12 +43,10 @@ pub fn compute_lengths<O: OffsetSizeTrait>(
fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
match range {
None => 1,
Some(range) if range.start == range.end => 1,
Some(range) => {
let element_count = range.end - range.start;
let row_bytes = range.map(|i| rows.row(i).as_ref().len()).sum::<usize>();
let total = (1 + element_count) * std::mem::size_of::<u32>() + row_bytes;
super::variable::padded_length(Some(total))
1 + range
.map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
.sum::<usize>()
}
}
}
Expand All @@ -63,7 +61,6 @@ pub fn encode<O: OffsetSizeTrait>(
opts: SortOptions,
array: &GenericListArray<O>,
) {
let mut temporary = vec![];
offsets
.iter_mut()
.skip(1)
Expand All @@ -74,42 +71,28 @@ pub fn encode<O: OffsetSizeTrait>(
let end = offsets[1].as_usize();
let range = array.is_valid(idx).then_some(start..end);
let out = &mut data[*offset..];
*offset += encode_one(out, &mut temporary, rows, range, opts)
*offset += encode_one(out, rows, range, opts)
});
}

#[inline]
fn encode_one(
out: &mut [u8],
temporary: &mut Vec<u8>,
rows: &Rows,
range: Option<Range<usize>>,
opts: SortOptions,
) -> usize {
temporary.clear();

match range {
None => super::variable::encode_one(out, None, opts),
Some(range) if range.start == range.end => {
super::variable::encode_one(out, Some(&[]), opts)
}
None => super::variable::encode_null(out, opts),
Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
Some(range) => {
for row in range.clone().map(|i| rows.row(i)) {
temporary.extend_from_slice(row.as_ref());
}
for row in range.clone().map(|i| rows.row(i)) {
let len: u32 = row
.as_ref()
.len()
.try_into()
.expect("ListArray or LargeListArray containing a list of more than u32::MAX items is not supported");
temporary.extend_from_slice(&len.to_be_bytes());
let mut offset = 0;
for i in range {
let row = rows.row(i);
offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
}
let row_count: u32 = (range.end - range.start)
.try_into()
.expect("lists containing more than u32::MAX elements not supported");
temporary.extend_from_slice(&row_count.to_be_bytes());
super::variable::encode_one(out, Some(temporary), opts)
offset += super::variable::encode_empty(&mut out[offset..], opts);
offset
}
}
}
Expand All @@ -125,50 +108,81 @@ pub unsafe fn decode<O: OffsetSizeTrait>(
field: &SortField,
validate_utf8: bool,
) -> Result<GenericListArray<O>, ArrowError> {
let canonical = super::variable::decode_binary::<i64>(rows, field.options);

let mut offsets = BufferBuilder::<O>::new(rows.len() + 1);
offsets.append(O::from_usize(0).unwrap());
let mut current_offset = 0;

let mut child_rows = Vec::with_capacity(rows.len());
canonical.value_offsets().windows(2).for_each(|w| {
let start = w[0] as usize;
let end = w[1] as usize;
if start == end {
// Null or empty list
offsets.append(O::from_usize(current_offset).unwrap());
return;
}
let opts = field.options;

let mut values_bytes = 0;

let row = &canonical.value_data()[start..end];
let element_count_start = row.len() - 4;
let element_count =
u32::from_be_bytes((&row[element_count_start..]).try_into().unwrap()) as usize;
let mut offset = 0;
let mut offsets = Vec::with_capacity(rows.len() + 1);
offsets.push(O::usize_as(0));

let lengths_start = element_count_start - (element_count * 4);
for row in rows.iter_mut() {
let mut row_offset = 0;
row[lengths_start..element_count_start]
.chunks_exact(4)
.for_each(|chunk| {
let len = u32::from_be_bytes(chunk.try_into().unwrap());
let next_row_offset = row_offset + len as usize;
child_rows.push(&row[row_offset..next_row_offset]);
row_offset = next_row_offset;
loop {
let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
values_bytes += x.len();
});
if decoded <= 1 {
offsets.push(O::usize_as(offset));
break;
}
row_offset += decoded;
offset += 1;
}
}
O::from_usize(offset).expect("overflow");

current_offset += element_count;
offsets.append(O::from_usize(current_offset).unwrap());
let mut null_count = 0;
let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
let valid = rows[x][0] != null_sentinel(opts);
null_count += !valid as usize;
valid
});

let mut values_offsets = Vec::with_capacity(offset);
let mut values_bytes = Vec::with_capacity(values_bytes);
for row in rows.iter_mut() {
let mut row_offset = 0;
loop {
let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
values_bytes.extend_from_slice(x)
});
if decoded <= 1 {
break;
}
row_offset += decoded;
values_offsets.push(values_bytes.len());
}
*row = &row[row_offset..];
}

if opts.descending {
values_bytes.iter_mut().for_each(|o| *o = !*o);
}

let mut last_value_offset = 0;
let mut child_rows: Vec<_> = values_offsets
.into_iter()
.map(|offset| {
let v = &values_bytes[last_value_offset..offset];
last_value_offset = offset;
v
})
.collect();

println!("Decoded Child rows: {child_rows:?}");

let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
assert_eq!(child.len(), 1);
println!("{child:?}");

let child_data = child[0].to_data();

let builder = ArrayDataBuilder::new(field.data_type.clone())
.len(rows.len())
.nulls(canonical.nulls().cloned())
.add_buffer(offsets.finish())
.null_count(null_count)
.null_bit_buffer(Some(nulls.into()))
.add_buffer(Buffer::from_vec(offsets))
.add_child_data(child_data);

Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
Expand Down
28 changes: 16 additions & 12 deletions arrow-row/src/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,23 @@ pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
}
}

pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
out[0] = null_sentinel(opts);
1
}

pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
out[0] = match opts.descending {
true => !EMPTY_SENTINEL,
false => EMPTY_SENTINEL,
};
1
}

pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
match val {
Some([]) => {
out[0] = match opts.descending {
true => !EMPTY_SENTINEL,
false => EMPTY_SENTINEL,
};
1
}
None => encode_null(out, opts),
Some([]) => encode_empty(out, opts),
Some(val) => {
// Write `2_u8` to demarcate as non-empty, non-null string
out[0] = NON_EMPTY_SENTINEL;
Expand All @@ -111,10 +119,6 @@ pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usiz
}
len
}
None => {
out[0] = null_sentinel(opts);
1
}
}
}

Expand Down Expand Up @@ -148,7 +152,7 @@ fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
end_offset
}

fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
let (non_empty_sentinel, continuation) = match options.descending {
true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
Expand Down

0 comments on commit 2420f49

Please sign in to comment.