diff --git a/velox/dwio/parquet/common/CMakeLists.txt b/velox/dwio/parquet/common/CMakeLists.txt index dd162818f703c..98e38965c9344 100644 --- a/velox/dwio/parquet/common/CMakeLists.txt +++ b/velox/dwio/parquet/common/CMakeLists.txt @@ -12,12 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_library( - velox_dwio_native_parquet_common BloomFilter.cpp XxHasher.cpp - LevelComparison.cpp LevelConversion.cpp) +add_library(velox_dwio_parquet_common BloomFilter.cpp XxHasher.cpp + LevelComparison.cpp LevelConversion.cpp) target_link_libraries( - velox_dwio_native_parquet_common + velox_dwio_parquet_common velox_dwio_parquet_thrift velox_type velox_dwio_common diff --git a/velox/dwio/parquet/common/LevelComparison.cpp b/velox/dwio/parquet/common/LevelComparison.cpp index 12b0e07204839..a59edfd71bb8d 100644 --- a/velox/dwio/parquet/common/LevelComparison.cpp +++ b/velox/dwio/parquet/common/LevelComparison.cpp @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,6 +17,9 @@ // Adapted from Apache Arrow. #include "velox/dwio/parquet/common/LevelComparison.h" + +#include <limits> + #include "arrow/util/endian.h" namespace facebook::velox::parquet { @@ -32,16 +35,6 @@ LevelsToBitmap(const int16_t* levels, int64_t numLevels, Predicate predicate) { return ::arrow::bit_util::ToLittleEndian(mask); } -inline MinMax FindMinMaxImpl(const int16_t* levels, int64_t numLevels) { - MinMax out{ - std::numeric_limits<int16_t>::max(), std::numeric_limits<int16_t>::min()}; - for (int x = 0; x < numLevels; x++) { - out.min = std::min(levels[x], out.min); - out.max = std::max(levels[x], out.max); - } - return out; -} - } // namespace uint64_t @@ -51,7 +44,13 @@ GreaterThanBitmap(const int16_t* levels, int64_t numLevels, int16_t rhs) { } MinMax FindMinMax(const int16_t* levels, int64_t numLevels) { - return FindMinMaxImpl(levels, numLevels); + MinMax out{ + std::numeric_limits<int16_t>::max(), std::numeric_limits<int16_t>::min()}; + for (int x = 0; x < numLevels; x++) { + out.min = std::min(levels[x], out.min); + out.max = std::max(levels[x], out.max); + } + return out; } } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/common/LevelConversion.cpp b/velox/dwio/parquet/common/LevelConversion.cpp index 45fc0b10bd155..9c63095b0731b 100644 --- a/velox/dwio/parquet/common/LevelConversion.cpp +++ b/velox/dwio/parquet/common/LevelConversion.cpp @@ -17,51 +17,45 @@ // Adapted from Apache Arrow. #include "velox/dwio/parquet/common/LevelConversion.h" -#include "velox/dwio/parquet/common/LevelConversionUtil.h" -#include <algorithm> #include <limits> #include <optional> #include "arrow/util/bit_run_reader.h" #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_writer.h" -#include "arrow/util/cpu_info.h" -#include "arrow/util/logging.h" + #include "velox/common/base/Exceptions.h" -#include "velox/dwio/parquet/common/LevelComparison.h" +#include "velox/dwio/parquet/common/LevelConversionUtil.h" namespace facebook::velox::parquet { namespace { -using ::arrow::internal::CpuInfo; -using ::std::optional; - template <typename OffsetType> void DefRepLevelsToListInfo( - const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_def_levels, - LevelInfo level_info, + const int16_t* defLevels, + const int16_t* repLevels, + int64_t num_defLevels, + LevelInfo levelInfo, ValidityBitmapInputOutput* output, OffsetType* offsets) { OffsetType* orig_pos = offsets; - optional<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer; - if (output->valid_bits) { - valid_bits_writer.emplace( - output->valid_bits, - output->valid_bits_offset, - output->values_read_upper_bound); + std::optional<::arrow::internal::FirstTimeBitmapWriter> validBitsWriter; + if (output->validBits) { + validBitsWriter.emplace( + output->validBits, + output->validBitsOffset, + output->valuesReadUpperBound); } - for (int x = 0; x < num_def_levels; x++) { + for (int x = 0; x < num_defLevels; x++) { // Skip items that belong to empty or null ancestor lists and further nested // lists. - if (def_levels[x] < level_info.repeated_ancestor_def_level || - rep_levels[x] > level_info.rep_level) { + if (defLevels[x] < levelInfo.repeatedAncestorDefLevel || + repLevels[x] > levelInfo.repLevel) { continue; } - if (rep_levels[x] == level_info.rep_level) { + if (repLevels[x] == levelInfo.repLevel) { // A continuation of an existing list. // offsets can be null for structs with repeated children (we don't need // to know offsets until we get to the children). @@ -74,14 +68,15 @@ void DefRepLevelsToListInfo( } } else { if (ARROW_PREDICT_FALSE( - (valid_bits_writer.has_value() && - valid_bits_writer->position() >= - output->values_read_upper_bound) || - (offsets - orig_pos) >= output->values_read_upper_bound)) { - VELOX_FAIL("Definition levels exceeded upper bound: {}", output->values_read_upper_bound); + (validBitsWriter.has_value() && + validBitsWriter->position() >= output->valuesReadUpperBound) || + (offsets - orig_pos) >= output->valuesReadUpperBound)) { + VELOX_FAIL( + "Definition levels exceeded upper bound: {}", + output->valuesReadUpperBound); } - // current_rep < list rep_level i.e. start of a list (ancestor empty lists + // current_rep < list repLevel i.e. start of a list (ancestor empty lists // are filtered out above). offsets can be null for structs with repeated // children (we don't need to know offsets until we get to the children). if (offsets != nullptr) { @@ -90,7 +85,7 @@ void DefRepLevelsToListInfo( // than fixed size lists so it should be cheaper to make these // cumulative and subtract when validating fixed size lists. *offsets = *(offsets - 1); - if (def_levels[x] >= level_info.def_level) { + if (defLevels[x] >= levelInfo.defLevel) { if (ARROW_PREDICT_FALSE( *offsets == std::numeric_limits<OffsetType>::max())) { VELOX_FAIL("List index overflow."); @@ -99,30 +94,30 @@ void DefRepLevelsToListInfo( } } - if (valid_bits_writer.has_value()) { - // the level_info def level for lists reflects element present level. + if (validBitsWriter.has_value()) { + // the levelInfo def level for lists reflects element present level. // the prior level distinguishes between empty lists. - if (def_levels[x] >= level_info.def_level - 1) { - valid_bits_writer->Set(); + if (defLevels[x] >= levelInfo.defLevel - 1) { + validBitsWriter->Set(); } else { - output->null_count++; - valid_bits_writer->Clear(); + output->nullCount++; + validBitsWriter->Clear(); } - valid_bits_writer->Next(); + validBitsWriter->Next(); } } } - if (valid_bits_writer.has_value()) { - valid_bits_writer->Finish(); + if (validBitsWriter.has_value()) { + validBitsWriter->Finish(); } if (offsets != nullptr) { - output->values_read = offsets - orig_pos; - } else if (valid_bits_writer.has_value()) { - output->values_read = valid_bits_writer->position(); + output->valuesRead = offsets - orig_pos; + } else if (validBitsWriter.has_value()) { + output->valuesRead = validBitsWriter->position(); } - if (output->null_count > 0 && level_info.null_slot_usage > 1) { + if (output->nullCount > 0 && levelInfo.nullSlotUsage > 1) { VELOX_FAIL( - "Null values with null_slot_usage > 1 not supported." + "Null values with nullSlotUsage > 1 not supported." "(i.e. FixedSizeLists with null values are not supported)"); } } @@ -130,18 +125,18 @@ void DefRepLevelsToListInfo( } // namespace void DefLevelsToBitmap( - const int16_t* def_levels, - int64_t num_def_levels, - LevelInfo level_info, + const int16_t* defLevels, + int64_t num_defLevels, + LevelInfo levelInfo, ValidityBitmapInputOutput* output) { - // It is simpler to rely on rep_level here until PARQUET-1899 is done and the + // It is simpler to rely on repLevel here until PARQUET-1899 is done and the // code is deleted in a follow-up release. - if (level_info.rep_level > 0) { + if (levelInfo.repLevel > 0) { DefLevelsToBitmapSimd</*has_repeated_parent=*/true>( - def_levels, num_def_levels, level_info, output); + defLevels, num_defLevels, levelInfo, output); } else { DefLevelsToBitmapSimd</*has_repeated_parent=*/false>( - def_levels, num_def_levels, level_info, output); + defLevels, num_defLevels, levelInfo, output); } } @@ -150,44 +145,44 @@ uint64_t TestOnlyExtractBitsSoftware(uint64_t bitmap, uint64_t select_bitmap) { } void DefRepLevelsToList( - const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_def_levels, - LevelInfo level_info, + const int16_t* defLevels, + const int16_t* repLevels, + int64_t num_defLevels, + LevelInfo levelInfo, ValidityBitmapInputOutput* output, int32_t* offsets) { DefRepLevelsToListInfo<int32_t>( - def_levels, rep_levels, num_def_levels, level_info, output, offsets); + defLevels, repLevels, num_defLevels, levelInfo, output, offsets); } void DefRepLevelsToList( - const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_def_levels, - LevelInfo level_info, + const int16_t* defLevels, + const int16_t* repLevels, + int64_t num_defLevels, + LevelInfo levelInfo, ValidityBitmapInputOutput* output, int64_t* offsets) { DefRepLevelsToListInfo<int64_t>( - def_levels, rep_levels, num_def_levels, level_info, output, offsets); + defLevels, repLevels, num_defLevels, levelInfo, output, offsets); } void DefRepLevelsToBitmap( - const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_def_levels, - LevelInfo level_info, + const int16_t* defLevels, + const int16_t* repLevels, + int64_t num_defLevels, + LevelInfo levelInfo, ValidityBitmapInputOutput* output) { // DefRepLevelsToListInfo assumes it for the actual list method and this // method is for parent structs, so we need to bump def and ref level. - level_info.rep_level += 1; - level_info.def_level += 1; + levelInfo.repLevel += 1; + levelInfo.defLevel += 1; DefRepLevelsToListInfo<int32_t>( - def_levels, - rep_levels, - num_def_levels, - level_info, + defLevels, + repLevels, + num_defLevels, + levelInfo, output, /*offsets=*/nullptr); } -} // namespace facebook::velox::parquet::arrow +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/common/LevelConversion.h b/velox/dwio/parquet/common/LevelConversion.h index 8706052a374c1..c3c475244ef01 100644 --- a/velox/dwio/parquet/common/LevelConversion.h +++ b/velox/dwio/parquet/common/LevelConversion.h @@ -24,29 +24,29 @@ namespace facebook::velox::parquet { struct LevelInfo { LevelInfo() - : null_slot_usage(1), - def_level(0), - rep_level(0), - repeated_ancestor_def_level(0) {} + : nullSlotUsage(1), + defLevel(0), + repLevel(0), + repeatedAncestorDefLevel(0) {} LevelInfo( int32_t null_slots, - int32_t definition_level, - int32_t repetition_level, - int32_t repeated_ancestor_definition_level) - : null_slot_usage(null_slots), - def_level(static_cast<int16_t>(definition_level)), - rep_level(static_cast<int16_t>(repetition_level)), - repeated_ancestor_def_level( - static_cast<int16_t>(repeated_ancestor_definition_level)) {} + int32_t definitionLevel, + int32_t repetitionLevel, + int32_t repeatedAncestorDefinitionLevel) + : nullSlotUsage(null_slots), + defLevel(static_cast<int16_t>(definitionLevel)), + repLevel(static_cast<int16_t>(repetitionLevel)), + repeatedAncestorDefLevel( + static_cast<int16_t>(repeatedAncestorDefinitionLevel)) {} bool operator==(const LevelInfo& b) const { - return null_slot_usage == b.null_slot_usage && def_level == b.def_level && - rep_level == b.rep_level && - repeated_ancestor_def_level == b.repeated_ancestor_def_level; + return nullSlotUsage == b.nullSlotUsage && defLevel == b.defLevel && + repLevel == b.repLevel && + repeatedAncestorDefLevel == b.repeatedAncestorDefLevel; } bool HasNullableValues() const { - return repeated_ancestor_def_level < def_level; + return repeatedAncestorDefLevel < defLevel; } // How many slots an undefined but present (i.e. null) element in @@ -54,7 +54,7 @@ struct LevelInfo { // "Slot" is used in the same context as the Arrow specification // (i.e. a value holder). // This is only ever >1 for descendents of FixedSizeList. - int32_t null_slot_usage = 1; + int32_t nullSlotUsage = 1; // The definition level at which the value for the field // is considered not null (definition levels greater than @@ -62,18 +62,18 @@ struct LevelInfo { // value for the field). For list fields definition levels // greater than or equal to this field indicate a present, // possibly null, child value. - int16_t def_level = 0; + int16_t defLevel = 0; // The repetition level corresponding to this element // or the closest repeated ancestor. Any repetition // level less than this indicates either a new list OR // an empty list (which is determined in conjunction // with definition levels). - int16_t rep_level = 0; + int16_t repLevel = 0; // The definition level indicating the level at which the closest // repeated ancestor is not empty. This is used to discriminate - // between a value less than |def_level| being null or excluded entirely. + // between a value less than |defLevel| being null or excluded entirely. // For instance if we have an arrow schema like: // list(struct(f0: int)). Then then there are the following // definition levels: @@ -83,105 +83,93 @@ struct LevelInfo { // 3 = a non null struct but null integer. // 4 = a present integer. // When reconstructing, the struct and integer arrays' - // repeated_ancestor_def_level would be 2. Any - // def_level < 2 indicates that there isn't a corresponding + // repeatedAncestorDefLevel would be 2. Any + // defLevel < 2 indicates that there isn't a corresponding // child value in the list. // i.e. [null, [], [null], [{f0: null}], [{f0: 1}]] // has the def levels [0, 1, 2, 3, 4]. The actual // struct array is only of length 3: [not-set, set, set] and // the int array is also of length 3: [N/A, null, 1]. // - int16_t repeated_ancestor_def_level = 0; + int16_t repeatedAncestorDefLevel = 0; /// Increments level for a optional node. void IncrementOptional() { - def_level++; + defLevel++; } /// Increments levels for the repeated node. Returns - /// the previous ancestor_list_def_level. + /// the previous ancestor_list_defLevel. int16_t IncrementRepeated() { - int16_t last_repeated_ancestor = repeated_ancestor_def_level; + int16_t lastRepeatedAncestor = repeatedAncestorDefLevel; // Repeated fields add both a repetition and definition level. This is used // to distinguish between an empty list and a list with an item in it. - ++rep_level; - ++def_level; - // For levels >= repeated_ancenstor_def_level it indicates the list was + ++repLevel; + ++defLevel; + // For levels >= repeated_ancenstor_defLevel it indicates the list was // non-null and had at least one element. This is important // for later decoding because we need to add a slot for these - // values. for levels < current_def_level no slots are added + // values. for levels < current_defLevel no slots are added // to arrays. - repeated_ancestor_def_level = def_level; - return last_repeated_ancestor; + repeatedAncestorDefLevel = defLevel; + return lastRepeatedAncestor; } -/* - friend std::ostream& operator<<(std::ostream& os, const LevelInfo& levels) { - // This print method is to silence valgrind issues. What's printed - // is not important because all asserts happen directly on - // members. - os << "{def=" << levels.def_level << ", rep=" << levels.rep_level - << ", repeated_ancestor_def=" << levels.repeated_ancestor_def_level; - if (levels.null_slot_usage > 1) { - os << ", null_slot_usage=" << levels.null_slot_usage; - } - os << "}"; - return os; - }*/ }; // Input/Output structure for reconstructed validity bitmaps. struct ValidityBitmapInputOutput { // Input only. - // The maximum number of values_read expected (actual + // The maximum number of valuesRead expected (actual // values read must be less than or equal to this value). // If this number is exceeded methods will throw a // ParquetException. Exceeding this limit indicates // either a corrupt or incorrectly written file. - int64_t values_read_upper_bound = 0; + int64_t valuesReadUpperBound = 0; // Output only. The number of values added to the encountered // (this is logically the count of the number of elements // for an Arrow array). - int64_t values_read = 0; + int64_t valuesRead = 0; // Input/Output. The number of nulls encountered. - int64_t null_count = 0; + int64_t nullCount = 0; // Output only. The validity bitmap to populate. Maybe be null only // for DefRepLevelsToListInfo (if all that is needed is list offsets). - uint8_t* valid_bits = nullptr; - // Input only, offset into valid_bits to start at. - int64_t valid_bits_offset = 0; + uint8_t* validBits = nullptr; + // Input only, offset into validBits to start at. + int64_t validBitsOffset = 0; }; -// Converts def_levels to validity bitmaps for non-list arrays and structs that +// Converts defLevels to validity bitmaps for non-list arrays and structs that // have at least one member that is not a list and has no list descendents. For // lists use DefRepLevelsToList and structs where all descendants contain a // list use DefRepLevelsToBitmap. void DefLevelsToBitmap( - const int16_t* def_levels, - int64_t num_def_levels, - LevelInfo level_info, + const int16_t* defLevels, + int64_t numDefLevels, + LevelInfo levelInfo, ValidityBitmapInputOutput* output); // Reconstructs a validity bitmap and list offsets for a list arrays based on // def/rep levels. The first element of offsets will not be modified if -// rep_levels starts with a new list. The first element of offsets will be used +// repLevels starts with a new list. The first element of offsets will be used // when calculating the next offset. See documentation onf DefLevelsToBitmap // for when to use this method vs the other ones in this file for // reconstruction. // -// Offsets must be sized to 1 + values_read_upper_bound. +// Offsets must be sized to 1 + valuesReadUpperBound. void DefRepLevelsToList( - const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_def_levels, - LevelInfo level_info, + const int16_t* defLevels, + const int16_t* repLevels, + int64_t numDefLevels, + LevelInfo levelInfo, ValidityBitmapInputOutput* output, int32_t* offsets); + void DefRepLevelsToList( - const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_def_levels, - LevelInfo level_info, + const int16_t* defLevels, + const int16_t* repLevels, + int64_t numDefLevels, + LevelInfo levelInfo, ValidityBitmapInputOutput* output, int64_t* offsets); @@ -189,9 +177,14 @@ void DefRepLevelsToList( // a list descendant. See documentation on DefLevelsToBitmap for when more // details on this method compared to the other ones defined above. void DefRepLevelsToBitmap( - const int16_t* def_levels, - const int16_t* rep_levels, - int64_t num_def_levels, - LevelInfo level_info, + const int16_t* defLevels, + const int16_t* repLevels, + int64_t numDefLevels, + LevelInfo levelInfo, ValidityBitmapInputOutput* output); + +// This is exposed to ensure we can properly test a software simulated pext +// function (i.e. it isn't hidden by runtime dispatch). +uint64_t TestOnlyExtractBitsSoftware(uint64_t bitmap, uint64_t selection); + } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/common/LevelConversionUtil.h b/velox/dwio/parquet/common/LevelConversionUtil.h index bbad8c014b8b8..50272d2b113b6 100644 --- a/velox/dwio/parquet/common/LevelConversionUtil.h +++ b/velox/dwio/parquet/common/LevelConversionUtil.h @@ -18,8 +18,6 @@ #pragma once -#include "velox/dwio/parquet/common/LevelConversion.h" - #include <algorithm> #include <cstdint> #include <limits> @@ -45,13 +43,13 @@ print('constexpr uint8_t kPextTable[1 << kLookupBits][1 << kLookupBits] = {') print(' ', end = '') for mask in range(1 << kLookupBits): for data in range(1 << kLookupBits): - bit_value = 0 - bit_len = 0 + bitValue = 0 + bitLen = 0 for i in range(kLookupBits): if mask & (1 << i): - bit_value |= (((data >> i) & 1) << bit_len) - bit_len += 1 - out = '0x{:02X},'.format(bit_value) + bitValue |= (((data >> i) & 1) << bitLen) + bitLen += 1 + out = '0x{:02X},'.format(bitValue) count += 1 if count % (1 << kLookupBits) == 1: print(' {') @@ -232,110 +230,101 @@ constexpr uint8_t kPextTable[1 << kLookupBits][1 << kLookupBits] = { }, }; -inline uint64_t ExtractBitsSoftware(uint64_t bitmap, uint64_t select_bitmap) { +inline uint64_t ExtractBitsSoftware(uint64_t bitmap, uint64_t selectBitmap) { // A software emulation of _pext_u64 // These checks should be inline and are likely to be common cases. - if (select_bitmap == ~uint64_t{0}) { + if (selectBitmap == ~uint64_t{0}) { return bitmap; - } else if (select_bitmap == 0) { + } else if (selectBitmap == 0) { return 0; } // Fallback to lookup table method - uint64_t bit_value = 0; - int bit_len = 0; + uint64_t bitValue = 0; + int bitLen = 0; constexpr uint8_t kLookupMask = (1U << kLookupBits) - 1; - while (select_bitmap != 0) { - const auto mask_len = ARROW_POPCOUNT32(select_bitmap & kLookupMask); + while (selectBitmap != 0) { + const auto mask_len = ARROW_POPCOUNT32(selectBitmap & kLookupMask); const uint64_t value = - kPextTable[select_bitmap & kLookupMask][bitmap & kLookupMask]; - bit_value |= (value << bit_len); - bit_len += mask_len; + kPextTable[selectBitmap & kLookupMask][bitmap & kLookupMask]; + bitValue |= (value << bitLen); + bitLen += mask_len; bitmap >>= kLookupBits; - select_bitmap >>= kLookupBits; + selectBitmap >>= kLookupBits; } - return bit_value; + return bitValue; } -// Use 64-bit pext emulation when BMI2 isn't available. -using extract_bitmap_t = uint64_t; -inline extract_bitmap_t ExtractBits( - extract_bitmap_t bitmap, - extract_bitmap_t select_bitmap) { - return ExtractBitsSoftware(bitmap, select_bitmap); +using extractBitmapT = uint64_t; +inline extractBitmapT ExtractBits( + extractBitmapT bitmap, + extractBitmapT selectBitmap) { + return ExtractBitsSoftware(bitmap, selectBitmap); } -static constexpr int64_t kExtractBitsSize = 8 * sizeof(extract_bitmap_t); +static constexpr int64_t kExtractBitsSize = 8 * sizeof(extractBitmapT); -template <bool has_repeated_parent> +template <bool hasRepeatedParent> int64_t DefLevelsBatchToBitmap( - const int16_t* def_levels, - const int64_t batch_size, - int64_t upper_bound_remaining, - LevelInfo level_info, + const int16_t* defLevels, + const int64_t batchSize, + int64_t upperBoundRemaining, + LevelInfo levelInfo, ::arrow::internal::FirstTimeBitmapWriter* writer) { - DCHECK_LE(batch_size, kExtractBitsSize); + DCHECK_LE(batchSize, kExtractBitsSize); - // Greater than level_info.def_level - 1 implies >= the def_level - auto defined_bitmap = - static_cast<extract_bitmap_t>(GreaterThanBitmap( - def_levels, batch_size, level_info.def_level - 1)); + // Greater than levelInfo.defLevel - 1 implies >= the defLevel + auto definedBitmap = static_cast<extractBitmapT>( + GreaterThanBitmap(defLevels, batchSize, levelInfo.defLevel - 1)); - if (has_repeated_parent) { - // Greater than level_info.repeated_ancestor_def_level - 1 implies >= the - // repeated_ancestor_def_level - auto present_bitmap = - static_cast<extract_bitmap_t>(GreaterThanBitmap( - def_levels, - batch_size, - level_info.repeated_ancestor_def_level - 1)); - auto selected_bits = ExtractBits(defined_bitmap, present_bitmap); - int64_t selected_count = ::arrow::bit_util::PopCount(present_bitmap); - if (ARROW_PREDICT_FALSE(selected_count > upper_bound_remaining)) { + if (hasRepeatedParent) { + // Greater than levelInfo.repeatedAncestorDefLevel - 1 implies >= the + // repeatedAncestorDefLevel + auto presentBitmap = static_cast<extractBitmapT>(GreaterThanBitmap( + defLevels, batchSize, levelInfo.repeatedAncestorDefLevel - 1)); + auto selectedBits = ExtractBits(definedBitmap, presentBitmap); + int64_t selectedCount = ::arrow::bit_util::PopCount(presentBitmap); + if (ARROW_PREDICT_FALSE(selectedCount > upperBoundRemaining)) { VELOX_FAIL("Values read exceeded upper bound"); } - writer->AppendWord(selected_bits, selected_count); - return ::arrow::bit_util::PopCount(selected_bits); + writer->AppendWord(selectedBits, selectedCount); + return ::arrow::bit_util::PopCount(selectedBits); } else { - if (ARROW_PREDICT_FALSE(batch_size > upper_bound_remaining)) { + if (ARROW_PREDICT_FALSE(batchSize > upperBoundRemaining)) { VELOX_FAIL("Values read exceeded upper bound"); } - writer->AppendWord(defined_bitmap, batch_size); - return ::arrow::bit_util::PopCount(defined_bitmap); + writer->AppendWord(definedBitmap, batchSize); + return ::arrow::bit_util::PopCount(definedBitmap); } } -template <bool has_repeated_parent> +template <bool hasRepeatedParent> void DefLevelsToBitmapSimd( - const int16_t* def_levels, - int64_t num_def_levels, - LevelInfo level_info, + const int16_t* defLevels, + int64_t numDefLevels, + LevelInfo levelInfo, ValidityBitmapInputOutput* output) { ::arrow::internal::FirstTimeBitmapWriter writer( - output->valid_bits, - /*start_offset=*/output->valid_bits_offset, - /*length=*/output->values_read_upper_bound); - int64_t set_count = 0; - output->values_read = 0; - int64_t values_read_remaining = output->values_read_upper_bound; - while (num_def_levels > kExtractBitsSize) { - set_count += DefLevelsBatchToBitmap<has_repeated_parent>( - def_levels, - kExtractBitsSize, - values_read_remaining, - level_info, - &writer); - def_levels += kExtractBitsSize; - num_def_levels -= kExtractBitsSize; - values_read_remaining = output->values_read_upper_bound - writer.position(); + output->validBits, + /*start_offset=*/output->validBitsOffset, + /*length=*/output->valuesReadUpperBound); + int64_t setCount = 0; + output->valuesRead = 0; + int64_t valuesReadRemaining = output->valuesReadUpperBound; + while (numDefLevels > kExtractBitsSize) { + setCount += DefLevelsBatchToBitmap<hasRepeatedParent>( + defLevels, kExtractBitsSize, valuesReadRemaining, levelInfo, &writer); + defLevels += kExtractBitsSize; + numDefLevels -= kExtractBitsSize; + valuesReadRemaining = output->valuesReadUpperBound - writer.position(); } - set_count += DefLevelsBatchToBitmap<has_repeated_parent>( - def_levels, num_def_levels, values_read_remaining, level_info, &writer); + setCount += DefLevelsBatchToBitmap<hasRepeatedParent>( + defLevels, numDefLevels, valuesReadRemaining, levelInfo, &writer); - output->values_read = writer.position(); - output->null_count += output->values_read - set_count; + output->valuesRead = writer.position(); + output->nullCount += output->valuesRead - setCount; writer.Finish(); } diff --git a/velox/dwio/parquet/reader/CMakeLists.txt b/velox/dwio/parquet/reader/CMakeLists.txt index 611882ad586c1..2bda9d61d06b0 100644 --- a/velox/dwio/parquet/reader/CMakeLists.txt +++ b/velox/dwio/parquet/reader/CMakeLists.txt @@ -30,7 +30,7 @@ velox_add_library( velox_link_libraries( velox_dwio_native_parquet_reader velox_dwio_parquet_thrift - velox_dwio_native_parquet_common + velox_dwio_parquet_common velox_type velox_dwio_common velox_dwio_common_compression diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 7b1b61925843c..81f429c59a5a0 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -603,11 +603,11 @@ int32_t PageReader::getLengthsAndNulls( uint64_t* nulls, int32_t nullsStartIndex) const { ValidityBitmapInputOutput bits; - bits.values_read_upper_bound = maxItems; - bits.values_read = 0; - bits.null_count = 0; - bits.valid_bits = reinterpret_cast<uint8_t*>(nulls); - bits.valid_bits_offset = nullsStartIndex; + bits.valuesReadUpperBound = maxItems; + bits.valuesRead = 0; + bits.nullCount = 0; + bits.validBits = reinterpret_cast<uint8_t*>(nulls); + bits.validBitsOffset = nullsStartIndex; switch (mode) { case LevelMode::kNulls: @@ -623,7 +623,7 @@ int32_t PageReader::getLengthsAndNulls( &bits, lengths); // Convert offsets to lengths. - for (auto i = 0; i < bits.values_read; ++i) { + for (auto i = 0; i < bits.valuesRead; ++i) { lengths[i] = lengths[i + 1] - lengths[i]; } break; @@ -638,7 +638,7 @@ int32_t PageReader::getLengthsAndNulls( break; } } - return bits.values_read; + return bits.valuesRead; } void PageReader::makeDecoder() { diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index 0b808da74e9a5..28267a64367ad 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -174,7 +174,7 @@ void StructColumnReader::seekToEndOfPresetNulls() { } void StructColumnReader::setNullsFromRepDefs(PageReader& pageReader) { - if (levelInfo_.def_level == 0) { + if (levelInfo_.defLevel == 0) { return; } auto repDefRange = pageReader.repDefRange(); diff --git a/velox/dwio/parquet/tests/common/CMakeLists.txt b/velox/dwio/parquet/tests/common/CMakeLists.txt index 0e79ad0f803b3..c738566d6120c 100644 --- a/velox/dwio/parquet/tests/common/CMakeLists.txt +++ b/velox/dwio/parquet/tests/common/CMakeLists.txt @@ -18,6 +18,7 @@ add_executable(velox_dwio_parquet_common_test ThriftTransportTest.cpp add_test(velox_dwio_parquet_common_test velox_dwio_parquet_common_test) target_link_libraries( velox_dwio_parquet_common_test + velox_dwio_parquet_common arrow thrift velox_link_libs diff --git a/velox/dwio/parquet/tests/common/LevelConversionTest.cpp b/velox/dwio/parquet/tests/common/LevelConversionTest.cpp index 777a0e876dd89..2cd2f0ef660b0 100644 --- a/velox/dwio/parquet/tests/common/LevelConversionTest.cpp +++ b/velox/dwio/parquet/tests/common/LevelConversionTest.cpp @@ -18,8 +18,8 @@ #include "velox/dwio/parquet/common/LevelConversion.h" +#include "velox/common/base/Exceptions.h" #include "velox/dwio/parquet/common/LevelComparison.h" -#include "velox/dwio/parquet/writer/arrow/tests/TestUtil.h" #include <gmock/gmock.h> #include <gtest/gtest.h> @@ -32,73 +32,73 @@ #include "arrow/util/bitmap.h" #include "arrow/util/ubsan.h" -namespace facebook::velox::parquet::arrow { -namespace internal { +namespace facebook::velox::parquet { +namespace { using ::arrow::internal::Bitmap; using ::testing::ElementsAreArray; -std::string BitmapToString(const uint8_t* bitmap, int64_t bit_count) { - return ::arrow::internal::Bitmap(bitmap, /*offset*/ 0, /*length=*/bit_count) +std::string BitmapToString(const uint8_t* bitmap, int64_t bitCount) { + return ::arrow::internal::Bitmap(bitmap, /*offset*/ 0, /*length=*/bitCount) .ToString(); } std::string BitmapToString( const std::vector<uint8_t>& bitmap, - int64_t bit_count) { - return BitmapToString(bitmap.data(), bit_count); + int64_t bitCount) { + return BitmapToString(bitmap.data(), bitCount); } TEST(TestColumnReader, DefLevelsToBitmap) { // Bugs in this function were exposed in ARROW-3930 - std::vector<int16_t> def_levels = {3, 3, 3, 2, 3, 3, 3, 3, 3}; + std::vector<int16_t> defLevels = {3, 3, 3, 2, 3, 3, 3, 3, 3}; - std::vector<uint8_t> valid_bits(2, 0); + std::vector<uint8_t> validBits(2, 0); - LevelInfo level_info; - level_info.def_level = 3; - level_info.rep_level = 1; + LevelInfo levelInfo; + levelInfo.defLevel = 3; + levelInfo.repLevel = 1; ValidityBitmapInputOutput io; - io.values_read_upper_bound = def_levels.size(); - io.values_read = -1; - io.valid_bits = valid_bits.data(); + io.valuesReadUpperBound = defLevels.size(); + io.valuesRead = -1; + io.validBits = validBits.data(); - DefLevelsToBitmap(def_levels.data(), 9, level_info, &io); - ASSERT_EQ(9, io.values_read); - ASSERT_EQ(1, io.null_count); + DefLevelsToBitmap(defLevels.data(), 9, levelInfo, &io); + ASSERT_EQ(9, io.valuesRead); + ASSERT_EQ(1, io.nullCount); - // Call again with 0 definition levels, make sure that valid_bits is + // Call again with 0 definition levels, make sure that validBits is // unmodified - const uint8_t current_byte = valid_bits[1]; - io.null_count = 0; - DefLevelsToBitmap(def_levels.data(), 0, level_info, &io); + const uint8_t current_byte = validBits[1]; + io.nullCount = 0; + DefLevelsToBitmap(defLevels.data(), 0, levelInfo, &io); - ASSERT_EQ(0, io.values_read); - ASSERT_EQ(0, io.null_count); - ASSERT_EQ(current_byte, valid_bits[1]); + ASSERT_EQ(0, io.valuesRead); + ASSERT_EQ(0, io.nullCount); + ASSERT_EQ(current_byte, validBits[1]); } TEST(TestColumnReader, DefLevelsToBitmapPowerOfTwo) { // PARQUET-1623: Invalid memory access when decoding a valid bits vector that // has a length equal to a power of two and also using a non-zero - // valid_bits_offset. This should not fail when run with ASAN or valgrind. - std::vector<int16_t> def_levels = {3, 3, 3, 2, 3, 3, 3, 3}; - std::vector<uint8_t> valid_bits(1, 0); + // validBitsOffset. This should not fail when run with ASAN or valgrind. + std::vector<int16_t> defLevels = {3, 3, 3, 2, 3, 3, 3, 3}; + std::vector<uint8_t> validBits(1, 0); - LevelInfo level_info; - level_info.rep_level = 1; - level_info.def_level = 3; + LevelInfo levelInfo; + levelInfo.repLevel = 1; + levelInfo.defLevel = 3; ValidityBitmapInputOutput io; - io.values_read_upper_bound = def_levels.size(); - io.values_read = -1; - io.valid_bits = valid_bits.data(); + io.valuesReadUpperBound = defLevels.size(); + io.valuesRead = -1; + io.validBits = validBits.data(); // Read the latter half of the validity bitmap - DefLevelsToBitmap(def_levels.data() + 4, 4, level_info, &io); - ASSERT_EQ(4, io.values_read); - ASSERT_EQ(0, io.null_count); + DefLevelsToBitmap(defLevels.data() + 4, 4, levelInfo, &io); + ASSERT_EQ(4, io.valuesRead); + ASSERT_EQ(0, io.nullCount); } #if defined(ARROW_LITTLE_ENDIAN) @@ -126,33 +126,33 @@ TEST(DefLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) { std::vector<uint8_t> validity_bitmap(/*count*/ 8, 0); ValidityBitmapInputOutput io; - io.values_read_upper_bound = 64; - io.values_read = 1; - io.null_count = 5; - io.valid_bits = validity_bitmap.data(); - io.valid_bits_offset = 1; - - LevelInfo level_info; - level_info.repeated_ancestor_def_level = 1; - level_info.def_level = 2; - level_info.rep_level = 1; + io.valuesReadUpperBound = 64; + io.valuesRead = 1; + io.nullCount = 5; + io.validBits = validity_bitmap.data(); + io.validBitsOffset = 1; + + LevelInfo levelInfo; + levelInfo.repeatedAncestorDefLevel = 1; + levelInfo.defLevel = 2; + levelInfo.repLevel = 1; // All zeros should be ignored, ones should be unset in the bitmp and 2 should // be set. - std::vector<int16_t> def_levels = {0, 0, 0, 2, 2, 1, 0, 2}; - DefLevelsToBitmap(def_levels.data(), def_levels.size(), level_info, &io); + std::vector<int16_t> defLevels = {0, 0, 0, 2, 2, 1, 0, 2}; + DefLevelsToBitmap(defLevels.data(), defLevels.size(), levelInfo, &io); - EXPECT_EQ(BitmapToString(validity_bitmap, /*bit_count=*/8), "01101000"); + EXPECT_EQ(BitmapToString(validity_bitmap, /*bitCount=*/8), "01101000"); for (size_t x = 1; x < validity_bitmap.size(); x++) { EXPECT_EQ(validity_bitmap[x], 0) << "index: " << x; } - EXPECT_EQ(io.null_count, /*5 + 1 =*/6); - EXPECT_EQ(io.values_read, 4); // value should get overwritten. + EXPECT_EQ(io.nullCount, /*5 + 1 =*/6); + EXPECT_EQ(io.valuesRead, 4); // value should get overwritten. } struct MultiLevelTestData { public: - std::vector<int16_t> def_levels; - std::vector<int16_t> rep_levels; + std::vector<int16_t> defLevels; + std::vector<int16_t> repLevels; }; MultiLevelTestData TriplyNestedList() { @@ -162,7 +162,7 @@ MultiLevelTestData TriplyNestedList() { // null, // [] return MultiLevelTestData{ - /*def_levels=*/std::vector<int16_t>{ + /*defLevels=*/std::vector<int16_t>{ 2, 7, 6, @@ -177,7 +177,7 @@ MultiLevelTestData TriplyNestedList() { 7, // second row 0, // third row 1}, - /*rep_levels=*/ + /*repLevels=*/ std::vector<int16_t>{ 0, 1, @@ -199,24 +199,24 @@ template <typename ConverterType> class NestedListTest : public testing::Test { public: void InitForLength(int length) { - this->validity_bits_.clear(); - this->validity_bits_.insert(this->validity_bits_.end(), length, 0); - validity_io_.valid_bits = validity_bits_.data(); - validity_io_.values_read_upper_bound = length; + this->validityBits_.clear(); + this->validityBits_.insert(this->validityBits_.end(), length, 0); + validityIo_.validBits = validityBits_.data(); + validityIo_.valuesReadUpperBound = length; offsets_.clear(); offsets_.insert(offsets_.end(), length + 1, 0); } typename ConverterType::OffsetsType* Run( - const MultiLevelTestData& test_data, - LevelInfo level_info) { + const MultiLevelTestData& testData, + LevelInfo levelInfo) { return this->converter_.ComputeListInfo( - test_data, level_info, &validity_io_, offsets_.data()); + testData, levelInfo, &validityIo_, offsets_.data()); } ConverterType converter_; - ValidityBitmapInputOutput validity_io_; - std::vector<uint8_t> validity_bits_; + ValidityBitmapInputOutput validityIo_; + std::vector<uint8_t> validityBits_; std::vector<typename ConverterType::OffsetsType> offsets_; }; @@ -224,18 +224,18 @@ template <typename IndexType> struct RepDefLevelConverter { using OffsetsType = IndexType; OffsetsType* ComputeListInfo( - const MultiLevelTestData& test_data, - LevelInfo level_info, + const MultiLevelTestData& testData, + LevelInfo levelInfo, ValidityBitmapInputOutput* output, IndexType* offsets) { DefRepLevelsToList( - test_data.def_levels.data(), - test_data.rep_levels.data(), - test_data.def_levels.size(), - level_info, + testData.defLevels.data(), + testData.repLevels.data(), + testData.defLevels.size(), + levelInfo, output, offsets); - return offsets + output->values_read; + return offsets + output->valuesRead; } }; @@ -250,21 +250,20 @@ TYPED_TEST(NestedListTest, OuterMostTest) { // null, // [] // -> 4 outer most lists (len(3), len(4), null, len(0)) - LevelInfo level_info; - level_info.rep_level = 1; - level_info.def_level = 2; + LevelInfo levelInfo; + levelInfo.repLevel = 1; + levelInfo.defLevel = 2; this->InitForLength(4); typename TypeParam::OffsetsType* next_position = - this->Run(TriplyNestedList(), level_info); + this->Run(TriplyNestedList(), levelInfo); EXPECT_EQ(next_position, this->offsets_.data() + 4); EXPECT_THAT(this->offsets_, testing::ElementsAre(0, 3, 7, 7, 7)); - EXPECT_EQ(this->validity_io_.values_read, 4); - EXPECT_EQ(this->validity_io_.null_count, 1); - EXPECT_EQ( - BitmapToString(this->validity_io_.valid_bits, /*length=*/4), "1101"); + EXPECT_EQ(this->validityIo_.valuesRead, 4); + EXPECT_EQ(this->validityIo_.nullCount, 1); + EXPECT_EQ(BitmapToString(this->validityIo_.validBits, /*length=*/4), "1101"); } TYPED_TEST(NestedListTest, MiddleListTest) { @@ -276,22 +275,22 @@ TYPED_TEST(NestedListTest, MiddleListTest) { // len(1), len(2), null, len(1), // N/A, // N/A - LevelInfo level_info; - level_info.rep_level = 2; - level_info.def_level = 4; - level_info.repeated_ancestor_def_level = 2; + LevelInfo levelInfo; + levelInfo.repLevel = 2; + levelInfo.defLevel = 4; + levelInfo.repeatedAncestorDefLevel = 2; this->InitForLength(7); typename TypeParam::OffsetsType* next_position = - this->Run(TriplyNestedList(), level_info); + this->Run(TriplyNestedList(), levelInfo); EXPECT_EQ(next_position, this->offsets_.data() + 7); EXPECT_THAT(this->offsets_, testing::ElementsAre(0, 0, 2, 2, 3, 5, 5, 6)); - EXPECT_EQ(this->validity_io_.values_read, 7); - EXPECT_EQ(this->validity_io_.null_count, 2); + EXPECT_EQ(this->validityIo_.valuesRead, 7); + EXPECT_EQ(this->validityIo_.nullCount, 2); EXPECT_EQ( - BitmapToString(this->validity_io_.valid_bits, /*length=*/7), "0111101"); + BitmapToString(this->validityIo_.validBits, /*length=*/7), "0111101"); } TYPED_TEST(NestedListTest, InnerMostListTest) { @@ -303,39 +302,39 @@ TYPED_TEST(NestedListTest, InnerMostListTest) { // len(0), [len(0), len(2)], N/A, len(1), // N/A, // N/A - LevelInfo level_info; - level_info.rep_level = 3; - level_info.def_level = 6; - level_info.repeated_ancestor_def_level = 4; + LevelInfo levelInfo; + levelInfo.repLevel = 3; + levelInfo.defLevel = 6; + levelInfo.repeatedAncestorDefLevel = 4; this->InitForLength(6); typename TypeParam::OffsetsType* next_position = - this->Run(TriplyNestedList(), level_info); + this->Run(TriplyNestedList(), levelInfo); EXPECT_EQ(next_position, this->offsets_.data() + 6); EXPECT_THAT(this->offsets_, testing::ElementsAre(0, 3, 3, 3, 3, 5, 6)); - EXPECT_EQ(this->validity_io_.values_read, 6); - EXPECT_EQ(this->validity_io_.null_count, 0); + EXPECT_EQ(this->validityIo_.valuesRead, 6); + EXPECT_EQ(this->validityIo_.nullCount, 0); EXPECT_EQ( - BitmapToString(this->validity_io_.valid_bits, /*length=*/6), "111111"); + BitmapToString(this->validityIo_.validBits, /*length=*/6), "111111"); } TYPED_TEST(NestedListTest, SimpleLongList) { - LevelInfo level_info; - level_info.rep_level = 1; - level_info.def_level = 2; - level_info.repeated_ancestor_def_level = 0; + LevelInfo levelInfo; + levelInfo.repLevel = 1; + levelInfo.defLevel = 2; + levelInfo.repeatedAncestorDefLevel = 0; - MultiLevelTestData test_data; + MultiLevelTestData testData; // No empty lists. - test_data.def_levels = std::vector<int16_t>(65 * 9, 2); + testData.defLevels = std::vector<int16_t>(65 * 9, 2); for (int x = 0; x < 65; x++) { - test_data.rep_levels.push_back(0); - test_data.rep_levels.insert( - test_data.rep_levels.end(), + testData.repLevels.push_back(0); + testData.repLevels.insert( + testData.repLevels.end(), 8, - /*rep_level=*/1); + /*repLevel=*/1); } std::vector<typename TypeParam::OffsetsType> expected_offsets(66, 0); @@ -344,15 +343,15 @@ TYPED_TEST(NestedListTest, SimpleLongList) { } this->InitForLength(65); typename TypeParam::OffsetsType* next_position = - this->Run(test_data, level_info); + this->Run(testData, levelInfo); EXPECT_EQ(next_position, this->offsets_.data() + 65); EXPECT_THAT(this->offsets_, testing::ElementsAreArray(expected_offsets)); - EXPECT_EQ(this->validity_io_.values_read, 65); - EXPECT_EQ(this->validity_io_.null_count, 0); + EXPECT_EQ(this->validityIo_.valuesRead, 65); + EXPECT_EQ(this->validityIo_.nullCount, 0); EXPECT_EQ( - BitmapToString(this->validity_io_.valid_bits, /*length=*/65), + BitmapToString(this->validityIo_.validBits, /*length=*/65), "11111111 " "11111111 " "11111111 " @@ -365,14 +364,14 @@ TYPED_TEST(NestedListTest, SimpleLongList) { } TYPED_TEST(NestedListTest, TestOverflow) { - LevelInfo level_info; - level_info.rep_level = 1; - level_info.def_level = 2; - level_info.repeated_ancestor_def_level = 0; + LevelInfo levelInfo; + levelInfo.repLevel = 1; + levelInfo.defLevel = 2; + levelInfo.repeatedAncestorDefLevel = 0; - MultiLevelTestData test_data; - test_data.def_levels = std::vector<int16_t>{2}; - test_data.rep_levels = std::vector<int16_t>{0}; + MultiLevelTestData testData; + testData.defLevels = std::vector<int16_t>{2}; + testData.repLevels = std::vector<int16_t>{0}; this->InitForLength(2); // Offsets is populated as the cumulative sum of all elements, @@ -382,18 +381,18 @@ TYPED_TEST(NestedListTest, TestOverflow) { std::numeric_limits<typename TypeParam::OffsetsType>::max(); this->offsets_[1] = std::numeric_limits<typename TypeParam::OffsetsType>::max(); - ASSERT_THROW(this->Run(test_data, level_info), ParquetException); + ASSERT_THROW(this->Run(testData, levelInfo), VeloxException); - ASSERT_THROW(this->Run(test_data, level_info), ParquetException); + ASSERT_THROW(this->Run(testData, levelInfo), VeloxException); // Same thing should happen if the list already existed. - test_data.rep_levels = std::vector<int16_t>{1}; - ASSERT_THROW(this->Run(test_data, level_info), ParquetException); + testData.repLevels = std::vector<int16_t>{1}; + ASSERT_THROW(this->Run(testData, levelInfo), VeloxException); // Should be OK because it shouldn't increment. - test_data.def_levels = std::vector<int16_t>{0}; - test_data.rep_levels = std::vector<int16_t>{0}; - this->Run(test_data, level_info); + testData.defLevels = std::vector<int16_t>{0}; + testData.repLevels = std::vector<int16_t>{0}; + this->Run(testData, levelInfo); } TEST(TestOnlyExtractBitsSoftware, BasicTest) { @@ -409,5 +408,5 @@ TEST(TestOnlyExtractBitsSoftware, BasicTest) { check(0xFECBDA9876543210ULL, 0xF00FF00FF00FF00FULL, 0xFBD87430ULL); } -} // namespace internal -} // namespace facebook::velox::parquet::arrow +} // namespace +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/reader/CMakeLists.txt b/velox/dwio/parquet/tests/reader/CMakeLists.txt index b58429d73e933..0b7edede4a137 100644 --- a/velox/dwio/parquet/tests/reader/CMakeLists.txt +++ b/velox/dwio/parquet/tests/reader/CMakeLists.txt @@ -59,7 +59,7 @@ add_test( COMMAND velox_dwio_parquet_reader_test WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries( - velox_dwio_parquet_reader_test velox_dwio_native_parquet_common + velox_dwio_parquet_reader_test velox_dwio_parquet_common velox_dwio_parquet_reader_benchmark_lib velox_link_libs) add_executable(velox_dwio_parquet_structure_decoder_test diff --git a/velox/dwio/parquet/writer/arrow/ArrowSchema.cpp b/velox/dwio/parquet/writer/arrow/ArrowSchema.cpp index 99949233e1e47..7738212c11f0e 100644 --- a/velox/dwio/parquet/writer/arrow/ArrowSchema.cpp +++ b/velox/dwio/parquet/writer/arrow/ArrowSchema.cpp @@ -64,7 +64,6 @@ using ParquetType = Type; namespace { - /// Increments levels according to the cardinality of node. void IncrementLevels(LevelInfo& current_levels, const schema::Node& node) { if (node.is_repeated()) { @@ -773,7 +772,7 @@ Status MapToSchemaField( out->level_info = current_levels; // At this point current levels contains the def level for this list, // we need to reset to the prior parent. - out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level; + out->level_info.repeatedAncestorDefLevel = repeated_ancestor_def_level; return Status::OK(); } @@ -867,7 +866,7 @@ Status ListToSchemaField( out->level_info = current_levels; // At this point current levels contains the def level for this list, // we need to reset to the prior parent. - out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level; + out->level_info.repeatedAncestorDefLevel = repeated_ancestor_def_level; return Status::OK(); } @@ -905,7 +904,7 @@ Status GroupToSchemaField( out->level_info = current_levels; // At this point current_levels contains this list as the def level, we need // to use the previous ancestor of this list. - out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level; + out->level_info.repeatedAncestorDefLevel = repeated_ancestor_def_level; return Status::OK(); } else { IncrementLevels(current_levels, node); @@ -966,7 +965,7 @@ Status NodeToSchemaField( out->level_info = current_levels; // At this point current_levels has consider this list the ancestor so // restore the actual ancestor. - out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level; + out->level_info.repeatedAncestorDefLevel = repeated_ancestor_def_level; return Status::OK(); } else { IncrementLevels(current_levels, node); diff --git a/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp b/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp index 33fd538c4c309..6c6b703c83469 100644 --- a/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp +++ b/velox/dwio/parquet/writer/arrow/ColumnWriter.cpp @@ -167,8 +167,8 @@ struct ValueBufferSlicer { LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) { LevelInfo level_info; - level_info.def_level = descr->max_definition_level(); - level_info.rep_level = descr->max_repetition_level(); + level_info.defLevel = descr->max_definition_level(); + level_info.repLevel = descr->max_repetition_level(); int16_t min_spaced_def_level = descr->max_definition_level(); const schema::Node* node = descr->schema_node().get(); @@ -178,7 +178,7 @@ LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) { } node = node->parent(); } - level_info.repeated_ancestor_def_level = min_spaced_def_level; + level_info.repeatedAncestorDefLevel = min_spaced_def_level; return level_info; } @@ -1615,8 +1615,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, // Leaf nulls are canonical when there is only a single null element after a // list and it is at the leaf. bool single_nullable_element = - (level_info_.def_level == - level_info_.repeated_ancestor_def_level + 1) && + (level_info_.defLevel == level_info_.repeatedAncestorDefLevel + 1) && leaf_field_nullable; bool maybe_parent_nulls = level_info_.HasNullableValues() && !single_nullable_element; @@ -1813,7 +1812,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, int64_t* out_spaced_values_to_write, int64_t* null_count) { if (bits_buffer_ == nullptr) { - if (level_info_.def_level == 0) { + if (level_info_.defLevel == 0) { // In this case def levels should be null and we only // need to output counts which will always be equal to // the batch size passed in (max def_level == 0 indicates @@ -1824,10 +1823,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, *null_count = 0; } else { for (int x = 0; x < batch_size; x++) { - *out_values_to_write += - def_levels[x] == level_info_.def_level ? 1 : 0; + *out_values_to_write += def_levels[x] == level_info_.defLevel ? 1 : 0; *out_spaced_values_to_write += - def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0; + def_levels[x] >= level_info_.repeatedAncestorDefLevel ? 1 : 0; } *null_count = batch_size - *out_values_to_write; } @@ -1842,12 +1840,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, bits_buffer_->ZeroPadding(); } ValidityBitmapInputOutput io; - io.valid_bits = bits_buffer_->mutable_data(); - io.values_read_upper_bound = batch_size; + io.validBits = bits_buffer_->mutable_data(); + io.valuesReadUpperBound = batch_size; DefLevelsToBitmap(def_levels, batch_size, level_info_, &io); - *out_values_to_write = io.values_read - io.null_count; - *out_spaced_values_to_write = io.values_read; - *null_count = io.null_count; + *out_values_to_write = io.valuesRead - io.nullCount; + *out_spaced_values_to_write = io.valuesRead; + *null_count = io.nullCount; } Result<std::shared_ptr<Array>> MaybeReplaceValidity( diff --git a/velox/dwio/parquet/writer/arrow/tests/CMakeLists.txt b/velox/dwio/parquet/writer/arrow/tests/CMakeLists.txt index 22064af5a83cc..044be2411bd08 100644 --- a/velox/dwio/parquet/writer/arrow/tests/CMakeLists.txt +++ b/velox/dwio/parquet/writer/arrow/tests/CMakeLists.txt @@ -33,7 +33,7 @@ add_test(velox_dwio_arrow_parquet_writer_test target_link_libraries( velox_dwio_arrow_parquet_writer_test velox_dwio_arrow_parquet_writer_test_lib - velox_dwio_native_parquet_common + velox_dwio_parquet_common GTest::gmock GTest::gtest GTest::gtest_main diff --git a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp index 5bf459d3dd045..ae9557e645bff 100644 --- a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp +++ b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.cpp @@ -718,8 +718,8 @@ class ColumnReaderImplBase { ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) : descr_(descr), - max_def_level_(descr->max_definition_level()), - max_rep_level_(descr->max_repetition_level()), + max_defLevel_(descr->max_definition_level()), + max_repLevel_(descr->max_repetition_level()), num_buffered_values_(0), num_decoded_values_(0), pool_(pool), @@ -741,28 +741,28 @@ class ColumnReaderImplBase { // Read up to batch_size values from the current data page into the // pre-allocated memory T*, leaving spaces for null entries according - // to the def_levels. + // to the defLevels. // // @returns: the number of values read into the out buffer int64_t ReadValuesSpaced( int64_t batch_size, T* out, - int64_t null_count, - uint8_t* valid_bits, - int64_t valid_bits_offset) { + int64_t nullCount, + uint8_t* validBits, + int64_t validBitsOffset) { return current_decoder_->DecodeSpaced( out, static_cast<int>(batch_size), - static_cast<int>(null_count), - valid_bits, - valid_bits_offset); + static_cast<int>(nullCount), + validBits, + validBitsOffset); } // Read multiple definition levels into preallocated memory // // Returns the number of decoded definition levels int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { - if (max_def_level_ == 0) { + if (max_defLevel_ == 0) { return 0; } return definition_level_decoder_.Decode( @@ -784,7 +784,7 @@ class ColumnReaderImplBase { // Read multiple repetition levels into preallocated memory // Returns the number of decoded repetition levels int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { - if (max_rep_level_ == 0) { + if (max_repLevel_ == 0) { return 0; } return repetition_level_decoder_.Decode( @@ -887,30 +887,30 @@ class ColumnReaderImplBase { // Data page Layout: Repetition Levels - Definition Levels - encoded values. // Levels are encoded as rle or bit-packed. // Init repetition levels - if (max_rep_level_ > 0) { - int32_t rep_levels_bytes = repetition_level_decoder_.SetData( + if (max_repLevel_ > 0) { + int32_t repLevels_bytes = repetition_level_decoder_.SetData( repetition_level_encoding, - max_rep_level_, + max_repLevel_, static_cast<int>(num_buffered_values_), buffer, max_size); - buffer += rep_levels_bytes; - levels_byte_size += rep_levels_bytes; - max_size -= rep_levels_bytes; + buffer += repLevels_bytes; + levels_byte_size += repLevels_bytes; + max_size -= repLevels_bytes; } - // TODO figure a way to set max_def_level_ to 0 + // TODO figure a way to set max_defLevel_ to 0 // if the initial value is invalid // Init definition levels - if (max_def_level_ > 0) { - int32_t def_levels_bytes = definition_level_decoder_.SetData( + if (max_defLevel_ > 0) { + int32_t defLevels_bytes = definition_level_decoder_.SetData( definition_level_encoding, - max_def_level_, + max_defLevel_, static_cast<int>(num_buffered_values_), buffer, max_size); - levels_byte_size += def_levels_bytes; - max_size -= def_levels_bytes; + levels_byte_size += defLevels_bytes; + max_size -= defLevels_bytes; } return levels_byte_size; @@ -933,22 +933,22 @@ class ColumnReaderImplBase { "Data page too small for levels (corrupt header?)"); } - if (max_rep_level_ > 0) { + if (max_repLevel_ > 0) { repetition_level_decoder_.SetDataV2( page.repetition_levels_byte_length(), - max_rep_level_, + max_repLevel_, static_cast<int>(num_buffered_values_), buffer); } - // ARROW-17453: Even if max_rep_level_ is 0, there may still be + // ARROW-17453: Even if max_repLevel_ is 0, there may still be // repetition level bytes written and/or reported in the header by // some writers (e.g. Athena) buffer += page.repetition_levels_byte_length(); - if (max_def_level_ > 0) { + if (max_defLevel_ > 0) { definition_level_decoder_.SetDataV2( page.definition_levels_byte_length(), - max_def_level_, + max_defLevel_, static_cast<int>(num_buffered_values_), buffer); } @@ -1038,8 +1038,8 @@ class ColumnReaderImplBase { } const ColumnDescriptor* descr_; - const int16_t max_def_level_; - const int16_t max_rep_level_; + const int16_t max_defLevel_; + const int16_t max_repLevel_; std::unique_ptr<PageReader> pager_; std::shared_ptr<Page> current_page_; @@ -1108,21 +1108,21 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>, int64_t ReadBatch( int64_t batch_size, - int16_t* def_levels, - int16_t* rep_levels, + int16_t* defLevels, + int16_t* repLevels, T* values, - int64_t* values_read) override; + int64_t* valuesRead) override; int64_t ReadBatchSpaced( int64_t batch_size, - int16_t* def_levels, - int16_t* rep_levels, + int16_t* defLevels, + int16_t* repLevels, T* values, - uint8_t* valid_bits, - int64_t valid_bits_offset, + uint8_t* validBits, + int64_t validBitsOffset, int64_t* levels_read, - int64_t* values_read, - int64_t* null_count) override; + int64_t* valuesRead, + int64_t* nullCount) override; int64_t Skip(int64_t num_values_to_skip) override; @@ -1140,8 +1140,8 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>, int64_t ReadBatchWithDictionary( int64_t batch_size, - int16_t* def_levels, - int16_t* rep_levels, + int16_t* defLevels, + int16_t* repLevels, int32_t* indices, int64_t* indices_read, const T** dict, @@ -1183,20 +1183,20 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>, // values. void ReadLevels( int64_t batch_size, - int16_t* def_levels, - int16_t* rep_levels, - int64_t* num_def_levels, + int16_t* defLevels, + int16_t* repLevels, + int64_t* num_defLevels, int64_t* values_to_read) { batch_size = std::min( batch_size, this->num_buffered_values_ - this->num_decoded_values_); // If the field is required and non-repeated, there are no definition levels - if (this->max_def_level_ > 0 && def_levels != nullptr) { - *num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); + if (this->max_defLevel_ > 0 && defLevels != nullptr) { + *num_defLevels = this->ReadDefinitionLevels(batch_size, defLevels); // TODO(wesm): this tallying of values-to-decode can be performed with // better cache-efficiency if fused with the level decoding. - for (int64_t i = 0; i < *num_def_levels; ++i) { - if (def_levels[i] == this->max_def_level_) { + for (int64_t i = 0; i < *num_defLevels; ++i) { + if (defLevels[i] == this->max_defLevel_) { ++(*values_to_read); } } @@ -1206,10 +1206,9 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>, } // Not present for non-repeated fields - if (this->max_rep_level_ > 0 && rep_levels != nullptr) { - int64_t num_rep_levels = - this->ReadRepetitionLevels(batch_size, rep_levels); - if (def_levels != nullptr && *num_def_levels != num_rep_levels) { + if (this->max_repLevel_ > 0 && repLevels != nullptr) { + int64_t num_repLevels = this->ReadRepetitionLevels(batch_size, repLevels); + if (defLevels != nullptr && *num_defLevels != num_repLevels) { throw ParquetException( "Number of decoded rep / def levels did not match"); } @@ -1220,8 +1219,8 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>, template <typename DType> int64_t TypedColumnReaderImpl<DType>::ReadBatchWithDictionary( int64_t batch_size, - int16_t* def_levels, - int16_t* rep_levels, + int16_t* defLevels, + int16_t* repLevels, int32_t* indices, int64_t* indices_read, const T** dict, @@ -1251,14 +1250,14 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchWithDictionary( } // Similar logic as ReadValues to get def levels and rep levels. - int64_t num_def_levels = 0; + int64_t num_defLevels = 0; int64_t indices_to_read = 0; ReadLevels( - batch_size, def_levels, rep_levels, &num_def_levels, &indices_to_read); + batch_size, defLevels, repLevels, &num_defLevels, &indices_to_read); // Read dictionary indices. *indices_read = ReadDictionaryIndices(indices_to_read, indices); - int64_t total_indices = std::max<int64_t>(num_def_levels, *indices_read); + int64_t total_indices = std::max<int64_t>(num_defLevels, *indices_read); // Some callers use a batch size of 0 just to get the dictionary. int64_t expected_values = std::min( batch_size, this->num_buffered_values_ - this->num_decoded_values_); @@ -1275,25 +1274,24 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchWithDictionary( template <typename DType> int64_t TypedColumnReaderImpl<DType>::ReadBatch( int64_t batch_size, - int16_t* def_levels, - int16_t* rep_levels, + int16_t* defLevels, + int16_t* repLevels, T* values, - int64_t* values_read) { + int64_t* valuesRead) { // HasNext invokes ReadNewPage if (!HasNext()) { - *values_read = 0; + *valuesRead = 0; return 0; } // TODO(wesm): keep reading data pages until batch_size is reached, or the // row group is finished - int64_t num_def_levels = 0; + int64_t num_defLevels = 0; int64_t values_to_read = 0; - ReadLevels( - batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read); + ReadLevels(batch_size, defLevels, repLevels, &num_defLevels, &values_to_read); - *values_read = this->ReadValues(values_to_read, values); - int64_t total_values = std::max<int64_t>(num_def_levels, *values_read); + *valuesRead = this->ReadValues(values_to_read, values); + int64_t total_values = std::max<int64_t>(num_defLevels, *valuesRead); int64_t expected_values = std::min( batch_size, this->num_buffered_values_ - this->num_decoded_values_); if (total_values == 0 && expected_values > 0) { @@ -1309,19 +1307,19 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch( template <typename DType> int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced( int64_t batch_size, - int16_t* def_levels, - int16_t* rep_levels, + int16_t* defLevels, + int16_t* repLevels, T* values, - uint8_t* valid_bits, - int64_t valid_bits_offset, + uint8_t* validBits, + int64_t validBitsOffset, int64_t* levels_read, - int64_t* values_read, - int64_t* null_count_out) { + int64_t* valuesRead, + int64_t* nullCount_out) { // HasNext invokes ReadNewPage if (!HasNext()) { *levels_read = 0; - *values_read = 0; - *null_count_out = 0; + *valuesRead = 0; + *nullCount_out = 0; return 0; } @@ -1332,71 +1330,70 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced( batch_size, this->num_buffered_values_ - this->num_decoded_values_); // If the field is required and non-repeated, there are no definition levels - if (this->max_def_level_ > 0) { - int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); + if (this->max_defLevel_ > 0) { + int64_t num_defLevels = this->ReadDefinitionLevels(batch_size, defLevels); // Not present for non-repeated fields - if (this->max_rep_level_ > 0) { - int64_t num_rep_levels = - this->ReadRepetitionLevels(batch_size, rep_levels); - if (num_def_levels != num_rep_levels) { + if (this->max_repLevel_ > 0) { + int64_t num_repLevels = this->ReadRepetitionLevels(batch_size, repLevels); + if (num_defLevels != num_repLevels) { throw ParquetException( "Number of decoded rep / def levels did not match"); } } const bool has_spaced_values = HasSpacedValues(this->descr_); - int64_t null_count = 0; + int64_t nullCount = 0; if (!has_spaced_values) { int values_to_read = 0; - for (int64_t i = 0; i < num_def_levels; ++i) { - if (def_levels[i] == this->max_def_level_) { + for (int64_t i = 0; i < num_defLevels; ++i) { + if (defLevels[i] == this->max_defLevel_) { ++values_to_read; } } total_values = this->ReadValues(values_to_read, values); ::arrow::bit_util::SetBitsTo( - valid_bits, - valid_bits_offset, + validBits, + validBitsOffset, /*length=*/total_values, /*bits_are_set=*/true); - *values_read = total_values; + *valuesRead = total_values; } else { LevelInfo info; - info.repeated_ancestor_def_level = this->max_def_level_ - 1; - info.def_level = this->max_def_level_; - info.rep_level = this->max_rep_level_; + info.repeatedAncestorDefLevel = this->max_defLevel_ - 1; + info.defLevel = this->max_defLevel_; + info.repLevel = this->max_repLevel_; ValidityBitmapInputOutput validity_io; - validity_io.values_read_upper_bound = num_def_levels; - validity_io.valid_bits = valid_bits; - validity_io.valid_bits_offset = valid_bits_offset; - validity_io.null_count = null_count; - validity_io.values_read = *values_read; + validity_io.valuesReadUpperBound = num_defLevels; + validity_io.validBits = validBits; + validity_io.validBitsOffset = validBitsOffset; + validity_io.nullCount = nullCount; + validity_io.valuesRead = *valuesRead; - DefLevelsToBitmap(def_levels, num_def_levels, info, &validity_io); - null_count = validity_io.null_count; - *values_read = validity_io.values_read; + DefLevelsToBitmap(defLevels, num_defLevels, info, &validity_io); + nullCount = validity_io.nullCount; + *valuesRead = validity_io.valuesRead; total_values = this->ReadValuesSpaced( - *values_read, + *valuesRead, values, - static_cast<int>(null_count), - valid_bits, - valid_bits_offset); + static_cast<int>(nullCount), + validBits, + validBitsOffset); } - *levels_read = num_def_levels; - *null_count_out = null_count; + *levels_read = num_defLevels; + *nullCount_out = nullCount; } else { // Required field, read all values total_values = this->ReadValues(batch_size, values); ::arrow::bit_util::SetBitsTo( - valid_bits, - valid_bits_offset, + validBits, + validBitsOffset, /*length=*/total_values, /*bits_are_set=*/true); - *null_count_out = 0; - *values_read = total_values; + *nullCount_out = 0; + *valuesRead = total_values; *levels_read = total_values; } @@ -1428,19 +1425,19 @@ int64_t TypedColumnReaderImpl<DType>::Skip(int64_t num_values_to_skip) { } else { // We need to read this Page // Jump to the right offset in the Page - int64_t values_read = 0; + int64_t valuesRead = 0; InitScratchForSkip(); ARROW_DCHECK_NE(this->scratch_for_skip_, nullptr); do { int64_t batch_size = std::min(kSkipScratchBatchSize, values_to_skip); - values_read = ReadBatch( + valuesRead = ReadBatch( static_cast<int>(batch_size), reinterpret_cast<int16_t*>(this->scratch_for_skip_->mutable_data()), reinterpret_cast<int16_t*>(this->scratch_for_skip_->mutable_data()), reinterpret_cast<T*>(this->scratch_for_skip_->mutable_data()), - &values_read); - values_to_skip -= values_read; - } while (values_read > 0 && values_to_skip > 0); + &valuesRead); + values_to_skip -= valuesRead; + } while (valuesRead > 0 && values_to_skip > 0); } } return num_values_to_skip - values_to_skip; @@ -1577,23 +1574,23 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, break; } - if (this->max_def_level_ > 0) { + if (this->max_defLevel_ > 0) { ReserveLevels(batch_size); - int16_t* def_levels = this->def_levels() + levels_written_; - int16_t* rep_levels = this->rep_levels() + levels_written_; + int16_t* defLevels = this->def_levels() + levels_written_; + int16_t* repLevels = this->rep_levels() + levels_written_; // Not present for non-repeated fields int64_t levels_read = 0; - if (this->max_rep_level_ > 0) { - levels_read = this->ReadDefinitionLevels(batch_size, def_levels); - if (this->ReadRepetitionLevels(batch_size, rep_levels) != + if (this->max_repLevel_ > 0) { + levels_read = this->ReadDefinitionLevels(batch_size, defLevels); + if (this->ReadRepetitionLevels(batch_size, repLevels) != levels_read) { throw ParquetException( "Number of decoded rep / def levels did not match"); } - } else if (this->max_def_level_ > 0) { - levels_read = this->ReadDefinitionLevels(batch_size, def_levels); + } else if (this->max_defLevel_ > 0) { + levels_read = this->ReadDefinitionLevels(batch_size, defLevels); } // Exhausted column chunk @@ -1620,7 +1617,7 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, void ThrowAwayLevels(int64_t start_levels_position) { ARROW_DCHECK_LE(levels_position_, levels_written_); ARROW_DCHECK_LE(start_levels_position, levels_position_); - ARROW_DCHECK_GT(this->max_def_level_, 0); + ARROW_DCHECK_GT(this->max_defLevel_, 0); ARROW_DCHECK_NE(def_levels_, nullptr); int64_t gap = levels_position_ - start_levels_position; @@ -1642,7 +1639,7 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, left_shift(def_levels_.get()); - if (this->max_rep_level_ > 0) { + if (this->max_repLevel_ > 0) { ARROW_DCHECK_NE(rep_levels_, nullptr); left_shift(rep_levels_.get()); } @@ -1655,7 +1652,7 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, // Skip records that we have in our buffer. This function is only for // non-repeated fields. int64_t SkipRecordsInBufferNonRepeated(int64_t num_records) { - ARROW_DCHECK_EQ(this->max_rep_level_, 0); + ARROW_DCHECK_EQ(this->max_repLevel_, 0); if (!this->has_values_to_process() || num_records == 0) return 0; @@ -1668,21 +1665,21 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, // We skipped the levels by incrementing 'levels_position_'. For values // we do not have a buffer, so we need to read them and throw them away. // First we need to figure out how many present/not-null values there are. - std::shared_ptr<::arrow::ResizableBuffer> valid_bits; - valid_bits = AllocateBuffer(this->pool_); - PARQUET_THROW_NOT_OK(valid_bits->Resize( + std::shared_ptr<::arrow::ResizableBuffer> validBits; + validBits = AllocateBuffer(this->pool_); + PARQUET_THROW_NOT_OK(validBits->Resize( ::arrow::bit_util::BytesForBits(skipped_records), /*shrink_to_fit=*/true)); ValidityBitmapInputOutput validity_io; - validity_io.values_read_upper_bound = skipped_records; - validity_io.valid_bits = valid_bits->mutable_data(); - validity_io.valid_bits_offset = 0; + validity_io.valuesReadUpperBound = skipped_records; + validity_io.validBits = validBits->mutable_data(); + validity_io.validBitsOffset = 0; DefLevelsToBitmap( def_levels() + start_levels_position, skipped_records, this->leaf_info_, &validity_io); - int64_t values_to_read = validity_io.values_read - validity_io.null_count; + int64_t values_to_read = validity_io.valuesRead - validity_io.nullCount; // Now that we have figured out number of values to read, we do not need // these levels anymore. We will remove these values from the buffer. @@ -1708,8 +1705,8 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, if (num_records == 0) return 0; // Look at the buffered levels, delimit them based on - // (rep_level == 0), report back how many records are in there, and - // fill in how many not-null values (def_level == max_def_level_). + // (repLevel == 0), report back how many records are in there, and + // fill in how many not-null values (defLevel == max_defLevel_). // DelimitRecords updates levels_position_. int64_t start_levels_position = levels_position_; int64_t values_seen = 0; @@ -1730,7 +1727,7 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, // reach the desired number of records or we run out of values in the column // chunk. Returns number of skipped records. int64_t SkipRecordsRepeated(int64_t num_records) { - ARROW_DCHECK_GT(this->max_rep_level_, 0); + ARROW_DCHECK_GT(this->max_repLevel_, 0); int64_t skipped_records = 0; // First consume what is in the buffer. @@ -1770,15 +1767,15 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, } // For skipping we will read the levels and append them to the end - // of the def_levels and rep_levels just like for read. + // of the defLevels and repLevels just like for read. ReserveLevels(batch_size); - int16_t* def_levels = this->def_levels() + levels_written_; - int16_t* rep_levels = this->rep_levels() + levels_written_; + int16_t* defLevels = this->def_levels() + levels_written_; + int16_t* repLevels = this->rep_levels() + levels_written_; int64_t levels_read = 0; - levels_read = this->ReadDefinitionLevels(batch_size, def_levels); - if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { + levels_read = this->ReadDefinitionLevels(batch_size, defLevels); + if (this->ReadRepetitionLevels(batch_size, repLevels) != levels_read) { throw ParquetException( "Number of decoded rep / def levels did not match"); } @@ -1796,7 +1793,7 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, // Throws an error if it could not read 'num_values'. void ReadAndThrowAwayValues(int64_t num_values) { int64_t values_left = num_values; - int64_t values_read = 0; + int64_t valuesRead = 0; // Allocate enough scratch space to accommodate 16-bit levels or any // value type @@ -1805,11 +1802,11 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, do { int64_t batch_size = std::min<int64_t>(kSkipScratchBatchSize, values_left); - values_read = this->ReadValues( + valuesRead = this->ReadValues( batch_size, reinterpret_cast<T*>(this->scratch_for_skip_->mutable_data())); - values_left -= values_read; - } while (values_read > 0 && values_left > 0); + values_left -= valuesRead; + } while (valuesRead > 0 && values_left > 0); if (values_left > 0) { std::stringstream ss; ss << "Could not read and throw away " << num_values << " values"; @@ -1823,11 +1820,11 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, // Top level required field. Number of records equals to number of levels, // and there is not read-ahead for levels. - if (this->max_rep_level_ == 0 && this->max_def_level_ == 0) { + if (this->max_repLevel_ == 0 && this->max_defLevel_ == 0) { return this->Skip(num_records); } int64_t skipped_records = 0; - if (this->max_rep_level_ == 0) { + if (this->max_repLevel_ == 0) { // Non-repeated optional field. // First consume whatever is in the buffer. skipped_records = SkipRecordsInBufferNonRepeated(num_records); @@ -1887,15 +1884,15 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, int64_t values_to_read = 0; int64_t records_read = 0; - const int16_t* def_levels = this->def_levels() + levels_position_; - const int16_t* rep_levels = this->rep_levels() + levels_position_; + const int16_t* defLevels = this->def_levels() + levels_position_; + const int16_t* repLevels = this->rep_levels() + levels_position_; - ARROW_DCHECK_GT(this->max_rep_level_, 0); + ARROW_DCHECK_GT(this->max_repLevel_, 0); // Count logical records and number of values to read while (levels_position_ < levels_written_) { - const int16_t rep_level = *rep_levels++; - if (rep_level == 0) { + const int16_t repLevel = *repLevels++; + if (repLevel == 0) { // If at_record_start_ is true, we are seeing the start of a record // for the second time, such as after repeated calls to // DelimitRecords. In this case we must continue until we find @@ -1915,8 +1912,8 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, // must advance until we find another record boundary at_record_start_ = false; - const int16_t def_level = *def_levels++; - if (def_level == this->max_def_level_) { + const int16_t defLevel = *defLevels++; + if (defLevel == this->max_defLevel_) { ++values_to_read; } ++levels_position_; @@ -1948,7 +1945,7 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, } void ReserveLevels(int64_t extra_levels) { - if (this->max_def_level_ > 0) { + if (this->max_defLevel_ > 0) { const int64_t new_levels_capacity = UpdateCapacity(levels_capacity_, levels_written_, extra_levels); if (new_levels_capacity > levels_capacity_) { @@ -1960,7 +1957,7 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, } PARQUET_THROW_NOT_OK( def_levels_->Resize(capacity_in_bytes, /*shrink_to_fit=*/false)); - if (this->max_rep_level_ > 0) { + if (this->max_repLevel_ > 0) { PARQUET_THROW_NOT_OK( rep_levels_->Resize(capacity_in_bytes, /*shrink_to_fit=*/false)); } @@ -2030,16 +2027,16 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, this->decoders_.clear(); } - virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { - uint8_t* valid_bits = valid_bits_->mutable_data(); - const int64_t valid_bits_offset = values_written_; + virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t nullCount) { + uint8_t* validBits = valid_bits_->mutable_data(); + const int64_t validBitsOffset = values_written_; int64_t num_decoded = this->current_decoder_->DecodeSpaced( ValuesHead<T>(), static_cast<int>(values_with_nulls), - static_cast<int>(null_count), - valid_bits, - valid_bits_offset); + static_cast<int>(nullCount), + validBits, + validBitsOffset); CheckNumberDecoded(num_decoded, values_with_nulls); } @@ -2050,37 +2047,37 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, } // Reads repeated records and returns number of records read. Fills in - // values_to_read and null_count. + // values_to_read and nullCount. int64_t ReadRepeatedRecords( int64_t num_records, int64_t* values_to_read, - int64_t* null_count) { + int64_t* nullCount) { const int64_t start_levels_position = levels_position_; // Note that repeated records may be required or nullable. If they have // an optional parent in the path, they will be nullable, otherwise, // they are required. We use leaf_info_->HasNullableValues() that looks - // at repeated_ancestor_def_level to determine if it is required or + // at repeatedAncestorDefLevel to determine if it is required or // nullable. Even if they are required, we may have to read ahead and // delimit the records to get the right number of values and they will // have associated levels. int64_t records_read = DelimitRecords(num_records, values_to_read); if (!nullable_values() || read_dense_for_nullable_) { ReadValuesDense(*values_to_read); - // null_count is always 0 for required. - ARROW_DCHECK_EQ(*null_count, 0); + // nullCount is always 0 for required. + ARROW_DCHECK_EQ(*nullCount, 0); } else { ReadSpacedForOptionalOrRepeated( - start_levels_position, values_to_read, null_count); + start_levels_position, values_to_read, nullCount); } return records_read; } // Reads optional records and returns number of records read. Fills in - // values_to_read and null_count. + // values_to_read and nullCount. int64_t ReadOptionalRecords( int64_t num_records, int64_t* values_to_read, - int64_t* null_count) { + int64_t* nullCount) { const int64_t start_levels_position = levels_position_; // No repetition levels, skip delimiting logic. Each level represents a // null or not null entry @@ -2092,12 +2089,12 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, // Optional fields are always nullable. if (read_dense_for_nullable_) { ReadDenseForOptional(start_levels_position, values_to_read); - // We don't need to update null_count when reading dense. It should be + // We don't need to update nullCount when reading dense. It should be // already set to 0. - ARROW_DCHECK_EQ(*null_count, 0); + ARROW_DCHECK_EQ(*nullCount, 0); } else { ReadSpacedForOptionalOrRepeated( - start_levels_position, values_to_read, null_count); + start_levels_position, values_to_read, nullCount); } return records_read; } @@ -2120,9 +2117,9 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, ARROW_DCHECK_GE(levels_position_, start_levels_position); // When reading dense we need to figure out number of values to read. - const int16_t* def_levels = this->def_levels(); + const int16_t* defLevels = this->def_levels(); for (int64_t i = start_levels_position; i < levels_position_; ++i) { - if (def_levels[i] == this->max_def_level_) { + if (defLevels[i] == this->max_defLevel_) { ++(*values_to_read); } } @@ -2133,30 +2130,29 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, void ReadSpacedForOptionalOrRepeated( int64_t start_levels_position, int64_t* values_to_read, - int64_t* null_count) { + int64_t* nullCount) { // levels_position_ must already be incremented based on number of records // read. ARROW_DCHECK_GE(levels_position_, start_levels_position); ValidityBitmapInputOutput validity_io; - validity_io.values_read_upper_bound = - levels_position_ - start_levels_position; - validity_io.valid_bits = valid_bits_->mutable_data(); - validity_io.valid_bits_offset = values_written_; + validity_io.valuesReadUpperBound = levels_position_ - start_levels_position; + validity_io.validBits = valid_bits_->mutable_data(); + validity_io.validBitsOffset = values_written_; DefLevelsToBitmap( def_levels() + start_levels_position, levels_position_ - start_levels_position, leaf_info_, &validity_io); - *values_to_read = validity_io.values_read - validity_io.null_count; - *null_count = validity_io.null_count; + *values_to_read = validity_io.valuesRead - validity_io.nullCount; + *nullCount = validity_io.nullCount; ARROW_DCHECK_GE(*values_to_read, 0); - ARROW_DCHECK_GE(*null_count, 0); - ReadValuesSpaced(validity_io.values_read, *null_count); + ARROW_DCHECK_GE(*nullCount, 0); + ReadValuesSpaced(validity_io.valuesRead, *nullCount); } // Return number of logical records read. - // Updates levels_position_, values_written_, and null_count_. + // Updates levels_position_, values_written_, and nullCount_. int64_t ReadRecordData(int64_t num_records) { // Conservative upper bound const int64_t possible_num_values = @@ -2169,37 +2165,37 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, // types. int64_t records_read = 0; int64_t values_to_read = 0; - int64_t null_count = 0; - if (this->max_rep_level_ > 0) { + int64_t nullCount = 0; + if (this->max_repLevel_ > 0) { // Repeated fields may be nullable or not. // This call updates levels_position_. records_read = - ReadRepeatedRecords(num_records, &values_to_read, &null_count); - } else if (this->max_def_level_ > 0) { + ReadRepeatedRecords(num_records, &values_to_read, &nullCount); + } else if (this->max_defLevel_ > 0) { // Non-repeated optional values are always nullable. // This call updates levels_position_. ARROW_DCHECK(nullable_values()); records_read = - ReadOptionalRecords(num_records, &values_to_read, &null_count); + ReadOptionalRecords(num_records, &values_to_read, &nullCount); } else { ARROW_DCHECK(!nullable_values()); records_read = ReadRequiredRecords(num_records, &values_to_read); - // We don't need to update null_count, since it is 0. + // We don't need to update nullCount, since it is 0. } ARROW_DCHECK_GE(records_read, 0); ARROW_DCHECK_GE(values_to_read, 0); - ARROW_DCHECK_GE(null_count, 0); + ARROW_DCHECK_GE(nullCount, 0); if (read_dense_for_nullable_) { values_written_ += values_to_read; - ARROW_DCHECK_EQ(null_count, 0); + ARROW_DCHECK_EQ(nullCount, 0); } else { - values_written_ += values_to_read + null_count; - null_count_ += null_count; + values_written_ += values_to_read + nullCount; + null_count_ += nullCount; } // Total values, including null spaces, if any - if (this->max_def_level_ > 0) { + if (this->max_defLevel_ > 0) { // Optional, repeated, or some mix thereof this->ConsumeBufferedValues(levels_position_ - start_levels_position); } else { @@ -2211,24 +2207,24 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, } void DebugPrintState() override { - const int16_t* def_levels = this->def_levels(); - const int16_t* rep_levels = this->rep_levels(); + const int16_t* defLevels = this->def_levels(); + const int16_t* repLevels = this->rep_levels(); const int64_t total_levels_read = levels_position_; const T* vals = reinterpret_cast<const T*>(this->values()); - if (leaf_info_.def_level > 0) { + if (leaf_info_.defLevel > 0) { std::cout << "def levels: "; for (int64_t i = 0; i < total_levels_read; ++i) { - std::cout << def_levels[i] << " "; + std::cout << defLevels[i] << " "; } std::cout << std::endl; } - if (leaf_info_.rep_level > 0) { + if (leaf_info_.repLevel > 0) { std::cout << "rep levels: "; for (int64_t i = 0; i < total_levels_read; ++i) { - std::cout << rep_levels[i] << " "; + std::cout << repLevels[i] << " "; } std::cout << std::endl; } @@ -2301,21 +2297,21 @@ class FLBARecordReader : public TypedRecordReader<FLBAType>, ResetValues(); } - void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - uint8_t* valid_bits = valid_bits_->mutable_data(); - const int64_t valid_bits_offset = values_written_; + void ReadValuesSpaced(int64_t values_to_read, int64_t nullCount) override { + uint8_t* validBits = valid_bits_->mutable_data(); + const int64_t validBitsOffset = values_written_; auto values = ValuesHead<FLBA>(); int64_t num_decoded = this->current_decoder_->DecodeSpaced( values, static_cast<int>(values_to_read), - static_cast<int>(null_count), - valid_bits, - valid_bits_offset); + static_cast<int>(nullCount), + validBits, + validBitsOffset); ARROW_DCHECK_EQ(num_decoded, values_to_read); for (int64_t i = 0; i < num_decoded; i++) { - if (::arrow::bit_util::GetBit(valid_bits, valid_bits_offset + i)) { + if (::arrow::bit_util::GetBit(validBits, validBitsOffset + i)) { PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); } else { PARQUET_THROW_NOT_OK(builder_->AppendNull()); @@ -2363,14 +2359,14 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>, ResetValues(); } - void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + void ReadValuesSpaced(int64_t values_to_read, int64_t nullCount) override { int64_t num_decoded = this->current_decoder_->DecodeArrow( static_cast<int>(values_to_read), - static_cast<int>(null_count), + static_cast<int>(nullCount), valid_bits_->mutable_data(), values_written_, &accumulator_); - CheckNumberDecoded(num_decoded, values_to_read - null_count); + CheckNumberDecoded(num_decoded, values_to_read - nullCount); ResetValues(); } @@ -2444,21 +2440,21 @@ class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>, CheckNumberDecoded(num_decoded, values_to_read); } - void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + void ReadValuesSpaced(int64_t values_to_read, int64_t nullCount) override { int64_t num_decoded = 0; if (current_encoding_ == Encoding::RLE_DICTIONARY) { MaybeWriteNewDictionary(); auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); num_decoded = decoder->DecodeIndicesSpaced( static_cast<int>(values_to_read), - static_cast<int>(null_count), + static_cast<int>(nullCount), valid_bits_->mutable_data(), values_written_, &builder_); } else { num_decoded = this->current_decoder_->DecodeArrow( static_cast<int>(values_to_read), - static_cast<int>(null_count), + static_cast<int>(nullCount), valid_bits_->mutable_data(), values_written_, &builder_); @@ -2466,7 +2462,7 @@ class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>, /// Flush values since they have been copied into the builder ResetValues(); } - ARROW_DCHECK_EQ(num_decoded, values_to_read - null_count); + ARROW_DCHECK_EQ(num_decoded, values_to_read - nullCount); } private: diff --git a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h index b2cbf63d795a1..6eba405e20ec2 100644 --- a/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h +++ b/velox/dwio/parquet/writer/arrow/tests/ColumnReader.h @@ -23,6 +23,7 @@ #include <utility> #include <vector> +#include "velox/dwio/parquet/common/LevelConversion.h" #include "velox/dwio/parquet/writer/arrow/Exception.h" #include "velox/dwio/parquet/writer/arrow/Metadata.h" #include "velox/dwio/parquet/writer/arrow/Properties.h" @@ -30,7 +31,6 @@ #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h" #include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h" -#include "velox/dwio/parquet/common/LevelConversion.h" namespace arrow { diff --git a/velox/dwio/parquet/writer/arrow/tests/ColumnReaderTest.cpp b/velox/dwio/parquet/writer/arrow/tests/ColumnReaderTest.cpp index 8be18c40a8f72..e22c8fcad530c 100644 --- a/velox/dwio/parquet/writer/arrow/tests/ColumnReaderTest.cpp +++ b/velox/dwio/parquet/writer/arrow/tests/ColumnReaderTest.cpp @@ -819,8 +819,8 @@ namespace { LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) { LevelInfo level_info; - level_info.def_level = descr->max_definition_level(); - level_info.rep_level = descr->max_repetition_level(); + level_info.defLevel = descr->max_definition_level(); + level_info.repLevel = descr->max_repetition_level(); int16_t min_spaced_def_level = descr->max_definition_level(); const schema::Node* node = descr->schema_node().get(); @@ -830,7 +830,7 @@ LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) { } node = node->parent(); } - level_info.repeated_ancestor_def_level = min_spaced_def_level; + level_info.repeatedAncestorDefLevel = min_spaced_def_level; return level_info; } @@ -1863,8 +1863,8 @@ class FLBARecordReaderTest : public ::testing::TestWithParam<bool> { levels_per_page_ = levels_per_page; FLBA_type_length_ = FLBA_type_length; LevelInfo level_info; - level_info.def_level = 1; - level_info.rep_level = 0; + level_info.defLevel = 1; + level_info.repLevel = 0; NodePtr type = schema::PrimitiveNode::Make( "b", Repetition::OPTIONAL, @@ -1872,7 +1872,7 @@ class FLBARecordReaderTest : public ::testing::TestWithParam<bool> { ConvertedType::NONE, FLBA_type_length_); descr_ = std::make_unique<ColumnDescriptor>( - type, level_info.def_level, level_info.rep_level); + type, level_info.defLevel, level_info.repLevel); MakePages<FLBAType>( descr_.get(), num_pages, @@ -1975,11 +1975,11 @@ class ByteArrayRecordReaderTest : public ::testing::TestWithParam<bool> { void MakeRecordReader(int levels_per_page, int num_pages) { levels_per_page_ = levels_per_page; LevelInfo level_info; - level_info.def_level = 1; - level_info.rep_level = 0; + level_info.defLevel = 1; + level_info.repLevel = 0; NodePtr type = schema::ByteArray("b", Repetition::OPTIONAL); descr_ = std::make_unique<ColumnDescriptor>( - type, level_info.def_level, level_info.rep_level); + type, level_info.defLevel, level_info.repLevel); MakePages<ByteArrayType>( descr_.get(), num_pages, @@ -2134,21 +2134,20 @@ TEST_P(RecordReaderStressTest, StressTest) { // Define these boolean variables for improving readability below. bool repeated = false, required = false; if (GetParam() == Repetition::REQUIRED) { - level_info.def_level = 0; - level_info.rep_level = 0; + level_info.defLevel = 0; + level_info.repLevel = 0; required = true; } else if (GetParam() == Repetition::OPTIONAL) { - level_info.def_level = 1; - level_info.rep_level = 0; + level_info.defLevel = 1; + level_info.repLevel = 0; } else { - level_info.def_level = 1; - level_info.rep_level = 1; + level_info.defLevel = 1; + level_info.repLevel = 1; repeated = true; } NodePtr type = schema::Int32("b", GetParam()); - const ColumnDescriptor descr( - type, level_info.def_level, level_info.rep_level); + const ColumnDescriptor descr(type, level_info.defLevel, level_info.repLevel); auto seed1 = static_cast<uint32_t>(time(0)); std::default_random_engine gen(seed1); @@ -2234,7 +2233,7 @@ TEST_P(RecordReaderStressTest, StressTest) { } bool has_value = required || - (!required && def_levels[levels_index] == level_info.def_level); + (!required && def_levels[levels_index] == level_info.defLevel); // If we are not skipping, we need to update the expected values and // rep/defs. If we are skipping, we just keep going.