From 4c3e9be465f1448b20faca69c0652748bb0f5436 Mon Sep 17 00:00:00 2001 From: Martin Date: Fri, 12 Jan 2024 05:23:28 -0500 Subject: [PATCH] Support Parquet Byte Stream Split Encoding (#5293) * wip byte-stream-split * decoding works * impl split * clean up * whitespace * remove println * get compiling after rebase * integration test, as one might call it * update parquet-testing revision * encoding bench * improve performance * test fix * add apache headers * one more test and readme update --------- Co-authored-by: Simon Vandel Sillesen --- parquet-testing | 2 +- parquet/Cargo.toml | 5 + parquet/README.md | 2 +- parquet/benches/encoding.rs | 83 ++++++++++ parquet/src/arrow/arrow_reader/mod.rs | 42 ++++- parquet/src/arrow/arrow_writer/mod.rs | 3 + parquet/src/encodings/decoding.rs | 146 ++++++++++++------ .../decoding/byte_stream_split_decoder.rs | 121 +++++++++++++++ .../encoding/byte_stream_split_encoder.rs | 93 +++++++++++ parquet/src/encodings/encoding/mod.rs | 94 ++++++----- 10 files changed, 499 insertions(+), 92 deletions(-) create mode 100644 parquet/benches/encoding.rs create mode 100644 parquet/src/encodings/decoding/byte_stream_split_decoder.rs create mode 100644 parquet/src/encodings/encoding/byte_stream_split_encoder.rs diff --git a/parquet-testing b/parquet-testing index 89b685a64c31..4cb3cff24c96 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 89b685a64c3117b3023d8684af1f41400841db71 +Subproject commit 4cb3cff24c965fb329cdae763eabce47395a68a0 diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 07b0609090a0..60160a8b3fc1 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -175,6 +175,11 @@ name = "compression" required-features = ["experimental", "default"] harness = false +[[bench]] +name = "encoding" +required-features = ["experimental", "default"] +harness = false + [[bench]] name = "metadata" diff --git a/parquet/README.md b/parquet/README.md index 9de7aec4e59a..e5b53050b70c 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -55,7 +55,7 @@ The `parquet` crate provides the following features which may be enabled in your ## Parquet Feature Status -- [x] All encodings supported, except for BYTE_STREAM_SPLIT ([#4102](https://github.com/apache/arrow-rs/issues/4102)) +- [x] All encodings supported - [x] All compression codecs supported - [x] Read support - [x] Primitive column value readers diff --git a/parquet/benches/encoding.rs b/parquet/benches/encoding.rs new file mode 100644 index 000000000000..bdbca3567a2b --- /dev/null +++ b/parquet/benches/encoding.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::*; +use parquet::basic::Encoding; +use parquet::data_type::{DataType, DoubleType, FloatType}; +use parquet::decoding::{get_decoder, Decoder}; +use parquet::encoding::get_encoder; +use parquet::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type}; +use rand::prelude::*; +use std::sync::Arc; + +fn bench_typed(c: &mut Criterion, values: &[T::T], encoding: Encoding) { + let name = format!( + "dtype={}, encoding={:?}", + std::any::type_name::(), + encoding + ); + c.bench_function(&format!("encoding: {}", name), |b| { + b.iter(|| { + let mut encoder = get_encoder::(encoding).unwrap(); + encoder.put(values).unwrap(); + encoder.flush_buffer().unwrap(); + }); + }); + + let mut encoder = get_encoder::(encoding).unwrap(); + encoder.put(values).unwrap(); + let encoded = encoder.flush_buffer().unwrap(); + println!("{} encoded as {} bytes", name, encoded.len(),); + + let mut buffer = vec![T::T::default(); values.len()]; + let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new( + Arc::new( + Type::primitive_type_builder("", T::get_physical_type()) + .build() + .unwrap(), + ), + 0, + 0, + ColumnPath::new(vec![]), + )); + c.bench_function(&format!("decoding: {}", name), |b| { + b.iter(|| { + let mut decoder: Box> = + get_decoder(column_desc_ptr.clone(), encoding).unwrap(); + decoder.set_data(encoded.clone(), values.len()).unwrap(); + decoder.get(&mut buffer).unwrap(); + }); + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut rng = StdRng::seed_from_u64(0); + let n = 16 * 1024; + + let mut f32s = Vec::new(); + let mut f64s = Vec::new(); + for _ in 0..n { + f32s.push(rng.gen::()); + f64s.push(rng.gen::()); + } + + bench_typed::(c, &f32s, Encoding::BYTE_STREAM_SPLIT); + bench_typed::(c, &f64s, Encoding::BYTE_STREAM_SPLIT); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 52d7249a290e..6b6146042051 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -737,7 +737,9 @@ mod tests { use arrow_array::builder::*; use arrow_array::cast::AsArray; - use arrow_array::types::{Decimal128Type, Decimal256Type, DecimalType, Float16Type}; + use arrow_array::types::{ + Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type, + }; use arrow_array::*; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_buffer::{i256, ArrowNativeType, Buffer}; @@ -755,7 +757,7 @@ mod tests { use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE; use crate::data_type::{ BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType, - Int32Type, Int64Type, Int96Type, + FloatType, Int32Type, Int64Type, Int96Type, }; use crate::errors::Result; use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; @@ -861,6 +863,13 @@ mod tests { Encoding::DELTA_BINARY_PACKED, ], ); + run_single_column_reader_tests::( + 2, + ConvertedType::NONE, + None, + |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())), + &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT], + ); } #[test] @@ -1390,6 +1399,35 @@ mod tests { assert!(col.value(2).is_nan()); } + #[test] + fn test_read_float32_float64_byte_stream_split() { + let path = format!( + "{}/byte_stream_split.zstd.parquet", + arrow::util::test_util::parquet_test_data(), + ); + let file = File::open(path).unwrap(); + let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap(); + + let mut row_count = 0; + for batch in record_reader { + let batch = batch.unwrap(); + row_count += batch.num_rows(); + let f32_col = batch.column(0).as_primitive::(); + let f64_col = batch.column(1).as_primitive::(); + + // This file contains floats from a standard normal distribution + for &x in f32_col.values() { + assert!(x > -10.0); + assert!(x < 10.0); + } + for &x in f64_col.values() { + assert!(x > -10.0); + assert!(x < 10.0); + } + } + assert_eq!(row_count, 300); + } + /// Parameters for single_column_reader_test #[derive(Clone)] struct TestOptions { diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index e6e95d50996a..3563348791bc 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1579,6 +1579,9 @@ mod tests { | DataType::UInt32 | DataType::UInt16 | DataType::UInt8 => vec![Encoding::PLAIN, Encoding::DELTA_BINARY_PACKED], + DataType::Float32 | DataType::Float64 => { + vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT] + } _ => vec![Encoding::PLAIN], }; diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index 5843acdb6d0f..88bc3920f309 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -31,6 +31,8 @@ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::{self, BitReader}; +mod byte_stream_split_decoder; + pub(crate) mod private { use super::*; @@ -103,8 +105,32 @@ pub(crate) mod private { } } - impl GetDecoder for f32 {} - impl GetDecoder for f64 {} + impl GetDecoder for f32 { + fn get_decoder>( + descr: ColumnDescPtr, + encoding: Encoding, + ) -> Result>> { + match encoding { + Encoding::BYTE_STREAM_SPLIT => Ok(Box::new( + byte_stream_split_decoder::ByteStreamSplitDecoder::new(), + )), + _ => get_decoder_default(descr, encoding), + } + } + } + impl GetDecoder for f64 { + fn get_decoder>( + descr: ColumnDescPtr, + encoding: Encoding, + ) -> Result>> { + match encoding { + Encoding::BYTE_STREAM_SPLIT => Ok(Box::new( + byte_stream_split_decoder::ByteStreamSplitDecoder::new(), + )), + _ => get_decoder_default(descr, encoding), + } + } + } impl GetDecoder for ByteArray { fn get_decoder>( @@ -550,14 +576,12 @@ where .get_zigzag_vlq_int() .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?; - self.min_delta = T::T::from_i64(min_delta) - .ok_or_else(|| general_err!("'min_delta' too large"))?; + self.min_delta = + T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?; self.mini_block_bit_widths.clear(); - self.bit_reader.get_aligned_bytes( - &mut self.mini_block_bit_widths, - self.mini_blocks_per_block, - ); + self.bit_reader + .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block); let mut offset = self.bit_reader.get_byte_offset(); let mut remaining = self.values_left; @@ -634,10 +658,8 @@ where .get_zigzag_vlq_int() .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?; - self.first_value = Some( - T::T::from_i64(first_value) - .ok_or_else(|| general_err!("first value too large"))?, - ); + self.first_value = + Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?); if self.block_size % 128 != 0 { return Err(general_err!( @@ -649,7 +671,8 @@ where if self.block_size % self.mini_blocks_per_block != 0 { return Err(general_err!( "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}", - self.block_size, self.mini_blocks_per_block + self.block_size, + self.mini_blocks_per_block )); } @@ -994,11 +1017,9 @@ impl Decoder for DeltaByteArrayDecoder { self.previous_value.clear(); Ok(()) } - _ => { - Err(general_err!( - "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType" - )) - } + _ => Err(general_err!( + "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType" + )), } } @@ -1010,7 +1031,10 @@ impl Decoder for DeltaByteArrayDecoder { for item in buffer.iter_mut().take(num_values) { // Process suffix // TODO: this is awkward - maybe we should add a non-vectorized API? - let suffix_decoder = self.suffix_decoder.as_mut().expect("decoder not initialized"); + let suffix_decoder = self + .suffix_decoder + .as_mut() + .expect("decoder not initialized"); suffix_decoder.get(&mut v[..])?; let suffix = v[0].data(); @@ -1045,11 +1069,9 @@ impl Decoder for DeltaByteArrayDecoder { self.num_values -= num_values; Ok(num_values) } - _ => { - Err(general_err!( - "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType" - )) - } + _ => Err(general_err!( + "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType" + )), } } @@ -1075,9 +1097,7 @@ mod tests { use std::f64::consts::PI as PI_f64; use std::sync::Arc; - use crate::schema::types::{ - ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType, - }; + use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType}; use crate::util::test_common::rand_gen::RandGen; #[test] @@ -1085,10 +1105,7 @@ mod tests { // supported encodings create_and_check_decoder::(Encoding::PLAIN, None); create_and_check_decoder::(Encoding::DELTA_BINARY_PACKED, None); - create_and_check_decoder::( - Encoding::DELTA_LENGTH_BYTE_ARRAY, - None, - ); + create_and_check_decoder::(Encoding::DELTA_LENGTH_BYTE_ARRAY, None); create_and_check_decoder::(Encoding::DELTA_BYTE_ARRAY, None); create_and_check_decoder::(Encoding::RLE, None); @@ -1479,8 +1496,8 @@ mod tests { #[test] fn test_delta_bit_packed_int32_repeat() { let block_data = vec![ - 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, - 3, 4, 5, 6, 7, 8, + 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, + 6, 7, 8, ]; test_delta_bit_packed_decode::(vec![block_data]); } @@ -1488,8 +1505,8 @@ mod tests { #[test] fn test_skip_delta_bit_packed_int32_repeat() { let block_data = vec![ - 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, - 3, 4, 5, 6, 7, 8, + 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, + 6, 7, 8, ]; test_skip::(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10); test_skip::(block_data, Encoding::DELTA_BINARY_PACKED, 100); @@ -1511,14 +1528,13 @@ mod tests { #[test] fn test_delta_bit_packed_int32_same_values() { let block_data = vec![ - 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, - 127, + 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, ]; test_delta_bit_packed_decode::(vec![block_data]); let block_data = vec![ - -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, - -127, -127, -127, + -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, + -127, -127, ]; test_delta_bit_packed_decode::(vec![block_data]); } @@ -1526,15 +1542,14 @@ mod tests { #[test] fn test_skip_delta_bit_packed_int32_same_values() { let block_data = vec![ - 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, - 127, + 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, ]; test_skip::(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5); test_skip::(block_data, Encoding::DELTA_BINARY_PACKED, 100); let block_data = vec![ - -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, - -127, -127, -127, + -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, + -127, -127, ]; test_skip::(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5); test_skip::(block_data, Encoding::DELTA_BINARY_PACKED, 100); @@ -1634,8 +1649,8 @@ mod tests { #[test] fn test_delta_bit_packed_decoder_sample() { let data_bytes = vec![ - 128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, ]; let mut decoder: DeltaBitPackDecoder = DeltaBitPackDecoder::new(); decoder.set_data(data_bytes.into(), 3).unwrap(); @@ -1760,6 +1775,38 @@ mod tests { test_delta_byte_array_decode(data); } + #[test] + fn test_byte_stream_split_multiple_f32() { + let data = vec![ + vec![ + f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]), + f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]), + ], + vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])], + ]; + test_byte_stream_split_decode::(data); + } + + #[test] + fn test_byte_stream_split_f64() { + let data = vec![vec![ + f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]), + f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]), + ]]; + test_byte_stream_split_decode::(data); + } + + #[test] + fn test_skip_byte_stream_split() { + let block_data = vec![0.3, 0.4, 0.1, 4.10]; + test_skip::(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2); + test_skip::( + block_data.into_iter().map(|x| x as f64).collect(), + Encoding::BYTE_STREAM_SPLIT, + 100, + ); + } + fn test_rle_value_decode(data: Vec>) { test_encode_decode::(data, Encoding::RLE); } @@ -1768,6 +1815,10 @@ mod tests { test_encode_decode::(data, Encoding::DELTA_BINARY_PACKED); } + fn test_byte_stream_split_decode(data: Vec>) { + test_encode_decode::(data, Encoding::BYTE_STREAM_SPLIT); + } + fn test_delta_byte_array_decode(data: Vec>) { test_encode_decode::(data, Encoding::DELTA_BYTE_ARRAY); } @@ -1844,10 +1895,7 @@ mod tests { } } - fn create_and_check_decoder( - encoding: Encoding, - err: Option, - ) { + fn create_and_check_decoder(encoding: Encoding, err: Option) { let descr = create_test_col_desc_ptr(-1, T::get_physical_type()); let decoder = get_decoder::(descr, encoding); match err { diff --git a/parquet/src/encodings/decoding/byte_stream_split_decoder.rs b/parquet/src/encodings/decoding/byte_stream_split_decoder.rs new file mode 100644 index 000000000000..98841d21ec9e --- /dev/null +++ b/parquet/src/encodings/decoding/byte_stream_split_decoder.rs @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::marker::PhantomData; + +use bytes::Bytes; + +use crate::basic::Encoding; +use crate::data_type::{DataType, SliceAsBytes}; +use crate::errors::{ParquetError, Result}; + +use super::Decoder; + +pub struct ByteStreamSplitDecoder { + _phantom: PhantomData, + encoded_bytes: Bytes, + total_num_values: usize, + values_decoded: usize, +} + +impl ByteStreamSplitDecoder { + pub(crate) fn new() -> Self { + Self { + _phantom: PhantomData, + encoded_bytes: Bytes::new(), + total_num_values: 0, + values_decoded: 0, + } + } +} + +// Here we assume src contains the full data (which it must, since we're +// can only know where to split the streams once all data is collected), +// but dst can be just a slice starting from the given index. +// We iterate over the output bytes and fill them in from their strided +// input byte locations. +fn join_streams_const( + src: &[u8], + dst: &mut [u8], + stride: usize, + values_decoded: usize, +) { + let sub_src = &src[values_decoded..]; + for i in 0..dst.len() / TYPE_SIZE { + for j in 0..TYPE_SIZE { + dst[i * TYPE_SIZE + j] = sub_src[i + j * stride]; + } + } +} + +impl Decoder for ByteStreamSplitDecoder { + fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> { + self.encoded_bytes = data; + self.total_num_values = num_values; + self.values_decoded = 0; + + Ok(()) + } + + fn get(&mut self, buffer: &mut [::T]) -> Result { + let total_remaining_values = self.values_left(); + let num_values = buffer.len().min(total_remaining_values); + let buffer = &mut buffer[..num_values]; + + // SAFETY: f32 and f64 has no constraints on their internal representation, so we can modify it as we want + let raw_out_bytes = unsafe { ::T::slice_as_bytes_mut(buffer) }; + let type_size = T::get_type_size(); + let stride = self.encoded_bytes.len() / type_size; + match type_size { + 4 => join_streams_const::<4>( + &self.encoded_bytes, + raw_out_bytes, + stride, + self.values_decoded, + ), + 8 => join_streams_const::<8>( + &self.encoded_bytes, + raw_out_bytes, + stride, + self.values_decoded, + ), + _ => { + return Err(general_err!( + "byte stream split unsupported for data types of size {} bytes", + type_size + )); + } + } + self.values_decoded += num_values; + + Ok(num_values) + } + + fn values_left(&self) -> usize { + self.total_num_values - self.values_decoded + } + + fn encoding(&self) -> Encoding { + Encoding::BYTE_STREAM_SPLIT + } + + fn skip(&mut self, num_values: usize) -> Result { + let to_skip = usize::min(self.values_left(), num_values); + self.values_decoded += to_skip; + Ok(to_skip) + } +} diff --git a/parquet/src/encodings/encoding/byte_stream_split_encoder.rs b/parquet/src/encodings/encoding/byte_stream_split_encoder.rs new file mode 100644 index 000000000000..a95487041cee --- /dev/null +++ b/parquet/src/encodings/encoding/byte_stream_split_encoder.rs @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::basic::{Encoding, Type}; +use crate::data_type::DataType; +use crate::data_type::SliceAsBytes; + +use crate::errors::{ParquetError, Result}; + +use super::Encoder; + +use bytes::Bytes; +use std::marker::PhantomData; + +pub struct ByteStreamSplitEncoder { + buffer: Vec, + _p: PhantomData, +} + +impl ByteStreamSplitEncoder { + pub(crate) fn new() -> Self { + Self { + buffer: Vec::new(), + _p: PhantomData, + } + } +} + +// Here we assume src contains the full data (which it must, since we're +// can only know where to split the streams once all data is collected). +// We iterate over the input bytes and write them to their strided output +// byte locations. +fn split_streams_const(src: &[u8], dst: &mut [u8]) { + let stride = src.len() / TYPE_SIZE; + for i in 0..stride { + for j in 0..TYPE_SIZE { + dst[i + j * stride] = src[i * TYPE_SIZE + j]; + } + } +} + +impl Encoder for ByteStreamSplitEncoder { + fn put(&mut self, values: &[T::T]) -> Result<()> { + self.buffer + .extend(::T::slice_as_bytes(values)); + ensure_phys_ty!( + Type::FLOAT | Type::DOUBLE, + "ByteStreamSplitEncoder only supports FloatType or DoubleType" + ); + + Ok(()) + } + + fn encoding(&self) -> Encoding { + Encoding::BYTE_STREAM_SPLIT + } + + fn estimated_data_encoded_size(&self) -> usize { + self.buffer.len() + } + + fn flush_buffer(&mut self) -> Result { + let mut encoded = vec![0; self.buffer.len()]; + let type_size = T::get_type_size(); + match type_size { + 4 => split_streams_const::<4>(&self.buffer, &mut encoded), + 8 => split_streams_const::<8>(&self.buffer, &mut encoded), + _ => { + return Err(general_err!( + "byte stream split unsupported for data types of size {} bytes", + type_size + )); + } + } + + self.buffer.clear(); + Ok(encoded.into()) + } +} diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index 89e61ee226ad..ef49f03e2f4a 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -29,6 +29,7 @@ use crate::util::bit_util::{self, num_required_bits, BitWriter}; use bytes::Bytes; pub use dict_encoder::DictEncoder; +mod byte_stream_split_encoder; mod dict_encoder; // ---------------------------------------------------------------------- @@ -85,6 +86,9 @@ pub fn get_encoder(encoding: Encoding) -> Result Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackEncoder::new()), Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayEncoder::new()), Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayEncoder::new()), + Encoding::BYTE_STREAM_SPLIT => { + Box::new(byte_stream_split_encoder::ByteStreamSplitEncoder::new()) + } e => return Err(nyi_err!("Encoding {} is not supported", e)), }; Ok(encoder) @@ -376,19 +380,17 @@ impl DeltaBitPackEncoder { // Compute the max delta in current mini block let mut max_delta = i64::MIN; for j in 0..n { - max_delta = - cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]); + max_delta = cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]); } // Compute bit width to store (max_delta - min_delta) - let bit_width = - num_required_bits(self.subtract_u64(max_delta, min_delta)) as usize; + let bit_width = num_required_bits(self.subtract_u64(max_delta, min_delta)) as usize; self.bit_writer.write_at(offset + i, bit_width as u8); // Encode values in current mini block using min_delta and bit_width for j in 0..n { - let packed_value = self - .subtract_u64(self.deltas[i * self.mini_block_size + j], min_delta); + let packed_value = + self.subtract_u64(self.deltas[i * self.mini_block_size + j], min_delta); self.bit_writer.put_value(packed_value, bit_width); } @@ -572,8 +574,7 @@ impl Encoder for DeltaLengthByteArrayEncoder { .map(|x| x.as_any().downcast_ref::().unwrap()) }; - let lengths: Vec = - val_it().map(|byte_array| byte_array.len() as i32).collect(); + let lengths: Vec = val_it().map(|byte_array| byte_array.len() as i32).collect(); self.len_encoder.put(&lengths)?; for byte_array in val_it() { self.encoded_size += byte_array.len(); @@ -649,14 +650,15 @@ impl Encoder for DeltaByteArrayEncoder { let mut prefix_lengths: Vec = vec![]; let mut suffixes: Vec = vec![]; - let values = values.iter() + let values = values + .iter() .map(|x| x.as_any()) .map(|x| match T::get_physical_type() { Type::BYTE_ARRAY => x.downcast_ref::().unwrap(), Type::FIXED_LEN_BYTE_ARRAY => x.downcast_ref::().unwrap(), _ => panic!( "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType" - ) + ), }); for byte_array in values { @@ -665,8 +667,7 @@ impl Encoder for DeltaByteArrayEncoder { // value let prefix_len = cmp::min(self.previous.len(), current.len()); let mut match_len = 0; - while match_len < prefix_len && self.previous[match_len] == current[match_len] - { + while match_len < prefix_len && self.previous[match_len] == current[match_len] { match_len += 1; } prefix_lengths.push(match_len as i32); @@ -724,9 +725,7 @@ mod tests { use std::sync::Arc; use crate::encodings::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}; - use crate::schema::types::{ - ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType, - }; + use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType}; use crate::util::test_common::rand_gen::{random_bytes, RandGen}; const TEST_SET_SIZE: usize = 1024; @@ -792,12 +791,14 @@ mod tests { fn test_float() { FloatType::test(Encoding::PLAIN, TEST_SET_SIZE, -1); FloatType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1); + FloatType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1); } #[test] fn test_double() { DoubleType::test(Encoding::PLAIN, TEST_SET_SIZE, -1); DoubleType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1); + DoubleType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1); } #[test] @@ -817,11 +818,7 @@ mod tests { #[test] fn test_dict_encoded_size() { - fn run_test( - type_length: i32, - values: &[T::T], - expected_size: usize, - ) { + fn run_test(type_length: i32, values: &[T::T], expected_size: usize) { let mut encoder = create_test_dict_encoder::(type_length); assert_eq!(encoder.dict_encoded_size(), 0); encoder.put(values).unwrap(); @@ -843,11 +840,7 @@ mod tests { &[Int96::from(vec![1, 2, 3]), Int96::from(vec![2, 3, 4])], 24, ); - run_test::( - -1, - &[ByteArray::from("abcd"), ByteArray::from("efj")], - 15, - ); + run_test::(-1, &[ByteArray::from("abcd"), ByteArray::from("efj")], 15); run_test::( 2, &[ByteArray::from("ab").into(), ByteArray::from("bc").into()], @@ -916,15 +909,45 @@ mod tests { 3, // only suffix bytes, length encoder is not flushed yet 0, ); + + // BYTE_STREAM_SPLIT + run_test::(Encoding::BYTE_STREAM_SPLIT, -1, &[0.1, 0.2], 0, 8, 0); + } + + #[test] + fn test_byte_stream_split_example_f32() { + // Test data from https://github.com/apache/parquet-format/blob/2a481fe1aad64ff770e21734533bb7ef5a057dac/Encodings.md#byte-stream-split-byte_stream_split--9 + let mut encoder = create_test_encoder::(Encoding::BYTE_STREAM_SPLIT); + let mut decoder = create_test_decoder::(0, Encoding::BYTE_STREAM_SPLIT); + + let input = vec![ + f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]), + f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]), + f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6]), + ]; + + encoder.put(&input).unwrap(); + let encoded = encoder.flush_buffer().unwrap(); + + assert_eq!( + encoded, + Bytes::from(vec![ + 0xAA_u8, 0x00, 0xA3, 0xBB, 0x11, 0xB4, 0xCC, 0x22, 0xC5, 0xDD, 0x33, 0xD6 + ]) + ); + + let mut decoded = vec![0.0; input.len()]; + decoder.set_data(encoded, input.len()).unwrap(); + decoder.get(&mut decoded).unwrap(); + + assert_eq!(decoded, input); } // See: https://github.com/sunchao/parquet-rs/issues/47 #[test] fn test_issue_47() { - let mut encoder = - create_test_encoder::(Encoding::DELTA_BYTE_ARRAY); - let mut decoder = - create_test_decoder::(0, Encoding::DELTA_BYTE_ARRAY); + let mut encoder = create_test_encoder::(Encoding::DELTA_BYTE_ARRAY); + let mut decoder = create_test_decoder::(0, Encoding::DELTA_BYTE_ARRAY); let input = vec![ ByteArray::from("aa"), @@ -935,8 +958,7 @@ mod tests { let mut output = vec![ByteArray::default(); input.len()]; - let mut result = - put_and_get(&mut encoder, &mut decoder, &input[..2], &mut output[..2]); + let mut result = put_and_get(&mut encoder, &mut decoder, &input[..2], &mut output[..2]); assert!( result.is_ok(), "first put_and_get() failed with: {}", @@ -1072,10 +1094,7 @@ mod tests { decoder.get(output) } - fn create_and_check_encoder( - encoding: Encoding, - err: Option, - ) { + fn create_and_check_encoder(encoding: Encoding, err: Option) { let encoder = get_encoder::(encoding); match err { Some(parquet_error) => { @@ -1106,10 +1125,7 @@ mod tests { get_encoder(enc).unwrap() } - fn create_test_decoder( - type_len: i32, - enc: Encoding, - ) -> Box> { + fn create_test_decoder(type_len: i32, enc: Encoding) -> Box> { let desc = create_test_col_desc_ptr(type_len, T::get_physical_type()); get_decoder(desc, enc).unwrap() }