diff --git a/velox/dwio/parquet/CMakeLists.txt b/velox/dwio/parquet/CMakeLists.txt index dca38504baa0..8a6d3034f3c1 100644 --- a/velox/dwio/parquet/CMakeLists.txt +++ b/velox/dwio/parquet/CMakeLists.txt @@ -14,6 +14,7 @@ if(VELOX_ENABLE_PARQUET) add_subdirectory(thrift) + add_subdirectory(common) add_subdirectory(reader) add_subdirectory(writer) diff --git a/velox/dwio/parquet/common/BloomFilter.cpp b/velox/dwio/parquet/common/BloomFilter.cpp new file mode 100644 index 000000000000..17a01674f08f --- /dev/null +++ b/velox/dwio/parquet/common/BloomFilter.cpp @@ -0,0 +1,247 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#include "velox/dwio/parquet/common/BloomFilter.h" +#include "velox/dwio/parquet/common/XxHasher.h" +#include "velox/dwio/parquet/thrift/ThriftTransport.h" + +#include +#include + +#include +#include +#include + +namespace facebook::velox::parquet { + +constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock]; + +BlockSplitBloomFilter::BlockSplitBloomFilter(memory::MemoryPool* pool) + : pool_(pool), + hashStrategy_(HashStrategy::XXHASH), + algorithm_(Algorithm::BLOCK), + compressionStrategy_(CompressionStrategy::UNCOMPRESSED) {} + +void BlockSplitBloomFilter::init(uint32_t numBytes) { + if (numBytes < kMinimumBloomFilterBytes) { + numBytes = kMinimumBloomFilterBytes; + } + + // Get next power of 2 if it is not power of 2. + if ((numBytes & (numBytes - 1)) != 0) { + numBytes = static_cast(bits::nextPowerOfTwo(numBytes)); + } + + if (numBytes > kMaximumBloomFilterBytes) { + numBytes = kMaximumBloomFilterBytes; + } + + numBytes_ = numBytes; + data_ = AlignedBuffer::allocate(numBytes_, pool_); + memset(data_->asMutable(), 0, numBytes_); + + this->hasher_ = std::make_unique(); +} + +void BlockSplitBloomFilter::init(const uint8_t* bitset, uint32_t numBytes) { + VELOX_CHECK(bitset != nullptr); + + if (numBytes < kMinimumBloomFilterBytes || + numBytes > kMaximumBloomFilterBytes || (numBytes & (numBytes - 1)) != 0) { + VELOX_FAIL("Given length of bitset is illegal"); + } + + numBytes_ = numBytes; + data_ = AlignedBuffer::allocate(numBytes_, pool_); + memcpy(data_->asMutable(), bitset, numBytes_); + + this->hasher_ = std::make_unique(); +} + +static void validateBloomFilterHeader(const thrift::BloomFilterHeader& header) { + std::stringstream error; + if (!header.algorithm.__isset.BLOCK) { + error << "Unsupported Bloom filter algorithm: "; + error << header.algorithm; + VELOX_FAIL(error.str()); + } + + if (!header.hash.__isset.XXHASH) { + error << "Unsupported Bloom filter hash: ", error << header.hash; + VELOX_FAIL(error.str()); + } + + if (!header.compression.__isset.UNCOMPRESSED) { + error << "Unsupported Bloom filter compression: ", + error << header.compression; + VELOX_FAIL(error.str()); + } + + if (header.numBytes <= 0 || + static_cast(header.numBytes) > + BloomFilter::kMaximumBloomFilterBytes) { + error << "Bloom filter size is incorrect: " << header.numBytes + << ". Must be in range (" << 0 << ", " + << BloomFilter::kMaximumBloomFilterBytes << "]."; + VELOX_FAIL(error.str()); + } +} + +BlockSplitBloomFilter BlockSplitBloomFilter::deserialize( + dwio::common::SeekableInputStream* input, + memory::MemoryPool& pool) { + const void* headerBuffer; + int32_t size; + input->Next(&headerBuffer, &size); + const char* bufferStart = reinterpret_cast(headerBuffer); + const char* bufferEnd = bufferStart + size; + + std::shared_ptr transport = + std::make_shared( + input, bufferStart, bufferEnd); + apache::thrift::protocol::TCompactProtocolT protocol( + transport); + thrift::BloomFilterHeader header; + uint32_t headerSize = header.read(&protocol); + validateBloomFilterHeader(header); + + const int32_t bloomFilterSize = header.numBytes; + if (bloomFilterSize + headerSize <= size) { + // The bloom filter data is entirely contained in the buffer we just read + // => just return it. + BlockSplitBloomFilter bloomFilter(&pool); + bloomFilter.init( + reinterpret_cast(headerBuffer) + headerSize, + bloomFilterSize); + return bloomFilter; + } + // We have read a part of the bloom filter already, copy it to the target + // buffer and read the remaining part from the InputStream. + auto buffer = AlignedBuffer::allocate(bloomFilterSize, &pool); + + const auto bloomFilterSizeInHeaderBuffer = size - headerSize; + if (bloomFilterSizeInHeaderBuffer > 0) { + std::memcpy( + buffer->asMutable(), + reinterpret_cast(headerBuffer) + headerSize, + bloomFilterSizeInHeaderBuffer); + } + const auto requiredReadSize = bloomFilterSize - bloomFilterSizeInHeaderBuffer; + + input->readFully( + buffer->asMutable() + bloomFilterSizeInHeaderBuffer, + requiredReadSize); + VELOX_CHECK_EQ( + buffer->size(), + bloomFilterSize, + "Bloom Filter read failed: not enough data, read size: {}, actual size: {}", + buffer->size(), + bloomFilterSize); + BlockSplitBloomFilter bloomFilter(&pool); + bloomFilter.init( + reinterpret_cast(buffer->as()), bloomFilterSize); + return bloomFilter; +} + +void BlockSplitBloomFilter::writeTo( + velox::dwio::common::AppendOnlyBufferedStream* sink) const { + VELOX_CHECK(sink != nullptr); + + thrift::BloomFilterHeader header; + if (algorithm_ != BloomFilter::Algorithm::BLOCK) { + VELOX_FAIL("BloomFilter does not support Algorithm other than BLOCK"); + } + header.algorithm.__set_BLOCK(thrift::SplitBlockAlgorithm()); + if (hashStrategy_ != HashStrategy::XXHASH) { + VELOX_FAIL("BloomFilter does not support Hash other than XXHASH"); + } + header.hash.__set_XXHASH(thrift::XxHash()); + if (compressionStrategy_ != CompressionStrategy::UNCOMPRESSED) { + VELOX_FAIL( + "BloomFilter does not support Compression other than UNCOMPRESSED"); + } + header.compression.__set_UNCOMPRESSED(thrift::Uncompressed()); + header.__set_numBytes(numBytes_); + + std::shared_ptr memBuffer = + std::make_shared(); + apache::thrift::protocol::TCompactProtocolFactoryT< + apache::thrift::transport::TMemoryBuffer> + factory; + std::shared_ptr protocol = + factory.getProtocol(memBuffer); + try { + memBuffer->resetBuffer(); + header.write(protocol.get()); + } catch (std::exception& e) { + std::stringstream ss; + ss << "Couldn't serialize thrift: " << e.what() << "\n"; + VELOX_FAIL(ss.str()); + } + uint8_t* outBuffer; + uint32_t outLength; + memBuffer->getBuffer(&outBuffer, &outLength); + // write header + sink->write(reinterpret_cast(outBuffer), outLength); + // write bitset + sink->write(data_->as(), numBytes_); +} + +bool BlockSplitBloomFilter::findHash(uint64_t hash) const { + const uint32_t bucketIndex = static_cast( + ((hash >> 32) * (numBytes_ / kBytesPerFilterBlock)) >> 32); + const uint32_t key = static_cast(hash); + const uint32_t* bitset32 = + reinterpret_cast(data_->as()); + + for (int i = 0; i < kBitsSetPerBlock; ++i) { + // Calculate mask for key in the given bitset. + const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27); + if (0 == (bitset32[kBitsSetPerBlock * bucketIndex + i] & mask)) { + return false; + } + } + return true; +} + +void BlockSplitBloomFilter::insertHashImpl(uint64_t hash) { + const uint32_t bucketIndex = static_cast( + ((hash >> 32) * (numBytes_ / kBytesPerFilterBlock)) >> 32); + const uint32_t key = static_cast(hash); + uint32_t* bitset32 = reinterpret_cast(data_->asMutable()); + + for (int i = 0; i < kBitsSetPerBlock; i++) { + // Calculate mask for key in the given bitset. + const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27); + bitset32[bucketIndex * kBitsSetPerBlock + i] |= mask; + } +} + +void BlockSplitBloomFilter::insertHash(uint64_t hash) { + insertHashImpl(hash); +} + +void BlockSplitBloomFilter::insertHashes( + const uint64_t* hashes, + int numValues) { + for (int i = 0; i < numValues; ++i) { + insertHashImpl(hashes[i]); + } +} + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/common/BloomFilter.h b/velox/dwio/parquet/common/BloomFilter.h new file mode 100644 index 000000000000..c2a5e97aec9f --- /dev/null +++ b/velox/dwio/parquet/common/BloomFilter.h @@ -0,0 +1,360 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#pragma once + +#include "velox/common/base/BitUtil.h" +#include "velox/dwio/common/BufferedInput.h" +#include "velox/dwio/common/OutputStream.h" +#include "velox/dwio/parquet/common/Hasher.h" +#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" + +#include +#include +#include + +namespace facebook::velox::parquet { + +// A Bloom filter is a compact structure to indicate whether an item is not in a +// set or probably in a set. The Bloom filter usually consists of a bit set that +// represents a set of elements, a hash strategy and a Bloom filter algorithm. +class BloomFilter { + public: + // Maximum Bloom filter size, it sets to HDFS default block size 128MB + // This value will be reconsidered when implementing Bloom filter producer. + static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024; + + /// Determine whether an element exist in set or not. + /// + /// @param hash the element to contain. + /// @return false if value is definitely not in set, and true means PROBABLY + /// in set. + virtual bool findHash(uint64_t hash) const = 0; + + /// Insert element to set represented by Bloom filter bitset. + /// @param hash the hash of value to insert into Bloom filter. + virtual void insertHash(uint64_t hash) = 0; + + /// Insert elements to set represented by Bloom filter bitset. + /// @param hashes the hash values to insert into Bloom filter. + /// @param num_values the number of hash values to insert. + virtual void insertHashes(const uint64_t* hashes, int numValues) = 0; + + /// Write this Bloom filter to an output stream. A Bloom filter structure + /// should include bitset length, hash strategy, algorithm, and bitset. + /// + /// @param sink the output stream to write + virtual void writeTo( + velox::dwio::common::AppendOnlyBufferedStream* sink) const = 0; + + /// Get the number of bytes of bitset + virtual uint32_t getBitsetSize() const = 0; + + /// Compute hash for 32 bits value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t hash(int32_t value) const = 0; + + /// Compute hash for 64 bits value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t hash(int64_t value) const = 0; + + /// Compute hash for float value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t hash(float value) const = 0; + + /// Compute hash for double value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t hash(double value) const = 0; + + /// Compute hash for bytearray by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t hash(const ByteArray* value) const = 0; + + /// Batch compute hashes for 32 bits values by using its plain encoding + /// result. + /// + /// @param values values a pointer to the values to hash. + /// @param num_values the number of values to hash. + /// @param hashes a pointer to the output hash values, its length should be + /// equal to num_values. + virtual void hashes(const int32_t* values, int numValues, uint64_t* hashes) + const = 0; + + /// Batch compute hashes for 64 bits values by using its plain encoding + /// result. + /// + /// @param values values a pointer to the values to hash. + /// @param num_values the number of values to hash. + /// @param hashes a pointer to the output hash values, its length should be + /// equal to num_values. + virtual void hashes(const int64_t* values, int numValues, uint64_t* hashes) + const = 0; + + /// Batch compute hashes for float values by using its plain encoding result. + /// + /// @param values values a pointer to the values to hash. + /// @param num_values the number of values to hash. + /// @param hashes a pointer to the output hash values, its length should be + /// equal to num_values. + virtual void hashes(const float* values, int numValues, uint64_t* hashes) + const = 0; + + /// Batch compute hashes for double values by using its plain encoding result. + /// + /// @param values values a pointer to the values to hash. + /// @param num_values the number of values to hash. + /// @param hashes a pointer to the output hash values, its length should be + /// equal to num_values. + virtual void hashes(const double* values, int numValues, uint64_t* hashes) + const = 0; + + /// Batch compute hashes for bytearray values by using its plain encoding + /// result. + /// + /// @param values values a pointer to the values to hash. + /// @param num_values the number of values to hash. + /// @param hashes a pointer to the output hash values, its length should be + /// equal to num_values. + virtual void hashes(const ByteArray* values, int numValues, uint64_t* hashes) + const = 0; + + virtual ~BloomFilter() = default; + + protected: + // Hash strategy available for Bloom filter. + enum class HashStrategy : uint32_t { XXHASH = 0 }; + + // Bloom filter algorithm. + enum class Algorithm : uint32_t { BLOCK = 0 }; + + enum class CompressionStrategy : uint32_t { UNCOMPRESSED = 0 }; +}; + +/// The BlockSplitBloomFilter is implemented using block-based Bloom filters +/// from Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The +/// basic idea is to hash the item to a tiny Bloom filter which size fit a +/// single cache line or smaller. +/// +/// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom +/// filter is 32 bytes to take advantage of 32-byte SIMD instructions. +class BlockSplitBloomFilter : public BloomFilter { + public: + /// The constructor of BlockSplitBloomFilter. It uses XXH64 as hash function. + /// + /// @param pool memory pool to use. + explicit BlockSplitBloomFilter(memory::MemoryPool* pool); + + /// Initialize the BlockSplitBloomFilter. The range of num_bytes should be + /// within [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes], it will be + /// rounded up/down to lower/upper bound if num_bytes is out of range and also + /// will be rounded up to a power of 2. + /// + /// @param num_bytes The number of bytes to store Bloom filter bitset. + void init(uint32_t numBytes); + + /// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying + /// bitset because the given bitset may not satisfy the 32-byte alignment + /// requirement which may lead to segfault when performing SIMD instructions. + /// It is the caller's responsibility to free the bitset passed in. This is + /// used when reconstructing a Bloom filter from a parquet file. + /// + /// @param bitset The given bitset to initialize the Bloom filter. + /// @param num_bytes The number of bytes of given bitset. + void init(const uint8_t* bitset, uint32_t numBytes); + + /// Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter. + static constexpr uint32_t kMinimumBloomFilterBytes = 32; + + /// Calculate optimal size according to the number of distinct values and + /// false positive probability. + /// + /// @param ndv The number of distinct values. + /// @param fpp The false positive probability. + /// @return it always return a value between kMinimumBloomFilterBytes and + /// kMaximumBloomFilterBytes, and the return value is always a power of 2 + static uint32_t optimalNumOfBytes(uint32_t ndv, double fpp) { + uint32_t optimalNumBits = optimalNumOfBits(ndv, fpp); + VELOX_CHECK(optimalNumBits % 8 == 0); + return optimalNumBits >> 3; + } + + /// Calculate optimal size according to the number of distinct values and + /// false positive probability. + /// + /// @param ndv The number of distinct values. + /// @param fpp The false positive probability. + /// @return it always return a value between kMinimumBloomFilterBytes * 8 and + /// kMaximumBloomFilterBytes * 8, and the return value is always a power of 16 + static uint32_t optimalNumOfBits(uint32_t ndv, double fpp) { + VELOX_CHECK(fpp > 0.0 && fpp < 1.0); + const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8)); + uint32_t numBits; + + // Handle overflow. + if (m < 0 || m > kMaximumBloomFilterBytes << 3) { + numBits = static_cast(kMaximumBloomFilterBytes << 3); + } else { + numBits = static_cast(m); + } + + // Round up to lower bound + if (numBits < kMinimumBloomFilterBytes << 3) { + numBits = kMinimumBloomFilterBytes << 3; + } + + // Get next power of 2 if bits is not power of 2. + if ((numBits & (numBits - 1)) != 0) { + numBits = static_cast(bits::nextPowerOfTwo(numBits)); + } + + // Round down to upper bound + if (numBits > kMaximumBloomFilterBytes << 3) { + numBits = kMaximumBloomFilterBytes << 3; + } + + return numBits; + } + + bool findHash(uint64_t hash) const override; + void insertHash(uint64_t hash) override; + void insertHashes(const uint64_t* hashes, int numValues) override; + void writeTo( + velox::dwio::common::AppendOnlyBufferedStream* sink) const override; + uint32_t getBitsetSize() const override { + return numBytes_; + } + + uint64_t hash(int32_t value) const override { + return hasher_->hash(value); + } + uint64_t hash(int64_t value) const override { + return hasher_->hash(value); + } + uint64_t hash(float value) const override { + return hasher_->hash(value); + } + uint64_t hash(double value) const override { + return hasher_->hash(value); + } + uint64_t hash(const ByteArray* value) const override { + return hasher_->hash(value); + } + + void hashes(const int32_t* values, int numValues, uint64_t* hashes) + const override { + hasher_->hashes(values, numValues, hashes); + } + void hashes(const int64_t* values, int numValues, uint64_t* hashes) + const override { + hasher_->hashes(values, numValues, hashes); + } + void hashes(const float* values, int numValues, uint64_t* hashes) + const override { + hasher_->hashes(values, numValues, hashes); + } + void hashes(const double* values, int numValues, uint64_t* hashes) + const override { + hasher_->hashes(values, numValues, hashes); + } + void hashes(const ByteArray* values, int numValues, uint64_t* hashes) + const override { + hasher_->hashes(values, numValues, hashes); + } + + uint64_t hash(const int32_t* value) const { + return hasher_->hash(*value); + } + uint64_t hash(const int64_t* value) const { + return hasher_->hash(*value); + } + uint64_t hash(const float* value) const { + return hasher_->hash(*value); + } + uint64_t hash(const double* value) const { + return hasher_->hash(*value); + } + + /// Deserialize the Bloom filter from an input stream. It is used when + /// reconstructing a Bloom filter from a parquet filter. + /// + /// @param input_stream The input stream from which to construct the Bloom + /// filter. + /// @return The BlockSplitBloomFilter. + static BlockSplitBloomFilter deserialize( + dwio::common::SeekableInputStream* input_stream, + memory::MemoryPool& pool); + + private: + inline void insertHashImpl(uint64_t hash); + + // Bytes in a tiny Bloom filter block. + static constexpr int kBytesPerFilterBlock = 32; + + // The number of bits to be set in each tiny Bloom filter + static constexpr int kBitsSetPerBlock = 8; + + // A mask structure used to set bits in each tiny Bloom filter. + struct BlockMask { + uint32_t item[kBitsSetPerBlock]; + }; + + // The block-based algorithm needs eight odd SALT values to calculate eight + // indexes of bit to set, one bit in each 32-bit word. + static constexpr uint32_t SALT[kBitsSetPerBlock] = { + 0x47b6137bU, + 0x44974d91U, + 0x8824ad5bU, + 0xa2b7289dU, + 0x705495c7U, + 0x2df1424bU, + 0x9efc4947U, + 0x5c6bfb31U}; + + // Memory pool to allocate aligned buffer for bitset + memory::MemoryPool* pool_; + + // The underlying buffer of bitset. + BufferPtr data_; + + // The number of bytes of Bloom filter bitset. + uint32_t numBytes_; + + // Hash strategy used in this Bloom filter. + HashStrategy hashStrategy_; + + // Algorithm used in this Bloom filter. + Algorithm algorithm_; + + // Compression used in this Bloom filter. + CompressionStrategy compressionStrategy_; + + // The hash pointer points to actual hash class used. + std::unique_ptr hasher_; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/common/CMakeLists.txt b/velox/dwio/parquet/common/CMakeLists.txt new file mode 100644 index 000000000000..cbffcbd7eee6 --- /dev/null +++ b/velox/dwio/parquet/common/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library(velox_dwio_native_parquet_common BloomFilter.cpp XxHasher.cpp) + +target_link_libraries( + velox_dwio_native_parquet_common + velox_dwio_parquet_thrift + velox_type + velox_dwio_common + velox_dwio_common_compression + fmt::fmt + arrow + Snappy::snappy + thrift + zstd::zstd) diff --git a/velox/dwio/parquet/common/Hasher.h b/velox/dwio/parquet/common/Hasher.h new file mode 100644 index 000000000000..3f3a907d06b4 --- /dev/null +++ b/velox/dwio/parquet/common/Hasher.h @@ -0,0 +1,127 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#pragma once + +#include +#include + +namespace facebook::velox::parquet { + +struct ByteArray { + ByteArray() : len(0), ptr(nullptr) {} + ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {} + + ByteArray(::std::string_view view) // NOLINT implicit conversion + : ByteArray( + static_cast(view.size()), + reinterpret_cast(view.data())) {} + + explicit operator std::string_view() const { + return std::string_view{reinterpret_cast(ptr), len}; + } + + uint32_t len; + const uint8_t* ptr; +}; + +// Abstract class for hash +class Hasher { + public: + /// Compute hash for 32 bits value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t hash(int32_t value) const = 0; + + /// Compute hash for 64 bits value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t hash(int64_t value) const = 0; + + /// Compute hash for float value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t hash(float value) const = 0; + + /// Compute hash for double value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t hash(double value) const = 0; + + /// Compute hash for ByteArray value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t hash(const ByteArray* value) const = 0; + + /// Batch compute hashes for 32 bits values by using its plain encoding + /// result. + /// + /// @param values a pointer to the values to hash. + /// @param num_values the number of values to hash. + /// @param hashes a pointer to the output hash values, its length should be + /// equal to num_values. + virtual void hashes(const int32_t* values, int num_values, uint64_t* hashes) + const = 0; + + /// Batch compute hashes for 64 bits values by using its plain encoding + /// result. + /// + /// @param values a pointer to the values to hash. + /// @param num_values the number of values to hash. + /// @param hashes a pointer to the output hash values, its length should be + /// equal to num_values. + virtual void hashes(const int64_t* values, int num_values, uint64_t* hashes) + const = 0; + + /// Batch compute hashes for float values by using its plain encoding result. + /// + /// @param values a pointer to the values to hash. + /// @param num_values the number of values to hash. + /// @param hashes a pointer to the output hash values, its length should be + /// equal to num_values. + virtual void hashes(const float* values, int num_values, uint64_t* hashes) + const = 0; + + /// Batch compute hashes for double values by using its plain encoding result. + /// + /// @param values a pointer to the values to hash. + /// @param num_values the number of values to hash. + /// @param hashes a pointer to the output hash values, its length should be + /// equal to num_values. + virtual void hashes(const double* values, int num_values, uint64_t* hashes) + const = 0; + + /// Batch compute hashes for ByteArray values by using its plain encoding + /// result. + /// + /// @param values a pointer to the values to hash. + /// @param num_values the number of values to hash. + /// @param hashes a pointer to the output hash values, its length should be + /// equal to num_values. + virtual void hashes(const ByteArray* values, int num_values, uint64_t* hashes) + const = 0; + + virtual ~Hasher() = default; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/common/XxHasher.cpp b/velox/dwio/parquet/common/XxHasher.cpp new file mode 100644 index 000000000000..255f62c6253c --- /dev/null +++ b/velox/dwio/parquet/common/XxHasher.cpp @@ -0,0 +1,98 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#include "XxHasher.h" + +#define XXH_INLINE_ALL +#include + +namespace facebook::velox::parquet { + +namespace { +template +uint64_t XxHashHelper(T value, uint32_t seed) { + return XXH64(reinterpret_cast(&value), sizeof(T), seed); +} + +template +void XxHashesHelper( + const T* values, + uint32_t seed, + int numValues, + uint64_t* results) { + for (int i = 0; i < numValues; ++i) { + results[i] = XxHashHelper(values[i], seed); + } +} + +} // namespace + +uint64_t XxHasher::hash(int32_t value) const { + return XxHashHelper(value, kParquetBloomXxHashSeed); +} + +uint64_t XxHasher::hash(int64_t value) const { + return XxHashHelper(value, kParquetBloomXxHashSeed); +} + +uint64_t XxHasher::hash(float value) const { + return XxHashHelper(value, kParquetBloomXxHashSeed); +} + +uint64_t XxHasher::hash(double value) const { + return XxHashHelper(value, kParquetBloomXxHashSeed); +} + +uint64_t XxHasher::hash(const ByteArray* value) const { + return XXH64( + reinterpret_cast(value->ptr), + value->len, + kParquetBloomXxHashSeed); +} + +void XxHasher::hashes(const int32_t* values, int numValues, uint64_t* hashes) + const { + XxHashesHelper(values, kParquetBloomXxHashSeed, numValues, hashes); +} + +void XxHasher::hashes(const int64_t* values, int numValues, uint64_t* hashes) + const { + XxHashesHelper(values, kParquetBloomXxHashSeed, numValues, hashes); +} + +void XxHasher::hashes(const float* values, int numValues, uint64_t* hashes) + const { + XxHashesHelper(values, kParquetBloomXxHashSeed, numValues, hashes); +} + +void XxHasher::hashes(const double* values, int numValues, uint64_t* hashes) + const { + XxHashesHelper(values, kParquetBloomXxHashSeed, numValues, hashes); +} + +void XxHasher::hashes(const ByteArray* values, int numValues, uint64_t* hashes) + const { + for (int i = 0; i < numValues; ++i) { + hashes[i] = XXH64( + reinterpret_cast(values[i].ptr), + values[i].len, + kParquetBloomXxHashSeed); + } +} + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/common/XxHasher.h b/velox/dwio/parquet/common/XxHasher.h new file mode 100644 index 000000000000..07c37f762f36 --- /dev/null +++ b/velox/dwio/parquet/common/XxHasher.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#pragma once + +#include + +#include "Hasher.h" + +namespace facebook::velox::parquet { + +class XxHasher : public Hasher { + public: + uint64_t hash(int32_t value) const override; + uint64_t hash(int64_t value) const override; + uint64_t hash(float value) const override; + uint64_t hash(double value) const override; + uint64_t hash(const ByteArray* value) const override; + + void hashes(const int32_t* values, int numValues, uint64_t* hashes) + const override; + void hashes(const int64_t* values, int numValues, uint64_t* hashes) + const override; + void hashes(const float* values, int numValues, uint64_t* hashes) + const override; + void hashes(const double* values, int numValues, uint64_t* hashes) + const override; + virtual void hashes(const ByteArray* values, int numValues, uint64_t* hashes) + const override; + + static constexpr int kParquetBloomXxHashSeed = 0; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/reader/BloomFilterTest.cpp b/velox/dwio/parquet/tests/reader/BloomFilterTest.cpp new file mode 100644 index 000000000000..91ba224d0c81 --- /dev/null +++ b/velox/dwio/parquet/tests/reader/BloomFilterTest.cpp @@ -0,0 +1,397 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#include +#include +#include +#include +#include +#include + +#include + +#include "velox/dwio/common/OutputStream.h" +#include "velox/dwio/parquet/common/BloomFilter.h" +#include "velox/dwio/parquet/common/XxHasher.h" +#include "velox/dwio/parquet/reader/ParquetData.h" +#include "velox/dwio/parquet/reader/ParquetReader.h" +#include "velox/dwio/parquet/tests/ParquetTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::parquet; + +class BloomFilterTest : public ParquetTestBase {}; + +TEST_F(BloomFilterTest, ConstructorTest) { + BlockSplitBloomFilter bloomFilter(leafPool_.get()); + EXPECT_NO_THROW(bloomFilter.init(1000)); + + // It throws because the length cannot be zero + std::unique_ptr bitset1(new uint8_t[1024]()); + EXPECT_THROW(bloomFilter.init(bitset1.get(), 0), VeloxRuntimeError); + + // It throws because the number of bytes of Bloom filter bitset must be a + // power of 2. + std::unique_ptr bitset2(new uint8_t[1024]()); + EXPECT_THROW(bloomFilter.init(bitset2.get(), 1023), VeloxRuntimeError); +} + +// The BasicTest is used to test basic operations including InsertHash, FindHash +// and serializing and de-serializing. +TEST_F(BloomFilterTest, BasicTest) { + const std::vector kBloomFilterSizes = { + 32, + 64, + 128, + 256, + 512, + 1024, + 2048, + }; + const std::vector kIntInserts = { + 1, 2, 3, 5, 6, 7, 8, 9, 10, 42, -1, 1 << 29, 1 << 30}; + const std::vector kFloatInserts = { + 1.5, -1.5, 3.0, 6.0, 0.0, 123.456, 1e6, 1e7, 1e8}; + const std::vector kNegativeIntLookups = { + 0, 11, 12, 13, -2, -3, 43, 1 << 27, 1 << 28}; + + for (const auto bloomFilterBytes : kBloomFilterSizes) { + BlockSplitBloomFilter bloomFilter(leafPool_.get()); + bloomFilter.init(bloomFilterBytes); + + // Empty bloom filter deterministically returns false + for (const auto v : kIntInserts) { + EXPECT_FALSE(bloomFilter.findHash(bloomFilter.hash(v))); + } + for (const auto v : kFloatInserts) { + EXPECT_FALSE(bloomFilter.findHash(bloomFilter.hash(v))); + } + + // Insert all values + for (const auto v : kIntInserts) { + bloomFilter.insertHash(bloomFilter.hash(v)); + } + for (const auto v : kFloatInserts) { + bloomFilter.insertHash(bloomFilter.hash(v)); + } + + // They should always lookup successfully + for (const auto v : kIntInserts) { + EXPECT_TRUE(bloomFilter.findHash(bloomFilter.hash(v))); + } + for (const auto v : kFloatInserts) { + EXPECT_TRUE(bloomFilter.findHash(bloomFilter.hash(v))); + } + + // Values not inserted in the filter should only rarely lookup successfully + int falsePositives = 0; + for (const auto v : kNegativeIntLookups) { + falsePositives += bloomFilter.findHash(bloomFilter.hash(v)); + } + // (this is a crude check, see FPPTest below for a more rigorous formula) + EXPECT_LE(falsePositives, 2); + + // Serialize Bloom filter to memory output stream + uint64_t blockSize = 1024; + dwio::common::DataBufferHolder bufferHolder{*leafPool_.get(), blockSize}; + // dwio::common::BufferedOutputStream bufferedOutputStream(bufferHolder); + dwio::common::AppendOnlyBufferedStream sink( + std::make_unique(bufferHolder)); + bloomFilter.writeTo(&sink); + sink.flush(); + std::string buffer; + + for (auto& tmpBuffer : bufferHolder.getBuffers()) { + buffer.append(tmpBuffer.data(), tmpBuffer.size()); + } + + // Deserialize Bloom filter from memory + dwio::common::SeekableArrayInputStream* source = + new dwio::common::SeekableArrayInputStream( + buffer.c_str(), buffer.size(), blockSize); + + BlockSplitBloomFilter deBloom = + BlockSplitBloomFilter::deserialize(source, *leafPool_.get()); + + // Lookup previously inserted values + for (const auto v : kIntInserts) { + EXPECT_TRUE(deBloom.findHash(deBloom.hash(v))); + } + for (const auto v : kFloatInserts) { + EXPECT_TRUE(deBloom.findHash(deBloom.hash(v))); + } + falsePositives = 0; + for (const auto v : kNegativeIntLookups) { + falsePositives += deBloom.findHash(deBloom.hash(v)); + } + EXPECT_LE(falsePositives, 2); + } +} + +// Helper function to generate random string. +std::string GetRandomString(uint32_t length) { + // Character set used to generate random string + const std::string charset = + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + + std::default_random_engine gen(42); + std::uniform_int_distribution dist( + 0, static_cast(charset.size() - 1)); + std::string ret(length, 'x'); + + for (uint32_t i = 0; i < length; i++) { + ret[i] = charset[dist(gen)]; + } + return ret; +} + +TEST_F(BloomFilterTest, FPPTest) { + // It counts the number of times FindHash returns true. + int exist = 0; + + // Total count of elements that will be used +#ifdef PARQUET_VALGRIND + const int totalCount = 5000; +#else + const int totalCount = 100000; +#endif + + // Bloom filter fpp parameter + const double fpp = 0.01; + + std::vector members; + BlockSplitBloomFilter bloomFilter(leafPool_.get()); + bloomFilter.init(BlockSplitBloomFilter::optimalNumOfBytes(totalCount, fpp)); + + // Insert elements into the Bloom filter + for (int i = 0; i < totalCount; i++) { + // Insert random string which length is 8 + std::string tmp = GetRandomString(8); + const ByteArray byte_array( + 8, reinterpret_cast(tmp.c_str())); + members.push_back(tmp); + bloomFilter.insertHash(bloomFilter.hash(&byte_array)); + } + + for (int i = 0; i < totalCount; i++) { + const ByteArray byte_array1( + 8, reinterpret_cast(members[i].c_str())); + ASSERT_TRUE(bloomFilter.findHash(bloomFilter.hash(&byte_array1))); + std::string tmp = GetRandomString(7); + const ByteArray byte_array2( + 7, reinterpret_cast(tmp.c_str())); + + if (bloomFilter.findHash(bloomFilter.hash(&byte_array2))) { + exist++; + } + } + + // The exist should be probably less than 1000 according default FPP 0.01. + EXPECT_LT(exist, totalCount * fpp); +} + +// The CompatibilityTest is used to test cross compatibility with parquet-mr, it +// reads the Bloom filter binary generated by the Bloom filter class in the +// parquet-mr project and tests whether the values inserted before could be +// filtered or not. + +// TODO: disabled as it requires Arrow parquet data dir. +// The Bloom filter binary is generated by three steps in from Parquet-mr. +// Step 1: Construct a Bloom filter with 1024 bytes bitset. +// Step 2: Insert "hello", "parquet", "bloom", "filter" to Bloom filter. +// Step 3: Call writeTo API to write to File. +/* +TEST(CompatibilityTest, TestBloomFilter) { + const std::string test_string[4] = {"hello", "parquet", "bloom", "filter"}; + const std::string bloom_filter_test_binary = + std::string(test::get_data_dir()) + "/bloom_filter.xxhash.bin"; + + PARQUET_ASSIGN_OR_THROW(auto handle, + ::arrow::io::ReadableFile::Open(bloom_filter_test_binary)); + PARQUET_ASSIGN_OR_THROW(int64_t size, handle->GetSize()); + + // 16 bytes (thrift header) + 1024 bytes (bitset) + EXPECT_EQ(size, 1040); + + std::unique_ptr bitset(new uint8_t[size]()); + PARQUET_ASSIGN_OR_THROW(auto buffer, handle->Read(size)); + + ::arrow::io::BufferReader source(buffer); + ReaderProperties reader_properties; + BlockSplitBloomFilter bloom_filter1 = + BlockSplitBloomFilter::Deserialize(reader_properties, &source); + + for (int i = 0; i < 4; i++) { + const ByteArray tmp(static_cast(test_string[i].length()), + reinterpret_cast(test_string[i].c_str())); + EXPECT_TRUE(bloom_filter1.findHash(bloom_filter1.hash(&tmp))); + } + + // The following is used to check whether the new created Bloom filter in +parquet-cpp is + // byte-for-byte identical to file at bloom_data_path which is created from +parquet-mr + // with same inserted hashes. + BlockSplitBloomFilter bloom_filter2; + bloom_filter2.Init(bloom_filter1.GetBitsetSize()); + for (int i = 0; i < 4; i++) { + const ByteArray byte_array(static_cast(test_string[i].length()), + reinterpret_cast(test_string[i].c_str())); + bloom_filter2.InsertHash(bloom_filter2.hash(&byte_array)); + } + + // Serialize Bloom filter to memory output stream + auto sink = CreateOutputStream(); + bloom_filter2.WriteTo(sink.get()); + PARQUET_ASSIGN_OR_THROW(auto buffer1, sink->Finish()); + + PARQUET_THROW_NOT_OK(handle->Seek(0)); + PARQUET_ASSIGN_OR_THROW(size, handle->GetSize()); + PARQUET_ASSIGN_OR_THROW(auto buffer2, handle->Read(size)); + + EXPECT_TRUE((*buffer1).Equals(*buffer2)); +} +*/ + +// OptimalValueTest is used to test whether OptimalNumOfBits returns expected +// numbers according to formula: +// num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0)) +// where ndv is the number of distinct values and fpp is the false positive +// probability. Also it is used to test whether OptimalNumOfBits returns value +// between [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE]. +TEST_F(BloomFilterTest, OptimalValueTest) { + auto testOptimalNumEstimation = [](uint32_t ndv, + double fpp, + uint32_t num_bits) { + EXPECT_EQ(BlockSplitBloomFilter::optimalNumOfBits(ndv, fpp), num_bits); + EXPECT_EQ(BlockSplitBloomFilter::optimalNumOfBytes(ndv, fpp), num_bits / 8); + }; + + testOptimalNumEstimation(256, 0.01, UINT32_C(4096)); + testOptimalNumEstimation(512, 0.01, UINT32_C(8192)); + testOptimalNumEstimation(1024, 0.01, UINT32_C(16384)); + testOptimalNumEstimation(2048, 0.01, UINT32_C(32768)); + + testOptimalNumEstimation(200, 0.01, UINT32_C(2048)); + testOptimalNumEstimation(300, 0.01, UINT32_C(4096)); + testOptimalNumEstimation(700, 0.01, UINT32_C(8192)); + testOptimalNumEstimation(1500, 0.01, UINT32_C(16384)); + + testOptimalNumEstimation(200, 0.025, UINT32_C(2048)); + testOptimalNumEstimation(300, 0.025, UINT32_C(4096)); + testOptimalNumEstimation(700, 0.025, UINT32_C(8192)); + testOptimalNumEstimation(1500, 0.025, UINT32_C(16384)); + + testOptimalNumEstimation(200, 0.05, UINT32_C(2048)); + testOptimalNumEstimation(300, 0.05, UINT32_C(4096)); + testOptimalNumEstimation(700, 0.05, UINT32_C(8192)); + testOptimalNumEstimation(1500, 0.05, UINT32_C(16384)); + + // Boundary check + testOptimalNumEstimation( + 4, 0.01, BlockSplitBloomFilter::kMinimumBloomFilterBytes * 8); + testOptimalNumEstimation( + 4, 0.25, BlockSplitBloomFilter::kMinimumBloomFilterBytes * 8); + + testOptimalNumEstimation( + std::numeric_limits::max(), + 0.01, + BlockSplitBloomFilter::kMaximumBloomFilterBytes * 8); + testOptimalNumEstimation( + std::numeric_limits::max(), + 0.25, + BlockSplitBloomFilter::kMaximumBloomFilterBytes * 8); +} + +// The test below is plainly copied from parquet-mr and serves as a basic sanity +// check of our XXH64 wrapper. +const int64_t HASHES_OF_LOOPING_BYTES_WITH_SEED_0[32] = { + -1205034819632174695L, -1642502924627794072L, 5216751715308240086L, + -1889335612763511331L, -13835840860730338L, -2521325055659080948L, + 4867868962443297827L, 1498682999415010002L, -8626056615231480947L, + 7482827008138251355L, -617731006306969209L, 7289733825183505098L, + 4776896707697368229L, 1428059224718910376L, 6690813482653982021L, + -6248474067697161171L, 4951407828574235127L, 6198050452789369270L, + 5776283192552877204L, -626480755095427154L, -6637184445929957204L, + 8370873622748562952L, -1705978583731280501L, -7898818752540221055L, + -2516210193198301541L, 8356900479849653862L, -4413748141896466000L, + -6040072975510680789L, 1451490609699316991L, -7948005844616396060L, + 8567048088357095527L, -4375578310507393311L}; + +/** + * Test data is output of the following program with xxHash implementation + * from https://github.com/Cyan4973/xxHash with commit + * c8c4cc0f812719ce1f5b2c291159658980e7c255 + * + * #define XXH_INLINE_ALL + * #include "xxhash.h" + * #include + * #include + * int main() + * { + * char* src = (char*) malloc(32); + * const int N = 32; + * for (int i = 0; i < N; i++) { + * src[i] = (char) i; + * } + * + * printf("without seed\n"); + * for (int i = 0; i <= N; i++) { + * printf("%lldL,\n", (long long) XXH64(src, i, 0)); + * } + * } + */ +TEST_F(BloomFilterTest, XxHashTest) { + constexpr int kNumValues = 32; + uint8_t bytes[kNumValues] = {}; + + for (int i = 0; i < kNumValues; i++) { + ByteArray byteArray(i, bytes); + bytes[i] = i; + + auto hasherSeed0 = std::make_unique(); + EXPECT_EQ( + HASHES_OF_LOOPING_BYTES_WITH_SEED_0[i], hasherSeed0->hash(&byteArray)) + << "Hash with seed 0 Error: " << i; + } +} + +// Same as TestBloomFilter but using Batch interface +TEST_F(BloomFilterTest, TestBloomFilterHashes) { + constexpr int kNumValues = 32; + uint8_t bytes[kNumValues] = {}; + + std::vector byteArrayVector; + for (int i = 0; i < kNumValues; i++) { + bytes[i] = i; + byteArrayVector.emplace_back(i, bytes); + } + auto hasherSeed0 = std::make_unique(); + std::vector hashes; + hashes.resize(kNumValues); + hasherSeed0->hashes( + byteArrayVector.data(), + static_cast(byteArrayVector.size()), + hashes.data()); + for (int i = 0; i < kNumValues; i++) { + EXPECT_EQ(HASHES_OF_LOOPING_BYTES_WITH_SEED_0[i], hashes[i]) + << "Hash with seed 0 Error: " << i; + } +} diff --git a/velox/dwio/parquet/tests/reader/CMakeLists.txt b/velox/dwio/parquet/tests/reader/CMakeLists.txt index e01126e3115f..b58429d73e93 100644 --- a/velox/dwio/parquet/tests/reader/CMakeLists.txt +++ b/velox/dwio/parquet/tests/reader/CMakeLists.txt @@ -51,15 +51,16 @@ add_executable(velox_dwio_parquet_reader_benchmark target_link_libraries( velox_dwio_parquet_reader_benchmark velox_dwio_parquet_reader_benchmark_lib) -add_executable(velox_dwio_parquet_reader_test ParquetReaderTest.cpp - ParquetReaderBenchmarkTest.cpp) +add_executable( + velox_dwio_parquet_reader_test + ParquetReaderTest.cpp ParquetReaderBenchmarkTest.cpp BloomFilterTest.cpp) add_test( NAME velox_dwio_parquet_reader_test COMMAND velox_dwio_parquet_reader_test WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries( - velox_dwio_parquet_reader_test velox_dwio_parquet_reader_benchmark_lib - velox_link_libs) + velox_dwio_parquet_reader_test velox_dwio_native_parquet_common + velox_dwio_parquet_reader_benchmark_lib velox_link_libs) add_executable(velox_dwio_parquet_structure_decoder_test NestedStructureDecoderTest.cpp)