Skip to content

Commit

Permalink
refactor(parquet): Move arrow levelComparison to parquet common
Browse files Browse the repository at this point in the history
  • Loading branch information
majetideepak committed Dec 2, 2024
1 parent ac5c15e commit 3205afa
Show file tree
Hide file tree
Showing 30 changed files with 146 additions and 338 deletions.
2 changes: 1 addition & 1 deletion velox/dwio/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# limitations under the License.

if(VELOX_ENABLE_PARQUET)
add_subdirectory(thrift)
add_subdirectory(common)
add_subdirectory(reader)
add_subdirectory(thrift)
add_subdirectory(writer)

if(${VELOX_BUILD_TESTING})
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/parquet/common/BloomFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/dwio/parquet/common/BloomFilter.h"
#include "velox/dwio/parquet/common/XxHasher.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"
#include "velox/dwio/parquet/thrift/ThriftTransport.h"

#include <thrift/protocol/TCompactProtocol.h>
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/parquet/common/BloomFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "velox/dwio/common/BufferedInput.h"
#include "velox/dwio/common/OutputStream.h"
#include "velox/dwio/parquet/common/Hasher.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

#include <cmath>
#include <cstdint>
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/parquet/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(velox_dwio_native_parquet_common BloomFilter.cpp XxHasher.cpp)
add_library(
velox_dwio_native_parquet_common BloomFilter.cpp XxHasher.cpp
LevelComparison.cpp LevelConversion.cpp)

