Skip to content

Commit

Permalink
added in rle decoder for boolean
Browse files Browse the repository at this point in the history
Co-authored-by: Minhan Cao <[email protected]>
  • Loading branch information
jkhaliqi and minhancao committed Nov 23, 2024
1 parent 059337f commit 27ad3f1
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 3 deletions.
12 changes: 12 additions & 0 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,16 @@ void PageReader::makeDecoder() {
break;
}
FMT_FALLTHROUGH;
case Encoding::RLE:
switch (parquetType) {
case thrift::Type::BOOLEAN:
rleBooleanDecoder_ = std::make_unique<RleBooleanDecoder>(
pageData_, pageData_ + encodedDataSize_, encodedDataSize_);
break;
default:
VELOX_UNSUPPORTED("RLE decoder only supports boolean");
}
break;
default:
VELOX_UNSUPPORTED("Encoding not supported yet: {}", encoding_);
}
Expand Down Expand Up @@ -740,6 +750,8 @@ void PageReader::skip(int64_t numRows) {
deltaBpDecoder_->skip(toSkip);
} else if (deltaByteArrDecoder_) {
deltaByteArrDecoder_->skip(toSkip);
} else if (rleBooleanDecoder_) {
rleBooleanDecoder_->skip(toSkip);
} else {
VELOX_FAIL("No decoder to skip");
}
Expand Down
18 changes: 16 additions & 2 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "velox/dwio/parquet/reader/DeltaBpDecoder.h"
#include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h"
#include "velox/dwio/parquet/reader/ParquetTypeWithId.h"
#include "velox/dwio/parquet/reader/RleBooleanDecoder.h"
#include "velox/dwio/parquet/reader/RleBpDataDecoder.h"
#include "velox/dwio/parquet/reader/StringDecoder.h"
#include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h"
Expand Down Expand Up @@ -338,9 +339,21 @@ class PageReader {
VELOX_CHECK(!isDictionary(), "BOOLEAN types are never dictionary-encoded");
if (nulls) {
nullsFromFastPath = false;
booleanDecoder_->readWithVisitor<true>(nulls, visitor);
switch (encoding_) {
case thrift::Encoding::RLE:
rleBooleanDecoder_->readWithVisitor<true>(nulls, visitor);
break;
default:
booleanDecoder_->readWithVisitor<true>(nulls, visitor);
}
} else {
booleanDecoder_->readWithVisitor<false>(nulls, visitor);
switch (encoding_) {
case thrift::Encoding::RLE:
rleBooleanDecoder_->readWithVisitor<false>(nulls, visitor);
break;
default:
booleanDecoder_->readWithVisitor<false>(nulls, visitor);
}
}
}

Expand Down Expand Up @@ -501,6 +514,7 @@ class PageReader {
std::unique_ptr<BooleanDecoder> booleanDecoder_;
std::unique_ptr<DeltaBpDecoder> deltaBpDecoder_;
std::unique_ptr<DeltaByteArrayDecoder> deltaByteArrDecoder_;
std::unique_ptr<RleBooleanDecoder> rleBooleanDecoder_;
// Add decoders for other encodings here.
};

Expand Down
116 changes: 116 additions & 0 deletions velox/dwio/parquet/reader/RleBooleanDecoder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/reader/RleBpDecoder.h"

namespace facebook::velox::parquet {

class RleBooleanDecoder : public RleBpDecoder {
public:
static constexpr int32_t kLengthOffset = 4;
RleBooleanDecoder(const char* start, const char* end, int32_t len)
: RleBpDecoder{start + kLengthOffset, end, 1} {
if (len < kLengthOffset) {
VELOX_FAIL("Received invalid length : {} (corrupt data page?)", len);
}
numBytes_ =
::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs<uint32_t>(
reinterpret_cast<const uint8_t*>(start)));
if (numBytes_ > static_cast<uint32_t>(len - kLengthOffset)) {
VELOX_FAIL(
"Received invalid number of bytes : " + std::to_string(numBytes_) +
" (corrupt data page?)");
}
}

void skip(uint64_t numValues) {
skip<false>(numValues, 0, nullptr);
}

template <bool hasNulls>
inline void skip(int32_t numValues, int32_t current, const uint64_t* nulls) {
if constexpr (hasNulls) {
numValues = bits::countNonNulls(nulls, current, current + numValues);
}

RleBpDecoder::skip(numValues);
}

template <bool hasNulls, typename Visitor>
void readWithVisitor(const uint64_t* nulls, Visitor visitor) {
int32_t current = visitor.start();
skip<hasNulls>(current, 0, nulls);
int32_t toSkip = 0;
bool atEnd = false;
const bool allowNulls = hasNulls && visitor.allowNulls();
bool* b = nullptr;
for (;;) {
if (hasNulls && allowNulls && bits::isBitNull(nulls, current)) {
toSkip = visitor.processNull(atEnd);
} else {
if (hasNulls && !allowNulls) {
toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd);
if (!Visitor::dense) {
skip<false>(toSkip, current, nullptr);
}
if (atEnd) {
return;
}
}

// We are at a non-null value on a row to visit.
if (!remainingValues_) {
readHeader();
}
if (repeating_) {
toSkip = visitor.process(value_, atEnd);
} else {
value_ = readBitField();
toSkip = visitor.process(value_, atEnd);
}
--remainingValues_;
}
// Will increment the current by one and if value of toSkip > 0
// We will count the number of non nulls for the bpdecoding and skip
// that value count as well
++current;
if (toSkip) {
skip<hasNulls>(toSkip, current, nulls);
current += toSkip;
}
if (atEnd) {
return;
}
}
}

