Skip to content

Commit

Permalink
refactor(parquet): Move arrow RleEncodingInternal to common
Browse files Browse the repository at this point in the history
  • Loading branch information
wypb committed Dec 9, 2024
1 parent 929affe commit 8a43928
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 89 deletions.
2 changes: 1 addition & 1 deletion velox/dwio/common/tests/BitPackDecoderBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "velox/common/base/BitUtil.h"
#include "velox/common/base/Exceptions.h"
#include "velox/dwio/common/BitPackDecoder.h"
#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h"
#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h"

#ifdef __BMI2__
#include "velox/dwio/common/tests/Lemire/bmipacking32.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include "arrow/util/macros.h"
#include "arrow/util/ubsan.h"

namespace facebook::velox::parquet::arrow::bit_util {
namespace facebook::velox::parquet {

/// Utility class to write bit/byte streams. This class can write data to
/// either be bit packed or byte aligned (and a single stream that has a mix of
Expand Down Expand Up @@ -572,4 +572,4 @@ inline bool BitReader::GetZigZagVlqInt(int64_t* v) {
return true;
}

} // namespace facebook::velox::parquet::arrow::bit_util
} // namespace facebook::velox::parquet
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/macros.h"
#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h"
#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h"

namespace facebook::velox::parquet::arrow::util {
namespace facebook::velox::parquet {

/// Utility classes to do run length encoding (RLE) for fixed bit width values.
/// If runs are sufficiently long, RLE is used, otherwise, the values are just
Expand Down Expand Up @@ -152,7 +152,7 @@ class RleDecoder {
int64_t valid_bits_offset);

protected:
::facebook::velox::parquet::arrow::bit_util::BitReader bit_reader_;
BitReader bit_reader_;
/// Number of bits needed to encode the value. Must be between 0 and 64.
int bit_width_;
uint64_t current_value_;
Expand Down Expand Up @@ -209,7 +209,7 @@ class RleEncoder {
static_cast<int>(::arrow::bit_util::BytesForBits(
MAX_VALUES_PER_LITERAL_RUN * bit_width));
/// Up to kMaxVlqByteLength indicator and a single 'bit_width' value.
int max_repeated_run_size = arrow::bit_util::BitReader::kMaxVlqByteLength +
int max_repeated_run_size = BitReader::kMaxVlqByteLength +
static_cast<int>(::arrow::bit_util::BytesForBits(bit_width));
return std::max(max_literal_run_size, max_repeated_run_size);
}
Expand Down Expand Up @@ -282,7 +282,7 @@ class RleEncoder {
const int bit_width_;

/// Underlying buffer.
arrow::bit_util::BitWriter bit_writer_;
BitWriter bit_writer_;

/// If true, the buffer is full and subsequent Put()'s will fail.
bool buffer_full_;
Expand Down Expand Up @@ -894,4 +894,4 @@ inline void RleEncoder::Clear() {
bit_writer_.Clear();
}

} // namespace facebook::velox::parquet::arrow::util
} // namespace facebook::velox::parquet
6 changes: 3 additions & 3 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) {
auto pageEnd = pageData_ + pageHeader.uncompressed_page_size;
if (maxRepeat_ > 0) {
uint32_t repeatLength = readField<int32_t>(pageData_);
repeatDecoder_ = std::make_unique<arrow::util::RleDecoder>(
repeatDecoder_ = std::make_unique<RleDecoder>(
reinterpret_cast<const uint8_t*>(pageData_),
repeatLength,
::arrow::bit_util::NumRequiredBits(maxRepeat_));
Expand All @@ -240,7 +240,7 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) {
pageData_ + defineLength,
::arrow::bit_util::NumRequiredBits(maxDefine_));
}
wideDefineDecoder_ = std::make_unique<arrow::util::RleDecoder>(
wideDefineDecoder_ = std::make_unique<RleDecoder>(
reinterpret_cast<const uint8_t*>(pageData_),
defineLength,
::arrow::bit_util::NumRequiredBits(maxDefine_));
Expand Down Expand Up @@ -281,7 +281,7 @@ void PageReader::prepareDataPageV2(const PageHeader& pageHeader, int64_t row) {
pageData_ = readBytes(bytes, pageBuffer_);

if (repeatLength) {
repeatDecoder_ = std::make_unique<arrow::util::RleDecoder>(
repeatDecoder_ = std::make_unique<RleDecoder>(
reinterpret_cast<const uint8_t*>(pageData_),
repeatLength,
::arrow::bit_util::NumRequiredBits(maxRepeat_));
Expand Down
8 changes: 3 additions & 5 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
#include "velox/dwio/common/DirectDecoder.h"
#include "velox/dwio/common/SelectiveColumnReader.h"
#include "velox/dwio/common/compression/Compression.h"
#include "velox/dwio/parquet/common/RleEncodingInternal.h"
#include "velox/dwio/parquet/reader/BooleanDecoder.h"
#include "velox/dwio/parquet/reader/DeltaBpDecoder.h"
#include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h"
#include "velox/dwio/parquet/reader/ParquetTypeWithId.h"
#include "velox/dwio/parquet/reader/RleBpDataDecoder.h"
#include "velox/dwio/parquet/reader/StringDecoder.h"
#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -375,10 +375,8 @@ class PageReader {
// Decoder for single bit definition levels. the arrow decoders are used for
// multibit levels pending fixing RleBpDecoder for the case.
std::unique_ptr<RleBpDecoder> defineDecoder_;
std::unique_ptr<::facebook::velox::parquet::arrow::util::RleDecoder>
repeatDecoder_;
std::unique_ptr<::facebook::velox::parquet::arrow::util::RleDecoder>
wideDefineDecoder_;
std::unique_ptr<RleDecoder> repeatDecoder_;
std::unique_ptr<RleDecoder> wideDefineDecoder_;

// True for a leaf column for which repdefs are loaded for the whole column
// chunk. This is typically the leaftmost leaf of a list. Other leaves under
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/parquet/tests/reader/RleBpDecoderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
*/

#include "velox/dwio/common/BitPackDecoder.h"
#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h"
#include "velox/dwio/parquet/common/RleEncodingInternal.h"

#include <gtest/gtest.h>

using namespace facebook::velox;
using namespace facebook::velox::dwio::common;

using facebook::velox::parquet::arrow::util::RleEncoder;
using facebook::velox::parquet::RleEncoder;

template <typename T>
class RleBpDecoderTest {
Expand Down
4 changes: 0 additions & 4 deletions velox/dwio/parquet/writer/arrow/ColumnWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/dwio/parquet/writer/arrow/util/Crc32.h"
#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h"
#include "velox/dwio/parquet/writer/arrow/util/VisitArrayInline.h"

using arrow::Array;
Expand All @@ -72,9 +71,6 @@ using arrow::internal::checked_pointer_cast;
namespace bit_util = arrow::bit_util;

namespace facebook::velox::parquet::arrow {
using bit_util::BitWriter;
using util::RleEncoder;

using util::CodecOptions;

namespace {
Expand Down
10 changes: 3 additions & 7 deletions velox/dwio/parquet/writer/arrow/ColumnWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cstring>
#include <memory>

#include "velox/dwio/parquet/common/RleEncodingInternal.h"
#include "velox/dwio/parquet/writer/arrow/Exception.h"
#include "velox/dwio/parquet/writer/arrow/Platform.h"
#include "velox/dwio/parquet/writer/arrow/Types.h"
Expand All @@ -35,13 +36,8 @@ class Array;

namespace facebook::velox::parquet::arrow {

namespace bit_util {
class BitWriter;
} // namespace bit_util

namespace util {
class CodecOptions;
class RleEncoder;
} // namespace util

struct ArrowWriteContext;
Expand Down Expand Up @@ -89,8 +85,8 @@ class PARQUET_EXPORT LevelEncoder {
int bit_width_;
int rle_length_;
Encoding::type encoding_;
std::unique_ptr<util::RleEncoder> rle_encoder_;
std::unique_ptr<bit_util::BitWriter> bit_packed_encoder_;
std::unique_ptr<RleEncoder> rle_encoder_;
std::unique_ptr<BitWriter> bit_packed_encoder_;
};

class PARQUET_EXPORT PageWriter {
Expand Down
42 changes: 20 additions & 22 deletions velox/dwio/parquet/writer/arrow/Encoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@
#include "arrow/util/logging.h"
#include "arrow/util/ubsan.h"
#include "arrow/visit_data_inline.h"
#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h"
#include "velox/dwio/parquet/common/RleEncodingInternal.h"
#include "velox/dwio/parquet/writer/arrow/Exception.h"
#include "velox/dwio/parquet/writer/arrow/Platform.h"
#include "velox/dwio/parquet/writer/arrow/Schema.h"
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h"
#include "velox/dwio/parquet/writer/arrow/util/ByteStreamSplitInternal.h"
#include "velox/dwio/parquet/writer/arrow/util/Hashing.h"
#include "velox/dwio/parquet/writer/arrow/util/OverflowUtilInternal.h"
#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h"

namespace bit_util = arrow::bit_util;

Expand Down Expand Up @@ -477,8 +477,8 @@ int RlePreserveBufferSize(int num_values, int bit_width) {
// is called, we have to reserve an extra "RleEncoder::MinBufferSize"
// bytes. These extra bytes won't be used but not reserving them
// would cause the encoder to fail.
return arrow::util::RleEncoder::MaxBufferSize(bit_width, num_values) +
arrow::util::RleEncoder::MinBufferSize(bit_width);
return RleEncoder::MaxBufferSize(bit_width, num_values) +
RleEncoder::MinBufferSize(bit_width);
}

/// See the dictionary encoding section of
Expand Down Expand Up @@ -517,7 +517,7 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
++buffer;
--buffer_len;

arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
RleEncoder encoder(buffer, buffer_len, bit_width());

for (int32_t index : buffered_indices_) {
if (ARROW_PREDICT_FALSE(!encoder.Put(index)))
Expand Down Expand Up @@ -1282,7 +1282,7 @@ class PlainBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
typename EncodingTraits<BooleanType>::DictAccumulator* out) override;

private:
std::unique_ptr<arrow::bit_util::BitReader> bit_reader_;
std::unique_ptr<BitReader> bit_reader_;
};

PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr)
Expand All @@ -1293,7 +1293,7 @@ void PlainBooleanDecoder::SetData(
const uint8_t* data,
int len) {
num_values_ = num_values;
bit_reader_ = std::make_unique<bit_util::BitReader>(data, len);
bit_reader_ = std::make_unique<BitReader>(data, len);
}

int PlainBooleanDecoder::DecodeArrow(
Expand Down Expand Up @@ -1738,7 +1738,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
num_values_ = num_values;
if (len == 0) {
// Initialize dummy decoder to avoid crashes later on
idx_decoder_ = arrow::util::RleDecoder(data, len, /*bit_width=*/1);
idx_decoder_ = RleDecoder(data, len, /*bit_width=*/1);
return;
}
uint8_t bit_width = *data;
Expand All @@ -1747,7 +1747,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
"Invalid or corrupted bit_width " + std::to_string(bit_width) +
". Maximum allowed is 32.");
}
idx_decoder_ = arrow::util::RleDecoder(++data, --len, bit_width);
idx_decoder_ = RleDecoder(++data, --len, bit_width);
}

int Decode(T* buffer, int num_values) override {
Expand Down Expand Up @@ -1920,7 +1920,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
// BinaryDictionary32Builder
std::shared_ptr<ResizableBuffer> indices_scratch_space_;

arrow::util::RleDecoder idx_decoder_;
RleDecoder idx_decoder_;
};

template <typename Type>
Expand Down Expand Up @@ -2555,7 +2555,7 @@ class DeltaBitPackEncoder : public EncoderImpl,
ArrowPoolVector<UT> deltas_;
std::shared_ptr<ResizableBuffer> bits_buffer_;
::arrow::BufferBuilder sink_;
arrow::bit_util::BitWriter bit_writer_;
BitWriter bit_writer_;
};

template <typename DType>
Expand Down Expand Up @@ -2658,7 +2658,7 @@ std::shared_ptr<::arrow::Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true));

uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {};
bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_));
BitWriter header_writer(header_buffer_, sizeof(header_buffer_));
if (!header_writer.PutVlqInt(values_per_block_) ||
!header_writer.PutVlqInt(mini_blocks_per_block_) ||
!header_writer.PutVlqInt(total_value_count_) ||
Expand Down Expand Up @@ -2775,15 +2775,13 @@ class DeltaBitPackDecoder : public DecoderImpl,
// num_values is equal to page's num_values, including null values in this
// page
this->num_values_ = num_values;
decoder_ = std::make_shared<arrow::bit_util::BitReader>(data, len);
decoder_ = std::make_shared<BitReader>(data, len);
InitHeader();
}

// Set BitReader which is already initialized by DeltaLengthByteArrayDecoder
// or DeltaByteArrayDecoder
void SetDecoder(
int num_values,
std::shared_ptr<arrow::bit_util::BitReader> decoder) {
void SetDecoder(int num_values, std::shared_ptr<BitReader> decoder) {
this->num_values_ = num_values;
decoder_ = std::move(decoder);
InitHeader();
Expand Down Expand Up @@ -2981,7 +2979,7 @@ class DeltaBitPackDecoder : public DecoderImpl,
}

MemoryPool* pool_;
std::shared_ptr<arrow::bit_util::BitReader> decoder_;
std::shared_ptr<BitReader> decoder_;
uint32_t values_per_block_;
uint32_t mini_blocks_per_block_;
uint32_t values_per_mini_block_;
Expand Down Expand Up @@ -3160,7 +3158,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,

void SetData(int num_values, const uint8_t* data, int len) override {
DecoderImpl::SetData(num_values, data, len);
decoder_ = std::make_shared<arrow::bit_util::BitReader>(data, len);
decoder_ = std::make_shared<BitReader>(data, len);
DecodeLengths();
}

Expand Down Expand Up @@ -3293,7 +3291,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
return Status::OK();
}

std::shared_ptr<arrow::bit_util::BitReader> decoder_;
std::shared_ptr<BitReader> decoder_;
DeltaBitPackDecoder<Int32Type> len_decoder_;
int num_valid_values_{0};
uint32_t length_idx_{0};
Expand Down Expand Up @@ -3401,7 +3399,7 @@ std::shared_ptr<Buffer> RleBooleanEncoder::FlushValues() {
int rle_buffer_size_max = MaxRleBufferSize();
std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes);
arrow::util::RleEncoder encoder(
RleEncoder encoder(
buffer->mutable_data() + kRleLengthInBytes,
rle_buffer_size_max,
/*bit_width*/ kBitWidth);
Expand Down Expand Up @@ -3772,7 +3770,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl,

void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
decoder_ = std::make_shared<arrow::bit_util::BitReader>(data, len);
decoder_ = std::make_shared<BitReader>(data, len);
prefix_len_decoder_.SetDecoder(num_values, decoder_);

// get the number of encoded prefix lengths
Expand Down Expand Up @@ -3926,7 +3924,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl,
MemoryPool* pool_;

private:
std::shared_ptr<arrow::bit_util::BitReader> decoder_;
std::shared_ptr<BitReader> decoder_;
DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
DeltaLengthByteArrayDecoder suffix_decoder_;
std::string last_value_;
Expand Down
Loading

0 comments on commit 8a43928

Please sign in to comment.