target_link_libraries(
velox_dwio_native_parquet_common
Expand Down
57 changes: 57 additions & 0 deletions velox/dwio/parquet/common/LevelComparison.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Adapted from Apache Arrow.

#include "velox/dwio/parquet/common/LevelComparison.h"
#include "arrow/util/endian.h"

namespace facebook::velox::parquet {
namespace {
template <typename Predicate>
inline uint64_t
LevelsToBitmap(const int16_t* levels, int64_t numLevels, Predicate predicate) {
// Both clang and GCC can vectorize this automatically with SSE4/AVX2.
uint64_t mask = 0;
for (int x = 0; x < numLevels; x++) {
mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
}
return ::arrow::bit_util::ToLittleEndian(mask);
}

inline MinMax FindMinMaxImpl(const int16_t* levels, int64_t numLevels) {
MinMax out{
std::numeric_limits<int16_t>::max(), std::numeric_limits<int16_t>::min()};
for (int x = 0; x < numLevels; x++) {
out.min = std::min(levels[x], out.min);
out.max = std::max(levels[x], out.max);
}
return out;
}

} // namespace

uint64_t
GreaterThanBitmap(const int16_t* levels, int64_t numLevels, int16_t rhs) {
return LevelsToBitmap(
levels, numLevels, [rhs](int16_t value) { return value > rhs; });
}

MinMax FindMinMax(const int16_t* levels, int64_t numLevels) {
return FindMinMaxImpl(levels, numLevels);
}

} // namespace facebook::velox::parquet
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@
#include <algorithm>
#include <cstdint>

#include "velox/dwio/parquet/writer/arrow/Platform.h"

namespace facebook::velox::parquet::arrow::internal {
namespace facebook::velox::parquet {

/// Builds a bitmap where each set bit indicates the corresponding level is
/// greater than rhs.
uint64_t PARQUET_EXPORT
GreaterThanBitmap(const int16_t* levels, int64_t num_levels, int16_t rhs);
uint64_t
GreaterThanBitmap(const int16_t* levels, int64_t numLevels, int16_t rhs);

struct MinMax {
int16_t min;
int16_t max;
};

MinMax FindMinMax(const int16_t* levels, int64_t num_levels);
MinMax FindMinMax(const int16_t* levels, int64_t numLevels);

} // namespace facebook::velox::parquet::arrow::internal
} // namespace facebook::velox::parquet
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,22 @@

// Adapted from Apache Arrow.

#include "velox/dwio/parquet/writer/arrow/LevelConversion.h"
#include "velox/dwio/parquet/common/LevelConversion.h"
#include "velox/dwio/parquet/common/LevelConversionUtil.h"

#include <algorithm>
#include <limits>
#include <optional>

#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_writer.h"
#include "arrow/util/cpu_info.h"
#include "arrow/util/logging.h"
#include "velox/dwio/parquet/writer/arrow/Exception.h"
#include "velox/common/base/Exceptions.h"
#include "velox/dwio/parquet/common/LevelComparison.h"

#include "velox/dwio/parquet/writer/arrow/LevelComparison.h"
#define PARQUET_IMPL_NAMESPACE standard
#include "velox/dwio/parquet/writer/arrow/LevelConversionInc.h"
#undef PARQUET_IMPL_NAMESPACE

namespace facebook::velox::parquet::arrow {
namespace facebook::velox::parquet {
namespace {

using ::arrow::internal::CpuInfo;
Expand Down Expand Up @@ -70,7 +68,7 @@ void DefRepLevelsToListInfo(
if (offsets != nullptr) {
if (ARROW_PREDICT_FALSE(
*offsets == std::numeric_limits<OffsetType>::max())) {
throw ParquetException("List index overflow.");
VELOX_FAIL("List index overflow.");
}
*offsets += 1;
}
Expand All @@ -80,10 +78,7 @@ void DefRepLevelsToListInfo(
valid_bits_writer->position() >=
output->values_read_upper_bound) ||
(offsets - orig_pos) >= output->values_read_upper_bound)) {
std::stringstream ss;
ss << "Definition levels exceeded upper bound: "
<< output->values_read_upper_bound;
throw ParquetException(ss.str());
VELOX_FAIL("Definition levels exceeded upper bound: {}", output->values_read_upper_bound);
}

// current_rep < list rep_level i.e. start of a list (ancestor empty lists
Expand All @@ -98,7 +93,7 @@ void DefRepLevelsToListInfo(
if (def_levels[x] >= level_info.def_level) {
if (ARROW_PREDICT_FALSE(
*offsets == std::numeric_limits<OffsetType>::max())) {
throw ParquetException("List index overflow.");
VELOX_FAIL("List index overflow.");
}
*offsets += 1;
}
Expand Down Expand Up @@ -126,23 +121,14 @@ void DefRepLevelsToListInfo(
output->values_read = valid_bits_writer->position();
}
if (output->null_count > 0 && level_info.null_slot_usage > 1) {
throw ParquetException(
VELOX_FAIL(
"Null values with null_slot_usage > 1 not supported."
"(i.e. FixedSizeLists with null values are not supported)");
}
}

} // namespace

#if defined(ARROW_HAVE_RUNTIME_BMI2)
// defined in level_conversion_bmi2.cc for dynamic dispatch.
void DefLevelsToBitmapBmi2WithRepeatedParent(
const int16_t* def_levels,
int64_t num_def_levels,
LevelInfo level_info,
ValidityBitmapInputOutput* output);
#endif

void DefLevelsToBitmap(
const int16_t* def_levels,
int64_t num_def_levels,
Expand All @@ -151,22 +137,16 @@ void DefLevelsToBitmap(
// It is simpler to rely on rep_level here until PARQUET-1899 is done and the
// code is deleted in a follow-up release.
if (level_info.rep_level > 0) {
#if defined(ARROW_HAVE_RUNTIME_BMI2)
if (CpuInfo::GetInstance()->HasEfficientBmi2()) {
return DefLevelsToBitmapBmi2WithRepeatedParent(
def_levels, num_def_levels, level_info, output);
}
#endif
internal::standard::DefLevelsToBitmapSimd</*has_repeated_parent=*/true>(
DefLevelsToBitmapSimd</*has_repeated_parent=*/true>(
def_levels, num_def_levels, level_info, output);
} else {
internal::standard::DefLevelsToBitmapSimd</*has_repeated_parent=*/false>(
DefLevelsToBitmapSimd</*has_repeated_parent=*/false>(
def_levels, num_def_levels, level_info, output);
}
}

uint64_t TestOnlyExtractBitsSoftware(uint64_t bitmap, uint64_t select_bitmap) {
return internal::standard::ExtractBitsSoftware(bitmap, select_bitmap);
return ExtractBitsSoftware(bitmap, select_bitmap);
}

void DefRepLevelsToList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@

#include <cstdint>

#include "arrow/util/endian.h"
#include "velox/dwio/parquet/writer/arrow/Platform.h"
#include "velox/dwio/parquet/writer/arrow/Schema.h"
namespace facebook::velox::parquet {

namespace facebook::velox::parquet::arrow {

struct PARQUET_EXPORT LevelInfo {
struct LevelInfo {
LevelInfo()
: null_slot_usage(1),
def_level(0),
Expand Down Expand Up @@ -97,19 +93,7 @@ struct PARQUET_EXPORT LevelInfo {
//
int16_t repeated_ancestor_def_level = 0;

/// Increments levels according to the cardinality of node.
void Increment(const schema::Node& node) {
if (node.is_repeated()) {
IncrementRepeated();
return;
}
if (node.is_optional()) {
IncrementOptional();
return;
}
}

/// Incremetns level for a optional node.
/// Increments level for a optional node.
void IncrementOptional() {
def_level++;
}
Expand All @@ -131,7 +115,7 @@ struct PARQUET_EXPORT LevelInfo {
repeated_ancestor_def_level = def_level;
return last_repeated_ancestor;
}

/*
friend std::ostream& operator<<(std::ostream& os, const LevelInfo& levels) {
// This print method is to silence valgrind issues. What's printed
// is not important because all asserts happen directly on
Expand All @@ -143,11 +127,11 @@ struct PARQUET_EXPORT LevelInfo {
}
os << "}";
return os;
}
}*/
};

// Input/Output structure for reconstructed validity bitmaps.
struct PARQUET_EXPORT ValidityBitmapInputOutput {
struct ValidityBitmapInputOutput {
// Input only.
// The maximum number of values_read expected (actual
// values read must be less than or equal to this value).
Expand All @@ -163,7 +147,7 @@ struct PARQUET_EXPORT ValidityBitmapInputOutput {
int64_t null_count = 0;
// Output only. The validity bitmap to populate. Maybe be null only
// for DefRepLevelsToListInfo (if all that is needed is list offsets).
uint8_t* valid_bits = NULLPTR;
uint8_t* valid_bits = nullptr;
// Input only, offset into valid_bits to start at.
int64_t valid_bits_offset = 0;
};
Expand All @@ -172,7 +156,7 @@ struct PARQUET_EXPORT ValidityBitmapInputOutput {
// have at least one member that is not a list and has no list descendents. For
// lists use DefRepLevelsToList and structs where all descendants contain a
// list use DefRepLevelsToBitmap.
void PARQUET_EXPORT DefLevelsToBitmap(
void DefLevelsToBitmap(
const int16_t* def_levels,
int64_t num_def_levels,
LevelInfo level_info,
Expand All @@ -186,14 +170,14 @@ void PARQUET_EXPORT DefLevelsToBitmap(
// reconstruction.
//
// Offsets must be sized to 1 + values_read_upper_bound.
void PARQUET_EXPORT DefRepLevelsToList(
void DefRepLevelsToList(
const int16_t* def_levels,
const int16_t* rep_levels,
int64_t num_def_levels,
LevelInfo level_info,
ValidityBitmapInputOutput* output,
int32_t* offsets);
void PARQUET_EXPORT DefRepLevelsToList(
void DefRepLevelsToList(
const int16_t* def_levels,
const int16_t* rep_levels,
int64_t num_def_levels,
Expand All @@ -204,16 +188,10 @@ void PARQUET_EXPORT DefRepLevelsToList(
// Reconstructs a validity bitmap for a struct every member is a list or has
// a list descendant. See documentation on DefLevelsToBitmap for when more
// details on this method compared to the other ones defined above.
void PARQUET_EXPORT DefRepLevelsToBitmap(
void DefRepLevelsToBitmap(
const int16_t* def_levels,
const int16_t* rep_levels,
int64_t num_def_levels,
LevelInfo level_info,
ValidityBitmapInputOutput* output);

// This is exposed to ensure we can properly test a software simulated pext
// function (i.e. it isn't hidden by runtime dispatch).
uint64_t PARQUET_EXPORT
TestOnlyExtractBitsSoftware(uint64_t bitmap, uint64_t selection);

} // namespace facebook::velox::parquet::arrow
} // namespace facebook::velox::parquet
Loading

0 comments on commit 3205afa

Please sign in to comment.