Skip to content

Commit

Permalink
added in rle decoder for boolean
Browse files Browse the repository at this point in the history
Co-authored-by: Minhan Cao <[email protected]>
  • Loading branch information
jkhaliqi and minhancao committed Oct 30, 2024
1 parent df5cff5 commit 163bdb3
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 3 deletions.
12 changes: 12 additions & 0 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,16 @@ void PageReader::makeDecoder() {
break;
}
FMT_FALLTHROUGH;
case Encoding::RLE:
switch (parquetType) {
case thrift::Type::BOOLEAN:
rleBooleanDecoder_ = std::make_unique<RleBooleanDecoder>(
pageData_, pageData_ + encodedDataSize_, encodedDataSize_);
break;
default:
VELOX_UNSUPPORTED("RLE decoder only supports boolean");
}
break;
default:
VELOX_UNSUPPORTED("Encoding not supported yet: {}", encoding_);
}
Expand Down Expand Up @@ -748,6 +758,8 @@ void PageReader::skip(int64_t numRows) {
deltaBpDecoder_->skip(toSkip);
} else if (deltaByteArrDecoder_) {
deltaByteArrDecoder_->skip(toSkip);
} else if (rleBooleanDecoder_) {
rleBooleanDecoder_->skip(toSkip);
} else {
VELOX_FAIL("No decoder to skip");
}
Expand Down
18 changes: 16 additions & 2 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "velox/dwio/parquet/reader/DeltaBpDecoder.h"
#include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h"
#include "velox/dwio/parquet/reader/ParquetTypeWithId.h"
#include "velox/dwio/parquet/reader/RleBooleanDecoder.h"
#include "velox/dwio/parquet/reader/RleBpDataDecoder.h"
#include "velox/dwio/parquet/reader/StringDecoder.h"

Expand Down Expand Up @@ -339,9 +340,21 @@ class PageReader {
VELOX_CHECK(!isDictionary(), "BOOLEAN types are never dictionary-encoded");
if (nulls) {
nullsFromFastPath = false;
booleanDecoder_->readWithVisitor<true>(nulls, visitor);
switch (encoding_) {
case thrift::Encoding::RLE:
rleBooleanDecoder_->readWithVisitor<true>(nulls, visitor);
break;
default:
booleanDecoder_->readWithVisitor<true>(nulls, visitor);
}
} else {
booleanDecoder_->readWithVisitor<false>(nulls, visitor);
switch (encoding_) {
case thrift::Encoding::RLE:
rleBooleanDecoder_->readWithVisitor<false>(nulls, visitor);
break;
default:
booleanDecoder_->readWithVisitor<false>(nulls, visitor);
}
}
}

Expand Down Expand Up @@ -500,6 +513,7 @@ class PageReader {
std::unique_ptr<BooleanDecoder> booleanDecoder_;
std::unique_ptr<DeltaBpDecoder> deltaBpDecoder_;
std::unique_ptr<DeltaByteArrayDecoder> deltaByteArrDecoder_;
std::unique_ptr<RleBooleanDecoder> rleBooleanDecoder_;
// Add decoders for other encodings here.
};

Expand Down
124 changes: 124 additions & 0 deletions velox/dwio/parquet/reader/RleBooleanDecoder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/reader/RleBpDecoder.h"

namespace facebook::velox::parquet {

class RleBooleanDecoder : public facebook::velox::parquet::RleBpDecoder {
public:
using super = facebook::velox::parquet::RleBpDecoder;
RleBooleanDecoder(const char* start, const char* end, int32_t& len)
: super::RleBpDecoder{start + 4, end, 1} {
if (len < 4) {
VELOX_FAIL(
"Received invalid length : " + std::to_string(len) +
" (corrupt data page?)");
}
// num_bytes will be the first 4 bytes that tell us the length of encoded
// data
num_bytes =
::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs<uint32_t>(
reinterpret_cast<const uint8_t*>(start)));
if (num_bytes < 0 || num_bytes > static_cast<uint32_t>(len - 4)) {
VELOX_FAIL(
"Received invalid number of bytes : " + std::to_string(num_bytes) +
" (corrupt data page?)");
}
}

void skip(uint64_t numValues) {
skip<false>(numValues, 0, nullptr);
}

template <bool hasNulls>
inline void skip(int32_t numValues, int32_t current, const uint64_t* nulls) {
if (hasNulls) {
numValues = bits::countNonNulls(nulls, current, current + numValues);
}

super::skip(numValues);
}

template <bool hasNulls, typename Visitor>
void readWithVisitor(const uint64_t* nulls, Visitor visitor) {
int32_t current = visitor.start();

skip<hasNulls>(current, 0, nulls);
int32_t toSkip;
bool atEnd = false;
const bool allowNulls = hasNulls && visitor.allowNulls();
std::vector<uint64_t> outputBuffer(20);
bool* b = nullptr;
for (;;) {
if (hasNulls && allowNulls && bits::isBitNull(nulls, current)) {
toSkip = visitor.processNull(atEnd);
} else {
if (hasNulls && !allowNulls) {
toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd);
if (!Visitor::dense) {
skip<false>(toSkip, current, nullptr);
}
if (atEnd) {
return;
}
}

// We are at a non-null value on a row to visit.
std::vector<uint8_t> outputValues_;
outputValues_.resize(visitor.numRows(), 0);
uint8_t* output = outputValues_.data();
if (!remainingValues_) {
readHeader();
}
if (repeating_) {
toSkip = visitor.process(value_, atEnd);
} else {
value_ = readBitField();
toSkip = visitor.process(value_, atEnd);
}
--remainingValues_;
}
++current;
if (toSkip) {
skip<hasNulls>(toSkip, current, nulls);
current += toSkip;
}
if (atEnd) {
return;
}
}
}

