From bdfe824ae3d92db7c9631a80022fdeacd03c47ee Mon Sep 17 00:00:00 2001 From: wypb Date: Thu, 12 Dec 2024 10:26:46 +0800 Subject: [PATCH] refactor(parquet): Move arrow RleEncodingInternal to common --- .../common/tests/BitPackDecoderBenchmark.cpp | 4 +- .../parquet/common/BitStreamUtilsInternal.h | 548 +++++++++++++ .../util => common}/RleEncodingInternal.h | 739 +++++++++--------- velox/dwio/parquet/reader/PageReader.cpp | 6 +- velox/dwio/parquet/reader/PageReader.h | 8 +- .../parquet/tests/reader/RleBpDecoderTest.cpp | 4 +- .../parquet/writer/arrow/ColumnWriter.cpp | 6 - .../dwio/parquet/writer/arrow/ColumnWriter.h | 17 +- velox/dwio/parquet/writer/arrow/Encoding.cpp | 54 +- .../writer/arrow/tests/ColumnReader.cpp | 12 +- .../parquet/writer/arrow/tests/ColumnReader.h | 7 +- .../writer/arrow/tests/EncodingTest.cpp | 65 +- .../arrow/util/BitStreamUtilsInternal.h | 575 -------------- 13 files changed, 996 insertions(+), 1049 deletions(-) create mode 100644 velox/dwio/parquet/common/BitStreamUtilsInternal.h rename velox/dwio/parquet/{writer/arrow/util => common}/RleEncodingInternal.h (50%) delete mode 100644 velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h diff --git a/velox/dwio/common/tests/BitPackDecoderBenchmark.cpp b/velox/dwio/common/tests/BitPackDecoderBenchmark.cpp index 6f42b42e9959..5a8f505e37b8 100644 --- a/velox/dwio/common/tests/BitPackDecoderBenchmark.cpp +++ b/velox/dwio/common/tests/BitPackDecoderBenchmark.cpp @@ -17,7 +17,7 @@ #include "velox/common/base/BitUtil.h" #include "velox/common/base/Exceptions.h" #include "velox/dwio/common/BitPackDecoder.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" +#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h" #ifdef __BMI2__ #include "velox/dwio/common/tests/Lemire/bmipacking32.h" @@ -307,7 +307,7 @@ void lemirebmi2(uint8_t bitWidth, uint32_t* result) { template void arrowBitUnpack(uint8_t bitWidth, T* result) { - facebook::velox::parquet::arrow::bit_util::BitReader bitReader( + facebook::velox::parquet::BitReader bitReader( reinterpret_cast(bitPackedData[bitWidth].data()), BYTES(kNumValues, bitWidth)); bitReader.GetBatch(bitWidth, result, kNumValues); diff --git a/velox/dwio/parquet/common/BitStreamUtilsInternal.h b/velox/dwio/parquet/common/BitStreamUtilsInternal.h new file mode 100644 index 000000000000..a49cea663696 --- /dev/null +++ b/velox/dwio/parquet/common/BitStreamUtilsInternal.h @@ -0,0 +1,548 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// From Apache Impala (incubating) as of 2016-01-29 + +// Adapted from Apache Arrow. + +#pragma once + +#include +#include +#include + +#include "velox/common/base/Exceptions.h" + +#include "arrow/util/bit_util.h" +#include "arrow/util/bpacking.h" + +namespace facebook::velox::parquet { + +/// Utility class to write bit/byte streams. This class can write data to +/// either be bit packed or byte aligned (and a single stream that has a mix of +/// both). This class does not allocate memory. +class BitWriter { + public: + /// buffer: buffer to write bits to. Buffer should be preallocated with + /// 'bufferLen' bytes. + BitWriter(uint8_t* buffer, int bufferLen) + : buffer_(buffer), maxBytes_(bufferLen) { + Clear(); + } + + void Clear() { + bufferedValues_ = 0; + byteOffset_ = 0; + bitOffset_ = 0; + } + + /// The number of current bytes written, including the current byte (i.e. may + /// include a fraction of a byte). Includes buffered values. + int bytesWritten() const { + return byteOffset_ + + static_cast(::arrow::bit_util::BytesForBits(bitOffset_)); + } + uint8_t* buffer() const { + return buffer_; + } + int bufferLen() const { + return maxBytes_; + } + + /// Writes a value to bufferedValues_, flushing to buffer_ if necessary. This + /// is bit packed. Returns false if there was not enough space. numBits must + /// be <= 32. + bool PutValue(uint64_t v, int numBits); + + /// Writes v to the next aligned byte using numBytes. If T is larger than + /// numBytes, the extra high-order bytes will be ignored. Returns false if + /// there was not enough space. + /// Assume the v is stored in buffer_ as a little-endian format + template + bool PutAligned(T v, int numBytes); + + /// Write a Vlq encoded int to the buffer. Returns false if there was not + /// enough room. The value is written byte aligned. For more details on vlq: + /// en.wikipedia.org/wiki/Variable-length_quantity + bool PutVlqInt(uint32_t v); + + // Writes an int zigzag encoded. + bool PutZigZagVlqInt(int32_t v); + + /// Write a Vlq encoded int64 to the buffer. Returns false if there was not + /// enough room. The value is written byte aligned. For more details on vlq: + /// en.wikipedia.org/wiki/Variable-length_quantity + bool PutVlqInt(uint64_t v); + + // Writes an int64 zigzag encoded. + bool PutZigZagVlqInt(int64_t v); + + /// Get a pointer to the next aligned byte and advance the underlying buffer + /// by numBytes. + /// Returns NULL if there was not enough space. + uint8_t* GetNextBytePtr(int numBytes = 1); + + /// Flushes all buffered values to the buffer. Call this when done writing to + /// the buffer. If 'align' is true, bufferedValues_ is reset and any future + /// writes will be written to the next byte boundary. + void Flush(bool align = false); + + private: + uint8_t* buffer_; + int maxBytes_; + + /// Bit-packed values are initially written to this variable before being + /// memcpy'd to buffer_. This is faster than writing values byte by byte + /// directly to buffer_. + uint64_t bufferedValues_; + + int byteOffset_; // Offset in buffer_ + int bitOffset_; // Offset in bufferedValues_ +}; + +namespace detail { + +inline uint64_t ReadLittleEndianWord( + const uint8_t* buffer, + int bytesRemaining) { + uint64_t leValue = 0; + if (FOLLY_LIKELY(bytesRemaining >= 8)) { + memcpy(&leValue, buffer, 8); + } else { + memcpy(&leValue, buffer, bytesRemaining); + } + return ::arrow::bit_util::FromLittleEndian(leValue); +} + +} // namespace detail + +/// Utility class to read bit/byte stream. This class can read bits or bytes +/// that are either byte aligned or not. It also has utilities to read multiple +/// bytes in one read (e.g. encoded int). +class BitReader { + public: + BitReader() = default; + + /// 'buffer' is the buffer to read from. The buffer's length is 'bufferLen'. + BitReader(const uint8_t* buffer, int bufferLen) : BitReader() { + Reset(buffer, bufferLen); + } + + void Reset(const uint8_t* buffer, int bufferLen) { + buffer_ = buffer; + maxBytes_ = bufferLen; + byteOffset_ = 0; + bitOffset_ = 0; + bufferedValues_ = detail::ReadLittleEndianWord( + buffer_ + byteOffset_, maxBytes_ - byteOffset_); + } + + /// Gets the next value from the buffer. Returns true if 'v' could be read or + /// false if there are not enough bytes left. + template + bool GetValue(int numBits, T* v); + + /// Get a number of values from the buffer. Return the number of values + /// actually read. + template + int GetBatch(int numBits, T* v, int batchSize); + + /// Reads a 'numBytes'-sized value from the buffer and stores it in 'v'. T + /// needs to be a little-endian native type and big enough to store + /// 'numBytes'. The value is assumed to be byte-aligned so the stream will + /// be advanced to the start of the next byte before 'v' is read. Returns + /// false if there are not enough bytes left. + /// Assume the v was stored in buffer_ as a little-endian format + template + bool GetAligned(int numBytes, T* v); + + /// Advances the stream by a number of bits. Returns true if succeed or false + /// if there are not enough bits left. + bool Advance(int64_t numBits); + + /// Reads a vlq encoded int from the stream. The encoded int must start at + /// the beginning of a byte. Return false if there were not enough bytes in + /// the buffer. + bool GetVlqInt(uint32_t* v); + + // Reads a zigzag encoded int `into` v. + bool GetZigZagVlqInt(int32_t* v); + + /// Reads a vlq encoded int64 from the stream. The encoded int must start at + /// the beginning of a byte. Return false if there were not enough bytes in + /// the buffer. + bool GetVlqInt(uint64_t* v); + + // Reads a zigzag encoded int64 `into` v. + bool GetZigZagVlqInt(int64_t* v); + + /// Returns the number of bytes left in the stream, not including the current + /// byte (i.e., there may be an additional fraction of a byte). + int bytesLeft() const { + return maxBytes_ - + (byteOffset_ + + static_cast(::arrow::bit_util::BytesForBits(bitOffset_))); + } + + /// Maximum byte length of a vlq encoded int + static constexpr int kMaxVlqByteLength = 5; + + /// Maximum byte length of a vlq encoded int64 + static constexpr int kMaxVlqByteLengthForInt64 = 10; + + private: + const uint8_t* buffer_; + int maxBytes_; + + /// Bytes are memcpy'd from buffer_ and values are read from this variable. + /// This is faster than reading values byte by byte directly from buffer_. + uint64_t bufferedValues_; + + int byteOffset_; // Offset in buffer_ + int bitOffset_; // Offset in bufferedValues_ +}; + +inline bool BitWriter::PutValue(uint64_t v, int numBits) { + VELOX_DCHECK_LE(numBits, 64); + if (numBits < 64) { + VELOX_DCHECK_EQ(v >> numBits, 0, "v = {}, numBits = {}", v, numBits); + } + + if (FOLLY_UNLIKELY(byteOffset_ * 8 + bitOffset_ + numBits > maxBytes_ * 8)) + return false; + + bufferedValues_ |= v << bitOffset_; + bitOffset_ += numBits; + + if (FOLLY_UNLIKELY(bitOffset_ >= 64)) { + // Flush bufferedValues_ and write out bits of v that did not fit + bufferedValues_ = folly::Endian::little(bufferedValues_); + memcpy(buffer_ + byteOffset_, &bufferedValues_, 8); + bufferedValues_ = 0; + byteOffset_ += 8; + bitOffset_ -= 64; + bufferedValues_ = + (numBits - bitOffset_ == 64) ? 0 : (v >> (numBits - bitOffset_)); + } + VELOX_DCHECK_LT(bitOffset_, 64); + return true; +} + +inline void BitWriter::Flush(bool align) { + int numBytes = static_cast(::arrow::bit_util::BytesForBits(bitOffset_)); + VELOX_DCHECK_LE(byteOffset_ + numBytes, maxBytes_); + auto bufferedValues = folly::Endian::little(bufferedValues_); + memcpy(buffer_ + byteOffset_, &bufferedValues, numBytes); + + if (align) { + bufferedValues_ = 0; + byteOffset_ += numBytes; + bitOffset_ = 0; + } +} + +inline uint8_t* BitWriter::GetNextBytePtr(int numBytes) { + Flush(/* align */ true); + VELOX_DCHECK_LE(byteOffset_, maxBytes_); + if (byteOffset_ + numBytes > maxBytes_) + return NULL; + uint8_t* ptr = buffer_ + byteOffset_; + byteOffset_ += numBytes; + return ptr; +} + +template +inline bool BitWriter::PutAligned(T val, int numBytes) { + uint8_t* ptr = GetNextBytePtr(numBytes); + if (ptr == NULL) + return false; + val = folly::Endian::little(val); + memcpy(ptr, &val, numBytes); + return true; +} + +namespace detail { + +template +inline void GetValue_( + int numBits, + T* v, + int maxBytes, + const uint8_t* buffer, + int* bitOffset, + int* byteOffset, + uint64_t* bufferedValues) { + *v = static_cast( + ::arrow::bit_util::TrailingBits(*bufferedValues, *bitOffset + numBits) >> + *bitOffset); + *bitOffset += numBits; + if (*bitOffset >= 64) { + *byteOffset += 8; + *bitOffset -= 64; + + *bufferedValues = detail::ReadLittleEndianWord( + buffer + *byteOffset, maxBytes - *byteOffset); + // Read bits of v that crossed into new bufferedValues_ + if (FOLLY_LIKELY(numBits - *bitOffset < static_cast(8 * sizeof(T)))) { + // if shift exponent(numBits - *bitOffset) is not less than sizeof(T), + // *v will not change and the following code may cause a runtime error + // that the shift exponent is too large + *v = *v | + static_cast( + ::arrow::bit_util::TrailingBits(*bufferedValues, *bitOffset) + << (numBits - *bitOffset)); + } + VELOX_DCHECK_LE(*bitOffset, 64); + } +} + +} // namespace detail + +template +inline bool BitReader::GetValue(int numBits, T* v) { + return GetBatch(numBits, v, 1) == 1; +} + +template +inline int BitReader::GetBatch(int numBits, T* v, int batchSize) { + VELOX_DCHECK(buffer_ != NULL); + VELOX_DCHECK_LE( + numBits, static_cast(sizeof(T) * 8), "numBits: {}", numBits); + + int bitOffset = bitOffset_; + int byteOffset = byteOffset_; + uint64_t bufferedValues = bufferedValues_; + int maxBytes = maxBytes_; + const uint8_t* buffer = buffer_; + + const int64_t neededBits = numBits * static_cast(batchSize); + constexpr uint64_t kBitsPerByte = 8; + const int64_t remainingBits = + static_cast(maxBytes - byteOffset) * kBitsPerByte - bitOffset; + if (remainingBits < neededBits) { + batchSize = static_cast(remainingBits / numBits); + } + + int i = 0; + if (FOLLY_UNLIKELY(bitOffset != 0)) { + for (; i < batchSize && bitOffset != 0; ++i) { + detail::GetValue_( + numBits, + &v[i], + maxBytes, + buffer, + &bitOffset, + &byteOffset, + &bufferedValues); + } + } + + if (sizeof(T) == 4) { + int numUnpacked = ::arrow::internal::unpack32( + reinterpret_cast(buffer + byteOffset), + reinterpret_cast(v + i), + batchSize - i, + numBits); + i += numUnpacked; + byteOffset += numUnpacked * numBits / 8; + } else if (sizeof(T) == 8 && numBits > 32) { + // Use unpack64 only if numBits is larger than 32 + // TODO (ARROW-13677): improve the performance of internal::unpack64 + // and remove the restriction of numBits + int numUnpacked = ::arrow::internal::unpack64( + buffer + byteOffset, + reinterpret_cast(v + i), + batchSize - i, + numBits); + i += numUnpacked; + byteOffset += numUnpacked * numBits / 8; + } else { + // TODO: revisit this limit if necessary + VELOX_DCHECK_LE(numBits, 32); + const int bufferSize = 1024; + uint32_t unpackBuffer[bufferSize]; + while (i < batchSize) { + int unpack_size = std::min(bufferSize, batchSize - i); + int numUnpacked = ::arrow::internal::unpack32( + reinterpret_cast(buffer + byteOffset), + unpackBuffer, + unpack_size, + numBits); + if (numUnpacked == 0) { + break; + } + for (int k = 0; k < numUnpacked; ++k) { + v[i + k] = static_cast(unpackBuffer[k]); + } + i += numUnpacked; + byteOffset += numUnpacked * numBits / 8; + } + } + + bufferedValues = + detail::ReadLittleEndianWord(buffer + byteOffset, maxBytes - byteOffset); + + for (; i < batchSize; ++i) { + detail::GetValue_( + numBits, + &v[i], + maxBytes, + buffer, + &bitOffset, + &byteOffset, + &bufferedValues); + } + + bitOffset_ = bitOffset; + byteOffset_ = byteOffset; + bufferedValues_ = bufferedValues; + + return batchSize; +} + +template +inline bool BitReader::GetAligned(int numBytes, T* v) { + if (FOLLY_UNLIKELY(numBytes > static_cast(sizeof(T)))) { + return false; + } + + int bytesRead = static_cast(::arrow::bit_util::BytesForBits(bitOffset_)); + if (FOLLY_UNLIKELY(byteOffset_ + bytesRead + numBytes > maxBytes_)) { + return false; + } + + // Advance byteOffset to next unread byte and read numBytes + byteOffset_ += bytesRead; + if constexpr (std::is_same_v) { + // ARROW-18031: if we're trying to get an aligned bool, just check + // the LSB of the next byte and move on. If we memcpy + FromLittleEndian + // as usual, we have potential undefined behavior for bools if the value + // isn't 0 or 1 + *v = *(buffer_ + byteOffset_) & 1; + } else { + memcpy(v, buffer_ + byteOffset_, numBytes); + *v = ::arrow::bit_util::FromLittleEndian(*v); + } + byteOffset_ += numBytes; + + bitOffset_ = 0; + bufferedValues_ = detail::ReadLittleEndianWord( + buffer_ + byteOffset_, maxBytes_ - byteOffset_); + return true; +} + +inline bool BitReader::Advance(int64_t numBits) { + int64_t bitsRequired = bitOffset_ + numBits; + int64_t bytesRequired = ::arrow::bit_util::BytesForBits(bitsRequired); + if (FOLLY_UNLIKELY(bytesRequired > maxBytes_ - byteOffset_)) { + return false; + } + byteOffset_ += static_cast(bitsRequired >> 3); + bitOffset_ = static_cast(bitsRequired & 7); + bufferedValues_ = detail::ReadLittleEndianWord( + buffer_ + byteOffset_, maxBytes_ - byteOffset_); + return true; +} + +inline bool BitWriter::PutVlqInt(uint32_t v) { + bool result = true; + while ((v & 0xFFFFFF80UL) != 0UL) { + result &= PutAligned(static_cast((v & 0x7F) | 0x80), 1); + v >>= 7; + } + result &= PutAligned(static_cast(v & 0x7F), 1); + return result; +} + +inline bool BitReader::GetVlqInt(uint32_t* v) { + uint32_t tmp = 0; + + for (int i = 0; i < kMaxVlqByteLength; i++) { + uint8_t byte = 0; + if (FOLLY_UNLIKELY(!GetAligned(1, &byte))) { + return false; + } + tmp |= static_cast(byte & 0x7F) << (7 * i); + + if ((byte & 0x80) == 0) { + *v = tmp; + return true; + } + } + + return false; +} + +inline bool BitWriter::PutZigZagVlqInt(int32_t v) { + uint32_t copyValue = ::arrow::util::SafeCopy(v); + copyValue = (copyValue << 1) ^ static_cast(v >> 31); + return PutVlqInt(copyValue); +} + +inline bool BitReader::GetZigZagVlqInt(int32_t* v) { + uint32_t u; + if (!GetVlqInt(&u)) + return false; + u = (u >> 1) ^ (~(u & 1) + 1); + *v = ::arrow::util::SafeCopy(u); + return true; +} + +inline bool BitWriter::PutVlqInt(uint64_t v) { + bool result = true; + while ((v & 0xFFFFFFFFFFFFFF80ULL) != 0ULL) { + result &= PutAligned(static_cast((v & 0x7F) | 0x80), 1); + v >>= 7; + } + result &= PutAligned(static_cast(v & 0x7F), 1); + return result; +} + +inline bool BitReader::GetVlqInt(uint64_t* v) { + uint64_t tmp = 0; + + for (int i = 0; i < kMaxVlqByteLengthForInt64; i++) { + uint8_t byte = 0; + if (FOLLY_UNLIKELY(!GetAligned(1, &byte))) { + return false; + } + tmp |= static_cast(byte & 0x7F) << (7 * i); + + if ((byte & 0x80) == 0) { + *v = tmp; + return true; + } + } + + return false; +} + +inline bool BitWriter::PutZigZagVlqInt(int64_t v) { + uint64_t copyValue = ::arrow::util::SafeCopy(v); + copyValue = (copyValue << 1) ^ static_cast(v >> 63); + return PutVlqInt(copyValue); +} + +inline bool BitReader::GetZigZagVlqInt(int64_t* v) { + uint64_t u; + if (!GetVlqInt(&u)) + return false; + u = (u >> 1) ^ (~(u & 1) + 1); + *v = ::arrow::util::SafeCopy(u); + return true; +} + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h b/velox/dwio/parquet/common/RleEncodingInternal.h similarity index 50% rename from velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h rename to velox/dwio/parquet/common/RleEncodingInternal.h index d3b7087cdb8d..650a82998688 100644 --- a/velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h +++ b/velox/dwio/parquet/common/RleEncodingInternal.h @@ -28,12 +28,10 @@ #include "arrow/util/bit_block_counter.h" #include "arrow/util/bit_run_reader.h" -#include "arrow/util/bit_util.h" -#include "arrow/util/macros.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" -namespace facebook::velox::parquet::arrow::util { +#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h" +namespace facebook::velox::parquet { /// Utility classes to do run length encoding (RLE) for fixed bit width values. /// If runs are sufficiently long, RLE is used, otherwise, the values are just /// bit-packed (literal encoding). For both types of runs, there is a @@ -87,28 +85,28 @@ namespace facebook::velox::parquet::arrow::util { /// Decoder class for RLE encoded data. class RleDecoder { public: - /// Create a decoder object. buffer/buffer_len is the decoded data. - /// bit_width is the width of each value (before encoding). - RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) - : bit_reader_(buffer, buffer_len), - bit_width_(bit_width), - current_value_(0), - repeat_count_(0), - literal_count_(0) { - DCHECK_GE(bit_width_, 0); - DCHECK_LE(bit_width_, 64); - } - - RleDecoder() : bit_width_(-1) {} - - void Reset(const uint8_t* buffer, int buffer_len, int bit_width) { - DCHECK_GE(bit_width, 0); - DCHECK_LE(bit_width, 64); - bit_reader_.Reset(buffer, buffer_len); - bit_width_ = bit_width; - current_value_ = 0; - repeat_count_ = 0; - literal_count_ = 0; + /// Create a decoder object. buffer/bufferLen is the decoded data. + /// bitWidth is the width of each value (before encoding). + RleDecoder(const uint8_t* buffer, int bufferLen, int bitWidth) + : bitReader_(buffer, bufferLen), + bitWidth_(bitWidth), + currentValue_(0), + repeatCount_(0), + literalCount_(0) { + VELOX_DCHECK_GE(bitWidth_, 0); + VELOX_DCHECK_LE(bitWidth_, 64); + } + + RleDecoder() : bitWidth_(-1) {} + + void Reset(const uint8_t* buffer, int bufferLen, int bitWidth) { + VELOX_DCHECK_GE(bitWidth, 0); + VELOX_DCHECK_LE(bitWidth, 64); + bitReader_.Reset(buffer, bufferLen); + bitWidth_ = bitWidth; + currentValue_ = 0; + repeatCount_ = 0; + literalCount_ = 0; } /// Gets the next value. Returns false if there are no more. @@ -117,15 +115,15 @@ class RleDecoder { /// Gets a batch of values. Returns the number of decoded elements. template - int GetBatch(T* values, int batch_size); + int GetBatch(T* values, int batchSize); /// Like GetBatch but add spacing for null entries template int GetBatchSpaced( - int batch_size, - int null_count, - const uint8_t* valid_bits, - int64_t valid_bits_offset, + int batchSize, + int nullCount, + const uint8_t* validBits, + int64_t validBitsOffset, T* out); /// Like GetBatch but the values are then decoded using the provided @@ -133,9 +131,9 @@ class RleDecoder { template int GetBatchWithDict( const T* dictionary, - int32_t dictionary_length, + int32_t dictionaryLength, T* values, - int batch_size); + int batchSize); /// Like GetBatchWithDict but add spacing for null entries /// @@ -144,23 +142,23 @@ class RleDecoder { template int GetBatchWithDictSpaced( const T* dictionary, - int32_t dictionary_length, + int32_t dictionaryLength, T* values, - int batch_size, - int null_count, - const uint8_t* valid_bits, - int64_t valid_bits_offset); + int batchSize, + int nullCount, + const uint8_t* validBits, + int64_t validBitsOffset); protected: - ::facebook::velox::parquet::arrow::bit_util::BitReader bit_reader_; + BitReader bitReader_; /// Number of bits needed to encode the value. Must be between 0 and 64. - int bit_width_; - uint64_t current_value_; - int32_t repeat_count_; - int32_t literal_count_; + int bitWidth_; + uint64_t currentValue_; + int32_t repeatCount_; + int32_t literalCount_; private: - /// Fills literal_count_ and repeat_count_ with next values. Returns false if + /// Fills literalCount_ and repeatCount_ with next values. Returns false if /// there are no more. template bool NextCounts(); @@ -169,10 +167,10 @@ class RleDecoder { template int GetSpaced( Converter converter, - int batch_size, - int null_count, - const uint8_t* valid_bits, - int64_t valid_bits_offset, + int batchSize, + int nullCount, + const uint8_t* validBits, + int64_t validBitsOffset, T* out); }; @@ -185,56 +183,56 @@ class RleDecoder { /// previous run is flushed out. class RleEncoder { public: - /// buffer/buffer_len: preallocated output buffer. - /// bit_width: max number of bits for value. + /// buffer/bufferLen: preallocated output buffer. + /// bitWidth: max number of bits for value. /// TODO: consider adding a min_repeated_run_length so the caller can control /// when values should be encoded as repeated runs. Currently this is derived - /// based on the bit_width, which can determine a storage optimal choice. - /// TODO: allow 0 bit_width (and have dict encoder use it) - RleEncoder(uint8_t* buffer, int buffer_len, int bit_width) - : bit_width_(bit_width), bit_writer_(buffer, buffer_len) { - DCHECK_GE(bit_width_, 0); - DCHECK_LE(bit_width_, 64); - max_run_byte_size_ = MinBufferSize(bit_width); - DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big enough."; + /// based on the bitWidth, which can determine a storage optimal choice. + /// TODO: allow 0 bitWidth (and have dict encoder use it) + RleEncoder(uint8_t* buffer, int bufferLen, int bitWidth) + : bitWidth_(bitWidth), bitWriter_(buffer, bufferLen) { + VELOX_DCHECK_GE(bitWidth_, 0); + VELOX_DCHECK_LE(bitWidth_, 64); + maxRunByteSize_ = MinBufferSize(bitWidth); + VELOX_DCHECK_GE(bufferLen, maxRunByteSize_, "Input buffer not big enough."); Clear(); } - /// Returns the minimum buffer size needed to use the encoder for 'bit_width' - /// This is the maximum length of a single run for 'bit_width'. + /// Returns the minimum buffer size needed to use the encoder for 'bitWidth' + /// This is the maximum length of a single run for 'bitWidth'. /// It is not valid to pass a buffer less than this length. - static int MinBufferSize(int bit_width) { - /// 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values. - int max_literal_run_size = 1 + + static int MinBufferSize(int bitWidth) { + /// 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bitWidth' values. + int maxLiteralRunSize = 1 + static_cast(::arrow::bit_util::BytesForBits( - MAX_VALUES_PER_LITERAL_RUN * bit_width)); - /// Up to kMaxVlqByteLength indicator and a single 'bit_width' value. - int max_repeated_run_size = arrow::bit_util::BitReader::kMaxVlqByteLength + - static_cast(::arrow::bit_util::BytesForBits(bit_width)); - return std::max(max_literal_run_size, max_repeated_run_size); + MAX_VALUES_PER_LITERAL_RUN * bitWidth)); + /// Up to kMaxVlqByteLength indicator and a single 'bitWidth' value. + int maxRepeatedRunSize = BitReader::kMaxVlqByteLength + + static_cast(::arrow::bit_util::BytesForBits(bitWidth)); + return std::max(maxLiteralRunSize, maxRepeatedRunSize); } - /// Returns the maximum byte size it could take to encode 'num_values'. - static int MaxBufferSize(int bit_width, int num_values) { - // For a bit_width > 1, the worst case is the repetition of "literal run of + /// Returns the maximum byte size it could take to encode 'numValues'. + static int MaxBufferSize(int bitWidth, int numValues) { + // For a bitWidth > 1, the worst case is the repetition of "literal run of // length 8 and then a repeated run of length 8". 8 values per smallest run, // 8 bits per byte - int bytes_per_run = bit_width; - int num_runs = static_cast(::arrow::bit_util::CeilDiv(num_values, 8)); - int literal_max_size = num_runs + num_runs * bytes_per_run; + int bytesPerRun = bitWidth; + int numRuns = static_cast(::arrow::bit_util::CeilDiv(numValues, 8)); + int literalMaxSize = numRuns + numRuns * bytesPerRun; // In the very worst case scenario, the data is a concatenation of repeated // runs of 8 values. Repeated run has a 1 byte varint followed by the // bit-packed repeated value - int min_repeated_run_size = - 1 + static_cast(::arrow::bit_util::BytesForBits(bit_width)); - int repeated_max_size = num_runs * min_repeated_run_size; + int minRepeatedRunSize = + 1 + static_cast(::arrow::bit_util::BytesForBits(bitWidth)); + int repeatedMaxSize = numRuns * minRepeatedRunSize; - return std::max(literal_max_size, repeated_max_size); + return std::max(literalMaxSize, repeatedMaxSize); } /// Encode value. Returns true if the value fits in buffer, false otherwise. - /// This value must be representable with bit_width_ bits. + /// This value must be representable with bitWidth_ bits. bool Put(uint64_t value); /// Flushes any pending values to the underlying buffer. @@ -246,10 +244,10 @@ class RleEncoder { /// Returns pointer to underlying buffer uint8_t* buffer() { - return bit_writer_.buffer(); + return bitWriter_.buffer(); } int32_t len() { - return bit_writer_.bytes_written(); + return bitWriter_.bytesWritten(); } private: @@ -263,14 +261,14 @@ class RleEncoder { void FlushBufferedValues(bool done); /// Flushes literal values to the underlying buffer. If - /// update_indicator_byte, then the current literal run is complete and the + /// updateIndicatorByte, then the current literal run is complete and the /// indicator byte is updated. - void FlushLiteralRun(bool update_indicator_byte); + void FlushLiteralRun(bool updateIndicatorByte); /// Flushes a repeated run to the underlying buffer. void FlushRepeatedRun(); - /// Checks and sets buffer_full_. This must be called after flushing a run to + /// Checks and sets bufferFull_. This must be called after flushing a run to /// make sure there are enough bytes remaining to encode the next run. void CheckBufferFull(); @@ -279,42 +277,42 @@ class RleEncoder { static const int MAX_VALUES_PER_LITERAL_RUN = (1 << 6) * 8; /// Number of bits needed to encode the value. Must be between 0 and 64. - const int bit_width_; + const int bitWidth_; /// Underlying buffer. - arrow::bit_util::BitWriter bit_writer_; + BitWriter bitWriter_; /// If true, the buffer is full and subsequent Put()'s will fail. - bool buffer_full_; + bool bufferFull_; /// The maximum byte size a single run can take. - int max_run_byte_size_; + int maxRunByteSize_; /// We need to buffer at most 8 values for literals. This happens when the - /// bit_width is 1 (so 8 values fit in one byte). + /// bitWidth is 1 (so 8 values fit in one byte). /// TODO: generalize this to other bit widths - int64_t buffered_values_[8]; + int64_t bufferedValues_[8]; - /// Number of values in buffered_values_ - int num_buffered_values_; + /// Number of values in bufferedValues_ + int numBufferedValues_; /// The current (also last) value that was written and the count of how /// many times in a row that value has been seen. This is maintained even - /// if we are in a literal run. If the repeat_count_ get high enough, we + /// if we are in a literal run. If the repeatCount_ get high enough, we /// switch to encoding repeated runs. - uint64_t current_value_; - int repeat_count_; + uint64_t currentValue_; + int repeatCount_; /// Number of literals in the current run. This does not include the literals - /// that might be in buffered_values_. Only after we've got a group big - /// enough can we decide if they should part of the literal_count_ or - /// repeat_count_ - int literal_count_; + /// that might be in bufferedValues_. Only after we've got a group big + /// enough can we decide if they should part of the literalCount_ or + /// repeatCount_ + int literalCount_; /// Pointer to a byte in the underlying buffer that stores the indicator byte. /// This is reserved as soon as we need a literal run but the value is written /// when the literal run is complete. - uint8_t* literal_indicator_byte_; + uint8_t* literalIndicatorByte_; }; template @@ -323,162 +321,160 @@ inline bool RleDecoder::Get(T* val) { } template -inline int RleDecoder::GetBatch(T* values, int batch_size) { - DCHECK_GE(bit_width_, 0); - int values_read = 0; +inline int RleDecoder::GetBatch(T* values, int batchSize) { + VELOX_DCHECK_GE(bitWidth_, 0); + int valuesRead = 0; auto* out = values; - while (values_read < batch_size) { - int remaining = batch_size - values_read; - - if (repeat_count_ > 0) { // Repeated value case. - int repeat_batch = std::min(remaining, repeat_count_); - std::fill(out, out + repeat_batch, static_cast(current_value_)); - - repeat_count_ -= repeat_batch; - values_read += repeat_batch; - out += repeat_batch; - } else if (literal_count_ > 0) { - int literal_batch = std::min(remaining, literal_count_); - int actual_read = bit_reader_.GetBatch(bit_width_, out, literal_batch); - if (actual_read != literal_batch) { - return values_read; + while (valuesRead < batchSize) { + int remaining = batchSize - valuesRead; + + if (repeatCount_ > 0) { // Repeated value case. + int repeatBatch = std::min(remaining, repeatCount_); + std::fill(out, out + repeatBatch, static_cast(currentValue_)); + + repeatCount_ -= repeatBatch; + valuesRead += repeatBatch; + out += repeatBatch; + } else if (literalCount_ > 0) { + int literalBatch = std::min(remaining, literalCount_); + int actualRead = bitReader_.GetBatch(bitWidth_, out, literalBatch); + if (actualRead != literalBatch) { + return valuesRead; } - literal_count_ -= literal_batch; - values_read += literal_batch; - out += literal_batch; + literalCount_ -= literalBatch; + valuesRead += literalBatch; + out += literalBatch; } else { if (!NextCounts()) - return values_read; + return valuesRead; } } - return values_read; + return valuesRead; } template inline int RleDecoder::GetSpaced( Converter converter, - int batch_size, - int null_count, - const uint8_t* valid_bits, - int64_t valid_bits_offset, + int batchSize, + int nullCount, + const uint8_t* validBits, + int64_t validBitsOffset, T* out) { - if (ARROW_PREDICT_FALSE(null_count == batch_size)) { - converter.FillZero(out, out + batch_size); - return batch_size; + if (FOLLY_UNLIKELY(nullCount == batchSize)) { + converter.FillZero(out, out + batchSize); + return batchSize; } - DCHECK_GE(bit_width_, 0); - int values_read = 0; - int values_remaining = batch_size - null_count; + VELOX_DCHECK_GE(bitWidth_, 0); + int valuesRead = 0; + int valuesRemaining = batchSize - nullCount; // Assume no bits to start. - ::arrow::internal::BitRunReader bit_reader( - valid_bits, - valid_bits_offset, - /*length=*/batch_size); - ::arrow::internal::BitRun valid_run = bit_reader.NextRun(); - while (values_read < batch_size) { - if (ARROW_PREDICT_FALSE(valid_run.length == 0)) { - valid_run = bit_reader.NextRun(); + ::arrow::internal::BitRunReader bitReader( + validBits, + validBitsOffset, + /*length=*/batchSize); + ::arrow::internal::BitRun validRun = bitReader.NextRun(); + while (valuesRead < batchSize) { + if (FOLLY_UNLIKELY(validRun.length == 0)) { + validRun = bitReader.NextRun(); } - DCHECK_GT(batch_size, 0); - DCHECK_GT(valid_run.length, 0); + VELOX_DCHECK_GT(batchSize, 0); + VELOX_DCHECK_GT(validRun.length, 0); - if (valid_run.set) { - if ((repeat_count_ == 0) && (literal_count_ == 0)) { + if (validRun.set) { + if ((repeatCount_ == 0) && (literalCount_ == 0)) { if (!NextCounts()) - return values_read; - DCHECK((repeat_count_ > 0) ^ (literal_count_ > 0)); + return valuesRead; + VELOX_DCHECK((repeatCount_ > 0) ^ (literalCount_ > 0)); } - if (repeat_count_ > 0) { - int repeat_batch = 0; - // Consume the entire repeat counts incrementing repeat_batch to + if (repeatCount_ > 0) { + int repeatBatch = 0; + // Consume the entire repeat counts incrementing repeatBatch to // be the total of nulls + values consumed, we only need to // get the total count because we can fill in the same value for // nulls and non-nulls. This proves to be a big efficiency win. - while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) { - DCHECK_GT(valid_run.length, 0); - if (valid_run.set) { - int update_size = - std::min(static_cast(valid_run.length), repeat_count_); - repeat_count_ -= update_size; - repeat_batch += update_size; - valid_run.length -= update_size; - values_remaining -= update_size; + while (repeatCount_ > 0 && (valuesRead + repeatBatch) < batchSize) { + VELOX_DCHECK_GT(validRun.length, 0); + if (validRun.set) { + int updateSize = + std::min(static_cast(validRun.length), repeatCount_); + repeatCount_ -= updateSize; + repeatBatch += updateSize; + validRun.length -= updateSize; + valuesRemaining -= updateSize; } else { // We can consume all nulls here because we would do so on // the next loop anyways. - repeat_batch += static_cast(valid_run.length); - valid_run.length = 0; + repeatBatch += static_cast(validRun.length); + validRun.length = 0; } - if (valid_run.length == 0) { - valid_run = bit_reader.NextRun(); + if (validRun.length == 0) { + validRun = bitReader.NextRun(); } } - RunType current_value = static_cast(current_value_); - if (ARROW_PREDICT_FALSE(!converter.IsValid(current_value))) { - return values_read; + RunType currentValue = static_cast(currentValue_); + if (FOLLY_UNLIKELY(!converter.IsValid(currentValue))) { + return valuesRead; } - converter.Fill(out, out + repeat_batch, current_value); - out += repeat_batch; - values_read += repeat_batch; - } else if (literal_count_ > 0) { - int literal_batch = std::min(values_remaining, literal_count_); - DCHECK_GT(literal_batch, 0); + converter.Fill(out, out + repeatBatch, currentValue); + out += repeatBatch; + valuesRead += repeatBatch; + } else if (literalCount_ > 0) { + int literalBatch = std::min(valuesRemaining, literalCount_); + VELOX_DCHECK_GT(literalBatch, 0); // Decode the literals constexpr int kBufferSize = 1024; RunType indices[kBufferSize]; - literal_batch = std::min(literal_batch, kBufferSize); - int actual_read = - bit_reader_.GetBatch(bit_width_, indices, literal_batch); - if (ARROW_PREDICT_FALSE(actual_read != literal_batch)) { - return values_read; + literalBatch = std::min(literalBatch, kBufferSize); + int actualRead = bitReader_.GetBatch(bitWidth_, indices, literalBatch); + if (FOLLY_UNLIKELY(actualRead != literalBatch)) { + return valuesRead; } - if (!converter.IsValid(indices, /*length=*/actual_read)) { - return values_read; + if (!converter.IsValid(indices, /*length=*/actualRead)) { + return valuesRead; } int skipped = 0; - int literals_read = 0; - while (literals_read < literal_batch) { - if (valid_run.set) { - int update_size = std::min( - literal_batch - literals_read, - static_cast(valid_run.length)); - converter.Copy(out, indices + literals_read, update_size); - literals_read += update_size; - out += update_size; - valid_run.length -= update_size; + int literalsRead = 0; + while (literalsRead < literalBatch) { + if (validRun.set) { + int updateSize = std::min( + literalBatch - literalsRead, static_cast(validRun.length)); + converter.Copy(out, indices + literalsRead, updateSize); + literalsRead += updateSize; + out += updateSize; + validRun.length -= updateSize; } else { - converter.FillZero(out, out + valid_run.length); - out += valid_run.length; - skipped += static_cast(valid_run.length); - valid_run.length = 0; + converter.FillZero(out, out + validRun.length); + out += validRun.length; + skipped += static_cast(validRun.length); + validRun.length = 0; } - if (valid_run.length == 0) { - valid_run = bit_reader.NextRun(); + if (validRun.length == 0) { + validRun = bitReader.NextRun(); } } - literal_count_ -= literal_batch; - values_remaining -= literal_batch; - values_read += literal_batch + skipped; + literalCount_ -= literalBatch; + valuesRemaining -= literalBatch; + valuesRead += literalBatch + skipped; } } else { - converter.FillZero(out, out + valid_run.length); - out += valid_run.length; - values_read += static_cast(valid_run.length); - valid_run.length = 0; + converter.FillZero(out, out + validRun.length); + out += validRun.length; + valuesRead += static_cast(validRun.length); + validRun.length = 0; } } - DCHECK_EQ(valid_run.length, 0); - DCHECK_EQ(values_remaining, 0); - return values_read; + VELOX_DCHECK_EQ(validRun.length, 0); + VELOX_DCHECK_EQ(valuesRemaining, 0); + return valuesRead; } // Converter for GetSpaced that handles runs that get returned @@ -492,8 +488,8 @@ struct PlainRleConverter { inline bool IsValid(const T* values, int32_t length) const { return true; } - inline void Fill(T* begin, T* end, const T& run_value) const { - std::fill(begin, end, run_value); + inline void Fill(T* begin, T* end, const T& runValue) const { + std::fill(begin, end, runValue); } inline void FillZero(T* begin, T* end) { std::fill(begin, end, kZero); @@ -505,25 +501,25 @@ struct PlainRleConverter { template inline int RleDecoder::GetBatchSpaced( - int batch_size, - int null_count, - const uint8_t* valid_bits, - int64_t valid_bits_offset, + int batchSize, + int nullCount, + const uint8_t* validBits, + int64_t validBitsOffset, T* out) { - if (null_count == 0) { - return GetBatch(out, batch_size); + if (nullCount == 0) { + return GetBatch(out, batchSize); } PlainRleConverter converter; - ::arrow::internal::BitBlockCounter block_counter( - valid_bits, valid_bits_offset, batch_size); + ::arrow::internal::BitBlockCounter blockCounter( + validBits, validBitsOffset, batchSize); - int total_processed = 0; + int totalProcessed = 0; int processed = 0; ::arrow::internal::BitBlockCount block; do { - block = block_counter.NextFourWords(); + block = blockCounter.NextFourWords(); if (block.length == 0) { break; } @@ -537,19 +533,19 @@ inline int RleDecoder::GetBatchSpaced( converter, block.length, block.length - block.popcount, - valid_bits, - valid_bits_offset, + validBits, + validBitsOffset, out); } - total_processed += processed; + totalProcessed += processed; out += block.length; - valid_bits_offset += block.length; + validBitsOffset += block.length; } while (processed == block.length); - return total_processed; + return totalProcessed; } -static inline bool IndexInRange(int32_t idx, int32_t dictionary_length) { - return idx >= 0 && idx < dictionary_length; +static inline bool IndexInRange(int32_t idx, int32_t dictionaryLength) { + return idx >= 0 && idx < dictionaryLength; } // Converter for GetSpaced that handles runs of returned dictionary @@ -558,26 +554,26 @@ template struct DictionaryConverter { T kZero = {}; const T* dictionary; - int32_t dictionary_length; + int32_t dictionaryLength; inline bool IsValid(int32_t value) { - return IndexInRange(value, dictionary_length); + return IndexInRange(value, dictionaryLength); } inline bool IsValid(const int32_t* values, int32_t length) const { using IndexType = int32_t; - IndexType min_index = std::numeric_limits::max(); - IndexType max_index = std::numeric_limits::min(); + IndexType minIndex = std::numeric_limits::max(); + IndexType maxIndex = std::numeric_limits::min(); for (int x = 0; x < length; x++) { - min_index = std::min(values[x], min_index); - max_index = std::max(values[x], max_index); + minIndex = std::min(values[x], minIndex); + maxIndex = std::max(values[x], maxIndex); } - return IndexInRange(min_index, dictionary_length) && - IndexInRange(max_index, dictionary_length); + return IndexInRange(minIndex, dictionaryLength) && + IndexInRange(maxIndex, dictionaryLength); } - inline void Fill(T* begin, T* end, const int32_t& run_value) const { - std::fill(begin, end, dictionary[run_value]); + inline void Fill(T* begin, T* end, const int32_t& runValue) const { + std::fill(begin, end, dictionary[runValue]); } inline void FillZero(T* begin, T* end) { std::fill(begin, end, kZero); @@ -593,99 +589,98 @@ struct DictionaryConverter { template inline int RleDecoder::GetBatchWithDict( const T* dictionary, - int32_t dictionary_length, + int32_t dictionaryLength, T* values, - int batch_size) { + int batchSize) { // Per https://github.com/apache/parquet-format/blob/master/Encodings.md, // the maximum dictionary index width in Parquet is 32 bits. using IndexType = int32_t; DictionaryConverter converter; converter.dictionary = dictionary; - converter.dictionary_length = dictionary_length; + converter.dictionaryLength = dictionaryLength; - DCHECK_GE(bit_width_, 0); - int values_read = 0; + VELOX_DCHECK_GE(bitWidth_, 0); + int valuesRead = 0; auto* out = values; - while (values_read < batch_size) { - int remaining = batch_size - values_read; + while (valuesRead < batchSize) { + int remaining = batchSize - valuesRead; - if (repeat_count_ > 0) { - auto idx = static_cast(current_value_); - if (ARROW_PREDICT_FALSE(!IndexInRange(idx, dictionary_length))) { - return values_read; + if (repeatCount_ > 0) { + auto idx = static_cast(currentValue_); + if (FOLLY_UNLIKELY(!IndexInRange(idx, dictionaryLength))) { + return valuesRead; } T val = dictionary[idx]; - int repeat_batch = std::min(remaining, repeat_count_); - std::fill(out, out + repeat_batch, val); + int repeatBatch = std::min(remaining, repeatCount_); + std::fill(out, out + repeatBatch, val); /* Upkeep counters */ - repeat_count_ -= repeat_batch; - values_read += repeat_batch; - out += repeat_batch; - } else if (literal_count_ > 0) { + repeatCount_ -= repeatBatch; + valuesRead += repeatBatch; + out += repeatBatch; + } else if (literalCount_ > 0) { constexpr int kBufferSize = 1024; IndexType indices[kBufferSize]; - int literal_batch = std::min(remaining, literal_count_); - literal_batch = std::min(literal_batch, kBufferSize); + int literalBatch = std::min(remaining, literalCount_); + literalBatch = std::min(literalBatch, kBufferSize); - int actual_read = - bit_reader_.GetBatch(bit_width_, indices, literal_batch); - if (ARROW_PREDICT_FALSE(actual_read != literal_batch)) { - return values_read; + int actualRead = bitReader_.GetBatch(bitWidth_, indices, literalBatch); + if (FOLLY_UNLIKELY(actualRead != literalBatch)) { + return valuesRead; } - if (ARROW_PREDICT_FALSE( - !converter.IsValid(indices, /*length=*/literal_batch))) { - return values_read; + if (FOLLY_UNLIKELY( + !converter.IsValid(indices, /*length=*/literalBatch))) { + return valuesRead; } - converter.Copy(out, indices, literal_batch); + converter.Copy(out, indices, literalBatch); /* Upkeep counters */ - literal_count_ -= literal_batch; - values_read += literal_batch; - out += literal_batch; + literalCount_ -= literalBatch; + valuesRead += literalBatch; + out += literalBatch; } else { if (!NextCounts()) - return values_read; + return valuesRead; } } - return values_read; + return valuesRead; } template inline int RleDecoder::GetBatchWithDictSpaced( const T* dictionary, - int32_t dictionary_length, + int32_t dictionaryLength, T* out, - int batch_size, - int null_count, - const uint8_t* valid_bits, - int64_t valid_bits_offset) { - if (null_count == 0) { - return GetBatchWithDict(dictionary, dictionary_length, out, batch_size); - } - ::arrow::internal::BitBlockCounter block_counter( - valid_bits, valid_bits_offset, batch_size); + int batchSize, + int nullCount, + const uint8_t* validBits, + int64_t validBitsOffset) { + if (nullCount == 0) { + return GetBatchWithDict(dictionary, dictionaryLength, out, batchSize); + } + ::arrow::internal::BitBlockCounter blockCounter( + validBits, validBitsOffset, batchSize); using IndexType = int32_t; DictionaryConverter converter; converter.dictionary = dictionary; - converter.dictionary_length = dictionary_length; + converter.dictionaryLength = dictionaryLength; - int total_processed = 0; + int totalProcessed = 0; int processed = 0; ::arrow::internal::BitBlockCount block; do { - block = block_counter.NextFourWords(); + block = blockCounter.NextFourWords(); if (block.length == 0) { break; } if (block.AllSet()) { processed = - GetBatchWithDict(dictionary, dictionary_length, out, block.length); + GetBatchWithDict(dictionary, dictionaryLength, out, block.length); } else if (block.NoneSet()) { converter.FillZero(out, out + block.length); processed = block.length; @@ -694,47 +689,47 @@ inline int RleDecoder::GetBatchWithDictSpaced( converter, block.length, block.length - block.popcount, - valid_bits, - valid_bits_offset, + validBits, + validBitsOffset, out); } - total_processed += processed; + totalProcessed += processed; out += block.length; - valid_bits_offset += block.length; + validBitsOffset += block.length; } while (processed == block.length); - return total_processed; + return totalProcessed; } template bool RleDecoder::NextCounts() { // Read the next run's indicator int, it could be a literal or repeated run. // The int is encoded as a vlq-encoded value. - uint32_t indicator_value = 0; - if (!bit_reader_.GetVlqInt(&indicator_value)) + uint32_t indicatorValue = 0; + if (!bitReader_.GetVlqInt(&indicatorValue)) return false; // lsb indicates if it is a literal run or repeated run - bool is_literal = indicator_value & 1; - uint32_t count = indicator_value >> 1; - if (is_literal) { - if (ARROW_PREDICT_FALSE( + bool isLiteral = indicatorValue & 1; + uint32_t count = indicatorValue >> 1; + if (isLiteral) { + if (FOLLY_UNLIKELY( count == 0 || count > static_cast(INT32_MAX) / 8)) { return false; } - literal_count_ = count * 8; + literalCount_ = count * 8; } else { - if (ARROW_PREDICT_FALSE( + if (FOLLY_UNLIKELY( count == 0 || count > static_cast(INT32_MAX))) { return false; } - repeat_count_ = count; + repeatCount_ = count; T value = {}; - if (!bit_reader_.GetAligned( - static_cast(::arrow::bit_util::CeilDiv(bit_width_, 8)), + if (!bitReader_.GetAligned( + static_cast(::arrow::bit_util::CeilDiv(bitWidth_, 8)), &value)) { return false; } - current_value_ = static_cast(value); + currentValue_ = static_cast(value); } return true; } @@ -742,156 +737,156 @@ bool RleDecoder::NextCounts() { /// This function buffers input values 8 at a time. After seeing all 8 values, /// it decides whether they should be encoded as a literal or repeated run. inline bool RleEncoder::Put(uint64_t value) { - DCHECK(bit_width_ == 64 || value < (1ULL << bit_width_)); - if (ARROW_PREDICT_FALSE(buffer_full_)) + VELOX_DCHECK(bitWidth_ == 64 || value < (1ULL << bitWidth_)); + if (FOLLY_UNLIKELY(bufferFull_)) return false; - if (ARROW_PREDICT_TRUE(current_value_ == value)) { - ++repeat_count_; - if (repeat_count_ > 8) { + if (FOLLY_LIKELY(currentValue_ == value)) { + ++repeatCount_; + if (repeatCount_ > 8) { // This is just a continuation of the current run, no need to buffer the // values. // Note that this is the fast path for long repeated runs. return true; } } else { - if (repeat_count_ >= 8) { + if (repeatCount_ >= 8) { // We had a run that was long enough but it has ended. Flush the // current repeated run. - DCHECK_EQ(literal_count_, 0); + VELOX_DCHECK_EQ(literalCount_, 0); FlushRepeatedRun(); } - repeat_count_ = 1; - current_value_ = value; + repeatCount_ = 1; + currentValue_ = value; } - buffered_values_[num_buffered_values_] = value; - if (++num_buffered_values_ == 8) { - DCHECK_EQ(literal_count_ % 8, 0); + bufferedValues_[numBufferedValues_] = value; + if (++numBufferedValues_ == 8) { + VELOX_DCHECK_EQ(literalCount_ % 8, 0); FlushBufferedValues(false); } return true; } -inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) { - if (literal_indicator_byte_ == NULL) { +inline void RleEncoder::FlushLiteralRun(bool updateIndicatorByte) { + if (literalIndicatorByte_ == NULL) { // The literal indicator byte has not been reserved yet, get one now. - literal_indicator_byte_ = bit_writer_.GetNextBytePtr(); - DCHECK(literal_indicator_byte_ != NULL); + literalIndicatorByte_ = bitWriter_.GetNextBytePtr(); + VELOX_DCHECK(literalIndicatorByte_ != NULL); } // Write all the buffered values as bit packed literals - for (int i = 0; i < num_buffered_values_; ++i) { - bool success = bit_writer_.PutValue(buffered_values_[i], bit_width_); - DCHECK(success) << "There is a bug in using CheckBufferFull()"; + for (int i = 0; i < numBufferedValues_; ++i) { + bool success = bitWriter_.PutValue(bufferedValues_[i], bitWidth_); + VELOX_DCHECK(success, "There is a bug in using CheckBufferFull()"); } - num_buffered_values_ = 0; + numBufferedValues_ = 0; - if (update_indicator_byte) { + if (updateIndicatorByte) { // At this point we need to write the indicator byte for the literal run. // We only reserve one byte, to allow for streaming writes of literal // values. The logic makes sure we flush literal runs often enough to not // overrun the 1 byte. - DCHECK_EQ(literal_count_ % 8, 0); - int num_groups = literal_count_ / 8; - int32_t indicator_value = (num_groups << 1) | 1; - DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); - *literal_indicator_byte_ = static_cast(indicator_value); - literal_indicator_byte_ = NULL; - literal_count_ = 0; + VELOX_DCHECK_EQ(literalCount_ % 8, 0); + int numGroups = literalCount_ / 8; + int32_t indicatorValue = (numGroups << 1) | 1; + VELOX_DCHECK_EQ(indicatorValue & 0xFFFFFF00, 0); + *literalIndicatorByte_ = static_cast(indicatorValue); + literalIndicatorByte_ = NULL; + literalCount_ = 0; CheckBufferFull(); } } inline void RleEncoder::FlushRepeatedRun() { - DCHECK_GT(repeat_count_, 0); + VELOX_DCHECK_GT(repeatCount_, 0); bool result = true; // The lsb of 0 indicates this is a repeated run - int32_t indicator_value = repeat_count_ << 1 | 0; - result &= bit_writer_.PutVlqInt(static_cast(indicator_value)); - result &= bit_writer_.PutAligned( - current_value_, - static_cast(::arrow::bit_util::CeilDiv(bit_width_, 8))); - DCHECK(result); - num_buffered_values_ = 0; - repeat_count_ = 0; + int32_t indicatorValue = repeatCount_ << 1 | 0; + result &= bitWriter_.PutVlqInt(static_cast(indicatorValue)); + result &= bitWriter_.PutAligned( + currentValue_, + static_cast(::arrow::bit_util::CeilDiv(bitWidth_, 8))); + VELOX_DCHECK(result); + numBufferedValues_ = 0; + repeatCount_ = 0; CheckBufferFull(); } /// Flush the values that have been buffered. At this point we decide whether /// we need to switch between the run types or continue the current one. inline void RleEncoder::FlushBufferedValues(bool done) { - if (repeat_count_ >= 8) { + if (repeatCount_ >= 8) { // Clear the buffered values. They are part of the repeated run now and we // don't want to flush them out as literals. - num_buffered_values_ = 0; - if (literal_count_ != 0) { + numBufferedValues_ = 0; + if (literalCount_ != 0) { // There was a current literal run. All the values in it have been // flushed but we still need to update the indicator byte. - DCHECK_EQ(literal_count_ % 8, 0); - DCHECK_EQ(repeat_count_, 8); + VELOX_DCHECK_EQ(literalCount_ % 8, 0); + VELOX_DCHECK_EQ(repeatCount_, 8); FlushLiteralRun(true); } - DCHECK_EQ(literal_count_, 0); + VELOX_DCHECK_EQ(literalCount_, 0); return; } - literal_count_ += num_buffered_values_; - DCHECK_EQ(literal_count_ % 8, 0); - int num_groups = literal_count_ / 8; - if (num_groups + 1 >= (1 << 6)) { + literalCount_ += numBufferedValues_; + VELOX_DCHECK_EQ(literalCount_ % 8, 0); + int numGroups = literalCount_ / 8; + if (numGroups + 1 >= (1 << 6)) { // We need to start a new literal run because the indicator byte we've // reserved cannot store more values. - DCHECK(literal_indicator_byte_ != NULL); + VELOX_DCHECK(literalIndicatorByte_ != NULL); FlushLiteralRun(true); } else { FlushLiteralRun(done); } - repeat_count_ = 0; + repeatCount_ = 0; } inline int RleEncoder::Flush() { - if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { - bool all_repeat = literal_count_ == 0 && - (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); + if (literalCount_ > 0 || repeatCount_ > 0 || numBufferedValues_ > 0) { + bool allRepeat = literalCount_ == 0 && + (repeatCount_ == numBufferedValues_ || numBufferedValues_ == 0); // There is something pending, figure out if it's a repeated or literal run - if (repeat_count_ > 0 && all_repeat) { + if (repeatCount_ > 0 && allRepeat) { FlushRepeatedRun(); } else { - DCHECK_EQ(literal_count_ % 8, 0); + VELOX_DCHECK_EQ(literalCount_ % 8, 0); // Buffer the last group of literals to 8 by padding with 0s. - for (; num_buffered_values_ != 0 && num_buffered_values_ < 8; - ++num_buffered_values_) { - buffered_values_[num_buffered_values_] = 0; + for (; numBufferedValues_ != 0 && numBufferedValues_ < 8; + ++numBufferedValues_) { + bufferedValues_[numBufferedValues_] = 0; } - literal_count_ += num_buffered_values_; + literalCount_ += numBufferedValues_; FlushLiteralRun(true); - repeat_count_ = 0; + repeatCount_ = 0; } } - bit_writer_.Flush(); - DCHECK_EQ(num_buffered_values_, 0); - DCHECK_EQ(literal_count_, 0); - DCHECK_EQ(repeat_count_, 0); + bitWriter_.Flush(); + VELOX_DCHECK_EQ(numBufferedValues_, 0); + VELOX_DCHECK_EQ(literalCount_, 0); + VELOX_DCHECK_EQ(repeatCount_, 0); - return bit_writer_.bytes_written(); + return bitWriter_.bytesWritten(); } inline void RleEncoder::CheckBufferFull() { - int bytes_written = bit_writer_.bytes_written(); - if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) { - buffer_full_ = true; + int bytesWritten = bitWriter_.bytesWritten(); + if (bytesWritten + maxRunByteSize_ > bitWriter_.bufferLen()) { + bufferFull_ = true; } } inline void RleEncoder::Clear() { - buffer_full_ = false; - current_value_ = 0; - repeat_count_ = 0; - num_buffered_values_ = 0; - literal_count_ = 0; - literal_indicator_byte_ = NULL; - bit_writer_.Clear(); + bufferFull_ = false; + currentValue_ = 0; + repeatCount_ = 0; + numBufferedValues_ = 0; + literalCount_ = 0; + literalIndicatorByte_ = NULL; + bitWriter_.Clear(); } -} // namespace facebook::velox::parquet::arrow::util +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 81f429c59a5a..e57201f9eac4 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -224,7 +224,7 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; if (maxRepeat_ > 0) { uint32_t repeatLength = readField(pageData_); - repeatDecoder_ = std::make_unique( + repeatDecoder_ = std::make_unique( reinterpret_cast(pageData_), repeatLength, ::arrow::bit_util::NumRequiredBits(maxRepeat_)); @@ -240,7 +240,7 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { pageData_ + defineLength, ::arrow::bit_util::NumRequiredBits(maxDefine_)); } - wideDefineDecoder_ = std::make_unique( + wideDefineDecoder_ = std::make_unique( reinterpret_cast(pageData_), defineLength, ::arrow::bit_util::NumRequiredBits(maxDefine_)); @@ -281,7 +281,7 @@ void PageReader::prepareDataPageV2(const PageHeader& pageHeader, int64_t row) { pageData_ = readBytes(bytes, pageBuffer_); if (repeatLength) { - repeatDecoder_ = std::make_unique( + repeatDecoder_ = std::make_unique( reinterpret_cast(pageData_), repeatLength, ::arrow::bit_util::NumRequiredBits(maxRepeat_)); diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 879f21d22691..dafd6cd49793 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -21,13 +21,13 @@ #include "velox/dwio/common/DirectDecoder.h" #include "velox/dwio/common/SelectiveColumnReader.h" #include "velox/dwio/common/compression/Compression.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/reader/BooleanDecoder.h" #include "velox/dwio/parquet/reader/DeltaBpDecoder.h" #include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h" #include "velox/dwio/parquet/reader/ParquetTypeWithId.h" #include "velox/dwio/parquet/reader/RleBpDataDecoder.h" #include "velox/dwio/parquet/reader/StringDecoder.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" namespace facebook::velox::parquet { @@ -375,10 +375,8 @@ class PageReader { // Decoder for single bit definition levels. the arrow decoders are used for // multibit levels pending fixing RleBpDecoder for the case. std::unique_ptr defineDecoder_; - std::unique_ptr<::facebook::velox::parquet::arrow::util::RleDecoder> - repeatDecoder_; - std::unique_ptr<::facebook::velox::parquet::arrow::util::RleDecoder> - wideDefineDecoder_; + std::unique_ptr repeatDecoder_; + std::unique_ptr wideDefineDecoder_; // True for a leaf column for which repdefs are loaded for the whole column // chunk. This is typically the leaftmost leaf of a list. Other leaves under diff --git a/velox/dwio/parquet/tests/reader/RleBpDecoderTest.cpp b/velox/dwio/parquet/tests/reader/RleBpDecoderTest.cpp index 043ceed97638..c45e10fe82b7 100644 --- a/velox/dwio/parquet/tests/reader/RleBpDecoderTest.cpp +++ b/velox/dwio/parquet/tests/reader/RleBpDecoderTest.cpp @@ -15,14 +15,14 @@ */ #include "velox/dwio/common/BitPackDecoder.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include using namespace facebook::velox; using namespace facebook::velox::dwio::common; -using facebook::velox::parquet::arrow::util::RleEncoder; +using facebook::velox::parquet::RleEncoder; template class RleBpDecoderTest { diff --git a/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp b/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp index 6c6b703c8346..517feadcdad5 100644 --- a/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp +++ b/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp @@ -57,7 +57,6 @@ #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/dwio/parquet/writer/arrow/util/Crc32.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/util/VisitArrayInline.h" using arrow::Array; @@ -69,12 +68,7 @@ using arrow::Status; using arrow::internal::checked_cast; using arrow::internal::checked_pointer_cast; -namespace bit_util = arrow::bit_util; - namespace facebook::velox::parquet::arrow { -using bit_util::BitWriter; -using util::RleEncoder; - using util::CodecOptions; namespace { diff --git a/velox/dwio/parquet/writer/arrow/ColumnWriter.h b/velox/dwio/parquet/writer/arrow/ColumnWriter.h index 522862b3e36a..90a9b96ab650 100644 --- a/velox/dwio/parquet/writer/arrow/ColumnWriter.h +++ b/velox/dwio/parquet/writer/arrow/ColumnWriter.h @@ -22,26 +22,25 @@ #include #include +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/Exception.h" #include "velox/dwio/parquet/writer/arrow/Platform.h" #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" namespace arrow { - class Array; - } // namespace arrow -namespace facebook::velox::parquet::arrow { - -namespace bit_util { +namespace facebook::velox::parquet { class BitWriter; -} // namespace bit_util +class RleEncoder; +} // namespace facebook::velox::parquet + +namespace facebook::velox::parquet::arrow { namespace util { class CodecOptions; -class RleEncoder; } // namespace util struct ArrowWriteContext; @@ -89,8 +88,8 @@ class PARQUET_EXPORT LevelEncoder { int bit_width_; int rle_length_; Encoding::type encoding_; - std::unique_ptr rle_encoder_; - std::unique_ptr bit_packed_encoder_; + std::unique_ptr rle_encoder_; + std::unique_ptr bit_packed_encoder_; }; class PARQUET_EXPORT PageWriter { diff --git a/velox/dwio/parquet/writer/arrow/Encoding.cpp b/velox/dwio/parquet/writer/arrow/Encoding.cpp index 3079c967cc4d..53a9ac4046e5 100644 --- a/velox/dwio/parquet/writer/arrow/Encoding.cpp +++ b/velox/dwio/parquet/writer/arrow/Encoding.cpp @@ -18,6 +18,7 @@ #include "velox/dwio/parquet/writer/arrow/Encoding.h" +#include #include #include #include @@ -42,17 +43,14 @@ #include "arrow/util/logging.h" #include "arrow/util/ubsan.h" #include "arrow/visit_data_inline.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/Exception.h" #include "velox/dwio/parquet/writer/arrow/Platform.h" #include "velox/dwio/parquet/writer/arrow/Schema.h" #include "velox/dwio/parquet/writer/arrow/Types.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" #include "velox/dwio/parquet/writer/arrow/util/ByteStreamSplitInternal.h" #include "velox/dwio/parquet/writer/arrow/util/Hashing.h" #include "velox/dwio/parquet/writer/arrow/util/OverflowUtilInternal.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" - -namespace bit_util = arrow::bit_util; using ::arrow::Buffer; using ::arrow::MemoryPool; @@ -477,8 +475,8 @@ int RlePreserveBufferSize(int num_values, int bit_width) { // is called, we have to reserve an extra "RleEncoder::MinBufferSize" // bytes. These extra bytes won't be used but not reserving them // would cause the encoder to fail. - return arrow::util::RleEncoder::MaxBufferSize(bit_width, num_values) + - arrow::util::RleEncoder::MinBufferSize(bit_width); + return RleEncoder::MaxBufferSize(bit_width, num_values) + + RleEncoder::MinBufferSize(bit_width); } /// See the dictionary encoding section of @@ -517,7 +515,7 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder { ++buffer; --buffer_len; - arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width()); + RleEncoder encoder(buffer, buffer_len, bit_width()); for (int32_t index : buffered_indices_) { if (ARROW_PREDICT_FALSE(!encoder.Put(index))) @@ -1282,7 +1280,7 @@ class PlainBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { typename EncodingTraits::DictAccumulator* out) override; private: - std::unique_ptr bit_reader_; + std::unique_ptr bit_reader_; }; PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr) @@ -1293,7 +1291,7 @@ void PlainBooleanDecoder::SetData( const uint8_t* data, int len) { num_values_ = num_values; - bit_reader_ = std::make_unique(data, len); + bit_reader_ = std::make_unique(data, len); } int PlainBooleanDecoder::DecodeArrow( @@ -1738,7 +1736,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { num_values_ = num_values; if (len == 0) { // Initialize dummy decoder to avoid crashes later on - idx_decoder_ = arrow::util::RleDecoder(data, len, /*bit_width=*/1); + idx_decoder_ = RleDecoder(data, len, /*bit_width=*/1); return; } uint8_t bit_width = *data; @@ -1747,7 +1745,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { "Invalid or corrupted bit_width " + std::to_string(bit_width) + ". Maximum allowed is 32."); } - idx_decoder_ = arrow::util::RleDecoder(++data, --len, bit_width); + idx_decoder_ = RleDecoder(++data, --len, bit_width); } int Decode(T* buffer, int num_values) override { @@ -1920,7 +1918,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { // BinaryDictionary32Builder std::shared_ptr indices_scratch_space_; - arrow::util::RleDecoder idx_decoder_; + RleDecoder idx_decoder_; }; template @@ -2555,7 +2553,7 @@ class DeltaBitPackEncoder : public EncoderImpl, ArrowPoolVector deltas_; std::shared_ptr bits_buffer_; ::arrow::BufferBuilder sink_; - arrow::bit_util::BitWriter bit_writer_; + BitWriter bit_writer_; }; template @@ -2646,7 +2644,7 @@ void DeltaBitPackEncoder::FlushBlock() { bit_writer_.Flush(); PARQUET_THROW_NOT_OK( - sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); + sink_.Append(bit_writer_.buffer(), bit_writer_.bytesWritten())); bit_writer_.Clear(); } @@ -2658,7 +2656,7 @@ std::shared_ptr<::arrow::Buffer> DeltaBitPackEncoder::FlushValues() { PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; - bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); + BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); if (!header_writer.PutVlqInt(values_per_block_) || !header_writer.PutVlqInt(mini_blocks_per_block_) || !header_writer.PutVlqInt(total_value_count_) || @@ -2671,11 +2669,11 @@ std::shared_ptr<::arrow::Buffer> DeltaBitPackEncoder::FlushValues() { // possible header and data was written immediately after. We now write the // header data immediately before the end of reserved space. const size_t offset_bytes = - kMaxPageHeaderWriterSize - header_writer.bytes_written(); + kMaxPageHeaderWriterSize - header_writer.bytesWritten(); std::memcpy( buffer->mutable_data() + offset_bytes, header_buffer_, - header_writer.bytes_written()); + header_writer.bytesWritten()); // Reset counter of cached values total_value_count_ = 0; @@ -2775,15 +2773,13 @@ class DeltaBitPackDecoder : public DecoderImpl, // num_values is equal to page's num_values, including null values in this // page this->num_values_ = num_values; - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); InitHeader(); } // Set BitReader which is already initialized by DeltaLengthByteArrayDecoder // or DeltaByteArrayDecoder - void SetDecoder( - int num_values, - std::shared_ptr decoder) { + void SetDecoder(int num_values, std::shared_ptr decoder) { this->num_values_ = num_values; decoder_ = std::move(decoder); InitHeader(); @@ -2981,7 +2977,7 @@ class DeltaBitPackDecoder : public DecoderImpl, } MemoryPool* pool_; - std::shared_ptr decoder_; + std::shared_ptr decoder_; uint32_t values_per_block_; uint32_t mini_blocks_per_block_; uint32_t values_per_mini_block_; @@ -3160,7 +3156,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, void SetData(int num_values, const uint8_t* data, int len) override { DecoderImpl::SetData(num_values, data, len); - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); DecodeLengths(); } @@ -3177,7 +3173,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, const int32_t* length_ptr = reinterpret_cast(buffered_length_->data()) + length_idx_; - int bytes_offset = len_ - decoder_->bytes_left(); + int bytes_offset = len_ - decoder_->bytesLeft(); for (int i = 0; i < max_values; ++i) { int32_t len = length_ptr[i]; if (ARROW_PREDICT_FALSE(len < 0)) { @@ -3293,7 +3289,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, return Status::OK(); } - std::shared_ptr decoder_; + std::shared_ptr decoder_; DeltaBitPackDecoder len_decoder_; int num_valid_values_{0}; uint32_t length_idx_{0}; @@ -3401,7 +3397,7 @@ std::shared_ptr RleBooleanEncoder::FlushValues() { int rle_buffer_size_max = MaxRleBufferSize(); std::shared_ptr buffer = AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes); - arrow::util::RleEncoder encoder( + RleEncoder encoder( buffer->mutable_data() + kRleLengthInBytes, rle_buffer_size_max, /*bit_width*/ kBitWidth); @@ -3772,7 +3768,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); prefix_len_decoder_.SetDecoder(num_values, decoder_); // get the number of encoded prefix lengths @@ -3788,7 +3784,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, prefix_len_offset_ = 0; num_valid_values_ = num_prefix; - int bytes_left = decoder_->bytes_left(); + int bytes_left = decoder_->bytesLeft(); // If len < bytes_left, prefix_len_decoder.Decode will throw exception. DCHECK_GE(len, bytes_left); int suffix_begins = len - bytes_left; @@ -3926,7 +3922,7 @@ class DeltaByteArrayDecoderImpl : public DecoderImpl, MemoryPool* pool_; private: - std::shared_ptr decoder_; + std::shared_ptr decoder_; DeltaBitPackDecoder prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; std::string last_value_; diff --git a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp index c2c898321bbc..1e038e68446b 100644 --- a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp +++ b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp @@ -51,8 +51,6 @@ #include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Statistics.h" #include "velox/dwio/parquet/writer/arrow/ThriftInternal.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" using arrow::MemoryPool; using arrow::internal::AddWithOverflow; @@ -127,8 +125,8 @@ int LevelDecoder::SetData( } const uint8_t* decoder_data = data + 4; if (!rle_decoder_) { - rle_decoder_ = std::make_unique( - decoder_data, num_bytes, bit_width_); + rle_decoder_ = + std::make_unique(decoder_data, num_bytes, bit_width_); } else { rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); } @@ -147,8 +145,7 @@ int LevelDecoder::SetData( "Received invalid number of bytes (corrupt data page?)"); } if (!bit_packed_decoder_) { - bit_packed_decoder_ = - std::make_unique(data, num_bytes); + bit_packed_decoder_ = std::make_unique(data, num_bytes); } else { bit_packed_decoder_->Reset(data, num_bytes); } @@ -176,8 +173,7 @@ void LevelDecoder::SetDataV2( bit_width_ = ::arrow::bit_util::Log2(max_level + 1); if (!rle_decoder_) { - rle_decoder_ = - std::make_unique(data, num_bytes, bit_width_); + rle_decoder_ = std::make_unique(data, num_bytes, bit_width_); } else { rle_decoder_->Reset(data, num_bytes, bit_width_); } diff --git a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h index 6eba405e20ec..6936aa618e50 100644 --- a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h +++ b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h @@ -24,13 +24,12 @@ #include #include "velox/dwio/parquet/common/LevelConversion.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/Exception.h" #include "velox/dwio/parquet/writer/arrow/Metadata.h" #include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Schema.h" #include "velox/dwio/parquet/writer/arrow/Types.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" namespace arrow { @@ -108,8 +107,8 @@ class PARQUET_EXPORT LevelDecoder { int bit_width_; int num_values_remaining_; Encoding::type encoding_; - std::unique_ptr rle_decoder_; - std::unique_ptr bit_packed_decoder_; + std::unique_ptr rle_decoder_; + std::unique_ptr bit_packed_decoder_; int16_t max_level_; }; diff --git a/velox/dwio/parquet/writer/arrow/tests/EncodingTest.cpp b/velox/dwio/parquet/writer/arrow/tests/EncodingTest.cpp index 48119ec403b8..7ba9847dab84 100644 --- a/velox/dwio/parquet/writer/arrow/tests/EncodingTest.cpp +++ b/velox/dwio/parquet/writer/arrow/tests/EncodingTest.cpp @@ -44,13 +44,12 @@ #include "arrow/util/ubsan.h" #include "arrow/visit_data_inline.h" +#include "velox/dwio/parquet/common/RleEncodingInternal.h" #include "velox/dwio/parquet/writer/arrow/Exception.h" #include "velox/dwio/parquet/writer/arrow/Platform.h" #include "velox/dwio/parquet/writer/arrow/Schema.h" #include "velox/dwio/parquet/writer/arrow/Types.h" -#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" #include "velox/dwio/parquet/writer/arrow/util/ByteStreamSplitInternal.h" -#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" namespace bit_util = arrow::bit_util; @@ -412,7 +411,7 @@ class PlainEncoder : public EncoderImpl, int bits_available_; std::shared_ptr bits_buffer_; ::arrow::BufferBuilder sink_; - arrow::bit_util::BitWriter bit_writer_; + BitWriter bit_writer_; template void PutImpl(const SequenceType& src, int num_values); @@ -434,7 +433,7 @@ void PlainEncoder::PutImpl( if (bits_available_ == 0) { bit_writer_.Flush(); PARQUET_THROW_NOT_OK( - sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); + sink_.Append(bit_writer_.buffer(), bit_writer_.bytesWritten())); bit_writer_.Clear(); } } @@ -454,7 +453,7 @@ void PlainEncoder::PutImpl( if (bits_available_ == 0) { bit_writer_.Flush(); PARQUET_THROW_NOT_OK( - sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); + sink_.Append(bit_writer_.buffer(), bit_writer_.bytesWritten())); bit_writer_.Clear(); } } @@ -462,14 +461,14 @@ void PlainEncoder::PutImpl( int64_t PlainEncoder::EstimatedDataEncodedSize() { int64_t position = sink_.length(); - return position + bit_writer_.bytes_written(); + return position + bit_writer_.bytesWritten(); } std::shared_ptr PlainEncoder::FlushValues() { if (bits_available_ > 0) { bit_writer_.Flush(); PARQUET_THROW_NOT_OK( - sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); + sink_.Append(bit_writer_.buffer(), bit_writer_.bytesWritten())); bit_writer_.Clear(); bits_available_ = static_cast(bits_buffer_->size()) * 8; } @@ -518,8 +517,8 @@ int RlePreserveBufferSize(int num_values, int bit_width) { // is called, we have to reserve an extra "RleEncoder::MinBufferSize" // bytes. These extra bytes won't be used but not reserving them // would cause the encoder to fail. - return arrow::util::RleEncoder::MaxBufferSize(bit_width, num_values) + - arrow::util::RleEncoder::MinBufferSize(bit_width); + return RleEncoder::MaxBufferSize(bit_width, num_values) + + RleEncoder::MinBufferSize(bit_width); } /// See the dictionary encoding section of @@ -560,7 +559,7 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder { ++buffer; --buffer_len; - arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width()); + RleEncoder encoder(buffer, buffer_len, bit_width()); for (int32_t index : buffered_indices_) { if (ARROW_PREDICT_FALSE(!encoder.Put(index))) @@ -1325,7 +1324,7 @@ class PlainBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { typename EncodingTraits::DictAccumulator* out) override; private: - std::unique_ptr bit_reader_; + std::unique_ptr bit_reader_; }; PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr) @@ -1336,7 +1335,7 @@ void PlainBooleanDecoder::SetData( const uint8_t* data, int len) { num_values_ = num_values; - bit_reader_ = std::make_unique(data, len); + bit_reader_ = std::make_unique(data, len); } int PlainBooleanDecoder::DecodeArrow( @@ -1707,7 +1706,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { num_values_ = num_values; if (len == 0) { // Initialize dummy decoder to avoid crashes later on - idx_decoder_ = arrow::util::RleDecoder(data, len, /*bit_width=*/1); + idx_decoder_ = RleDecoder(data, len, /*bit_width=*/1); return; } uint8_t bit_width = *data; @@ -1716,7 +1715,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { "Invalid or corrupted bit_width " + std::to_string(bit_width) + ". Maximum allowed is 32."); } - idx_decoder_ = arrow::util::RleDecoder(++data, --len, bit_width); + idx_decoder_ = RleDecoder(++data, --len, bit_width); } int Decode(T* buffer, int num_values) override { @@ -1889,7 +1888,7 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { // BinaryDictionary32Builder std::shared_ptr indices_scratch_space_; - arrow::util::RleDecoder idx_decoder_; + RleDecoder idx_decoder_; }; template @@ -2525,7 +2524,7 @@ class DeltaBitPackEncoder : public EncoderImpl, ArrowPoolVector deltas_; std::shared_ptr bits_buffer_; ::arrow::BufferBuilder sink_; - arrow::bit_util::BitWriter bit_writer_; + BitWriter bit_writer_; }; template @@ -2616,7 +2615,7 @@ void DeltaBitPackEncoder::FlushBlock() { bit_writer_.Flush(); PARQUET_THROW_NOT_OK( - sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); + sink_.Append(bit_writer_.buffer(), bit_writer_.bytesWritten())); bit_writer_.Clear(); } @@ -2628,7 +2627,7 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; - bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); + BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); if (!header_writer.PutVlqInt(values_per_block_) || !header_writer.PutVlqInt(mini_blocks_per_block_) || !header_writer.PutVlqInt(total_value_count_) || @@ -2641,11 +2640,11 @@ std::shared_ptr DeltaBitPackEncoder::FlushValues() { // possible header and data was written immediately after. We now write the // header data immediately before the end of reserved space. const size_t offset_bytes = - kMaxPageHeaderWriterSize - header_writer.bytes_written(); + kMaxPageHeaderWriterSize - header_writer.bytesWritten(); std::memcpy( buffer->mutable_data() + offset_bytes, header_buffer_, - header_writer.bytes_written()); + header_writer.bytesWritten()); // Reset counter of cached values total_value_count_ = 0; @@ -2745,15 +2744,13 @@ class DeltaBitPackDecoder : public DecoderImpl, // num_values is equal to page's num_values, including null values in this // page this->num_values_ = num_values; - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); InitHeader(); } // Set BitReader which is already initialized by DeltaLengthByteArrayDecoder // or DeltaByteArrayDecoder - void SetDecoder( - int num_values, - std::shared_ptr decoder) { + void SetDecoder(int num_values, std::shared_ptr decoder) { this->num_values_ = num_values; decoder_ = std::move(decoder); InitHeader(); @@ -2946,7 +2943,7 @@ class DeltaBitPackDecoder : public DecoderImpl, } MemoryPool* pool_; - std::shared_ptr decoder_; + std::shared_ptr decoder_; uint32_t values_per_block_; uint32_t mini_blocks_per_block_; uint32_t values_per_mini_block_; @@ -3124,7 +3121,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, void SetData(int num_values, const uint8_t* data, int len) override { DecoderImpl::SetData(num_values, data, len); - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); DecodeLengths(); } @@ -3141,7 +3138,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, const int32_t* length_ptr = reinterpret_cast(buffered_length_->data()) + length_idx_; - int bytes_offset = len_ - decoder_->bytes_left(); + int bytes_offset = len_ - decoder_->bytesLeft(); for (int i = 0; i < max_values; ++i) { int32_t len = length_ptr[i]; if (ARROW_PREDICT_FALSE(len < 0)) { @@ -3258,7 +3255,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, return Status::OK(); } - std::shared_ptr decoder_; + std::shared_ptr decoder_; DeltaBitPackDecoder len_decoder_; int num_valid_values_; uint32_t length_idx_; @@ -3366,7 +3363,7 @@ std::shared_ptr RleBooleanEncoder::FlushValues() { int rle_buffer_size_max = MaxRleBufferSize(); std::shared_ptr buffer = AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes); - arrow::util::RleEncoder encoder( + RleEncoder encoder( buffer->mutable_data() + kRleLengthInBytes, rle_buffer_size_max, /*bit_width*/ kBitWidth); @@ -3409,7 +3406,7 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { auto decoder_data = data + 4; if (decoder_ == nullptr) { - decoder_ = std::make_shared( + decoder_ = std::make_shared( decoder_data, num_bytes, /*bit_width=*/1); @@ -3471,7 +3468,7 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { } private: - std::shared_ptr decoder_; + std::shared_ptr decoder_; }; // ---------------------------------------------------------------------- @@ -3492,7 +3489,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; - decoder_ = std::make_shared(data, len); + decoder_ = std::make_shared(data, len); prefix_len_decoder_.SetDecoder(num_values, decoder_); // get the number of encoded prefix lengths @@ -3508,7 +3505,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, prefix_len_offset_ = 0; num_valid_values_ = num_prefix; - int bytes_left = decoder_->bytes_left(); + int bytes_left = decoder_->bytesLeft(); // If len < bytes_left, prefix_len_decoder.Decode will throw exception. DCHECK_GE(len, bytes_left); int suffix_begins = len - bytes_left; @@ -3650,7 +3647,7 @@ class DeltaByteArrayDecoder : public DecoderImpl, return Status::OK(); } - std::shared_ptr decoder_; + std::shared_ptr decoder_; DeltaBitPackDecoder prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; std::string last_value_; diff --git a/velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h b/velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h deleted file mode 100644 index 9ccb171bc084..000000000000 --- a/velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h +++ /dev/null @@ -1,575 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// From Apache Impala (incubating) as of 2016-01-29 - -// Adapted from Apache Arrow. - -#pragma once - -#include -#include -#include - -#include "arrow/util/bit_util.h" -#include "arrow/util/bpacking.h" -#include "arrow/util/logging.h" -#include "arrow/util/macros.h" -#include "arrow/util/ubsan.h" - -namespace facebook::velox::parquet::arrow::bit_util { - -/// Utility class to write bit/byte streams. This class can write data to -/// either be bit packed or byte aligned (and a single stream that has a mix of -/// both). This class does not allocate memory. -class BitWriter { - public: - /// buffer: buffer to write bits to. Buffer should be preallocated with - /// 'buffer_len' bytes. - BitWriter(uint8_t* buffer, int buffer_len) - : buffer_(buffer), max_bytes_(buffer_len) { - Clear(); - } - - void Clear() { - buffered_values_ = 0; - byte_offset_ = 0; - bit_offset_ = 0; - } - - /// The number of current bytes written, including the current byte (i.e. may - /// include a fraction of a byte). Includes buffered values. - int bytes_written() const { - return byte_offset_ + - static_cast(::arrow::bit_util::BytesForBits(bit_offset_)); - } - uint8_t* buffer() const { - return buffer_; - } - int buffer_len() const { - return max_bytes_; - } - - /// Writes a value to buffered_values_, flushing to buffer_ if necessary. This - /// is bit packed. Returns false if there was not enough space. num_bits must - /// be <= 32. - bool PutValue(uint64_t v, int num_bits); - - /// Writes v to the next aligned byte using num_bytes. If T is larger than - /// num_bytes, the extra high-order bytes will be ignored. Returns false if - /// there was not enough space. - /// Assume the v is stored in buffer_ as a little-endian format - template - bool PutAligned(T v, int num_bytes); - - /// Write a Vlq encoded int to the buffer. Returns false if there was not - /// enough room. The value is written byte aligned. For more details on vlq: - /// en.wikipedia.org/wiki/Variable-length_quantity - bool PutVlqInt(uint32_t v); - - // Writes an int zigzag encoded. - bool PutZigZagVlqInt(int32_t v); - - /// Write a Vlq encoded int64 to the buffer. Returns false if there was not - /// enough room. The value is written byte aligned. For more details on vlq: - /// en.wikipedia.org/wiki/Variable-length_quantity - bool PutVlqInt(uint64_t v); - - // Writes an int64 zigzag encoded. - bool PutZigZagVlqInt(int64_t v); - - /// Get a pointer to the next aligned byte and advance the underlying buffer - /// by num_bytes. - /// Returns NULL if there was not enough space. - uint8_t* GetNextBytePtr(int num_bytes = 1); - - /// Flushes all buffered values to the buffer. Call this when done writing to - /// the buffer. If 'align' is true, buffered_values_ is reset and any future - /// writes will be written to the next byte boundary. - void Flush(bool align = false); - - private: - uint8_t* buffer_; - int max_bytes_; - - /// Bit-packed values are initially written to this variable before being - /// memcpy'd to buffer_. This is faster than writing values byte by byte - /// directly to buffer_. - uint64_t buffered_values_; - - int byte_offset_; // Offset in buffer_ - int bit_offset_; // Offset in buffered_values_ -}; - -namespace detail { - -inline uint64_t ReadLittleEndianWord( - const uint8_t* buffer, - int bytes_remaining) { - uint64_t le_value = 0; - if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) { - memcpy(&le_value, buffer, 8); - } else { - memcpy(&le_value, buffer, bytes_remaining); - } - return ::arrow::bit_util::FromLittleEndian(le_value); -} - -} // namespace detail - -/// Utility class to read bit/byte stream. This class can read bits or bytes -/// that are either byte aligned or not. It also has utilities to read multiple -/// bytes in one read (e.g. encoded int). -class BitReader { - public: - BitReader() = default; - - /// 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'. - BitReader(const uint8_t* buffer, int buffer_len) : BitReader() { - Reset(buffer, buffer_len); - } - - void Reset(const uint8_t* buffer, int buffer_len) { - buffer_ = buffer; - max_bytes_ = buffer_len; - byte_offset_ = 0; - bit_offset_ = 0; - buffered_values_ = detail::ReadLittleEndianWord( - buffer_ + byte_offset_, max_bytes_ - byte_offset_); - } - - /// Gets the next value from the buffer. Returns true if 'v' could be read or - /// false if there are not enough bytes left. - template - bool GetValue(int num_bits, T* v); - - /// Get a number of values from the buffer. Return the number of values - /// actually read. - template - int GetBatch(int num_bits, T* v, int batch_size); - - /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T - /// needs to be a little-endian native type and big enough to store - /// 'num_bytes'. The value is assumed to be byte-aligned so the stream will - /// be advanced to the start of the next byte before 'v' is read. Returns - /// false if there are not enough bytes left. - /// Assume the v was stored in buffer_ as a little-endian format - template - bool GetAligned(int num_bytes, T* v); - - /// Advances the stream by a number of bits. Returns true if succeed or false - /// if there are not enough bits left. - bool Advance(int64_t num_bits); - - /// Reads a vlq encoded int from the stream. The encoded int must start at - /// the beginning of a byte. Return false if there were not enough bytes in - /// the buffer. - bool GetVlqInt(uint32_t* v); - - // Reads a zigzag encoded int `into` v. - bool GetZigZagVlqInt(int32_t* v); - - /// Reads a vlq encoded int64 from the stream. The encoded int must start at - /// the beginning of a byte. Return false if there were not enough bytes in - /// the buffer. - bool GetVlqInt(uint64_t* v); - - // Reads a zigzag encoded int64 `into` v. - bool GetZigZagVlqInt(int64_t* v); - - /// Returns the number of bytes left in the stream, not including the current - /// byte (i.e., there may be an additional fraction of a byte). - int bytes_left() const { - return max_bytes_ - - (byte_offset_ + - static_cast(::arrow::bit_util::BytesForBits(bit_offset_))); - } - - /// Maximum byte length of a vlq encoded int - static constexpr int kMaxVlqByteLength = 5; - - /// Maximum byte length of a vlq encoded int64 - static constexpr int kMaxVlqByteLengthForInt64 = 10; - - private: - const uint8_t* buffer_; - int max_bytes_; - - /// Bytes are memcpy'd from buffer_ and values are read from this variable. - /// This is faster than reading values byte by byte directly from buffer_. - uint64_t buffered_values_; - - int byte_offset_; // Offset in buffer_ - int bit_offset_; // Offset in buffered_values_ -}; - -inline bool BitWriter::PutValue(uint64_t v, int num_bits) { - DCHECK_LE(num_bits, 64); - if (num_bits < 64) { - DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits; - } - - if (ARROW_PREDICT_FALSE( - byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) - return false; - - buffered_values_ |= v << bit_offset_; - bit_offset_ += num_bits; - - if (ARROW_PREDICT_FALSE(bit_offset_ >= 64)) { - // Flush buffered_values_ and write out bits of v that did not fit - buffered_values_ = ::arrow::bit_util::ToLittleEndian(buffered_values_); - memcpy(buffer_ + byte_offset_, &buffered_values_, 8); - buffered_values_ = 0; - byte_offset_ += 8; - bit_offset_ -= 64; - buffered_values_ = - (num_bits - bit_offset_ == 64) ? 0 : (v >> (num_bits - bit_offset_)); - } - DCHECK_LT(bit_offset_, 64); - return true; -} - -inline void BitWriter::Flush(bool align) { - int num_bytes = - static_cast(::arrow::bit_util::BytesForBits(bit_offset_)); - DCHECK_LE(byte_offset_ + num_bytes, max_bytes_); - auto buffered_values = ::arrow::bit_util::ToLittleEndian(buffered_values_); - memcpy(buffer_ + byte_offset_, &buffered_values, num_bytes); - - if (align) { - buffered_values_ = 0; - byte_offset_ += num_bytes; - bit_offset_ = 0; - } -} - -inline uint8_t* BitWriter::GetNextBytePtr(int num_bytes) { - Flush(/* align */ true); - DCHECK_LE(byte_offset_, max_bytes_); - if (byte_offset_ + num_bytes > max_bytes_) - return NULL; - uint8_t* ptr = buffer_ + byte_offset_; - byte_offset_ += num_bytes; - return ptr; -} - -template -inline bool BitWriter::PutAligned(T val, int num_bytes) { - uint8_t* ptr = GetNextBytePtr(num_bytes); - if (ptr == NULL) - return false; - val = ::arrow::bit_util::ToLittleEndian(val); - memcpy(ptr, &val, num_bytes); - return true; -} - -namespace detail { - -template -inline void GetValue_( - int num_bits, - T* v, - int max_bytes, - const uint8_t* buffer, - int* bit_offset, - int* byte_offset, - uint64_t* buffered_values) { -#ifdef _MSC_VER -#pragma warning(push) -#pragma warning(disable : 4800) -#endif - *v = static_cast( - ::arrow::bit_util::TrailingBits( - *buffered_values, *bit_offset + num_bits) >> - *bit_offset); -#ifdef _MSC_VER -#pragma warning(pop) -#endif - *bit_offset += num_bits; - if (*bit_offset >= 64) { - *byte_offset += 8; - *bit_offset -= 64; - - *buffered_values = detail::ReadLittleEndianWord( - buffer + *byte_offset, max_bytes - *byte_offset); -#ifdef _MSC_VER -#pragma warning(push) -#pragma warning(disable : 4800 4805) -#endif - // Read bits of v that crossed into new buffered_values_ - if (ARROW_PREDICT_TRUE( - num_bits - *bit_offset < static_cast(8 * sizeof(T)))) { - // if shift exponent(num_bits - *bit_offset) is not less than sizeof(T), - // *v will not change and the following code may cause a runtime error - // that the shift exponent is too large - *v = *v | - static_cast( - ::arrow::bit_util::TrailingBits(*buffered_values, *bit_offset) - << (num_bits - *bit_offset)); - } -#ifdef _MSC_VER -#pragma warning(pop) -#endif - DCHECK_LE(*bit_offset, 64); - } -} - -} // namespace detail - -template -inline bool BitReader::GetValue(int num_bits, T* v) { - return GetBatch(num_bits, v, 1) == 1; -} - -template -inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) { - DCHECK(buffer_ != NULL); - DCHECK_LE(num_bits, static_cast(sizeof(T) * 8)) - << "num_bits: " << num_bits; - - int bit_offset = bit_offset_; - int byte_offset = byte_offset_; - uint64_t buffered_values = buffered_values_; - int max_bytes = max_bytes_; - const uint8_t* buffer = buffer_; - - const int64_t needed_bits = num_bits * static_cast(batch_size); - constexpr uint64_t kBitsPerByte = 8; - const int64_t remaining_bits = - static_cast(max_bytes - byte_offset) * kBitsPerByte - bit_offset; - if (remaining_bits < needed_bits) { - batch_size = static_cast(remaining_bits / num_bits); - } - - int i = 0; - if (ARROW_PREDICT_FALSE(bit_offset != 0)) { - for (; i < batch_size && bit_offset != 0; ++i) { - detail::GetValue_( - num_bits, - &v[i], - max_bytes, - buffer, - &bit_offset, - &byte_offset, - &buffered_values); - } - } - - if (sizeof(T) == 4) { - int num_unpacked = ::arrow::internal::unpack32( - reinterpret_cast(buffer + byte_offset), - reinterpret_cast(v + i), - batch_size - i, - num_bits); - i += num_unpacked; - byte_offset += num_unpacked * num_bits / 8; - } else if (sizeof(T) == 8 && num_bits > 32) { - // Use unpack64 only if num_bits is larger than 32 - // TODO (ARROW-13677): improve the performance of internal::unpack64 - // and remove the restriction of num_bits - int num_unpacked = ::arrow::internal::unpack64( - buffer + byte_offset, - reinterpret_cast(v + i), - batch_size - i, - num_bits); - i += num_unpacked; - byte_offset += num_unpacked * num_bits / 8; - } else { - // TODO: revisit this limit if necessary - DCHECK_LE(num_bits, 32); - const int buffer_size = 1024; - uint32_t unpack_buffer[buffer_size]; - while (i < batch_size) { - int unpack_size = std::min(buffer_size, batch_size - i); - int num_unpacked = ::arrow::internal::unpack32( - reinterpret_cast(buffer + byte_offset), - unpack_buffer, - unpack_size, - num_bits); - if (num_unpacked == 0) { - break; - } - for (int k = 0; k < num_unpacked; ++k) { -#ifdef _MSC_VER -#pragma warning(push) -#pragma warning(disable : 4800) -#endif - v[i + k] = static_cast(unpack_buffer[k]); -#ifdef _MSC_VER -#pragma warning(pop) -#endif - } - i += num_unpacked; - byte_offset += num_unpacked * num_bits / 8; - } - } - - buffered_values = detail::ReadLittleEndianWord( - buffer + byte_offset, max_bytes - byte_offset); - - for (; i < batch_size; ++i) { - detail::GetValue_( - num_bits, - &v[i], - max_bytes, - buffer, - &bit_offset, - &byte_offset, - &buffered_values); - } - - bit_offset_ = bit_offset; - byte_offset_ = byte_offset; - buffered_values_ = buffered_values; - - return batch_size; -} - -template -inline bool BitReader::GetAligned(int num_bytes, T* v) { - if (ARROW_PREDICT_FALSE(num_bytes > static_cast(sizeof(T)))) { - return false; - } - - int bytes_read = - static_cast(::arrow::bit_util::BytesForBits(bit_offset_)); - if (ARROW_PREDICT_FALSE(byte_offset_ + bytes_read + num_bytes > max_bytes_)) { - return false; - } - - // Advance byte_offset to next unread byte and read num_bytes - byte_offset_ += bytes_read; - if constexpr (std::is_same_v) { - // ARROW-18031: if we're trying to get an aligned bool, just check - // the LSB of the next byte and move on. If we memcpy + FromLittleEndian - // as usual, we have potential undefined behavior for bools if the value - // isn't 0 or 1 - *v = *(buffer_ + byte_offset_) & 1; - } else { - memcpy(v, buffer_ + byte_offset_, num_bytes); - *v = ::arrow::bit_util::FromLittleEndian(*v); - } - byte_offset_ += num_bytes; - - bit_offset_ = 0; - buffered_values_ = detail::ReadLittleEndianWord( - buffer_ + byte_offset_, max_bytes_ - byte_offset_); - return true; -} - -inline bool BitReader::Advance(int64_t num_bits) { - int64_t bits_required = bit_offset_ + num_bits; - int64_t bytes_required = ::arrow::bit_util::BytesForBits(bits_required); - if (ARROW_PREDICT_FALSE(bytes_required > max_bytes_ - byte_offset_)) { - return false; - } - byte_offset_ += static_cast(bits_required >> 3); - bit_offset_ = static_cast(bits_required & 7); - buffered_values_ = detail::ReadLittleEndianWord( - buffer_ + byte_offset_, max_bytes_ - byte_offset_); - return true; -} - -inline bool BitWriter::PutVlqInt(uint32_t v) { - bool result = true; - while ((v & 0xFFFFFF80UL) != 0UL) { - result &= PutAligned(static_cast((v & 0x7F) | 0x80), 1); - v >>= 7; - } - result &= PutAligned(static_cast(v & 0x7F), 1); - return result; -} - -inline bool BitReader::GetVlqInt(uint32_t* v) { - uint32_t tmp = 0; - - for (int i = 0; i < kMaxVlqByteLength; i++) { - uint8_t byte = 0; - if (ARROW_PREDICT_FALSE(!GetAligned(1, &byte))) { - return false; - } - tmp |= static_cast(byte & 0x7F) << (7 * i); - - if ((byte & 0x80) == 0) { - *v = tmp; - return true; - } - } - - return false; -} - -inline bool BitWriter::PutZigZagVlqInt(int32_t v) { - uint32_t u_v = ::arrow::util::SafeCopy(v); - u_v = (u_v << 1) ^ static_cast(v >> 31); - return PutVlqInt(u_v); -} - -inline bool BitReader::GetZigZagVlqInt(int32_t* v) { - uint32_t u; - if (!GetVlqInt(&u)) - return false; - u = (u >> 1) ^ (~(u & 1) + 1); - *v = ::arrow::util::SafeCopy(u); - return true; -} - -inline bool BitWriter::PutVlqInt(uint64_t v) { - bool result = true; - while ((v & 0xFFFFFFFFFFFFFF80ULL) != 0ULL) { - result &= PutAligned(static_cast((v & 0x7F) | 0x80), 1); - v >>= 7; - } - result &= PutAligned(static_cast(v & 0x7F), 1); - return result; -} - -inline bool BitReader::GetVlqInt(uint64_t* v) { - uint64_t tmp = 0; - - for (int i = 0; i < kMaxVlqByteLengthForInt64; i++) { - uint8_t byte = 0; - if (ARROW_PREDICT_FALSE(!GetAligned(1, &byte))) { - return false; - } - tmp |= static_cast(byte & 0x7F) << (7 * i); - - if ((byte & 0x80) == 0) { - *v = tmp; - return true; - } - } - - return false; -} - -inline bool BitWriter::PutZigZagVlqInt(int64_t v) { - uint64_t u_v = ::arrow::util::SafeCopy(v); - u_v = (u_v << 1) ^ static_cast(v >> 63); - return PutVlqInt(u_v); -} - -inline bool BitReader::GetZigZagVlqInt(int64_t* v) { - uint64_t u; - if (!GetVlqInt(&u)) - return false; - u = (u >> 1) ^ (~(u & 1) + 1); - *v = ::arrow::util::SafeCopy(u); - return true; -} - -} // namespace facebook::velox::parquet::arrow::bit_util