private:
int64_t readBitField() {
auto value = dwio::common::safeLoadBits(
bufferStart_, bitOffset_, bitWidth_, lastSafeWord_) &
bitMask_;
bitOffset_ += bitWidth_;
bufferStart_ += bitOffset_ >> 3;
bitOffset_ &= 7;
return value;
}

uint32_t numBytes_ = 0;
};

} // namespace facebook::velox::parquet
Binary file added velox/dwio/parquet/tests/examples/rleboolean.parquet
Binary file not shown.
14 changes: 14 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,20 @@ TEST_F(E2EFilterTest, configurableWriteSchema) {
test(type, newType);
}

TEST_F(E2EFilterTest, booleanRle) {
options_.enableDictionary = false;
options_.encoding = facebook::velox::parquet::arrow::Encoding::RLE;
options_.parquetDataPageVersion = "V2";

testWithTypes(
"boolean_val:boolean,"
"boolean_null:boolean",
[&]() { makeAllNulls("boolean_null"); },
false,
{"boolean_val"},
20);
}

// Define main so that gflags get processed.
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
Expand Down
144 changes: 144 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,150 @@ TEST_F(ParquetTableScanTest, deltaByteArray) {
assertSelect({"a"}, "SELECT a from expected");
}

TEST_F(ParquetTableScanTest, rleBoolean) {
loadData(
getExampleFilePath("rleboolean.parquet"),
ROW({"c0", "c1", "c2", "c3", "c4", "c5"},
{BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN()}),
makeRowVector(
{"c0", "c1", "c2", "c3", "c4", "c5"},
{
makeNullableFlatVector<bool>(
{true,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt}),
makeNullableFlatVector<bool>(
{false,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt}),
makeNullableFlatVector<bool>(
{std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt}),
makeFlatVector<bool>(std::vector<bool>{
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true}),
makeFlatVector<bool>(std::vector<bool>{
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false}),
makeNullableFlatVector<bool>(
{false,
false,
false,
std::nullopt,
std::nullopt,
true,
true,
true,
true,
true,
false,
true,
std::nullopt,
false,
true,
std::nullopt,
std::nullopt,
false}),
}));
std::shared_ptr<connector::ColumnHandle> c0 = makeColumnHandle(
"c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c1 = makeColumnHandle(
"c1", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c2 = makeColumnHandle(
"c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c3 = makeColumnHandle(
"c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c4 = makeColumnHandle(
"c3", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c5 = makeColumnHandle(
"c4", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);

assertSelect({"c0"}, "SELECT c0 FROM tmp");
assertSelect({"c1"}, "SELECT c1 FROM tmp");
assertSelect({"c2"}, "SELECT c2 FROM tmp");
assertSelect({"c3"}, "SELECT c3 FROM tmp");
assertSelect({"c4"}, "SELECT c4 FROM tmp");
assertSelect({"c5"}, "SELECT c5 FROM tmp");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
properties = properties->codec_options(options.codecOptions);
properties = properties->enable_store_decimal_as_integer();
if (options.parquetDataPageVersion == "V2") {
properties = properties->data_page_version(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V2);
}
return properties->build();
}

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ struct WriterOptions : public dwio::common::WriterOptions {
/// Timestamp time zone for Parquet write through Arrow bridge.
std::optional<std::string> parquetWriteTimestampTimeZone;
bool writeInt96AsTimestamp = false;

std::optional<std::string> parquetDataPageVersion = std::nullopt;
// Parsing session and hive configs.

// This isn't a typo; session and hive connector config names are different
Expand Down

0 comments on commit 27ad3f1

Please sign in to comment.