private:
int64_t readBitField() {
auto value =
dwio::common::safeLoadBits(
super::bufferStart_, bitOffset_, bitWidth_, lastSafeWord_) &
bitMask_;
bitOffset_ += bitWidth_;
super::bufferStart_ += bitOffset_ >> 3;
bitOffset_ &= 7;
return value;
}

const char* bufferStart_;
uint32_t num_bytes = 0;
};

} // namespace facebook::velox::parquet
Binary file added velox/dwio/parquet/tests/examples/rleboolean.parquet
Binary file not shown.
12 changes: 12 additions & 0 deletions velox/dwio/parquet/tests/reader/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,16 @@ if(${VELOX_ENABLE_ARROW})
velox_link_libs
${TEST_LINK_LIBS})

add_executable(velox_dwio_parquet_boolean_decoder_test BooleanDecoderTest.cpp)
add_test(
NAME velox_dwio_parquet_boolean_decoder_test
COMMAND velox_dwio_parquet_boolean_decoder_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(
velox_dwio_parquet_boolean_decoder_test
velox_dwio_native_parquet_reader
arrow
velox_link_libs
${TEST_LINK_LIBS})

endif()
14 changes: 14 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,20 @@ TEST_F(E2EFilterTest, configurableWriteSchema) {
test(type, newType);
}

TEST_F(E2EFilterTest, booleanRle) {
options_.enableDictionary = false;
options_.encoding = facebook::velox::parquet::arrow::Encoding::RLE;
options_.parquetDataPageVersion = "V2";

testWithTypes(
"boolean_val:boolean,"
"boolean_null:boolean",
[&]() { makeAllNulls("boolean_null"); },
false,
{"boolean_val"},
20);
}

// Define main so that gflags get processed.
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
Expand Down
35 changes: 35 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,41 @@ TEST_F(ParquetTableScanTest, deltaByteArray) {
assertSelect({"a"}, "SELECT a from expected");
}

TEST_F(ParquetTableScanTest, rleBoolean) {
loadData(
getExampleFilePath("rleboolean.parquet"),
ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, {BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN()}),
makeRowVector(
{"c0", "c1", "c2", "c3", "c4", "c5"},
{
makeNullableFlatVector<bool>({true, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt}),
makeNullableFlatVector<bool>({false, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt}),
makeNullableFlatVector<bool>({std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt}),
makeFlatVector<bool>(std::vector<bool>{true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true}),
makeFlatVector<bool>(std::vector<bool>{false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false}),
makeNullableFlatVector<bool>({false, false, false, std::nullopt, std::nullopt, true, true, true, true, true, false, true, std::nullopt, false, true, std::nullopt, std::nullopt, false}),
}));
std::shared_ptr<connector::ColumnHandle> c0 = makeColumnHandle(
"c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c1 = makeColumnHandle(
"c1", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c2 = makeColumnHandle(
"c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c3 = makeColumnHandle(
"c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c4 = makeColumnHandle(
"c3", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c5 = makeColumnHandle(
"c4", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);

assertSelect({"c0"}, "SELECT c0 FROM tmp");
assertSelect({"c1"}, "SELECT c1 FROM tmp");
assertSelect({"c2"}, "SELECT c2 FROM tmp");
assertSelect({"c3"}, "SELECT c3 FROM tmp");
assertSelect({"c4"}, "SELECT c4 FROM tmp");
assertSelect({"c5"}, "SELECT c5 FROM tmp");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
properties = properties->codec_options(options.codecOptions);
properties = properties->enable_store_decimal_as_integer();
if (options.parquetDataPageVersion == "V2") {
properties = properties->data_page_version(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V2);
}
return properties->build();
}

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ struct WriterOptions : public dwio::common::WriterOptions {
/// Timestamp time zone for Parquet write through Arrow bridge.
std::optional<std::string> parquetWriteTimestampTimeZone;
bool writeInt96AsTimestamp = false;

std::optional<std::string> parquetDataPageVersion = std::nullopt;
// Parsing session and hive configs.

// This isn't a typo; session and hive connector config names are different
Expand Down

0 comments on commit 163bdb3

Please sign in to comment.