Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Jul 13, 2024
1 parent 6451fe0 commit e2656c9
Showing 1 changed file with 41 additions and 39 deletions.
80 changes: 41 additions & 39 deletions arrow-row/src/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use arrow_array::builder::BufferBuilder;
use arrow_array::*;
use arrow_buffer::bit_util::ceil;
use arrow_buffer::MutableBuffer;
use arrow_data::{ArrayDataBuilder, ByteView};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType, SortOptions};
use builder::make_view;

Expand Down Expand Up @@ -259,62 +259,64 @@ fn decode_binary_view_inner(
valid
});

let values_capacity: usize = rows.iter().map(|row| decoded_len(row, options)).sum();
let mut values = MutableBuffer::new(values_capacity);
// we create two buffer, the inline buffer is used for quick utf8 validation.
let mut output_buffer_cap = 0;
let mut inline_buffer_cap = 0;
for r in rows.iter() {
let len = decoded_len(r, options);
if len > 12 {
output_buffer_cap += len;
} else {
inline_buffer_cap += len;
}
}

let mut output_buffer = MutableBuffer::new(output_buffer_cap);
let mut inline_buffer = MutableBuffer::new(inline_buffer_cap);
let mut views = BufferBuilder::<u128>::new(len);

for row in rows {
let start_offset = values.len();
let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
let start_offset = output_buffer.len();
let offset = decode_blocks(row, options, |b| {
let val = if b.len() <= 12 {
let old_len = inline_buffer.len();
inline_buffer.extend_from_slice(b);
// Safety: we just extended the buffer with the length of `b`
unsafe { inline_buffer.get_unchecked_mut(old_len..) }
} else {
output_buffer.extend_from_slice(b);
debug_assert_eq!(b, &output_buffer[start_offset..]);
// Safety: we just extended the buffer with the length of `b`
unsafe { output_buffer.get_unchecked_mut(start_offset..) }
};
if options.descending {
val.iter_mut().for_each(|o| *o = !*o);
}

let view = make_view(val, 0, start_offset as u32);
views.append(view);
});
if row[0] == null_sentinel(options) {
debug_assert_eq!(offset, 1);
debug_assert_eq!(start_offset, values.len());
debug_assert_eq!(start_offset, output_buffer.len());
views.append(0);
} else {
let view = make_view(
unsafe { values.get_unchecked(start_offset..) },
0,
start_offset as u32,
);
views.append(view);
}
*row = &row[offset..];
}

if options.descending {
values.as_slice_mut().iter_mut().for_each(|o| *o = !*o);
for view in views.as_slice_mut() {
let len = *view as u32;
if len <= 12 {
let mut bytes = view.to_le_bytes();
bytes
.iter_mut()
.skip(4)
.take(len as usize)
.for_each(|o| *o = !*o);
*view = u128::from_le_bytes(bytes);
} else {
let mut byte_view = ByteView::from(*view);
let mut prefix = byte_view.prefix.to_le_bytes();
prefix.iter_mut().for_each(|o| *o = !*o);
byte_view.prefix = u32::from_le_bytes(prefix);
*view = byte_view.into();
}
}
}

if check_utf8 {
// the values contains all data, no matter if it is short or long
// we can validate utf8 in one go.
std::str::from_utf8(values.as_slice()).unwrap();
// We validate the utf8 of the output buffer and the inline buffer
// This is much faster than validating each string individually
std::str::from_utf8(output_buffer.as_slice()).unwrap();
std::str::from_utf8(inline_buffer.as_slice()).unwrap();
}

let builder = ArrayDataBuilder::new(DataType::BinaryView)
.len(len)
.null_count(null_count)
.null_bit_buffer(Some(nulls.into()))
.add_buffer(views.finish())
.add_buffer(values.into());
.add_buffer(output_buffer.into());

// SAFETY:
// Valid by construction above
Expand Down

0 comments on commit e2656c9

Please sign in to comment.