Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Revert "use different buffer for different encoding"
Browse files Browse the repository at this point in the history
This reverts commit 71f51aa.
ariesdevil committed Apr 22, 2024
1 parent 5def24a commit a3213dd
Showing 3 changed files with 18 additions and 70 deletions.
11 changes: 6 additions & 5 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
@@ -327,7 +327,6 @@ impl ByteViewArrayDecoderPlain {
let mut read = 0;

let buf = self.buf.as_ref();
output.add_buffer(self.buf.clone());
while self.offset < self.buf.len() && read != to_read {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte view array".into()));
@@ -341,7 +340,7 @@ impl ByteViewArrayDecoderPlain {
return Err(ParquetError::EOF("eof decoding byte view array".into()));
}

output.try_push_with_offset(start_offset, end_offset)?;
output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?;

self.offset = end_offset;
read += 1;
@@ -417,10 +416,12 @@ impl ByteViewArrayDecoderDeltaLength {
}

let mut start_offset = self.data_offset;
output.add_buffer(self.data.clone());
for length in src_lengths {
let end_offset = start_offset + *length as usize;
output.try_push_with_offset(start_offset, end_offset)?;
output.try_push(
&self.data.as_ref()[start_offset..end_offset],
self.validate_utf8,
)?;
start_offset = end_offset;
}

@@ -498,7 +499,7 @@ impl ByteViewArrayDecoderDictionary {
}

self.decoder.read(len, |keys| {
output.extend_from_dictionary(keys, &dict.views, dict.plain_buffer.as_ref().unwrap())
output.extend_from_dictionary(keys, &dict.views, &dict.buffer)
})
}

2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
@@ -2214,7 +2214,7 @@ mod tests {
),
(
invalid_utf8_later_char::<i32>(),
"Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 6",
"Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 3",
),
];
for (array, expected_error) in cases {
75 changes: 11 additions & 64 deletions parquet/src/arrow/buffer/view_buffer.rs
Original file line number Diff line number Diff line change
@@ -22,22 +22,13 @@ use arrow_array::{make_array, ArrayRef};
use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
use arrow_data::{ArrayDataBuilder, ByteView};
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;

/// A buffer of variable-sized byte arrays that can be converted into
/// a corresponding [`ArrayRef`]
#[derive(Debug, Default)]
pub struct ViewBuffer {
pub views: Vec<u128>,
/// If encoding in (`PLAIN`, `DELTA_LENGTH_BYTE_ARRAY`), we use `plain_buffer`
/// to hold the page data without copy.
pub plain_buffer: Option<Bytes>,
/// If encoding is `DELTA_BYTE_ARRAY`, we use `delta_buffer` to build data buffer
/// since this encoding's page data not hold full data.
///
/// If encoding in (`PLAIN_DICTIONARY`, `RLE_DICTIONARY`), we need these two buffers
/// cause these encoding first build dict then use dict to read data.
pub delta_buffer: Vec<u8>,
pub buffer: Vec<u8>,
}

impl ViewBuffer {
@@ -50,36 +41,6 @@ impl ViewBuffer {
self.len() == 0
}

/// add entire page buf to [`ViewBuffer`], avoid copy data.
pub fn add_buffer(&mut self, buf: Bytes) {
if self.plain_buffer.is_none() {
self.plain_buffer = Some(buf);
}
}

/// Push data to [`ViewBuffer`], since we already hold full data through [`Self::add_buffer`],
/// we only need to slice the data to build the view.
pub fn try_push_with_offset(&mut self, start_offset: usize, end_offset: usize) -> Result<()> {
let data = &self.plain_buffer.as_ref().unwrap()[start_offset..end_offset];
let length: u32 = (end_offset - start_offset) as u32;
if length <= 12 {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
view_buffer[4..4 + length as usize].copy_from_slice(data);
self.views.push(u128::from_le_bytes(view_buffer));
return Ok(());
}

let view = ByteView {
length,
prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
buffer_index: 0,
offset: start_offset as u32,
};
self.views.push(view.into());
Ok(())
}

/// If `validate_utf8` this verifies that the first character of `data` is
/// the start of a UTF-8 codepoint
///
@@ -107,8 +68,8 @@ impl ViewBuffer {
return Ok(());
}

let offset = self.delta_buffer.len() as u32;
self.delta_buffer.extend_from_slice(data);
let offset = self.buffer.len() as u32;
self.buffer.extend_from_slice(data);

let view = ByteView {
length,
@@ -168,34 +129,20 @@ impl ViewBuffer {
return Ok(());
}
let first_buffer_offset = ((*first_buffer.unwrap()) >> 96) as u32 as usize;
if self.plain_buffer.is_none() {
match std::str::from_utf8(&self.delta_buffer[first_buffer_offset..]) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
}
} else {
match std::str::from_utf8(&self.plain_buffer.as_ref().unwrap()[first_buffer_offset..]) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
}
match std::str::from_utf8(&self.buffer[first_buffer_offset..]) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
}
}

/// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
let len = self.len();
let array_data_builder = {
let builder = ArrayDataBuilder::new(data_type)
.len(len)
.add_buffer(Buffer::from_vec(self.views))
.null_bit_buffer(null_buffer);

if self.plain_buffer.is_none() {
builder.add_buffer(Buffer::from_vec(self.delta_buffer))
} else {
builder.add_buffer(self.plain_buffer.unwrap().into())
}
};
let array_data_builder = ArrayDataBuilder::new(data_type)
.len(len)
.add_buffer(Buffer::from_vec(self.views))
.add_buffer(Buffer::from_vec(self.buffer))
.null_bit_buffer(null_buffer);

let data = match cfg!(debug_assertions) {
true => array_data_builder.build().unwrap(),

0 comments on commit a3213dd

Please sign in to comment.