diff --git a/velox/dwio/parquet/reader/DeltaBpDecoder.h b/velox/dwio/parquet/reader/DeltaBpDecoder.h index f2833bbf80a6..59ec359ce433 100644 --- a/velox/dwio/parquet/reader/DeltaBpDecoder.h +++ b/velox/dwio/parquet/reader/DeltaBpDecoder.h @@ -86,6 +86,14 @@ class DeltaBpDecoder { return static_cast(totalValuesRemaining_); } + template + void readValues(T* values, int32_t numValues) { + VELOX_DCHECK_LE(numValues, totalValuesRemaining_); + for (auto i = 0; i < numValues; i++) { + values[i] = T(readLong()); + } + } + private: bool getVlqInt(uint64_t& v) { uint64_t tmp = 0; diff --git a/velox/dwio/parquet/reader/DeltaByteArrayDecoder.h b/velox/dwio/parquet/reader/DeltaByteArrayDecoder.h new file mode 100644 index 000000000000..de440d82e81b --- /dev/null +++ b/velox/dwio/parquet/reader/DeltaByteArrayDecoder.h @@ -0,0 +1,183 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/base/BitUtil.h" +#include "velox/dwio/parquet/reader/DeltaBpDecoder.h" + +namespace facebook::velox::parquet { + +// DeltaByteArrayDecoder is adapted from Apache Arrow: +// https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/parquet/encoding.cc#L2758-L2889 +class DeltaLengthByteArrayDecoder { + public: + explicit DeltaLengthByteArrayDecoder(const char* start) { + lengthDecoder_ = std::make_unique(start); + decodeLengths(); + bufferStart_ = lengthDecoder_->bufferStart(); + } + + std::string_view readString() { + int32_t dataSize = 0; + const int64_t length = bufferedLength_[lengthIdx_++]; + VELOX_CHECK_GE(length, 0, "negative string delta length"); + bufferStart_ += length; + return std::string_view(bufferStart_ - length, length); + } + + private: + void decodeLengths() { + int64_t numLength = lengthDecoder_->validValuesCount(); + bufferedLength_.resize(numLength); + lengthDecoder_->readValues(bufferedLength_.data(), numLength); + + lengthIdx_ = 0; + numValidValues_ = numLength; + } + + const char* bufferStart_; + std::unique_ptr lengthDecoder_; + int32_t numValidValues_{0}; + uint32_t lengthIdx_{0}; + std::vector bufferedLength_; +}; + +// DeltaByteArrayDecoder is adapted from Apache Arrow: +// https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/parquet/encoding.cc#L3301-L3545 +class DeltaByteArrayDecoder { + public: + explicit DeltaByteArrayDecoder(const char* start) { + prefixLenDecoder_ = std::make_unique(start); + int64_t numPrefix = prefixLenDecoder_->validValuesCount(); + bufferedPrefixLength_.resize(numPrefix); + prefixLenDecoder_->readValues( + bufferedPrefixLength_.data(), numPrefix); + prefixLenOffset_ = 0; + numValidValues_ = numPrefix; + + suffixDecoder_ = std::make_unique( + prefixLenDecoder_->bufferStart()); + } + + void skip(uint64_t numValues) { + skip(numValues, 0, nullptr); + } + + template + inline void skip(int32_t numValues, int32_t current, const uint64_t* nulls) { + if (hasNulls) { + numValues = bits::countNonNulls(nulls, current, current + numValues); + } + for (int32_t i = 0; i < numValues; ++i) { + readString(); + } + } + + template + void readWithVisitor(const uint64_t* nulls, Visitor visitor) { + int32_t current = visitor.start(); + skip(current, 0, nulls); + int32_t toSkip; + bool atEnd = false; + const bool allowNulls = hasNulls && visitor.allowNulls(); + for (;;) { + if (hasNulls && allowNulls && bits::isBitNull(nulls, current)) { + toSkip = visitor.processNull(atEnd); + } else { + if (hasNulls && !allowNulls) { + toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd); + if (!Visitor::dense) { + skip(toSkip, current, nullptr); + } + if (atEnd) { + return; + } + } + + // We are at a non-null value on a row to visit. + toSkip = visitor.process(readString(), atEnd); + } + ++current; + if (toSkip) { + skip(toSkip, current, nulls); + current += toSkip; + } + if (atEnd) { + return; + } + } + } + + std::string_view readString() { + auto suffix = suffixDecoder_->readString(); + bool isFirstRun = (prefixLenOffset_ == 0); + const int64_t prefixLength = bufferedPrefixLength_[prefixLenOffset_++]; + + VELOX_CHECK_GE( + prefixLength, 0, "negative prefix length in DELTA_BYTE_ARRAY"); + + buildReadValue(isFirstRun, prefixLength, suffix); + + numValidValues_--; + return {lastValue_}; + } + + private: + void buildReadValue( + bool isFirstRun, + const int64_t prefixLength, + std::string_view suffix) { + VELOX_CHECK_LE( + prefixLength, + lastValue_.size(), + "prefix length too large in DELTA_BYTE_ARRAY"); + + if (prefixLength == 0) { + // prefix is empty. + lastValue_ = std::string{suffix}; + return; + } + + if (!isFirstRun) { + if (suffix.empty()) { + // suffix is empty: read value can simply point to the prefix + // of the lastValue_. This is not possible for the first run since + // the prefix would point to the mutable `lastValue_`. + lastValue_ = lastValue_.substr(0, prefixLength); + return; + } + } + + lastValue_.resize(prefixLength + suffix.size()); + + // Both prefix and suffix are non-empty, so we need to decode the string + // into read value. + // Just keep the prefix in lastValue_, and copy the suffix. + memcpy(lastValue_.data() + prefixLength, suffix.data(), suffix.size()); + } + + std::unique_ptr prefixLenDecoder_; + std::unique_ptr suffixLenDecoder_; + std::unique_ptr suffixDecoder_; + + std::string lastValue_; + int32_t numValidValues_{0}; + uint32_t prefixLenOffset_{0}; + std::vector bufferedPrefixLength_; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index c185358470e6..7529bef7c2a5 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -699,6 +699,13 @@ void PageReader::makeDecoder() { "DELTA_BINARY_PACKED decoder only supports INT32 and INT64"); } break; + case Encoding::DELTA_BYTE_ARRAY: + if (parquetType == thrift::Type::BYTE_ARRAY) { + deltaByteArrDecoder_ = + std::make_unique(pageData_); + break; + } + FMT_FALLTHROUGH; default: VELOX_UNSUPPORTED("Encoding not supported yet: {}", encoding_); } @@ -739,6 +746,8 @@ void PageReader::skip(int64_t numRows) { booleanDecoder_->skip(toSkip); } else if (deltaBpDecoder_) { deltaBpDecoder_->skip(toSkip); + } else if (deltaByteArrDecoder_) { + deltaByteArrDecoder_->skip(toSkip); } else { VELOX_FAIL("No decoder to skip"); } diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 4813f84d01d4..94d107109b5f 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -23,6 +23,7 @@ #include "velox/dwio/common/compression/Compression.h" #include "velox/dwio/parquet/reader/BooleanDecoder.h" #include "velox/dwio/parquet/reader/DeltaBpDecoder.h" +#include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h" #include "velox/dwio/parquet/reader/ParquetTypeWithId.h" #include "velox/dwio/parquet/reader/RleBpDataDecoder.h" #include "velox/dwio/parquet/reader/StringDecoder.h" @@ -131,6 +132,10 @@ class PageReader { return encoding_ == thrift::Encoding::DELTA_BINARY_PACKED; } + bool isDeltaByteArray() const { + return encoding_ == thrift::Encoding::DELTA_BYTE_ARRAY; + } + /// Returns the range of repdefs for the top level rows covered by the last /// decoderepDefs(). std::pair repDefRange() const { @@ -305,6 +310,9 @@ class PageReader { nullsFromFastPath = dwio::common::useFastPath(visitor); auto dictVisitor = visitor.toStringDictionaryColumnVisitor(); dictionaryIdDecoder_->readWithVisitor(nulls, dictVisitor); + } else if (encoding_ == thrift::Encoding::DELTA_BYTE_ARRAY) { + nullsFromFastPath = false; + deltaByteArrDecoder_->readWithVisitor(nulls, visitor); } else { nullsFromFastPath = false; stringDecoder_->readWithVisitor(nulls, visitor); @@ -313,6 +321,8 @@ class PageReader { if (isDictionary()) { auto dictVisitor = visitor.toStringDictionaryColumnVisitor(); dictionaryIdDecoder_->readWithVisitor(nullptr, dictVisitor); + } else if (encoding_ == thrift::Encoding::DELTA_BYTE_ARRAY) { + deltaByteArrDecoder_->readWithVisitor(nulls, visitor); } else { stringDecoder_->readWithVisitor(nulls, visitor); } @@ -489,6 +499,7 @@ class PageReader { std::unique_ptr stringDecoder_; std::unique_ptr booleanDecoder_; std::unique_ptr deltaBpDecoder_; + std::unique_ptr deltaByteArrDecoder_; // Add decoders for other encodings here. }; diff --git a/velox/dwio/parquet/reader/ParquetData.h b/velox/dwio/parquet/reader/ParquetData.h index 739e62afc86a..fe8020f57c65 100644 --- a/velox/dwio/parquet/reader/ParquetData.h +++ b/velox/dwio/parquet/reader/ParquetData.h @@ -192,6 +192,10 @@ class ParquetData : public dwio::common::FormatData { return reader_->isDeltaBinaryPacked(); } + bool isDeltaByteArray() const { + return reader_->isDeltaByteArray(); + } + bool parentNullsInLeaves() const override { return true; } diff --git a/velox/dwio/parquet/reader/StringColumnReader.h b/velox/dwio/parquet/reader/StringColumnReader.h index 4215d217afef..4f60220efa21 100644 --- a/velox/dwio/parquet/reader/StringColumnReader.h +++ b/velox/dwio/parquet/reader/StringColumnReader.h @@ -31,7 +31,8 @@ class StringColumnReader : public dwio::common::SelectiveColumnReader { bool hasBulkPath() const override { // Non-dictionary encodings do not have fast path. - return scanState_.dictionary.values != nullptr; + return !formatData_->as().isDeltaByteArray() && + scanState_.dictionary.values != nullptr; } void seekToRowGroup(int64_t index) override { diff --git a/velox/dwio/parquet/tests/examples/delta_byte_array.parquet b/velox/dwio/parquet/tests/examples/delta_byte_array.parquet new file mode 100644 index 000000000000..c81d0094fed1 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/delta_byte_array.parquet differ diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 0b27395f9a00..85853ae350c0 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -477,6 +477,23 @@ TEST_F(E2EFilterTest, stringDictionary) { 20); } +TEST_F(E2EFilterTest, stringDeltaByteArray) { + options_.enableDictionary = false; + options_.encoding = + facebook::velox::parquet::arrow::Encoding::DELTA_BYTE_ARRAY; + + testWithTypes( + "string_val:string," + "string_val_2:string", + [&]() { + makeStringUnique("string_val"); + makeStringUnique("string_val_2"); + }, + true, + {"string_val", "string_val_2"}, + 20); +} + TEST_F(E2EFilterTest, dedictionarize) { rowsInRowGroup_ = 10'000; options_.dictionaryPageSizeLimit = 20'000; diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 649f1ad10d8f..58f7bfdb8040 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -1134,6 +1134,19 @@ TEST_F(ParquetTableScanTest, readParquetColumnsByIndex) { VeloxRuntimeError); } +TEST_F(ParquetTableScanTest, deltaByteArray) { + auto a = makeFlatVector({"axis", "axle", "babble", "babyhood"}); + auto expected = makeRowVector({"a"}, {a}); + createDuckDbTable("expected", {expected}); + + auto vector = makeFlatVector({{}}); + loadData( + getExampleFilePath("delta_byte_array.parquet"), + ROW({"a"}, {VARCHAR()}), + makeRowVector({"a"}, {vector})); + assertSelect({"a"}, "SELECT a from expected"); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::Init init{&argc, &argv, false};