From 163bdb3d79db7e138a4493a31f9c92a2f6f3b56a Mon Sep 17 00:00:00 2001 From: Jacob Khaliqi Date: Mon, 9 Sep 2024 15:23:08 -0700 Subject: [PATCH] added in rle decoder for boolean Co-authored-by: Minhan Cao --- velox/dwio/parquet/reader/PageReader.cpp | 12 ++ velox/dwio/parquet/reader/PageReader.h | 18 ++- velox/dwio/parquet/reader/RleBooleanDecoder.h | 124 ++++++++++++++++++ .../parquet/tests/examples/rleboolean.parquet | Bin 0 -> 646 bytes .../dwio/parquet/tests/reader/CMakeLists.txt | 12 ++ .../parquet/tests/reader/E2EFilterTest.cpp | 14 ++ .../tests/reader/ParquetTableScanTest.cpp | 35 +++++ velox/dwio/parquet/writer/Writer.cpp | 4 + velox/dwio/parquet/writer/Writer.h | 2 +- 9 files changed, 218 insertions(+), 3 deletions(-) create mode 100644 velox/dwio/parquet/reader/RleBooleanDecoder.h create mode 100644 velox/dwio/parquet/tests/examples/rleboolean.parquet diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 7529bef7c2a51..fb43f70db6f63 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -706,6 +706,16 @@ void PageReader::makeDecoder() { break; } FMT_FALLTHROUGH; + case Encoding::RLE: + switch (parquetType) { + case thrift::Type::BOOLEAN: + rleBooleanDecoder_ = std::make_unique( + pageData_, pageData_ + encodedDataSize_, encodedDataSize_); + break; + default: + VELOX_UNSUPPORTED("RLE decoder only supports boolean"); + } + break; default: VELOX_UNSUPPORTED("Encoding not supported yet: {}", encoding_); } @@ -748,6 +758,8 @@ void PageReader::skip(int64_t numRows) { deltaBpDecoder_->skip(toSkip); } else if (deltaByteArrDecoder_) { deltaByteArrDecoder_->skip(toSkip); + } else if (rleBooleanDecoder_) { + rleBooleanDecoder_->skip(toSkip); } else { VELOX_FAIL("No decoder to skip"); } diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 94d107109b5f3..8485f21737fb6 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -25,6 +25,7 @@ #include "velox/dwio/parquet/reader/DeltaBpDecoder.h" #include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h" #include "velox/dwio/parquet/reader/ParquetTypeWithId.h" +#include "velox/dwio/parquet/reader/RleBooleanDecoder.h" #include "velox/dwio/parquet/reader/RleBpDataDecoder.h" #include "velox/dwio/parquet/reader/StringDecoder.h" @@ -339,9 +340,21 @@ class PageReader { VELOX_CHECK(!isDictionary(), "BOOLEAN types are never dictionary-encoded"); if (nulls) { nullsFromFastPath = false; - booleanDecoder_->readWithVisitor(nulls, visitor); + switch (encoding_) { + case thrift::Encoding::RLE: + rleBooleanDecoder_->readWithVisitor(nulls, visitor); + break; + default: + booleanDecoder_->readWithVisitor(nulls, visitor); + } } else { - booleanDecoder_->readWithVisitor(nulls, visitor); + switch (encoding_) { + case thrift::Encoding::RLE: + rleBooleanDecoder_->readWithVisitor(nulls, visitor); + break; + default: + booleanDecoder_->readWithVisitor(nulls, visitor); + } } } @@ -500,6 +513,7 @@ class PageReader { std::unique_ptr booleanDecoder_; std::unique_ptr deltaBpDecoder_; std::unique_ptr deltaByteArrDecoder_; + std::unique_ptr rleBooleanDecoder_; // Add decoders for other encodings here. }; diff --git a/velox/dwio/parquet/reader/RleBooleanDecoder.h b/velox/dwio/parquet/reader/RleBooleanDecoder.h new file mode 100644 index 0000000000000..d7a14010dc5b4 --- /dev/null +++ b/velox/dwio/parquet/reader/RleBooleanDecoder.h @@ -0,0 +1,124 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/parquet/reader/RleBpDecoder.h" + +namespace facebook::velox::parquet { + +class RleBooleanDecoder : public facebook::velox::parquet::RleBpDecoder { + public: + using super = facebook::velox::parquet::RleBpDecoder; + RleBooleanDecoder(const char* start, const char* end, int32_t& len) + : super::RleBpDecoder{start + 4, end, 1} { + if (len < 4) { + VELOX_FAIL( + "Received invalid length : " + std::to_string(len) + + " (corrupt data page?)"); + } + // num_bytes will be the first 4 bytes that tell us the length of encoded + // data + num_bytes = + ::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs( + reinterpret_cast(start))); + if (num_bytes < 0 || num_bytes > static_cast(len - 4)) { + VELOX_FAIL( + "Received invalid number of bytes : " + std::to_string(num_bytes) + + " (corrupt data page?)"); + } + } + + void skip(uint64_t numValues) { + skip(numValues, 0, nullptr); + } + + template + inline void skip(int32_t numValues, int32_t current, const uint64_t* nulls) { + if (hasNulls) { + numValues = bits::countNonNulls(nulls, current, current + numValues); + } + + super::skip(numValues); + } + + template + void readWithVisitor(const uint64_t* nulls, Visitor visitor) { + int32_t current = visitor.start(); + + skip(current, 0, nulls); + int32_t toSkip; + bool atEnd = false; + const bool allowNulls = hasNulls && visitor.allowNulls(); + std::vector outputBuffer(20); + bool* b = nullptr; + for (;;) { + if (hasNulls && allowNulls && bits::isBitNull(nulls, current)) { + toSkip = visitor.processNull(atEnd); + } else { + if (hasNulls && !allowNulls) { + toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd); + if (!Visitor::dense) { + skip(toSkip, current, nullptr); + } + if (atEnd) { + return; + } + } + + // We are at a non-null value on a row to visit. + std::vector outputValues_; + outputValues_.resize(visitor.numRows(), 0); + uint8_t* output = outputValues_.data(); + if (!remainingValues_) { + readHeader(); + } + if (repeating_) { + toSkip = visitor.process(value_, atEnd); + } else { + value_ = readBitField(); + toSkip = visitor.process(value_, atEnd); + } + --remainingValues_; + } + ++current; + if (toSkip) { + skip(toSkip, current, nulls); + current += toSkip; + } + if (atEnd) { + return; + } + } + } + + private: + int64_t readBitField() { + auto value = + dwio::common::safeLoadBits( + super::bufferStart_, bitOffset_, bitWidth_, lastSafeWord_) & + bitMask_; + bitOffset_ += bitWidth_; + super::bufferStart_ += bitOffset_ >> 3; + bitOffset_ &= 7; + return value; + } + + const char* bufferStart_; + uint32_t num_bytes = 0; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/examples/rleboolean.parquet b/velox/dwio/parquet/tests/examples/rleboolean.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3f2feef35cd4e33a143be142c9d74aadecd5cb1d GIT binary patch literal 646 zcmWG=3^EjD6O|Bkh!Is0RRUr*Q7%y?1_l)Y23AG}`ECvd2uMy!NJvOyVfdS@uz(FH zM1m#;hU{SV>rhR6qGlkQK#)z81#FWFvNb?k7#L*kyC||iED;j50;y*}R?mp)tbjxo zh8Y*1RzYnbN(Tc2-@`||s2<@0+ra^L6Z`YC%t*GR0iD%5ft8_mkM?eMki(cHYdm-h zic*V9^5ct>Gg5OCMR^!R8C01hn34@(3_}>h2*xmmF-%|#QwA{=Ng2r;H3k_`21!vi zNss}eEMh8RPGVVV95xb+j6fu&qyeUZRwFAkgexopDr5j6kU|iRrqBqi&_XOh?GB@j znF^Y$F<90{EJbY(6U;&etQMNURW4wL*$7mL#YR)O(^^;{W-^Fzfz1TDP3#%ahYSqv KK!*kZQy>5-EL5%l literal 0 HcmV?d00001 diff --git a/velox/dwio/parquet/tests/reader/CMakeLists.txt b/velox/dwio/parquet/tests/reader/CMakeLists.txt index b58429d73e933..393f9583ea7fa 100644 --- a/velox/dwio/parquet/tests/reader/CMakeLists.txt +++ b/velox/dwio/parquet/tests/reader/CMakeLists.txt @@ -107,4 +107,16 @@ if(${VELOX_ENABLE_ARROW}) velox_link_libs ${TEST_LINK_LIBS}) + add_executable(velox_dwio_parquet_boolean_decoder_test BooleanDecoderTest.cpp) + add_test( + NAME velox_dwio_parquet_boolean_decoder_test + COMMAND velox_dwio_parquet_boolean_decoder_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) + target_link_libraries( + velox_dwio_parquet_boolean_decoder_test + velox_dwio_native_parquet_reader + arrow + velox_link_libs + ${TEST_LINK_LIBS}) + endif() diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 85853ae350c0c..284f2e91887aa 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -730,6 +730,20 @@ TEST_F(E2EFilterTest, configurableWriteSchema) { test(type, newType); } +TEST_F(E2EFilterTest, booleanRle) { + options_.enableDictionary = false; + options_.encoding = facebook::velox::parquet::arrow::Encoding::RLE; + options_.parquetDataPageVersion = "V2"; + + testWithTypes( + "boolean_val:boolean," + "boolean_null:boolean", + [&]() { makeAllNulls("boolean_null"); }, + false, + {"boolean_val"}, + 20); +} + // Define main so that gflags get processed. int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 58f7bfdb80400..42ab4fed4fdbc 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -1147,6 +1147,41 @@ TEST_F(ParquetTableScanTest, deltaByteArray) { assertSelect({"a"}, "SELECT a from expected"); } +TEST_F(ParquetTableScanTest, rleBoolean) { + loadData( + getExampleFilePath("rleboolean.parquet"), + ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, {BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN()}), + makeRowVector( + {"c0", "c1", "c2", "c3", "c4", "c5"}, + { + makeNullableFlatVector({true, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt}), + makeNullableFlatVector({false, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt}), + makeNullableFlatVector({std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt}), + makeFlatVector(std::vector{true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true, true}), + makeFlatVector(std::vector{false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false}), + makeNullableFlatVector({false, false, false, std::nullopt, std::nullopt, true, true, true, true, true, false, true, std::nullopt, false, true, std::nullopt, std::nullopt, false}), + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c1 = makeColumnHandle( + "c1", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c2 = makeColumnHandle( + "c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c3 = makeColumnHandle( + "c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c4 = makeColumnHandle( + "c3", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c5 = makeColumnHandle( + "c4", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); + assertSelect({"c1"}, "SELECT c1 FROM tmp"); + assertSelect({"c2"}, "SELECT c2 FROM tmp"); + assertSelect({"c3"}, "SELECT c3 FROM tmp"); + assertSelect({"c4"}, "SELECT c4 FROM tmp"); + assertSelect({"c5"}, "SELECT c5 FROM tmp"); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::Init init{&argc, &argv, false}; diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index bce6919a7cc63..ec868975ac766 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -147,6 +147,10 @@ std::shared_ptr getArrowParquetWriterOptions( static_cast(flushPolicy->rowsInRowGroup())); properties = properties->codec_options(options.codecOptions); properties = properties->enable_store_decimal_as_integer(); + if (options.parquetDataPageVersion == "V2") { + properties = properties->data_page_version( + facebook::velox::parquet::arrow::ParquetDataPageVersion::V2); + } return properties->build(); } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 26969a59d52fb..7adc3ed8dd7ca 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -108,7 +108,7 @@ struct WriterOptions : public dwio::common::WriterOptions { /// Timestamp time zone for Parquet write through Arrow bridge. std::optional parquetWriteTimestampTimeZone; bool writeInt96AsTimestamp = false; - + std::optional parquetDataPageVersion = std::nullopt; // Parsing session and hive configs. // This isn't a typo; session and hive connector config names are different