diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 7529bef7c2a51..eaf2a6ff80bc5 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -308,6 +308,7 @@ void PageReader::prepareDataPageV2(const PageHeader& pageHeader, int64_t row) { } encodedDataSize_ = pageHeader.uncompressed_page_size - levelsSize; + numNulls_ = pageHeader.data_page_header_v2.num_nulls; encoding_ = pageHeader.data_page_header_v2.encoding; if (numRowsInPage_ == kRowsUnknown) { readPageDefLevels(); @@ -706,6 +707,15 @@ void PageReader::makeDecoder() { break; } FMT_FALLTHROUGH; + case Encoding::RLE: + switch (parquetType) { + case thrift::Type::BOOLEAN: + rleBooleanDecoder_ = std::make_unique(pageData_, pageData_ + encodedDataSize_, decompressedData_, repetitionLevels_.data(), encodedDataSize_, numNulls_); + break; + default: + VELOX_UNSUPPORTED("RLE decoder only supports boolean"); + } + break; default: VELOX_UNSUPPORTED("Encoding not supported yet: {}", encoding_); } @@ -748,6 +758,8 @@ void PageReader::skip(int64_t numRows) { deltaBpDecoder_->skip(toSkip); } else if (deltaByteArrDecoder_) { deltaByteArrDecoder_->skip(toSkip); + } else if (rleBooleanDecoder_) { + rleBooleanDecoder_->skip(toSkip); } else { VELOX_FAIL("No decoder to skip"); } diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 94d107109b5f3..005ea222cf28f 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -25,6 +25,7 @@ #include "velox/dwio/parquet/reader/DeltaBpDecoder.h" #include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h" #include "velox/dwio/parquet/reader/ParquetTypeWithId.h" +#include "velox/dwio/parquet/reader/RleBooleanDecoder.h" #include "velox/dwio/parquet/reader/RleBpDataDecoder.h" #include "velox/dwio/parquet/reader/StringDecoder.h" @@ -339,9 +340,23 @@ class PageReader { VELOX_CHECK(!isDictionary(), "BOOLEAN types are never dictionary-encoded"); if (nulls) { nullsFromFastPath = false; - booleanDecoder_->readWithVisitor(nulls, visitor); + switch (encoding_) + { + case thrift::Encoding::RLE: + rleBooleanDecoder_->readWithVisitor(nulls, visitor); + break; + default: + booleanDecoder_->readWithVisitor(nulls, visitor); + } } else { - booleanDecoder_->readWithVisitor(nulls, visitor); + switch (encoding_) + { + case thrift::Encoding::RLE: + rleBooleanDecoder_->readWithVisitor(nulls, visitor); + break; + default: + booleanDecoder_->readWithVisitor(nulls, visitor); + } } } @@ -449,6 +464,7 @@ class PageReader { // Number of bytes starting at pageData_ for current encoded data. int32_t encodedDataSize_{0}; + int32_t numNulls_{0}; // Below members Keep state between calls to readWithVisitor(). // Original rows in Visitor. @@ -500,6 +516,7 @@ class PageReader { std::unique_ptr booleanDecoder_; std::unique_ptr deltaBpDecoder_; std::unique_ptr deltaByteArrDecoder_; + std::unique_ptr rleBooleanDecoder_; // Add decoders for other encodings here. }; diff --git a/velox/dwio/parquet/reader/RleBooleanDecoder.h b/velox/dwio/parquet/reader/RleBooleanDecoder.h new file mode 100644 index 0000000000000..af3faefd835c4 --- /dev/null +++ b/velox/dwio/parquet/reader/RleBooleanDecoder.h @@ -0,0 +1,321 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/parquet/reader/RleBpDecoder.h" +#include "arrow/util/rle_encoding.h" // need for arrow decoder stuff + +namespace facebook::velox::parquet { + +class RleBooleanDecoder : public facebook::velox::parquet::RleBpDecoder { + public: + using super = facebook::velox::parquet::RleBpDecoder; + RleBooleanDecoder(const char* start, const char* end, BufferPtr decompressedData_, const short* repetitionLevels_, int32_t& len, int32_t& numNulls) + : super::RleBpDecoder{start+4, end, 1} { + // const uint8_t* data = decompressedData_->asMutable(); + // int64_t size = decompressedData_->size(); + + // Print each byte in the buffer + // for (int64_t i = 0; i < size; ++i) { + // std::cout << "Byte " << i << ": " << static_cast(data[i]) << std::endl; + + // // iterate through bits + // for (int bit = 0; bit < 8; ++bit) { + // bool value = (data[i] & (1 << bit)) != 0; + // std::cout << "Bit " << (i * 8 + bit) << ": " << (value ? "true" : "false") << " "; + // } + // std::cout << std::endl; + // } + // const uint8_t* rleData = reinterpret_cast(bufferStart_); + // You would need the number of bytes in the RLE encoded data. + //size_t numBytes = end - start; // Adjust this based on actual data. + // You might need to calculate the bit width as needed for booleans. + int bitWidth = 1; // For boolean values. + + // ARROW IMPLEMNTATION/SETUP BELOW SET DATA + // length is the minimum length of the whole data including the first 4 so we know it should be 4 to be valid + if (len < 4) { + VELOX_FAIL("Received invalid length : " + std::to_string(len) + + " (corrupt data page?)"); + } + // num_bytes will be the first 4 bytes that tell us the length of encoded data + num_bytes = ::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs(reinterpret_cast(start))); + if (num_bytes < 0 || num_bytes > static_cast(len - 4)) { + VELOX_FAIL("Received invalid number of bytes : " + + std::to_string(num_bytes) + " (corrupt data page?)"); + } + + nullCount = numNulls; + + start += 4; + + decoder_ = std::make_shared<::arrow::util::RleDecoder>(reinterpret_cast(start), num_bytes, 1); + // std::vector outputValues_; + // outputValues_.resize(numValues_, 0); + // uint8_t* output = outputValues_.data(); + // decoder_->GetBatch(output, numValues_); + + // ARROW IMPLEMENTATION COMPLETE USING ARROW METHODS FOR DECODING... + + // start should be the value at byte 4... + bool rleOrPacked = (start[0] & 1) == 0 ? true : false; + if (rleOrPacked) { + // rle case + currentCount_ = static_cast(start[0]) >> 1; + //currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in, bitWidth); + currentValue_ = start[1] & 1; + } + else { + // bit packed case + uint8_t numGroups = static_cast(start[0]) >> 1; + currentCount_ = numGroups * 8; + //LOG.debug("reading {} values BIT PACKED", currentCount); + //currentBuffer = new int[currentCount]; // TODO: reuse a buffer + std::vector currentBuffer(currentCount_); + //byte[] bytes = new byte[numGroups * bitWidth]; + std::vector bytes(numGroups * bitWidth); + // At the end of the file RLE data though, there might not be that many bytes left. + // bytesToRead = Math.min(bytesToRead, in.available()); + int bytesToRead = static_cast(std::ceil(currentCount_ * bitWidth / 8.0)); + // new DataInputStream(in).readFully(bytes, 0, bytesToRead); + //readFully(bytes, 0, bytesToRead, repetitionLevels_); + for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount_; valueIndex += 8, byteIndex += bitWidth) { + // packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex); + unpack8Values(bytes, byteIndex, currentBuffer, valueIndex); + } + } + } + + void skip(uint64_t numValues) { + skip(numValues, 0, nullptr); + } + + template + inline void skip(int32_t numValues, int32_t current, const uint64_t* nulls) { + if (hasNulls) { + numValues = bits::countNonNulls(nulls, current, current + numValues); + } + + super::skip(numValues); + } + + template + void readWithVisitor(const uint64_t* nulls, Visitor visitor) { + int32_t current = visitor.start(); + + skip(current, 0, nulls); + int32_t toSkip; + bool atEnd = false; + const bool allowNulls = hasNulls && visitor.allowNulls(); + std::vector outputBuffer(20); + bool* b = nullptr; + for (;;) { + // comment this out and the ending else if we want to try out + // the arrow decoder approach with the null count placing as null + if (hasNulls && allowNulls && bits::isBitNull(nulls, current)) { + toSkip = visitor.processNull(atEnd); + } else { + if (hasNulls && !allowNulls) { + toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd); + if (!Visitor::dense) { + skip(toSkip, current, nullptr); + } + if (atEnd) { + return; + } + } + + // We are at a non-null value on a row to visit. + // if (nullCount == visitor.numRows()) { + // for (int i = 0; i < nullCount; i++) { + // visitor.processNull(atEnd); + // } + // return; + // } + + std::vector outputValues_; + outputValues_.resize(visitor.numRows(), 0); + std::cout<<"numRows: " << visitor.numRows()<GetBatch(output, visitor.numRows()); + // std::cout<<"cout decoded:" << count << "num nulls " < 0 && output[i] == 0) { + // visitor.processNull(atEnd); + // nullCount--; + // } + // else { + // toSkip = visitor.process(output[i], atEnd); + // } + // //toSkip = visitor.process(output[i], atEnd); + // if (toSkip) { + // skip(toSkip, i, nulls); + // i += toSkip; + // } + // } + // if (atEnd) { + // return; + // } + if (!remainingValues_) { + readHeader(); + } + if (repeating_) { + toSkip = visitor.process(value_, atEnd); + } else { + value_ = readBitField(); + toSkip = visitor.process(value_, atEnd); + } + if (toSkip) { + skip(toSkip, current, nulls); + current += toSkip; + } + if(remainingValues_ == 0){ + return; + } + --remainingValues_; + } + ++current; + if (toSkip) { + skip(toSkip, current, nulls); + current += toSkip; + } + if (atEnd) { + return; + } + } + } + + private: + + int64_t readBitField() { + auto value = + dwio::common::safeLoadBits( + super::bufferStart_, bitOffset_, bitWidth_, lastSafeWord_) & + bitMask_; + bitOffset_ += bitWidth_; + super::bufferStart_ += bitOffset_ >> 3; + bitOffset_ &= 7; + return value; + } + bool readBoolean() { + + std::vector outputValues_; + outputValues_.resize(numValues_, 0); + uint8_t* output = outputValues_.data(); + decoder_->GetBatch(output, numValues_); + return true; + + // if (remainingBits_ == 0) { + // remainingBits_ = 7; + // reversedLastByte_ = *reinterpret_cast(bufferStart_); + // bufferStart_++; + // return reversedLastByte_ & 0x1; + // } else { + // return reversedLastByte_ & (1 << (8 - (remainingBits_--))); + // } + } + + // void decode() { + // if (bufferStart_ >= bufferEnd_) { + // remainingCount_ = 0; // No more runs + // return; + // } + + // // Read the run length (first byte) + // uint8_t runLength = *bufferStart_++; + // std::cout << "Read runLength: " << static_cast(runLength) << std::endl; + + + // // Read the boolean value (next byte) + // if (bufferStart_ >= bufferEnd_) { + // remainingCount_ = 0; // Not enough data for value + // return; + // } + + // currentValue_ = (*bufferStart_++ != 0); // 0 for false, non-zero for true + // std::cout << "Read currentValue: " << currentValue_ << std::endl; + // remainingCount_ = runLength; // Set the remaining count + // } + + void readFully(const std::vector& b, int off, int len, const uint8_t* repetitionLevels_) { + if (len < 0) { + VELOX_FAIL("Index Out of bounds exception"); + } + int n = 0; + while (n < len) { + // int count = in.read(b, off + n, len - n); + int count = repetitionLevels_[0]; + if (count < 0) { + VELOX_FAIL("End of file Exception"); + } + n += count; + } + } + + void unpack8Values(const std::vector& in, int inPos, std::vector& out, int outPos) { + out[ 0 + outPos] = + // [_______0] + // [0] + (static_cast(in[0 + inPos]) & 1); + out[ 1 + outPos] = + // [______1_] + // [0] + (static_cast(in[0 + inPos] >> 1) & 1); + out[ 2 + outPos] = + // [_____2__] + // [0] + (static_cast(in[0 + inPos] >> 2) & 1); + out[ 3 + outPos] = + // [____3___] + // [0] + (static_cast(in[0 + inPos] >> 3) & 1); + out[ 4 + outPos] = + // [___4____] + // [0] + (static_cast(in[0 + inPos] >> 4) & 1); + out[ 5 + outPos] = + // [__5_____] + // [0] + (static_cast(in[0 + inPos] >> 5) & 1); + out[ 6 + outPos] = + // [_6______] + // [0] + (static_cast(in[0 + inPos] >> 6) & 1); + out[ 7 + outPos] = + // [7_______] + // [0] + (static_cast(in[0 + inPos] >> 7) & 1); + } + + size_t remainingCount_{0}; + bool currentValue_{true}; + const char* bufferStart_; + const char* bufferEnd_; + int currentCount_ = 0; + uint32_t num_bytes = 0; + uint32_t numValues_ = 1024; + int32_t nullCount = 0; + // int32_t num_values_; + std::shared_ptr<::arrow::util::RleDecoder> decoder_; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/examples/bool.parquet b/velox/dwio/parquet/tests/examples/bool.parquet new file mode 100644 index 0000000000000..e1b5b6ec11c1d Binary files /dev/null and b/velox/dwio/parquet/tests/examples/bool.parquet differ diff --git a/velox/dwio/parquet/tests/examples/false.parquet b/velox/dwio/parquet/tests/examples/false.parquet new file mode 100644 index 0000000000000..a6f7cd4b4bce2 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/false.parquet differ diff --git a/velox/dwio/parquet/tests/examples/moreBool.parquet b/velox/dwio/parquet/tests/examples/moreBool.parquet new file mode 100644 index 0000000000000..62936bff16eb1 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/moreBool.parquet differ diff --git a/velox/dwio/parquet/tests/examples/plainBool.parquet b/velox/dwio/parquet/tests/examples/plainBool.parquet new file mode 100644 index 0000000000000..d0f7c3bb23619 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/plainBool.parquet differ diff --git a/velox/dwio/parquet/tests/examples/sixtrue.parquet b/velox/dwio/parquet/tests/examples/sixtrue.parquet new file mode 100644 index 0000000000000..10729729ad401 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/sixtrue.parquet differ diff --git a/velox/dwio/parquet/tests/examples/thirteenfalse.parquet b/velox/dwio/parquet/tests/examples/thirteenfalse.parquet new file mode 100644 index 0000000000000..3747ae6164f2b Binary files /dev/null and b/velox/dwio/parquet/tests/examples/thirteenfalse.parquet differ diff --git a/velox/dwio/parquet/tests/examples/true.parquet b/velox/dwio/parquet/tests/examples/true.parquet new file mode 100644 index 0000000000000..348f74b949835 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/true.parquet differ diff --git a/velox/dwio/parquet/tests/examples/twelvetruesout.parquet b/velox/dwio/parquet/tests/examples/twelvetruesout.parquet new file mode 100644 index 0000000000000..fc7478cb87814 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/twelvetruesout.parquet differ diff --git a/velox/dwio/parquet/tests/reader/BooleanDecoderTest.cpp b/velox/dwio/parquet/tests/reader/BooleanDecoderTest.cpp new file mode 100644 index 0000000000000..1badaa2dd11d0 --- /dev/null +++ b/velox/dwio/parquet/tests/reader/BooleanDecoderTest.cpp @@ -0,0 +1,125 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/BitPackDecoder.h" + +#include // @manual +#include + +#include +#include +#include +#include + +using namespace facebook::velox; +using namespace facebook::velox::dwio::common; + + +template +class BooleanDecoderTest { + public: + BooleanDecoderTest() { + inputValues_.resize(numValues_, 0); + outputValues_.resize(numValues_, 0); + encodedValues_.resize(numValues_ * 4, 0); + } + + BooleanDecoderTest(uint32_t numValues) : numValues_(numValues) { + VELOX_CHECK(numValues % 8 == 0); + + inputValues_.resize(numValues_, 0); + outputValues_.resize(numValues_, 0); + encodedValues_.resize(numValues_ * 4, 0); + } + + void testDecodeSuppliedData(std::vector inputValues, uint8_t bitWidth) { + numValues_ = inputValues.size(); + inputValues_ = inputValues; + bitWidth_ = bitWidth; + + encodeInputValues(); + testDecode(); + } + + private: + void testDecode() { + T* output = outputValues_.data(); + arrow::util::RleDecoder arrowDecoder( + reinterpret_cast(encodedValues_.data()), + bytes(bitWidth_), + bitWidth_); + int numOfDecodedElements = arrowDecoder.GetBatch(output, numValues_); + T* expectedOutput = inputValues_.data(); + for (int i = 0; i < numValues_; i++) { + ASSERT_EQ(output[i], expectedOutput[i]); + } + } + + void encodeInputValues() { + arrow::util::RleEncoder arrowEncoder( + reinterpret_cast(encodedValues_.data()), + bytes(bitWidth_), + bitWidth_); + for (auto i = 0; i < numValues_; i++) { + arrowEncoder.Put(inputValues_[i]); + } + arrowEncoder.Flush(); + } + + uint32_t bytes(uint8_t bitWidth) { + return (numValues_/(64/bitWidth_))*8 + bitWidth_ + arrow::util::RleEncoder::MinBufferSize(bitWidth_); + } + + // multiple of 8 + uint32_t numValues_ = 1024; + std::vector inputValues_; + std::vector outputValues_; + std::vector encodedValues_; + uint8_t bitWidth_; +}; + +TEST(BooleanDecoderTest, allOnes) { + std::vector allOnesVector(1024, 1); + BooleanDecoderTest test; + test.testDecodeSuppliedData(allOnesVector, 1); +} + +TEST(BooleanDecoderTest, allZeros) { + std::vector allZerosVector(1024, 0); + BooleanDecoderTest test; + test.testDecodeSuppliedData(allZerosVector, 1); +} + +TEST(BooleanDecoderTest, withNulls) { + // 0, 1, and 2 represent false, true, and null respectively + std::vector zeroOneNullsVector(520); + for(int i = 0; i < zeroOneNullsVector.size(); i++) { + zeroOneNullsVector[i] = std::rand() % 2; + } + zeroOneNullsVector[0] = 2; + zeroOneNullsVector[20] = 2; + BooleanDecoderTest test; + test.testDecodeSuppliedData(zeroOneNullsVector, 2); +} + +TEST(BooleanDecoderTest, zeroAndOnes) { + std::vector zeroOneVector(520); + for(int i = 0; i < zeroOneVector.size(); i++) { + zeroOneVector[i] = std::rand() % 2; + } + BooleanDecoderTest test; + test.testDecodeSuppliedData(zeroOneVector, 1); +} diff --git a/velox/dwio/parquet/tests/reader/CMakeLists.txt b/velox/dwio/parquet/tests/reader/CMakeLists.txt index b58429d73e933..393f9583ea7fa 100644 --- a/velox/dwio/parquet/tests/reader/CMakeLists.txt +++ b/velox/dwio/parquet/tests/reader/CMakeLists.txt @@ -107,4 +107,16 @@ if(${VELOX_ENABLE_ARROW}) velox_link_libs ${TEST_LINK_LIBS}) + add_executable(velox_dwio_parquet_boolean_decoder_test BooleanDecoderTest.cpp) + add_test( + NAME velox_dwio_parquet_boolean_decoder_test + COMMAND velox_dwio_parquet_boolean_decoder_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) + target_link_libraries( + velox_dwio_parquet_boolean_decoder_test + velox_dwio_native_parquet_reader + arrow + velox_link_libs + ${TEST_LINK_LIBS}) + endif() diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 85853ae350c0c..be812f22dc238 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -730,6 +730,44 @@ TEST_F(E2EFilterTest, configurableWriteSchema) { test(type, newType); } +TEST_F(E2EFilterTest, booleanRle) { + options_.enableDictionary = false; + options_.encoding = + facebook::velox::parquet::arrow::Encoding::RLE; + options_.parquetDataPageVersion = "V2"; + + + // testWithTypes( + // "boolean_null:boolean", + // [&]() { makeAllNulls("boolean_null"); }, + // false, + // {}, + // 20); + + testWithTypes( + "boolean_val:boolean", + [&]() {}, + false, + {}, + 20); + + // testWithTypes( + // "boolean_val:boolean," + // "boolean_null:boolean", + // [&]() { makeAllNulls("boolean_null"); }, + // false, + // {"boolean_val"}, + // 20); + + // testWithTypes( + // "boolean_val:boolean," + // "boolean_null:boolean", + // [&]() { makeAllNulls("boolean_null"); }, + // true, + // {"boolean_val"}, + // 20); +} + // Define main so that gflags get processed. int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 58f7bfdb80400..1055ccac4e36b 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -1147,6 +1147,158 @@ TEST_F(ParquetTableScanTest, deltaByteArray) { assertSelect({"a"}, "SELECT a from expected"); } +TEST_F(ParquetTableScanTest, rleBooleanParquetV2) { + loadData( + getExampleFilePath("bool.parquet"), + ROW({"c0", "c1", "c2"}, {BOOLEAN(), BOOLEAN(), BOOLEAN()}), + makeRowVector( + {"c0", "c1", "c2"}, + { + makeFlatVector(std::vector{true}), + makeFlatVector(std::vector{false}), + makeNullableFlatVector({std::nullopt}), + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c1 = makeColumnHandle( + "c1", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + std::shared_ptr c2 = makeColumnHandle( + "c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); + assertSelect({"c1"}, "SELECT c1 FROM tmp"); + assertSelect({"c2"}, "SELECT c2 FROM tmp"); +} + +TEST_F(ParquetTableScanTest, rleBooleanTrue) { + loadData( + getExampleFilePath("true.parquet"), + ROW({"c0"}, {BOOLEAN()}), + makeRowVector( + {"c0"}, + { + makeFlatVector(std::vector{true}), + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); +} + + +TEST_F(ParquetTableScanTest, rleBooleanFalse) { + loadData( + getExampleFilePath("false.parquet"), + ROW({"c0"}, {BOOLEAN()}), + makeRowVector( + {"c0"}, + { + makeFlatVector(std::vector{false}) + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); +} + +TEST_F(ParquetTableScanTest, sixTrue) { + loadData( + getExampleFilePath("sixtrue.parquet"), + ROW({"c0"}, {BOOLEAN()}), + makeRowVector( + {"c0"}, + { + makeFlatVector(std::vector{true, true, true, true, true, true}), + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); +} + +TEST_F(ParquetTableScanTest, thirteenFalse) { + loadData( + getExampleFilePath("thirteenfalse.parquet"), + ROW({"c0"}, {BOOLEAN()}), + makeRowVector( + {"c0"}, + { + makeFlatVector(std::vector{false, false, false, false, false, false, false, false, false, false, false, false, false}), + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); +} + +TEST_F(ParquetTableScanTest, moreBool) { + loadData( + getExampleFilePath("morebool.parquet"), + ROW({"c0"}, {BOOLEAN()}), + makeRowVector( + {"c0"}, + { + makeFlatVector(std::vector{false, true, false, true, + true, false, false, true, + false, false, true, true, + false, true, true, true, + false, true, false, false, + true, true}), + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); +} + +TEST_F(ParquetTableScanTest, twelveTruesuncompressed) { + loadData( + getExampleFilePath("twelvetruesout.parquet"), + ROW({"c0"}, {BOOLEAN()}), + makeRowVector( + {"c0"}, + { + makeFlatVector(std::vector{true, true, true, true, + true, true, true, true, true, true, true, true}), + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); +} + +TEST_F(ParquetTableScanTest, sixNulls) { + loadData( + getExampleFilePath("sixnulls.parquet"), + ROW({"c0"}, {BOOLEAN()}), + makeRowVector( + {"c0"}, + { + makeNullableFlatVector({std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt}), + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); +} + +TEST_F(ParquetTableScanTest, trueFalseNull) { + loadData( + getExampleFilePath("tfn.parquet"), + ROW({"c0"}, {BOOLEAN()}), + makeRowVector( + {"c0"}, + { + makeNullableFlatVector({std::nullopt, true, std::nullopt, std::nullopt, true, true, + std::nullopt, true, std::nullopt, std::nullopt, false, false, std::nullopt, true, std::nullopt, true, false, false, true, true, + false, true, std::nullopt, true, true, false, true, false}), + })); + std::shared_ptr c0 = makeColumnHandle( + "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular); + + assertSelect({"c0"}, "SELECT c0 FROM tmp"); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::Init init{&argc, &argv, false}; diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index bce6919a7cc63..ec868975ac766 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -147,6 +147,10 @@ std::shared_ptr getArrowParquetWriterOptions( static_cast(flushPolicy->rowsInRowGroup())); properties = properties->codec_options(options.codecOptions); properties = properties->enable_store_decimal_as_integer(); + if (options.parquetDataPageVersion == "V2") { + properties = properties->data_page_version( + facebook::velox::parquet::arrow::ParquetDataPageVersion::V2); + } return properties->build(); } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 26969a59d52fb..7adc3ed8dd7ca 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -108,7 +108,7 @@ struct WriterOptions : public dwio::common::WriterOptions { /// Timestamp time zone for Parquet write through Arrow bridge. std::optional parquetWriteTimestampTimeZone; bool writeInt96AsTimestamp = false; - + std::optional parquetDataPageVersion = std::nullopt; // Parsing session and hive configs. // This isn't a typo; session and hive connector config names are different