Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Implement support for Encoding::BYTE_STREAM_SPLIT #4183

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,9 @@ mod tests {
| DataType::UInt32
| DataType::UInt16
| DataType::UInt8 => vec![Encoding::PLAIN, Encoding::DELTA_BINARY_PACKED],
DataType::Float32 | DataType::Float64 => {
vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
}
_ => vec![Encoding::PLAIN],
};

Expand Down
58 changes: 56 additions & 2 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::util::{
memory::ByteBufferPtr,
};

mod byte_stream_split_decoder;

pub(crate) mod private {
use super::*;

Expand Down Expand Up @@ -105,8 +107,32 @@ pub(crate) mod private {
}
}

impl GetDecoder for f32 {}
impl GetDecoder for f64 {}
impl GetDecoder for f32 {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
byte_stream_split_decoder::ByteStreamSplitDecoder::new(),
)),
_ => get_decoder_default(descr, encoding),
}
}
}
impl GetDecoder for f64 {
fn get_decoder<T: DataType<T = Self>>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<dyn Decoder<T>>> {
match encoding {
Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
byte_stream_split_decoder::ByteStreamSplitDecoder::new(),
)),
_ => get_decoder_default(descr, encoding),
}
}
}

impl GetDecoder for ByteArray {
fn get_decoder<T: DataType<T = Self>>(
Expand Down Expand Up @@ -1854,6 +1880,29 @@ mod tests {
test_delta_byte_array_decode(data);
}

#[test]
fn test_byte_stream_split_multiple() {
let data = vec![
vec![
f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
],
vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
];
test_byte_stream_split_decode::<FloatType>(data);
}

#[test]
fn test_skip_byte_stream_split() {
let block_data = vec![0.3, 0.4, 0.1, 4.10];
test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
test_skip::<DoubleType>(
block_data.into_iter().map(|x| x as f64).collect(),
Encoding::BYTE_STREAM_SPLIT,
100,
);
}

fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
test_encode_decode::<T>(data, Encoding::RLE);
}
Expand All @@ -1862,6 +1911,10 @@ mod tests {
test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED);
}

fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>) {
test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT);
}

fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY);
}
Expand All @@ -1882,6 +1935,7 @@ mod tests {
encoder.put(&v[..]).expect("ok to encode");
}
let bytes = encoder.flush_buffer().expect("ok to flush buffer");
println!("{:x?}", bytes.data());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?


// Flatten expected data as contiguous array of values
let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
Expand Down
79 changes: 79 additions & 0 deletions parquet/src/encodings/decoding/byte_stream_split_decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::data_type::{DataType, SliceAsBytes};
use crate::{basic::Encoding, errors::Result};

use super::Decoder;

use crate::util::memory::ByteBufferPtr;

use std::marker::PhantomData;

pub struct ByteStreamSplitDecoder<T: DataType> {
_phantom: PhantomData<T>,
encoded_bytes: ByteBufferPtr,
total_num_values: usize,
values_decoded: usize,
}

impl<T: DataType> ByteStreamSplitDecoder<T> {
pub(crate) fn new() -> Self {
Self {
_phantom: PhantomData,
encoded_bytes: ByteBufferPtr::new(vec![]),
total_num_values: 0,
values_decoded: 0,
}
}
}

impl<T: DataType> Decoder<T> for ByteStreamSplitDecoder<T> {
fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> {
self.encoded_bytes = data;
self.total_num_values = num_values;
self.values_decoded = 0;

Ok(())
}

fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
let total_remaining_values = self.values_left();
let num_values = buffer.len().min(total_remaining_values);

// SAFETY: f32 and f64 has no constraints on their internal representation, so we can modify it as we want
let raw_out_bytes = unsafe { <T as DataType>::T::slice_as_bytes_mut(buffer) };

let num_values = num_values;
let num_streams = T::get_type_size();
let byte_stream_length = self.encoded_bytes.len() / num_streams;
let values_decoded = self.values_decoded;

// go through each value to decode
for out_value_idx in 0..num_values {
// go through each byte stream of that value
for byte_stream_idx in 0..num_streams {
let idx_in_encoded_data = (byte_stream_idx * byte_stream_length)
+ (values_decoded + out_value_idx);

raw_out_bytes[(out_value_idx * num_streams) + byte_stream_idx] =
self.encoded_bytes[idx_in_encoded_data];
}
}

self.values_decoded += num_values;

Ok(num_values)
}

