From 15e25de68274f0af690528c53e9e3ab03d457f6e Mon Sep 17 00:00:00 2001 From: wypb Date: Mon, 9 Dec 2024 17:45:38 +0800 Subject: [PATCH] refactor(parquet): Move arrow RleEncodingInternal to common --- .../common/tests/BitPackDecoderBenchmark.cpp | 2 +- .../util => common}/BitStreamUtilsInternal.h | 4 +- .../util => common}/RleEncodingInternal.h | 12 ++--- velox/dwio/parquet/reader/PageReader.cpp | 6 +-- velox/dwio/parquet/reader/PageReader.h | 8 ++-- .../parquet/tests/reader/RleBpDecoderTest.cpp | 4 +- .../parquet/writer/arrow/ColumnWriter.cpp | 4 -- .../dwio/parquet/writer/arrow/ColumnWriter.h | 10 ++-- velox/dwio/parquet/writer/arrow/Encoding.cpp | 42 ++++++++-------- .../writer/arrow/tests/ColumnReader.cpp | 14 +++--- .../parquet/writer/arrow/tests/ColumnReader.h | 8 ++-- .../writer/arrow/tests/EncodingTest.cpp | 48 +++++++++---------- 12 files changed, 73 insertions(+), 89 deletions(-) rename velox/dwio/parquet/{writer/arrow/util => common}/BitStreamUtilsInternal.h (99%) rename velox/dwio/parquet/{writer/arrow/util => common}/RleEncodingInternal.h (98%) diff --git a/velox/dwio/common/tests/BitPackDecoderBenchmark.cpp b/velox/dwio/common/tests/BitPackDecoderBenchmark.cpp index 63b87d86cebe9..bee120b960e21 100644 --- a/velox/dwio/common/tests/BitPackDecoderBenchmark.cpp +++ b/velox/dwio/common/tests/BitPackDecoderBenchmark.cpp @@ -17,7 +17,7 @@ #include "velox/common/base/BitUtil.h" #include "velox/common/base/Exceptions.h" #include "velox/dwio/common/BitPackDecoder.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" +#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h" #ifdef __BMI2__ #include "velox/dwio/common/tests/Lemire/bmipacking32.h" diff --git a/velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h b/velox/dwio/parquet/common/BitStreamUtilsInternal.h similarity index 99% rename from velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h rename to velox/dwio/parquet/common/BitStreamUtilsInternal.h index 9ccb171bc0847..2bcd8a3c4fe77 100644 --- a/velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h +++ b/velox/dwio/parquet/common/BitStreamUtilsInternal.h @@ -30,7 +30,7 @@ #include "arrow/util/macros.h" #include "arrow/util/ubsan.h" -namespace facebook::velox::parquet::arrow::bit_util { +namespace facebook::velox::parquet { /// Utility class to write bit/byte streams. This class can write data to /// either be bit packed or byte aligned (and a single stream that has a mix of @@ -572,4 +572,4 @@ inline bool BitReader::GetZigZagVlqInt(int64_t* v) { return true; } -} // namespace facebook::velox::parquet::arrow::bit_util +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h b/velox/dwio/parquet/common/RleEncodingInternal.h similarity index 98% rename from velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h rename to velox/dwio/parquet/common/RleEncodingInternal.h index d3b7087cdb8d3..8b4c8664c005b 100644 --- a/velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h +++ b/velox/dwio/parquet/common/RleEncodingInternal.h @@ -30,9 +30,9 @@ #include "arrow/util/bit_run_reader.h" #include "arrow/util/bit_util.h" #include "arrow/util/macros.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" +#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h" -namespace facebook::velox::parquet::arrow::util { +namespace facebook::velox::parquet { /// Utility classes to do run length encoding (RLE) for fixed bit width values. /// If runs are sufficiently long, RLE is used, otherwise, the values are just @@ -152,7 +152,7 @@ class RleDecoder { int64_t valid_bits_offset); protected: - ::facebook::velox::parquet::arrow::bit_util::BitReader bit_reader_; + BitReader bit_reader_; /// Number of bits needed to encode the value. Must be between 0 and 64. int bit_width_; uint64_t current_value_; @@ -209,7 +209,7 @@ class RleEncoder { static_cast(::arrow::bit_util::BytesForBits( MAX_VALUES_PER_LITERAL_RUN * bit_width)); /// Up to kMaxVlqByteLength indicator and a single 'bit_width' value. - int max_repeated_run_size = arrow::bit_util::BitReader::kMaxVlqByteLength + + int max_repeated_run_size = BitReader::kMaxVlqByteLength + static_cast(::arrow::bit_util::BytesForBits(bit_width)); return std::max(max_literal_run_size, max_repeated_run_size); } @@ -282,7 +282,7 @@ class RleEncoder { const int bit_width_; /// Underlying buffer. - arrow::bit_util::BitWriter bit_writer_; + BitWriter bit_writer_; /// If true, the buffer is full and subsequent Put()'s will fail. bool buffer_full_; @@ -894,4 +894,4 @@ inline void RleEncoder::Clear() { bit_writer_.Clear(); } -} // namespace facebook::velox::parquet::arrow::util +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 81f429c59a5a0..e57201f9eac4b 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -224,7 +224,7 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; if (maxRepeat_ > 0) { uint32_t repeatLength = readField(pageData_); - repeatDecoder_ = std::make_unique( + repeatDecoder_ = std::make_unique( reinterpret_cast(pageData_), repeatLength, ::arrow::bit_util::NumRequiredBits(maxRepeat_)); @@ -240,7 +240,7 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { pageData_ + defineLength, ::arrow::bit_util::NumRequiredBits(maxDefine_)); } - wideDefineDecoder_ = std::make_unique( + wideDefineDecoder_ = std::make_unique( reinterpret_cast(pageData_), defineLength, ::arrow::bit_util::NumRequiredBits(maxDefine_)); @@ -281,7 +281,7 @@ void PageReader::prepareDataPageV2(const PageHeader& pageHeader, int64_t row) { pageData_ = readBytes(bytes, pageBuffer_); if (repeatLength) { - repeatDecoder_ = std::make_unique( + repeatDecoder_ = std::make_unique( reinterpret_cast(pageData_), repeatLength, ::arrow::bit_util::NumRequiredBits(maxRepeat_)); diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 879f21d226910..dafd6cd497931 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -21,13 +21,13 @@ #include "velox/dwio/common/DirectDecoder.h" #include "velox/dwio/common/SelectiveColumnReader.h" #include "velox/dwio/common/compression/Compression.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/reader/BooleanDecoder.h" #include "velox/dwio/parquet/reader/DeltaBpDecoder.h" #include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h" #include "velox/dwio/parquet/reader/ParquetTypeWithId.h" #include "velox/dwio/parquet/reader/RleBpDataDecoder.h" #include "velox/dwio/parquet/reader/StringDecoder.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" namespace facebook::velox::parquet { @@ -375,10 +375,8 @@ class PageReader { // Decoder for single bit definition levels. the arrow decoders are used for // multibit levels pending fixing RleBpDecoder for the case. std::unique_ptr defineDecoder_; - std::unique_ptr<::facebook::velox::parquet::arrow::util::RleDecoder> - repeatDecoder_; - std::unique_ptr<::facebook::velox::parquet::arrow::util::RleDecoder> - wideDefineDecoder_; + std::unique_ptr repeatDecoder_; + std::unique_ptr wideDefineDecoder_; // True for a leaf column for which repdefs are loaded for the whole column // chunk. This is typically the leaftmost leaf of a list. Other leaves under diff --git a/velox/dwio/parquet/tests/reader/RleBpDecoderTest.cpp b/velox/dwio/parquet/tests/reader/RleBpDecoderTest.cpp index 043ceed976387..c45e10fe82b7d 100644 --- a/velox/dwio/parquet/tests/reader/RleBpDecoderTest.cpp +++ b/velox/dwio/parquet/tests/reader/RleBpDecoderTest.cpp @@ -15,14 +15,14 @@ */ #include "velox/dwio/common/BitPackDecoder.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include using namespace facebook::velox; using namespace facebook::velox::dwio::common; -using facebook::velox::parquet::arrow::util::RleEncoder; +using facebook::velox::parquet::RleEncoder; template class RleBpDecoderTest { diff --git a/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp b/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp index 6c6b703c83469..a97fbc4e17bec 100644 --- a/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp +++ b/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp @@ -57,7 +57,6 @@ #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/dwio/parquet/writer/arrow/util/Crc32.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/util/VisitArrayInline.h" using arrow::Array; @@ -72,9 +71,6 @@ using arrow::internal::checked_pointer_cast; namespace bit_util = arrow::bit_util; namespace facebook::velox::parquet::arrow { -using bit_util::BitWriter; -using util::RleEncoder; - using util::CodecOptions; namespace { diff --git a/velox/dwio/parquet/writer/arrow/ColumnWriter.h b/velox/dwio/parquet/writer/arrow/ColumnWriter.h index 522862b3e36ae..0bca4e33f2c96 100644 --- a/velox/dwio/parquet/writer/arrow/ColumnWriter.h +++ b/velox/dwio/parquet/writer/arrow/ColumnWriter.h @@ -22,6 +22,7 @@ #include #include +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/Exception.h" #include "velox/dwio/parquet/writer/arrow/Platform.h" #include "velox/dwio/parquet/writer/arrow/Types.h" @@ -35,13 +36,8 @@ class Array; namespace facebook::velox::parquet::arrow { -namespace bit_util { -class BitWriter; -} // namespace bit_util - namespace util { class CodecOptions; -class RleEncoder; } // namespace util struct ArrowWriteContext; @@ -89,8 +85,8 @@ class PARQUET_EXPORT LevelEncoder { int bit_width_; int rle_length_; Encoding::type encoding_; - std::unique_ptr rle_encoder_; - std::unique_ptr bit_packed_encoder_; + std::unique_ptr rle_encoder_; + std::unique_ptr bit_packed_encoder_; }; class PARQUET_EXPORT PageWriter { diff --git a/velox/dwio/parquet/writer/arrow/Encoding.cpp b/velox/dwio/parquet/writer/arrow/Encoding.cpp index 3079c967cc4dd..5188fa7f8207f 100644 --- a/velox/dwio/parquet/writer/arrow/Encoding.cpp +++ b/velox/dwio/parquet/writer/arrow/Encoding.cpp @@ -42,15 +42,15 @@ #include "arrow/util/logging.h" #include "arrow/util/ubsan.h" #include "arrow/visit_data_inline.h" +#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/Exception.h" #include "velox/dwio/parquet/writer/arrow/Platform.h" #include "velox/dwio/parquet/writer/arrow/Schema.h" #include "velox/dwio/parquet/writer/arrow/Types.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" #include "velox/dwio/parquet/writer/arrow/util/ByteStreamSplitInternal.h" #include "velox/dwio/parquet/writer/arrow/util/Hashing.h" #include "velox/dwio/parquet/writer/arrow/util/OverflowUtilInternal.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" namespace bit_util = arrow::bit_util; @@ -477,8 +477,8 @@ int RlePreserveBufferSize(int num_values, int bit_width) { // is called, we have to reserve an extra "RleEncoder::MinBufferSize" // bytes. These extra bytes won't be used but not reserving them // would cause the encoder to fail. - return arrow::util::RleEncoder::MaxBufferSize(bit_width, num_values) + - arrow::util::RleEncoder::MinBufferSize(bit_width); + return RleEncoder::MaxBufferSize(bit_width, num_values) + + RleEncoder::MinBufferSize(bit_width); } /// See the dictionary encoding section of @@ -517,7 +517,7 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder { ++buffer; --buffer_len; - arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width()); + RleEncoder encoder(buffer, buffer_len, bit_width()); for (int32_t index : buffered_indices_) { if (ARROW_PREDICT_FALSE(!encoder.Put(index))) @@ -1282,7 +1282,7 @@ class PlainBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { typename EncodingTraits::DictAccumulator* out) override; private: - std::unique_ptr bit_reader_; + std::unique_ptr bit_reader_; }; PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr) @@ -1293,7 +1293,7 @@ void PlainBooleanDecoder::SetData( const uint8_t* data, int len) { num_values_ = num_values; - bit_reader_ = std::make_unique(data, len); + bit_reader_ = std::make_unique(data, len); } int PlainBooleanDecoder::DecodeArrow( @@ -1738,7 +1738,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { num_values_ = num_values; if (len == 0) { // Initialize dummy decoder to avoid crashes later on - idx_decoder_ = arrow::util::RleDecoder(data, len, /*bit_width=*/1); + idx_decoder_ = RleDecoder(data, len, /*bit_width=*/1); return; } uint8_t bit_width = *data; @@ -1747,7 +1747,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { "Invalid or corrupted bit_width " + std::to_string(bit_width) + ". Maximum allowed is 32."); } - idx_decoder_ = arrow::util::RleDecoder(++data, --len, bit_width); + idx_decoder_ = RleDecoder(++data, --len, bit_width); } int Decode(T* buffer, int num_values) override { @@ -1920,7 +1920,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { // BinaryDictionary32Builder std::shared_ptr indices_scratch_space_; - arrow::util::RleDecoder idx_decoder_; + RleDecoder idx_decoder_; }; template @@ -2555,7 +2555,7 @@ class DeltaBitPackEncoder : public EncoderImpl, ArrowPoolVector deltas_; std::shared_ptr bits_buffer_; ::arrow::BufferBuilder sink_; - arrow::bit_util::BitWriter bit_writer_; + BitWriter bit_writer_; }; template @@ -2658,7 +2658,7 @@ std::shared_ptr<::arrow::Buffer> DeltaBitPackEncoder::FlushValues() { PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; - bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); + BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); if (!header_writer.PutVlqInt(values_per_block_) || !header_writer.PutVlqInt(mini_blocks_per_block_) || !header_writer.PutVlqInt(total_value_count_) || @@ -2775,15 +2775,13 @@ class DeltaBitPackDecoder : public DecoderImpl, // num_values is equal to page's num_values, including null values in this // page this->num_values_ = num_values; - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); InitHeader(); } // Set BitReader which is already initialized by DeltaLengthByteArrayDecoder // or DeltaByteArrayDecoder - void SetDecoder( - int num_values, - std::shared_ptr decoder) { + void SetDecoder(int num_values, std::shared_ptr decoder) { this->num_values_ = num_values; decoder_ = std::move(decoder); InitHeader(); @@ -2981,7 +2979,7 @@ class DeltaBitPackDecoder : public DecoderImpl, } MemoryPool* pool_; - std::shared_ptr decoder_; + std::shared_ptr decoder_; uint32_t values_per_block_; uint32_t mini_blocks_per_block_; uint32_t values_per_mini_block_; @@ -3160,7 +3158,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, void SetData(int num_values, const uint8_t* data, int len) override { DecoderImpl::SetData(num_values, data, len); - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); DecodeLengths(); } @@ -3293,7 +3291,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, return Status::OK(); } - std::shared_ptr decoder_; + std::shared_ptr decoder_; DeltaBitPackDecoder len_decoder_; int num_valid_values_{0}; uint32_t length_idx_{0}; @@ -3401,7 +3399,7 @@ std::shared_ptr RleBooleanEncoder::FlushValues() { int rle_buffer_size_max = MaxRleBufferSize(); std::shared_ptr buffer = AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes); - arrow::util::RleEncoder encoder( + RleEncoder encoder( buffer->mutable_data() + kRleLengthInBytes, rle_buffer_size_max, /*bit_width*/ kBitWidth); @@ -3772,7 +3770,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); prefix_len_decoder_.SetDecoder(num_values, decoder_); // get the number of encoded prefix lengths @@ -3926,7 +3924,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, MemoryPool* pool_; private: - std::shared_ptr decoder_; + std::shared_ptr decoder_; DeltaBitPackDecoder prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; std::string last_value_; diff --git a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp index c2c898321bbc7..d1b9bf0f62ca4 100644 --- a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp +++ b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp @@ -42,8 +42,10 @@ #include "arrow/util/crc32.h" #include "arrow/util/int_util_overflow.h" #include "arrow/util/logging.h" +#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h" #include "velox/dwio/parquet/common/LevelComparison.h" #include "velox/dwio/parquet/common/LevelConversion.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/ColumnPage.h" #include "velox/dwio/parquet/writer/arrow/Encoding.h" #include "velox/dwio/parquet/writer/arrow/EncryptionInternal.h" @@ -51,8 +53,6 @@ #include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Statistics.h" #include "velox/dwio/parquet/writer/arrow/ThriftInternal.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" using arrow::MemoryPool; using arrow::internal::AddWithOverflow; @@ -127,8 +127,8 @@ int LevelDecoder::SetData( } const uint8_t* decoder_data = data + 4; if (!rle_decoder_) { - rle_decoder_ = std::make_unique( - decoder_data, num_bytes, bit_width_); + rle_decoder_ = + std::make_unique(decoder_data, num_bytes, bit_width_); } else { rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); } @@ -147,8 +147,7 @@ int LevelDecoder::SetData( "Received invalid number of bytes (corrupt data page?)"); } if (!bit_packed_decoder_) { - bit_packed_decoder_ = - std::make_unique(data, num_bytes); + bit_packed_decoder_ = std::make_unique(data, num_bytes); } else { bit_packed_decoder_->Reset(data, num_bytes); } @@ -176,8 +175,7 @@ void LevelDecoder::SetDataV2( bit_width_ = ::arrow::bit_util::Log2(max_level + 1); if (!rle_decoder_) { - rle_decoder_ = - std::make_unique(data, num_bytes, bit_width_); + rle_decoder_ = std::make_unique(data, num_bytes, bit_width_); } else { rle_decoder_->Reset(data, num_bytes, bit_width_); } diff --git a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h index 6eba405e20ec2..b412354fc1839 100644 --- a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h +++ b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h @@ -23,14 +23,14 @@ #include #include +#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h" #include "velox/dwio/parquet/common/LevelConversion.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/Exception.h" #include "velox/dwio/parquet/writer/arrow/Metadata.h" #include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Schema.h" #include "velox/dwio/parquet/writer/arrow/Types.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" namespace arrow { @@ -108,8 +108,8 @@ class PARQUET_EXPORT LevelDecoder { int bit_width_; int num_values_remaining_; Encoding::type encoding_; - std::unique_ptr rle_decoder_; - std::unique_ptr bit_packed_decoder_; + std::unique_ptr rle_decoder_; + std::unique_ptr bit_packed_decoder_; int16_t max_level_; }; diff --git a/velox/dwio/parquet/writer/arrow/tests/EncodingTest.cpp b/velox/dwio/parquet/writer/arrow/tests/EncodingTest.cpp index 48119ec403b83..488ae6a75bb85 100644 --- a/velox/dwio/parquet/writer/arrow/tests/EncodingTest.cpp +++ b/velox/dwio/parquet/writer/arrow/tests/EncodingTest.cpp @@ -44,13 +44,13 @@ #include "arrow/util/ubsan.h" #include "arrow/visit_data_inline.h" +#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/Exception.h" #include "velox/dwio/parquet/writer/arrow/Platform.h" #include "velox/dwio/parquet/writer/arrow/Schema.h" #include "velox/dwio/parquet/writer/arrow/Types.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" #include "velox/dwio/parquet/writer/arrow/util/ByteStreamSplitInternal.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" namespace bit_util = arrow::bit_util; @@ -412,7 +412,7 @@ class PlainEncoder : public EncoderImpl, int bits_available_; std::shared_ptr bits_buffer_; ::arrow::BufferBuilder sink_; - arrow::bit_util::BitWriter bit_writer_; + BitWriter bit_writer_; template void PutImpl(const SequenceType& src, int num_values); @@ -518,8 +518,8 @@ int RlePreserveBufferSize(int num_values, int bit_width) { // is called, we have to reserve an extra "RleEncoder::MinBufferSize" // bytes. These extra bytes won't be used but not reserving them // would cause the encoder to fail. - return arrow::util::RleEncoder::MaxBufferSize(bit_width, num_values) + - arrow::util::RleEncoder::MinBufferSize(bit_width); + return RleEncoder::MaxBufferSize(bit_width, num_values) + + RleEncoder::MinBufferSize(bit_width); } /// See the dictionary encoding section of @@ -560,7 +560,7 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder { ++buffer; --buffer_len; - arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width()); + RleEncoder encoder(buffer, buffer_len, bit_width()); for (int32_t index : buffered_indices_) { if (ARROW_PREDICT_FALSE(!encoder.Put(index))) @@ -1325,7 +1325,7 @@ class PlainBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { typename EncodingTraits::DictAccumulator* out) override; private: - std::unique_ptr bit_reader_; + std::unique_ptr bit_reader_; }; PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr) @@ -1336,7 +1336,7 @@ void PlainBooleanDecoder::SetData( const uint8_t* data, int len) { num_values_ = num_values; - bit_reader_ = std::make_unique(data, len); + bit_reader_ = std::make_unique(data, len); } int PlainBooleanDecoder::DecodeArrow( @@ -1707,7 +1707,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { num_values_ = num_values; if (len == 0) { // Initialize dummy decoder to avoid crashes later on - idx_decoder_ = arrow::util::RleDecoder(data, len, /*bit_width=*/1); + idx_decoder_ = RleDecoder(data, len, /*bit_width=*/1); return; } uint8_t bit_width = *data; @@ -1716,7 +1716,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { "Invalid or corrupted bit_width " + std::to_string(bit_width) + ". Maximum allowed is 32."); } - idx_decoder_ = arrow::util::RleDecoder(++data, --len, bit_width); + idx_decoder_ = RleDecoder(++data, --len, bit_width); } int Decode(T* buffer, int num_values) override { @@ -1889,7 +1889,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { // BinaryDictionary32Builder std::shared_ptr indices_scratch_space_; - arrow::util::RleDecoder idx_decoder_; + RleDecoder idx_decoder_; }; template @@ -2525,7 +2525,7 @@ class DeltaBitPackEncoder : public EncoderImpl, ArrowPoolVector deltas_; std::shared_ptr bits_buffer_; ::arrow::BufferBuilder sink_; - arrow::bit_util::BitWriter bit_writer_; + BitWriter bit_writer_; }; template @@ -2628,7 +2628,7 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; - bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); + BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); if (!header_writer.PutVlqInt(values_per_block_) || !header_writer.PutVlqInt(mini_blocks_per_block_) || !header_writer.PutVlqInt(total_value_count_) || @@ -2745,15 +2745,13 @@ class DeltaBitPackDecoder : public DecoderImpl, // num_values is equal to page's num_values, including null values in this // page this->num_values_ = num_values; - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); InitHeader(); } // Set BitReader which is already initialized by DeltaLengthByteArrayDecoder // or DeltaByteArrayDecoder - void SetDecoder( - int num_values, - std::shared_ptr decoder) { + void SetDecoder(int num_values, std::shared_ptr decoder) { this->num_values_ = num_values; decoder_ = std::move(decoder); InitHeader(); @@ -2946,7 +2944,7 @@ class DeltaBitPackDecoder : public DecoderImpl, } MemoryPool* pool_; - std::shared_ptr decoder_; + std::shared_ptr decoder_; uint32_t values_per_block_; uint32_t mini_blocks_per_block_; uint32_t values_per_mini_block_; @@ -3124,7 +3122,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, void SetData(int num_values, const uint8_t* data, int len) override { DecoderImpl::SetData(num_values, data, len); - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); DecodeLengths(); } @@ -3258,7 +3256,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, return Status::OK(); } - std::shared_ptr decoder_; + std::shared_ptr decoder_; DeltaBitPackDecoder len_decoder_; int num_valid_values_; uint32_t length_idx_; @@ -3366,7 +3364,7 @@ std::shared_ptr RleBooleanEncoder::FlushValues() { int rle_buffer_size_max = MaxRleBufferSize(); std::shared_ptr buffer = AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes); - arrow::util::RleEncoder encoder( + RleEncoder encoder( buffer->mutable_data() + kRleLengthInBytes, rle_buffer_size_max, /*bit_width*/ kBitWidth); @@ -3409,7 +3407,7 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { auto decoder_data = data + 4; if (decoder_ == nullptr) { - decoder_ = std::make_shared( + decoder_ = std::make_shared( decoder_data, num_bytes, /*bit_width=*/1); @@ -3471,7 +3469,7 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { } private: - std::shared_ptr decoder_; + std::shared_ptr decoder_; }; // ---------------------------------------------------------------------- @@ -3492,7 +3490,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); prefix_len_decoder_.SetDecoder(num_values, decoder_); // get the number of encoded prefix lengths @@ -3650,7 +3648,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, return Status::OK(); } - std::shared_ptr decoder_; + std::shared_ptr decoder_; DeltaBitPackDecoder prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; std::string last_value_;