diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 7529bef7c2a51..fb43f70db6f63 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -706,6 +706,16 @@ void PageReader::makeDecoder() { break; } FMT_FALLTHROUGH; + case Encoding::RLE: + switch (parquetType) { + case thrift::Type::BOOLEAN: + rleBooleanDecoder_ = std::make_unique( + pageData_, pageData_ + encodedDataSize_, encodedDataSize_); + break; + default: + VELOX_UNSUPPORTED("RLE decoder only supports boolean"); + } + break; default: VELOX_UNSUPPORTED("Encoding not supported yet: {}", encoding_); } @@ -748,6 +758,8 @@ void PageReader::skip(int64_t numRows) { deltaBpDecoder_->skip(toSkip); } else if (deltaByteArrDecoder_) { deltaByteArrDecoder_->skip(toSkip); + } else if (rleBooleanDecoder_) { + rleBooleanDecoder_->skip(toSkip); } else { VELOX_FAIL("No decoder to skip"); } diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 94d107109b5f3..8485f21737fb6 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -25,6 +25,7 @@ #include "velox/dwio/parquet/reader/DeltaBpDecoder.h" #include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h" #include "velox/dwio/parquet/reader/ParquetTypeWithId.h" +#include "velox/dwio/parquet/reader/RleBooleanDecoder.h" #include "velox/dwio/parquet/reader/RleBpDataDecoder.h" #include "velox/dwio/parquet/reader/StringDecoder.h" @@ -339,9 +340,21 @@ class PageReader { VELOX_CHECK(!isDictionary(), "BOOLEAN types are never dictionary-encoded"); if (nulls) { nullsFromFastPath = false; - booleanDecoder_->readWithVisitor(nulls, visitor); + switch (encoding_) { + case thrift::Encoding::RLE: + rleBooleanDecoder_->readWithVisitor(nulls, visitor); + break; + default: + booleanDecoder_->readWithVisitor(nulls, visitor); + } } else { - booleanDecoder_->readWithVisitor(nulls, visitor); + switch (encoding_) { + case thrift::Encoding::RLE: + rleBooleanDecoder_->readWithVisitor(nulls, visitor); + break; + default: + booleanDecoder_->readWithVisitor(nulls, visitor); + } } } @@ -500,6 +513,7 @@ class PageReader { std::unique_ptr booleanDecoder_; std::unique_ptr deltaBpDecoder_; std::unique_ptr deltaByteArrDecoder_; + std::unique_ptr rleBooleanDecoder_; // Add decoders for other encodings here. }; diff --git a/velox/dwio/parquet/reader/RleBooleanDecoder.h b/velox/dwio/parquet/reader/RleBooleanDecoder.h new file mode 100644 index 0000000000000..d7a14010dc5b4 --- /dev/null +++ b/velox/dwio/parquet/reader/RleBooleanDecoder.h @@ -0,0 +1,124 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/parquet/reader/RleBpDecoder.h" + +namespace facebook::velox::parquet { + +class RleBooleanDecoder : public facebook::velox::parquet::RleBpDecoder { + public: + using super = facebook::velox::parquet::RleBpDecoder; + RleBooleanDecoder(const char* start, const char* end, int32_t& len) + : super::RleBpDecoder{start + 4, end, 1} { + if (len < 4) { + VELOX_FAIL( + "Received invalid length : " + std::to_string(len) + + " (corrupt data page?)"); + } + // num_bytes will be the first 4 bytes that tell us the length of encoded + // data + num_bytes = + ::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs( + reinterpret_cast(start))); + if (num_bytes < 0 || num_bytes > static_cast(len - 4)) { + VELOX_FAIL( + "Received invalid number of bytes : " + std::to_string(num_bytes) + + " (corrupt data page?)"); + } + } + + void skip(uint64_t numValues) { + skip(numValues, 0, nullptr); + } + + template + inline void skip(int32_t numValues, int32_t current, const uint64_t* nulls) { + if (hasNulls) { + numValues = bits::countNonNulls(nulls, current, current + numValues); + } + + super::skip(numValues); + } + + template + void readWithVisitor(const uint64_t* nulls, Visitor visitor) { + int32_t current = visitor.start(); + + skip(current, 0, nulls); + int32_t toSkip; + bool atEnd = false; + const bool allowNulls = hasNulls && visitor.allowNulls(); + std::vector outputBuffer(20); + bool* b = nullptr; + for (;;) { + if (hasNulls && allowNulls && bits::isBitNull(nulls, current)) { + toSkip = visitor.processNull(atEnd); + } else { + if (hasNulls && !allowNulls) { + toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd); + if (!Visitor::dense) { + skip(toSkip, current, nullptr); + } + if (atEnd) { + return; + } + } + + // We are at a non-null value on a row to visit. + std::vector outputValues_; + outputValues_.resize(visitor.numRows(), 0); + uint8_t* output = outputValues_.data(); + if (!remainingValues_) { + readHeader(); + } + if (repeating_) { + toSkip = visitor.process(value_, atEnd); + } else { + value_ = readBitField(); + toSkip = visitor.process(value_, atEnd); + } + --remainingValues_; + } + ++current; + if (toSkip) { + skip(toSkip, current, nulls); + current += toSkip; + } + if (atEnd) { + return; + } + } + } + + private: + int64_t readBitField() { + auto value = + dwio::common::safeLoadBits( + super::bufferStart_, bitOffset_, bitWidth_, lastSafeWord_) & + bitMask_; + bitOffset_ += bitWidth_; + super::bufferStart_ += bitOffset_ >> 3; + bitOffset_ &= 7; + return value; + } + + const char* bufferStart_; + uint32_t num_bytes = 0; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/examples/rleboolean.parquet b/velox/dwio/parquet/tests/examples/rleboolean.parquet new file mode 100644 index 0000000000000..3f2feef35cd4e Binary files /dev/null and b/velox/dwio/parquet/tests/examples/rleboolean.parquet differ diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 85853ae350c0c..284f2e91887aa 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -730,6 +730,20 @@ TEST_F(E2EFilterTest, configurableWriteSchema) { test(type, newType); } +TEST_F(E2EFilterTest, booleanRle) { + options_.enableDictionary = false; + options_.encoding = facebook::velox::parquet::arrow::Encoding::RLE; + options_.parquetDataPageVersion = "V2"; + + testWithTypes( + "boolean_val:boolean," + "boolean_null:boolean", + [&]() { makeAllNulls("boolean_null"); }, + false, + {"boolean_val"}, + 20); +} + // Define main so that gflags get processed. int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 58f7bfdb80400..bd1de187171da 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -1147,6 +1147,150 @@ TEST_F(ParquetTableScanTest, deltaByteArray) { assertSelect({"a"}, "SELECT a from expected"); } +TEST_F(ParquetTableScanTest, rleBoolean) { + loadData( + getExampleFilePath("rleboolean.parquet"), + ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, + {BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN()}), + makeRowVector( + {"c0", "c1", "c2", "c3", "c4", "c5"}, + { + makeNullableFlatVector( + {true, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt}), + makeNullableFlatVector( + {false, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt}), + makeNullableFlatVector( + {std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt}), + makeFlatVector(std::vector{ + true, + true, + true, + true, + true, + true, + true, + true, + true, + true, + true, + true, + true, + true, + true, + true, + true, + true}), + makeFlatVector(std::vector{ + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false}), + makeNullableFlatVector( + {false, + false, + false, + std::nullopt, + std::nullopt, + true, + true, + true, + true, + true, + false, + true, + std::nullopt, + false, + true, + std::nullopt, + std::nullopt, + false}), + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c1 = makeColumnHandle( + "c1", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c2 = makeColumnHandle( + "c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c3 = makeColumnHandle( + "c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c4 = makeColumnHandle( + "c3", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c5 = makeColumnHandle( + "c4", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); + assertSelect({"c1"}, "SELECT c1 FROM tmp"); + assertSelect({"c2"}, "SELECT c2 FROM tmp"); + assertSelect({"c3"}, "SELECT c3 FROM tmp"); + assertSelect({"c4"}, "SELECT c4 FROM tmp"); + assertSelect({"c5"}, "SELECT c5 FROM tmp"); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::Init init{&argc, &argv, false}; diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index bce6919a7cc63..ec868975ac766 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -147,6 +147,10 @@ std::shared_ptr getArrowParquetWriterOptions( static_cast(flushPolicy->rowsInRowGroup())); properties = properties->codec_options(options.codecOptions); properties = properties->enable_store_decimal_as_integer(); + if (options.parquetDataPageVersion == "V2") { + properties = properties->data_page_version( + facebook::velox::parquet::arrow::ParquetDataPageVersion::V2); + } return properties->build(); } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 26969a59d52fb..7adc3ed8dd7ca 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -108,7 +108,7 @@ struct WriterOptions : public dwio::common::WriterOptions { /// Timestamp time zone for Parquet write through Arrow bridge. std::optional parquetWriteTimestampTimeZone; bool writeInt96AsTimestamp = false; - + std::optional parquetDataPageVersion = std::nullopt; // Parsing session and hive configs. // This isn't a typo; session and hive connector config names are different