fn values_left(&self) -> usize {
self.total_num_values - self.values_decoded
}

fn encoding(&self) -> Encoding {
Encoding::BYTE_STREAM_SPLIT
}

fn skip(&mut self, num_values: usize) -> Result<usize> {
let to_skip = usize::min(self.values_left(), num_values);
self.values_decoded += to_skip;
Ok(to_skip)
}
}
60 changes: 60 additions & 0 deletions parquet/src/encodings/encoding/byte_stream_split_encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use crate::basic::{Encoding, Type};
use crate::data_type::DataType;
use crate::data_type::SliceAsBytes;
use crate::util::memory::ByteBufferPtr;

use crate::errors::Result;

use super::Encoder;

use std::marker::PhantomData;

pub struct ByteStreamSplitEncoder<T> {
buffer: Vec<u8>,
_p: PhantomData<T>,
}

impl<T: DataType> ByteStreamSplitEncoder<T> {
pub(crate) fn new() -> Self {
Self {
buffer: Vec::new(),
_p: PhantomData,
}
}
}

impl<T: DataType> Encoder<T> for ByteStreamSplitEncoder<T> {
fn put(&mut self, values: &[T::T]) -> Result<()> {
self.buffer
.extend(<T as DataType>::T::slice_as_bytes(values));
ensure_phys_ty!(
Type::FLOAT | Type::DOUBLE,
"ByteStreamSplitEncoder only supports FloatType or DoubleType"
);

Ok(())
}

fn encoding(&self) -> Encoding {
Encoding::BYTE_STREAM_SPLIT
}

fn estimated_data_encoded_size(&self) -> usize {
self.buffer.len()
}

fn flush_buffer(&mut self) -> Result<ByteBufferPtr> {
let mut encoded = vec![0; self.buffer.len()];
// Have to do all work in flush buffer, as we need the whole byte stream
let num_streams = T::get_type_size();
let num_values = self.buffer.len() / T::get_type_size();
for i in 0..num_values {
for j in 0..num_streams {
let byte_in_value = self.buffer[i * num_streams + j];
encoded[j * num_values + i] = byte_in_value;
}
}
self.buffer.clear();
Ok(encoded.into())
}
}
37 changes: 37 additions & 0 deletions parquet/src/encodings/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::util::{

pub use dict_encoder::DictEncoder;

mod byte_stream_split_encoder;
mod dict_encoder;

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -87,6 +88,9 @@ pub fn get_encoder<T: DataType>(encoding: Encoding) -> Result<Box<dyn Encoder<T>
Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackEncoder::new()),
Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayEncoder::new()),
Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayEncoder::new()),
Encoding::BYTE_STREAM_SPLIT => {
Box::new(byte_stream_split_encoder::ByteStreamSplitEncoder::new())
}
e => return Err(nyi_err!("Encoding {} is not supported", e)),
};
Ok(encoder)
Expand Down Expand Up @@ -794,12 +798,14 @@ mod tests {
fn test_float() {
FloatType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
FloatType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
FloatType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
}

#[test]
fn test_double() {
DoubleType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
DoubleType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
DoubleType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
}

#[test]
Expand Down Expand Up @@ -918,6 +924,37 @@ mod tests {
3, // only suffix bytes, length encoder is not flushed yet
0,
);

// BYTE_STREAM_SPLIT
run_test::<FloatType>(Encoding::BYTE_STREAM_SPLIT, -1, &[0.1, 0.2], 0, 8, 0);
}

#[test]
fn test_byte_stream_split_example_f32() {
// Test data from https://github.com/apache/parquet-format/blob/2a481fe1aad64ff770e21734533bb7ef5a057dac/Encodings.md#byte-stream-split-byte_stream_split--9
let mut encoder = create_test_encoder::<FloatType>(Encoding::BYTE_STREAM_SPLIT);
let mut decoder =
create_test_decoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);

let input = vec![
f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6]),
];

encoder.put(&input).unwrap();
let encoded = encoder.flush_buffer().unwrap();

assert_eq!(
encoded.data(),
[0xAA, 0x00, 0xA3, 0xBB, 0x11, 0xB4, 0xCC, 0x22, 0xC5, 0xDD, 0x33, 0xD6]
);

let mut decoded = vec![0.0; input.len()];
decoder.set_data(encoded, input.len()).unwrap();
decoder.get(&mut decoded).unwrap();

assert_eq!(decoded, input);
}

// See: https://github.com/sunchao/parquet-rs/issues/47
Expand Down