Skip to content

Commit

Permalink
refactor(parquet): Move arrow RleEncodingInternal to common
Browse files Browse the repository at this point in the history
  • Loading branch information
wypb committed Dec 10, 2024
1 parent 06e0896 commit 162171e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 38 deletions.
30 changes: 3 additions & 27 deletions velox/dwio/parquet/common/BitStreamUtilsInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@

#include "arrow/util/bit_util.h"
#include "arrow/util/bpacking.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/ubsan.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -232,7 +229,7 @@ inline bool BitWriter::PutValue(uint64_t v, int numBits) {

if (FOLLY_UNLIKELY(bitOffset_ >= 64)) {
// Flush bufferedValues_ and write out bits of v that did not fit
bufferedValues_ = ::arrow::bit_util::ToLittleEndian(bufferedValues_);
bufferedValues_ = folly::Endian::little(bufferedValues_);
memcpy(buffer_ + byteOffset_, &bufferedValues_, 8);
bufferedValues_ = 0;
byteOffset_ += 8;
Expand All @@ -247,7 +244,7 @@ inline bool BitWriter::PutValue(uint64_t v, int numBits) {
inline void BitWriter::Flush(bool align) {
int numBytes = static_cast<int>(::arrow::bit_util::BytesForBits(bitOffset_));
VELOX_DCHECK_LE(byteOffset_ + numBytes, maxBytes_);
auto bufferedValues = ::arrow::bit_util::ToLittleEndian(bufferedValues_);
auto bufferedValues = folly::Endian::little(bufferedValues_);
memcpy(buffer_ + byteOffset_, &bufferedValues, numBytes);

if (align) {
Expand All @@ -272,7 +269,7 @@ inline bool BitWriter::PutAligned(T val, int numBytes) {
uint8_t* ptr = GetNextBytePtr(numBytes);
if (ptr == NULL)
return false;
val = ::arrow::bit_util::ToLittleEndian(val);
val = folly::Endian::little(bufferedValues_);
memcpy(ptr, &val, numBytes);
return true;
}
Expand All @@ -288,27 +285,16 @@ inline void GetValue_(
int* bitOffset,
int* byteOffset,
uint64_t* bufferedValues) {
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4800)
#endif
*v = static_cast<T>(
::arrow::bit_util::TrailingBits(*bufferedValues, *bitOffset + numBits) >>
*bitOffset);
#ifdef _MSC_VER
#pragma warning(pop)
#endif
*bitOffset += numBits;
if (*bitOffset >= 64) {
*byteOffset += 8;
*bitOffset -= 64;

*bufferedValues = detail::ReadLittleEndianWord(
buffer + *byteOffset, maxBytes - *byteOffset);
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4800 4805)
#endif
// Read bits of v that crossed into new bufferedValues_
if (FOLLY_LIKELY(numBits - *bitOffset < static_cast<int>(8 * sizeof(T)))) {
// if shift exponent(numBits - *bitOffset) is not less than sizeof(T),
Expand All @@ -319,9 +305,6 @@ inline void GetValue_(
::arrow::bit_util::TrailingBits(*bufferedValues, *bitOffset)
<< (numBits - *bitOffset));
}
#ifdef _MSC_VER
#pragma warning(pop)
#endif
VELOX_DCHECK_LE(*bitOffset, 64);
}
}
Expand Down Expand Up @@ -402,14 +385,7 @@ inline int BitReader::GetBatch(int numBits, T* v, int batchSize) {
break;
}
for (int k = 0; k < numUnpacked; ++k) {
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4800)
#endif
v[i + k] = static_cast<T>(unpackBuffer[k]);
#ifdef _MSC_VER
#pragma warning(pop)
#endif
}
i += numUnpacked;
byteOffset += numUnpacked * numBits / 8;
Expand Down
21 changes: 10 additions & 11 deletions velox/dwio/parquet/common/RleEncodingInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@

#include "arrow/util/bit_block_counter.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/macros.h"

#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h"

namespace facebook::velox::parquet {
Expand Down Expand Up @@ -96,14 +95,14 @@ class RleDecoder {
repeatCount_(0),
literalCount_(0) {
VELOX_DCHECK_GE(bitWidth_, 0);
DCHECK_LE(bitWidth_, 64);
VELOX_DCHECK_LE(bitWidth_, 64);
}

RleDecoder() : bitWidth_(-1) {}

void Reset(const uint8_t* buffer, int bufferLen, int bitWidth) {
VELOX_DCHECK_GE(bitWidth, 0);
DCHECK_LE(bitWidth, 64);
VELOX_DCHECK_LE(bitWidth, 64);
bitReader_.Reset(buffer, bufferLen);
bitWidth_ = bitWidth;
currentValue_ = 0;
Expand Down Expand Up @@ -194,7 +193,7 @@ class RleEncoder {
RleEncoder(uint8_t* buffer, int bufferLen, int bitWidth)
: bitWidth_(bitWidth), bitWriter_(buffer, bufferLen) {
VELOX_DCHECK_GE(bitWidth_, 0);
DCHECK_LE(bitWidth_, 64);
VELOX_DCHECK_LE(bitWidth_, 64);
maxRunByteSize_ = MinBufferSize(bitWidth);
VELOX_DCHECK_GE(bufferLen, maxRunByteSize_, "Input buffer not big enough.");
Clear();
Expand All @@ -219,18 +218,18 @@ class RleEncoder {
// For a bitWidth > 1, the worst case is the repetition of "literal run of
// length 8 and then a repeated run of length 8". 8 values per smallest run,
// 8 bits per byte
int bytes_per_run = bitWidth;
int num_runs = static_cast<int>(::arrow::bit_util::CeilDiv(numValues, 8));
int literal_max_size = num_runs + num_runs * bytes_per_run;
int bytesPerRun = bitWidth;
int numRuns = static_cast<int>(::arrow::bit_util::CeilDiv(numValues, 8));
int literalMaxSize = numRuns + numRuns * bytesPerRun;

// In the very worst case scenario, the data is a concatenation of repeated
// runs of 8 values. Repeated run has a 1 byte varint followed by the
// bit-packed repeated value
int min_repeated_run_size =
int minRepeatedRunSize =
1 + static_cast<int>(::arrow::bit_util::BytesForBits(bitWidth));
int repeated_max_size = num_runs * min_repeated_run_size;
int repeatedMaxSize = numRuns * minRepeatedRunSize;

return std::max(literal_max_size, repeated_max_size);
return std::max(literalMaxSize, repeatedMaxSize);
}

/// Encode value. Returns true if the value fits in buffer, false otherwise.
Expand Down

0 comments on commit 162171e

Please sign in to comment.