From 5b84c3fddae050d624e9e6f259bde4efb7e23d92 Mon Sep 17 00:00:00 2001 From: yaqi-zhao Date: Mon, 21 Aug 2023 09:28:50 +0800 Subject: [PATCH] pre-decompress gzip w/IAA --- CMakeLists.txt | 6 + third_party/CMakeLists.txt | 51 ++++ velox/dwio/common/CMakeLists.txt | 9 + velox/dwio/common/QplJobPool.cpp | 117 ++++++++ velox/dwio/common/QplJobPool.h | 73 +++++ velox/dwio/common/compression/Compression.cpp | 138 ++++++++-- velox/dwio/common/compression/Compression.h | 128 ++++++--- velox/dwio/dwrf/common/Compression.h | 1 - velox/dwio/parquet/reader/PageReader.cpp | 259 ++++++++++++++++-- velox/dwio/parquet/reader/PageReader.h | 54 +++- velox/dwio/parquet/reader/ParquetData.cpp | 37 ++- velox/dwio/parquet/reader/ParquetData.h | 6 +- velox/dwio/parquet/reader/ParquetReader.cpp | 8 + .../parquet/reader/RepeatedColumnReader.h | 3 + .../parquet/reader/StructColumnReader.cpp | 19 ++ .../dwio/parquet/reader/StructColumnReader.h | 3 + 16 files changed, 830 insertions(+), 82 deletions(-) create mode 100644 velox/dwio/common/QplJobPool.cpp create mode 100644 velox/dwio/common/QplJobPool.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 31df59045c044..d1843b3425aa8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -89,6 +89,7 @@ option( VELOX_ENABLE_INT64_BUILD_PARTITION_BOUND "make buildPartitionBounds_ a vector int64 instead of int32 to avoid integer overflow when the hashtable has billions of records" OFF) +option(VELOX_ENABLE_QPL "Enable Intel QPL support" OFF) if(${VELOX_BUILD_MINIMAL}) # Enable and disable components for velox base build @@ -235,6 +236,11 @@ if(VELOX_ENABLE_REMOTE_FUNCTIONS) find_package(FBThrift CONFIG REQUIRED) endif() +if(VELOX_ENABLE_QPL) + add_definitions(-DVELOX_ENABLE_QPL) + message(STATUS "add VELOX_ENABLE_QPL") +endif() + # Turn on Codegen only for Clang and non Mac systems. if((NOT DEFINED VELOX_CODEGEN_SUPPORT) AND (CMAKE_CXX_COMPILER_ID MATCHES "Clang") diff --git a/third_party/CMakeLists.txt b/third_party/CMakeLists.txt index 380bef472243d..d24399499bead 100644 --- a/third_party/CMakeLists.txt +++ b/third_party/CMakeLists.txt @@ -87,3 +87,54 @@ if(VELOX_ENABLE_ARROW) ${ARROW_LIBDIR}/libparquet.a) endif() + +if(VELOX_ENABLE_QPL) + message(STATUS "Building QPL from source") + set(QPL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qpl_ep/install") + set(QPL_STATIC_LIB_NAME + ${CMAKE_STATIC_LIBRARY_PREFIX}qpl${CMAKE_STATIC_LIBRARY_SUFFIX}) + set(QPL_STATIC_LIB "${QPL_PREFIX}/lib/${QPL_STATIC_LIB_NAME}") + + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -ldl -laccel-config -L/usr/lib64") + + set(QPL_CMAKE_ARGS + -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + -DCMAKE_INSTALL_LIBDIR=${QPL_PREFIX}/lib + -DCMAKE_INSTALL_PREFIX=${QPL_PREFIX} + -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} + -DEFFICIENT_WAIT=OFF + -DQPL_BUILD_TESTS=OFF + -DCMAKE_C_COMPILER=gcc + -DCMAKE_CXX_COMPILER=g++ + -DQPL_BUILD_EXAMPLES=OFF + -DQPL_LIB=ON) + + # todo: qpl newest version will released at Sep, to be updated +# ExternalProject_Add( +# qpl_ep +# ${EP_LOG_OPTIONS} +# URL https://github.com/intel/qpl/archive/refs/tags/v1.2.0.tar.gz +# BUILD_BYPRODUCTS "${QPL_STATIC_LIB}" +# CMAKE_ARGS ${QPL_CMAKE_ARGS}) + ExternalProject_Add(qpl_ep + ${EP_LOG_OPTIONS} + GIT_REPOSITORY https://github.com/intel-innersource/libraries.performance.accelerators.qpl.qpl-library.git + GIT_TAG origin/develop + GIT_CONFIG https.sslVerify=false + BUILD_BYPRODUCTS "${QPL_STATIC_LIB}" + CMAKE_ARGS ${QPL_CMAKE_ARGS} + ) + + file(MAKE_DIRECTORY "${QPL_PREFIX}/include") + + add_library(qpl::qpl STATIC IMPORTED GLOBAL) + set(QPL_LIBRARIES ${QPL_STATIC_LIB}) + set(QPL_INCLUDE_DIRS "${QPL_PREFIX}/include") + target_link_libraries(qpl::qpl INTERFACE /usr/lib64/libaccel-config.so) + set_target_properties( + qpl::qpl PROPERTIES IMPORTED_LOCATION ${QPL_LIBRARIES} + INTERFACE_INCLUDE_DIRECTORIES ${QPL_INCLUDE_DIRS}) + + add_dependencies(qpl::qpl qpl_ep) + +endif() diff --git a/velox/dwio/common/CMakeLists.txt b/velox/dwio/common/CMakeLists.txt index d2d8fd3389963..c913b2cc275e5 100644 --- a/velox/dwio/common/CMakeLists.txt +++ b/velox/dwio/common/CMakeLists.txt @@ -59,6 +59,8 @@ add_library( target_include_directories(velox_dwio_common PRIVATE ${Protobuf_INCLUDE_DIRS}) +message(STATUS "QPL_STATIC_LINK_LIBS: ${QPL_STATIC_LINK_LIBS}") + target_link_libraries( velox_dwio_common velox_buffer @@ -73,4 +75,11 @@ target_link_libraries( velox_exec Boost::regex Folly::folly + ${QPL_STATIC_LINK_LIBS} glog::glog) + +if(VELOX_ENABLE_QPL) + add_library(velox_dwio_qpl QplJobPool.cpp) + target_link_libraries(velox_dwio_qpl qpl::qpl Folly::folly) + target_link_libraries(velox_dwio_common velox_dwio_qpl) +endif() \ No newline at end of file diff --git a/velox/dwio/common/QplJobPool.cpp b/velox/dwio/common/QplJobPool.cpp new file mode 100644 index 0000000000000..42ff29c2b1be6 --- /dev/null +++ b/velox/dwio/common/QplJobPool.cpp @@ -0,0 +1,117 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/QplJobPool.h" +#include +#include +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::dwio::common { + +std::array + QplJobHWPool::hw_job_ptr_pool; +std::array, QplJobHWPool::MAX_JOB_NUMBER> + QplJobHWPool::hw_job_ptr_locks; +bool QplJobHWPool::iaa_job_ready = false; +std::unique_ptr QplJobHWPool::hw_jobs_buffer; + +QplJobHWPool& QplJobHWPool::GetInstance() { + static QplJobHWPool pool; + return pool; +} + +QplJobHWPool::QplJobHWPool() { + if (!iaa_job_ready) { + (void)AllocateQPLJob(); + } +} + +QplJobHWPool::~QplJobHWPool() { + for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) { + if (hw_job_ptr_pool[i]) { + qpl_fini_job(hw_job_ptr_pool[i]); + hw_job_ptr_pool[i] = nullptr; + } + } + iaa_job_ready = false; +} + +bool QplJobHWPool::AllocateQPLJob() { + uint32_t job_size = 0; + + // Get size required for saving a single qpl job object + qpl_get_job_size(qpl_path, &job_size); + // Allocate entire buffer for storing all job objects + hw_jobs_buffer = std::make_unique(job_size * MAX_JOB_NUMBER); + // Initialize pool for storing all job object pointers + // Allocate buffer by shifting address offset for each job object. + for (uint32_t index = 0; index < MAX_JOB_NUMBER; ++index) { + qpl_job* qpl_job_ptr = + reinterpret_cast(hw_jobs_buffer.get() + index * job_size); + auto status = qpl_init_job(qpl_path, qpl_job_ptr); + if (status != QPL_STS_OK) { + iaa_job_ready = false; + LOG(WARNING) << "Initialization of hardware IAA failed, statsu: " + << status << ". Please check if Intel \ + In-Memory Analytics Accelerator (IAA) is properly set up!"; + return false; + } + this->hw_job_ptr_pool[index] = qpl_job_ptr; + hw_job_ptr_locks[index].store(false); + } + + iaa_job_ready = true; + return true; +} + +qpl_job* QplJobHWPool::AcquireDeflateJob(int& job_id) { + job_id = -1; + if (!job_ready()) { + return nullptr; + } + uint32_t retry = 0; + auto index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1); + while (!tryLockJob(index)) { + index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1); + retry++; + if (retry > MAX_JOB_NUMBER) { + return nullptr; + } + } + job_id = index; + if (index >= MAX_JOB_NUMBER) { + return nullptr; + } + + return hw_job_ptr_pool[index]; +} + +void QplJobHWPool::ReleaseJob(int job_id) { + if (job_id >= MAX_JOB_NUMBER || job_id <= 0) { + return; + } + assert(job_id < MAX_JOB_NUMBER); + hw_job_ptr_locks[job_id].store(false); + return; +} + +bool QplJobHWPool::tryLockJob(uint32_t index) { + bool expected = false; + assert(index < MAX_JOB_NUMBER); + return hw_job_ptr_locks[index].compare_exchange_strong(expected, true); +} + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/QplJobPool.h b/velox/dwio/common/QplJobPool.h new file mode 100644 index 0000000000000..526c598cfbab3 --- /dev/null +++ b/velox/dwio/common/QplJobPool.h @@ -0,0 +1,73 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "qpl/qpl.h" + +namespace facebook::velox::dwio::common { + +// QplJobHWPool is resource pool to provide the job objects, which is +// used for storing context information. +// Memory for QPL job will be allocated when the QPLJobHWPool instance is +// created +class QplJobHWPool { + public: + static QplJobHWPool& GetInstance(); + QplJobHWPool(); + ~QplJobHWPool(); + + // Release QPL job by the job_id. + void ReleaseJob(int job_id); + + // Return if the QPL job is allocated sucessfully. + const bool& job_ready() { + return iaa_job_ready; + } + + qpl_job* AcquireDeflateJob(int& job_id); + qpl_job* GetJobById(int job_id) { + if (job_id >= MAX_JOB_NUMBER || job_id <= 0) { + return nullptr; + } + return hw_job_ptr_pool[job_id]; + } + + static constexpr auto MAX_JOB_NUMBER = 1024; + + private: + bool tryLockJob(uint32_t index); + bool AllocateQPLJob(); + + static constexpr qpl_path_t qpl_path = qpl_path_hardware; + // Max jobs in QPL_JOB_POOL + // Entire buffer for storing all job objects + static std::unique_ptr hw_jobs_buffer; + + // Job pool for storing all job object pointers + static std::array hw_job_ptr_pool; + + // Locks for accessing each job object pointers + static bool iaa_job_ready; + static std::array, MAX_JOB_NUMBER> hw_job_ptr_locks; +}; + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/compression/Compression.cpp b/velox/dwio/common/compression/Compression.cpp index b31de09704e45..67a464c8189c9 100644 --- a/velox/dwio/common/compression/Compression.cpp +++ b/velox/dwio/common/compression/Compression.cpp @@ -113,13 +113,8 @@ class ZlibDecompressor : public Decompressor { public: explicit ZlibDecompressor( uint64_t blockSize, -<<<<<<< HEAD int windowBits, const std::string& streamDebugInfo, -======= - const std::string& streamDebugInfo, - int windowBits, ->>>>>>> 5573ae233 (Refactor parquet PageReader to use PagedInputStream for decompression) bool izGzip = false); ~ZlibDecompressor() override; @@ -144,13 +139,8 @@ class ZlibDecompressor : public Decompressor { ZlibDecompressor::ZlibDecompressor( uint64_t blockSize, -<<<<<<< HEAD int windowBits, const std::string& streamDebugInfo, -======= - const std::string& streamDebugInfo, - int windowBits, ->>>>>>> 5573ae233 (Refactor parquet PageReader to use PagedInputStream for decompression) bool isGzip) : Decompressor{blockSize, streamDebugInfo} { zstream_.next_in = Z_NULL; @@ -165,11 +155,7 @@ ZlibDecompressor::ZlibDecompressor( if (isGzip) { zlibWindowBits = zlibWindowBits | GZIP_DETECT_CODE; } -<<<<<<< HEAD const auto result = inflateInit2(&zstream_, zlibWindowBits); -======= - auto result = inflateInit2(&zstream_, zlibWindowBits); ->>>>>>> 5573ae233 (Refactor parquet PageReader to use PagedInputStream for decompression) DWIO_ENSURE_EQ( result, Z_OK, @@ -313,6 +299,97 @@ std::pair ZstdDecompressor::getDecompressedLength( return {uncompressedLength, true}; } +class GzipIAADecompressor : public AsyncDecompressor { + public: + explicit GzipIAADecompressor() {} + + explicit GzipIAADecompressor( + uint64_t blockSize, + const std::string& streamDebugInfo) + : AsyncDecompressor{blockSize, streamDebugInfo} {} + + int decompress( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) override; + + bool waitResult(int job_id) override; +}; + +int GzipIAADecompressor::decompress( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) { +#ifdef VELOX_ENABLE_QPL + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::GetInstance(); + int job_id = 0; + qpl_job* job = qpl_job_pool.AcquireDeflateJob(job_id); + if (job == nullptr) { + LOG(WARNING) << "cannot AcquireDeflateJob "; + return -1; // Invalid job id to illustrate the + // failed decompress job. + } + job->op = qpl_op_decompress; + job->next_in_ptr = reinterpret_cast(const_cast(src)); + job->next_out_ptr = reinterpret_cast(dest); + job->available_in = static_cast(srcLength); + job->available_out = static_cast(destLength); + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE; + + qpl_status status = qpl_submit_job(job); + if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) { + qpl_job_pool.ReleaseJob(job_id); + job = qpl_job_pool.AcquireDeflateJob(job_id); + if (job == nullptr) { + LOG(WARNING) + << "cannot acqure deflate job after QPL_STS_QUEUES_ARE_BUSY_ERR "; + return -1; // Invalid job id to illustrate the + // failed decompress job. + } + job->op = qpl_op_decompress; + job->next_in_ptr = reinterpret_cast(const_cast(src)); + job->next_out_ptr = reinterpret_cast(dest); + job->available_in = static_cast(srcLength); + job->available_out = static_cast(destLength); + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE; + + status = qpl_submit_job(job); + } + if (status != QPL_STS_OK) { + qpl_job_pool.ReleaseJob(job_id); + LOG(WARNING) << "cannot submit job, error status: " << status; + return -1; // Invalid job id to illustrate the + // failed decompress job. + } else { + return job_id; + } +#else + return -1; +#endif +} + +bool GzipIAADecompressor::waitResult(int job_id) { +#ifdef VELOX_ENABLE_QPL + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::GetInstance(); + if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) { + return true; + } + qpl_job* job = qpl_job_pool.GetJobById(job_id); + + auto status = qpl_wait_job(job); + qpl_job_pool.ReleaseJob(job_id); + if (status == QPL_STS_OK) { + return true; + } + LOG(WARNING) << "Decompress w/IAA error, status: " << status; +#endif + return false; +} + class SnappyDecompressor : public Decompressor { public: explicit SnappyDecompressor( @@ -370,13 +447,13 @@ class ZlibDecompressionStream : public PagedInputStream, std::unique_ptr inStream, uint64_t blockSize, MemoryPool& pool, - const std::string& streamDebugInfo, int windowBits, + const std::string& streamDebugInfo, bool isGzip = false, bool useRawDecompression = false, size_t compressedLength = 0) : PagedInputStream{std::move(inStream), pool, streamDebugInfo, useRawDecompression, compressedLength}, - ZlibDecompressor{blockSize, streamDebugInfo, windowBits, isGzip} {} + ZlibDecompressor{blockSize, windowBits, streamDebugInfo, isGzip} {} ~ZlibDecompressionStream() override = default; bool readOrSkip(const void** data, int32_t* size) override; @@ -483,8 +560,8 @@ std::unique_ptr createCompressor( CompressionKind kind, CompressionBufferPool& bufferPool, DataBufferHolder& bufferHolder, - CompressionOptions options, uint8_t pageHeaderSize, + const CompressionOptions& options, const Encrypter* encrypter) { std::unique_ptr compressor; switch (kind) { @@ -531,11 +608,11 @@ std::unique_ptr createDecompressor( std::unique_ptr input, uint64_t blockSize, MemoryPool& pool, + const CompressionOptions& options, const std::string& streamDebugInfo, const Decrypter* decrypter, bool useRawDecompression, - size_t compressedLength, - CompressionOptions options) { + size_t compressedLength) { std::unique_ptr decompressor; switch (static_cast(kind)) { case CompressionKind::CompressionKind_NONE: @@ -552,14 +629,14 @@ std::unique_ptr createDecompressor( std::move(input), blockSize, pool, - streamDebugInfo, options.format.zlib.windowBits, + streamDebugInfo, false, useRawDecompression, compressedLength); } decompressor = std::make_unique( - blockSize, streamDebugInfo, options.format.zlib.windowBits, false); + blockSize, options.format.zlib.windowBits, streamDebugInfo, false); break; case CompressionKind::CompressionKind_GZIP: if (!decrypter) { @@ -569,14 +646,14 @@ std::unique_ptr createDecompressor( std::move(input), blockSize, pool, - streamDebugInfo, options.format.zlib.windowBits, + streamDebugInfo, true, useRawDecompression, compressedLength); } decompressor = std::make_unique( - blockSize, streamDebugInfo, options.format.zlib.windowBits, true); + blockSize, options.format.zlib.windowBits, streamDebugInfo, true); break; case CompressionKind::CompressionKind_SNAPPY: decompressor = @@ -607,4 +684,19 @@ std::unique_ptr createDecompressor( compressedLength); } +std::unique_ptr +createAsyncDecompressor( + facebook::velox::common::CompressionKind kind, + uint64_t bufferSize, + const std::string& streamDebugInfo) { + std::unique_ptr decompressor; + switch (static_cast(kind)) { + case CompressionKind::CompressionKind_GZIP: + return std::make_unique(bufferSize, streamDebugInfo); + default: + DWIO_RAISE("Asynchronous mode not support for compression codec ", kind); + } + return nullptr; +} + } // namespace facebook::velox::dwio::common::compression diff --git a/velox/dwio/common/compression/Compression.h b/velox/dwio/common/compression/Compression.h index 993b6a6911431..e68b50dd4b07c 100644 --- a/velox/dwio/common/compression/Compression.h +++ b/velox/dwio/common/compression/Compression.h @@ -21,6 +21,9 @@ #include "velox/dwio/common/SeekableInputStream.h" #include "velox/dwio/common/compression/CompressionBufferPool.h" #include "velox/dwio/common/encryption/Encryption.h" +#ifdef VELOX_ENABLE_QPL +#include "velox/dwio/common/QplJobPool.h" +#endif namespace facebook::velox::dwio::common::compression { @@ -32,6 +35,7 @@ class Compressor { // https://zlib.net/manual.html static constexpr int DWRF_ORC_ZLIB_WINDOW_BITS = -15; static constexpr int PARQUET_ZLIB_WINDOW_BITS = 15; + static constexpr int PARQUET_ZLIB_WINDOW_BITS_4KB = 12; explicit Compressor(int32_t level) : level_{level} {} @@ -67,10 +71,44 @@ class Decompressor { const std::string streamDebugInfo_; }; +class AsyncDecompressor { + public: + explicit AsyncDecompressor(){}; + explicit AsyncDecompressor( + uint64_t blockSize, + const std::string& streamDebugInfo) + : blockSize_{blockSize}, streamDebugInfo_{streamDebugInfo} {} + + virtual ~AsyncDecompressor() = default; + + virtual uint64_t getUncompressedLength( + const char* /* unused */, + uint64_t /* unused */) const { + return blockSize_; + } + + virtual int decompress( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) = 0; + + virtual bool waitResult(int job_id) = 0; + + protected: + uint64_t blockSize_; + const std::string streamDebugInfo_; +}; + struct CompressionOptions { + /// Format specific compression/decompression options union Format { struct { + /// Window bits determines the history buffer size and whether + /// header/trailer is added to the compression block. int windowBits; + /// Compression level determines the compression ratio. Zlib supports + /// values ranging from 0 (no compression) to 9 (max compression) int32_t compressionLevel; } zlib; @@ -82,44 +120,51 @@ struct CompressionOptions { uint32_t compressionThreshold; }; -static CompressionOptions getDwrfOrcCompressionOptions( - velox::common::CompressionKind kind, - uint32_t compressionThreshold, - int32_t zlibCompressionLevel, - int32_t zstdCompressionLevel) { - CompressionOptions options; - options.compressionThreshold = compressionThreshold; - - if (kind == velox::common::CompressionKind_ZLIB) { - options.format.zlib.windowBits = Compressor::DWRF_ORC_ZLIB_WINDOW_BITS; - options.format.zlib.compressionLevel = zlibCompressionLevel; - } else if (kind == velox::common::CompressionKind_ZSTD) { - options.format.zstd.compressionLevel = zstdCompressionLevel; +/** + * Get the window size from zlib header(rfc1950). + * 0 1 + * +---+---+ + * |CMF|FLG| (more-->) + * +---+---+ + * bits 0 to 3 CM Compression method + * bits 4 to 7 CINFO Compression info + * CM (Compression method) This identifies the compression method used in the + * file. CM = 8 denotes the "deflate" compression method with a window size up + * to 32K. CINFO (Compression info) For CM = 8, CINFO is the base-2 logarithm of + * the LZ77 window size, minus eight (CINFO=7 indicates a 32K window size). + * @param stream_ptr the compressed block length for raw decompression + * @param stream_size compression options to use + */ +static int getZlibWindowBits(const uint8_t* stream_ptr, uint32_t stream_size) { + static constexpr uint8_t CM_ZLIB_DEFAULT_VALUE = 8u; + static constexpr uint32_t ZLIB_MIN_HEADER_SIZE = 2u; + static constexpr uint32_t ZLIB_INFO_OFFSET = 4u; + if (stream_size < ZLIB_MIN_HEADER_SIZE) { + return -1; } - return options; -} - -static CompressionOptions getDwrfOrcDecompressionOptions() { - CompressionOptions options; - options.format.zlib.windowBits = Compressor::DWRF_ORC_ZLIB_WINDOW_BITS; - return options; -} + const uint8_t compression_method_and_flag = *stream_ptr++; + const uint8_t compression_method = compression_method_and_flag & 0xf; + const uint8_t compression_info = + compression_method_and_flag >> ZLIB_INFO_OFFSET; -static CompressionOptions getParquetDecompressionOptions() { - CompressionOptions options; - options.format.zlib.windowBits = Compressor::PARQUET_ZLIB_WINDOW_BITS; - return options; + if (CM_ZLIB_DEFAULT_VALUE != compression_method) { + return -1; + } + if (compression_info > 7) { + return -1; + } + return CM_ZLIB_DEFAULT_VALUE + compression_info; } /** * Create a decompressor for the given compression kind. - * @param kind the compression type to implement - * @param input the input stream that is the underlying source - * @param bufferSize the maximum size of the buffer - * @param pool the memory pool - * @param useRawDecompression specify whether to perform raw decompression - * @param compressedLength the compressed block length for raw decompression - * @param options compression options to use + * @param kind The compression type to implement + * @param input The input stream that is the underlying source + * @param bufferSize The maximum size of the buffer + * @param pool The memory pool + * @param options The compression options to use + * @param useRawDecompression Specify whether to perform raw decompression + * @param compressedLength The compressed block length for raw decompression */ std::unique_ptr createDecompressor( facebook::velox::common::CompressionKind kind, @@ -130,8 +175,7 @@ std::unique_ptr createDecompressor( const std::string& streamDebugInfo, const dwio::common::encryption::Decrypter* decryptr = nullptr, bool useRawDecompression = false, - size_t compressedLength = 0, - CompressionOptions options = getDwrfOrcDecompressionOptions()); + size_t compressedLength = 0); /** * Create a compressor for the given compression kind. @@ -146,9 +190,23 @@ std::unique_ptr createCompressor( facebook::velox::common::CompressionKind kind, CompressionBufferPool& bufferPool, DataBufferHolder& bufferHolder, - CompressionOptions options, uint8_t pageHeaderSize, const CompressionOptions& options, const dwio::common::encryption::Encrypter* encrypter = nullptr); +/** + * Create a decompressor for the given compression kind in asynchronous mode. + * @param kind the compression type to implement + * @param input the input stream that is the underlying source + * @param bufferSize the maximum size of the buffer + * @param pool the memory pool + * @param useRawDecompression specify whether to perform raw decompression + * @param compressedLength the compressed block length for raw decompression + * @param options compression options to use + */ +std::unique_ptr +createAsyncDecompressor( + facebook::velox::common::CompressionKind kind, + uint64_t bufferSize, + const std::string& streamDebugInfo); } // namespace facebook::velox::dwio::common::compression diff --git a/velox/dwio/dwrf/common/Compression.h b/velox/dwio/dwrf/common/Compression.h index 8d485ff037039..ce69430b52e06 100644 --- a/velox/dwio/dwrf/common/Compression.h +++ b/velox/dwio/dwrf/common/Compression.h @@ -71,7 +71,6 @@ static std::unique_ptr createCompressor( kind, bufferPool, bufferHolder, - dwrfOrcCompressionOptions, PAGE_HEADER_SIZE, dwrfOrcCompressionOptions, encrypter); diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index dbe8481e33a64..cd8ab33bedd9e 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -34,11 +34,116 @@ namespace facebook::velox::parquet { using thrift::Encoding; using thrift::PageHeader; +void PageReader::preDecompressPage( + bool& need_pre_decompress, + int64_t numValues) { + if (codec_ != thrift::CompressionCodec::GZIP) { + need_pre_decompress = false; + return; + } + for (;;) { + auto dataStart = pageStart_; + if (chunkSize_ <= pageStart_) { + // This may happen if seeking to exactly end of row group. + numRepDefsInPage_ = 0; + numRowsInPage_ = 0; + break; + } + PageHeader pageHeader = readPageHeader(); + pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; + switch (pageHeader.type) { + case thrift::PageType::DATA_PAGE: + prefetchDataPageV1(pageHeader); + break; + case thrift::PageType::DATA_PAGE_V2: + prefetchDataPageV2(pageHeader); + break; + case thrift::PageType::DICTIONARY_PAGE: + prefetchDictionary(pageHeader); + continue; + default: + break; // ignore INDEX page type and any other custom extensions + } + break; + } + need_pre_decompress = isWinSizeFit; + rowGroupPageInfo_.numValues = numValues; + rowGroupPageInfo_.visitedRows = 0; +} + +void PageReader::prefetchNextPage() { + if (rowGroupPageInfo_.visitedRows + numRowsInPage_ >= + rowGroupPageInfo_.numValues) { + return; + } + if (chunkSize_ <= pageStart_) { + return; + } + PageHeader pageHeader = readPageHeader(); + rowGroupPageInfo_.pageStart = + pageDataStart_ + pageHeader.compressed_page_size; + switch (pageHeader.type) { + case thrift::PageType::DATA_PAGE: { + dataPageHeader_ = pageHeader; + VELOX_CHECK( + pageHeader.type == thrift::PageType::DATA_PAGE && + pageHeader.__isset.data_page_header); + rowGroupPageInfo_.dataPageData = + readBytes(pageHeader.compressed_page_size, pageBuffer_); + pre_decompress_data = iaaDecompressGzip( + rowGroupPageInfo_.dataPageData, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + rowGroupPageInfo_.uncompressedDataV1Data, + data_qpl_job_id); + break; + } + case thrift::PageType::DATA_PAGE_V2: + LOG(WARNING) << "Data Page V2 not support "; + break; + case thrift::PageType::DICTIONARY_PAGE: + LOG(WARNING) << "Wrong path "; + break; + default: + break; // ignore INDEX page type and any other custom extensions + } +} + +bool PageReader::seekToPreDecompPage(int64_t row) { + bool has_qpl = false; + if (dict_qpl_job_id != 0) { + bool job_success = getDecompRes(dict_qpl_job_id); + prepareDictionary(dictPageHeader_, job_success); + dict_qpl_job_id = 0; + has_qpl = true; + } + + if (this->data_qpl_job_id != 0) { + bool job_success = getDecompRes(data_qpl_job_id); + prepareDataPageV1(dataPageHeader_, row, job_success); + data_qpl_job_id = 0; + has_qpl = true; + } + + if (has_qpl) { + if (row == kRepDefOnly || row < rowOfPage_ + numRowsInPage_) { + return true; + } + updateRowInfoAfterPageSkipped(); + } + return false; +} + void PageReader::seekToPage(int64_t row) { defineDecoder_.reset(); repeatDecoder_.reset(); // 'rowOfPage_' is the row number of the first row of the next page. rowOfPage_ += numRowsInPage_; + + if (seekToPreDecompPage(row)) { + return; + } + for (;;) { auto dataStart = pageStart_; if (chunkSize_ <= pageStart_) { @@ -225,14 +330,14 @@ const char* FOLLY_NONNULL decompressLz4AndLzo( const char* FOLLY_NONNULL PageReader::decompressData( const char* pageData, uint32_t compressedSize, - uint32_t decompressedSize) { + uint32_t uncompressedSize) { if (codec_ == thrift::CompressionCodec::LZ4 || codec_ == thrift::CompressionCodec::LZO) { return decompressLz4AndLzo( pageData, decompressedData_, compressedSize, - decompressedSize, + uncompressedSize, pool_, codec_); } @@ -246,18 +351,18 @@ const char* FOLLY_NONNULL PageReader::decompressData( dwio::common::compression::createDecompressor( ThriftCodecToCompressionKind(codec_), std::move(inputStream), - decompressedSize, + uncompressedSize, pool_, + getParquetDecompressionOptions(), streamDebugInfo, nullptr, true, - compressedSize, - dwio::common::compression::getParquetDecompressionOptions()); + compressedSize); dwio::common::ensureCapacity( - decompressedData_, decompressedSize, &pool_); + decompressedData_, uncompressedSize, &pool_); decompressedStream->readFully( - decompressedData_->asMutable(), decompressedSize); + decompressedData_->asMutable(), uncompressedSize); return decompressedData_->as(); } @@ -304,7 +409,10 @@ void PageReader::updateRowInfoAfterPageSkipped() { } } -void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { +void PageReader::prepareDataPageV1( + const PageHeader& pageHeader, + int64_t row, + bool job_success) { VELOX_CHECK( pageHeader.type == thrift::PageType::DATA_PAGE && pageHeader.__isset.data_page_header); @@ -320,11 +428,24 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { return; } - pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - pageData_ = decompressData( - pageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); + if (data_qpl_job_id != 0 && pre_decompress_data && job_success) { + if (rowGroupPageInfo_.visitedRows > 0) { + BufferPtr tmp = uncompressedDataV1Data_; + uncompressedDataV1Data_ = rowGroupPageInfo_.uncompressedDataV1Data; + rowGroupPageInfo_.uncompressedDataV1Data = tmp; + } + pageData_ = uncompressedDataV1Data_->as(); + } else { + if (data_qpl_job_id == 0) { + dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + } else if (rowGroupPageInfo_.visitedRows > 0) { + dataPageData_ = rowGroupPageInfo_.dataPageData; + } + pageData_ = decompressData( + dataPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; if (maxRepeat_ > 0) { uint32_t repeatLength = readField(pageData_); @@ -422,7 +543,9 @@ void PageReader::prepareDataPageV2(const PageHeader& pageHeader, int64_t row) { } } -void PageReader::prepareDictionary(const PageHeader& pageHeader) { +void PageReader::prepareDictionary( + const PageHeader& pageHeader, + bool job_success) { dictionary_.numValues = pageHeader.dictionary_page_header.num_values; dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; dictionary_.sorted = pageHeader.dictionary_page_header.__isset.is_sorted && @@ -432,11 +555,17 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { dictionaryEncoding_ == Encoding::PLAIN); if (codec_ != thrift::CompressionCodec::UNCOMPRESSED) { - pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - pageData_ = decompressData( - pageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); + if (dict_qpl_job_id != 0 && pre_decompress_dict && job_success) { + pageData_ = uncompressedDictData_->as(); + } else { + if (dict_qpl_job_id == 0) { + dictPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + } + pageData_ = decompressData( + dictPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } } auto parquetType = type_->parquetType_.value(); @@ -899,6 +1028,10 @@ bool PageReader::rowsForPage( if (hasChunkRepDefs_) { numLeafNullsConsumed_ = rowOfPage_; } + if (isWinSizeFit) { + prefetchNextPage(); + } + rowGroupPageInfo_.visitedRows += numRowsInPage_; } auto& scanState = reader.scanState(); if (isDictionary()) { @@ -994,4 +1127,92 @@ const VectorPtr& PageReader::dictionaryValues(const TypePtr& type) { return dictionaryValues_; } +void PageReader::prefetchDataPageV1(const thrift::PageHeader& pageHeader) { + dataPageHeader_ = pageHeader; + VELOX_CHECK( + pageHeader.type == thrift::PageType::DATA_PAGE && + pageHeader.__isset.data_page_header); + + dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + pre_decompress_data = iaaDecompressGzip( + dataPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + uncompressedDataV1Data_, + data_qpl_job_id); + return; +} + +void PageReader::prefetchDataPageV2(const thrift::PageHeader& pageHeader) { + return; +} + +void PageReader::prefetchDictionary(const thrift::PageHeader& pageHeader) { + dictPageHeader_ = pageHeader; + dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; + VELOX_CHECK( + dictionaryEncoding_ == Encoding::PLAIN_DICTIONARY || + dictionaryEncoding_ == Encoding::PLAIN); + dictPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + + pre_decompress_dict = iaaDecompressGzip( + dictPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + uncompressedDictData_, + dict_qpl_job_id); + + return; +} + +const bool FOLLY_NONNULL PageReader::iaaDecompressGzip( + const char* pageData, + uint32_t compressedSize, + uint32_t uncompressedSize, + BufferPtr& uncompressedData, + int& qpl_job_id) { + dwio::common::ensureCapacity( + uncompressedData, uncompressedSize, &pool_); + + if (!isWinSizeFit) { + // window size should be 4KB for IAA + if (dwio::common::compression::Compressor::PARQUET_ZLIB_WINDOW_BITS_4KB == + dwio::common::compression::getZlibWindowBits( + (const uint8_t*)pageData, uncompressedSize)) { + isWinSizeFit = true; + } else { + qpl_job_id = -1; + return false; + } + } + auto streamDebugInfo = + fmt::format("Page Reader: Stream {}", inputStream_->getName()); + std::unique_ptr decompressor = + dwio::common::compression::createAsyncDecompressor( + ThriftCodecToCompressionKind(codec_), + uncompressedSize, + streamDebugInfo); + if (decompressor == nullptr) { + return false; + } + qpl_job_id = decompressor->decompress( + (const char*)pageData, + compressedSize, + (char*)uncompressedData->asMutable(), + uncompressedSize); + if (qpl_job_id < 0) { + return false; + } + return true; +} + +const bool FOLLY_NONNULL PageReader::getDecompRes(int job_id) { + auto streamDebugInfo = + fmt::format("Page Reader: Stream {}", inputStream_->getName()); + std::unique_ptr decompressor = + dwio::common::compression::createAsyncDecompressor( + ThriftCodecToCompressionKind(codec_), 0, streamDebugInfo); + return decompressor->waitResult(job_id); +} + } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 06844ee3573c2..9d5d3036eb5d0 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -30,6 +30,15 @@ namespace facebook::velox::parquet { +struct PreDecompPageInfo { + int64_t numValues; // Number of values in this row group + int64_t visitedRows; // rows already read + uint64_t pageStart{0}; + thrift::PageHeader dataPageHeader; + const char* FOLLY_NULLABLE dataPageData{nullptr}; + BufferPtr uncompressedDataV1Data; +}; + /// Manages access to pages inside a ColumnChunk. Interprets page headers and /// encodings and presents the combination of pages and encoded values as a /// continuous stream accessible via readWithVisitor(). @@ -51,6 +60,10 @@ class PageReader { chunkSize_(chunkSize), nullConcatenation_(pool_) { type_->makeLevelInfo(leafInfo_); + dict_qpl_job_id = 0; + data_qpl_job_id = 0; + uncompressedDictData_ = nullptr; + uncompressedDataV1Data_ = nullptr; } // This PageReader constructor is for unit test only. @@ -110,6 +123,21 @@ class PageReader { /// Advances 'numRows' top level rows. void skip(int64_t numRows); + /// Pre-decompress GZIP page with IAA + void preDecompressPage(bool& need_pre_decompress, int64_t numValues); + void prefetchDataPageV1(const thrift::PageHeader& pageHeader); + void prefetchDataPageV2(const thrift::PageHeader& pageHeader); + void prefetchDictionary(const thrift::PageHeader& pageHeader); + const bool FOLLY_NONNULL getDecompRes(int job_id); + void prefetchNextPage(); + bool seekToPreDecompPage(int64_t row); + const bool FOLLY_NONNULL iaaDecompressGzip( + const char* FOLLY_NONNULL pageData, + uint32_t compressedSize, + uint32_t uncompressedSize, + BufferPtr& uncompressedData, + int& qpl_job_id); + /// Decodes repdefs for 'numTopLevelRows'. Use getLengthsAndNulls() /// to access the lengths and nulls for the different nesting /// levels. @@ -222,9 +250,14 @@ class PageReader { // next page. void updateRowInfoAfterPageSkipped(); - void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row); + void prepareDataPageV1( + const thrift::PageHeader& pageHeader, + int64_t row, + bool job_success = false); void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row); - void prepareDictionary(const thrift::PageHeader& pageHeader); + void prepareDictionary( + const thrift::PageHeader& pageHeader, + bool job_success = false); void makeDecoder(); // For a non-top level leaf, reads the defs and sets 'leafNulls_' and @@ -514,6 +547,23 @@ class PageReader { std::unique_ptr stringDecoder_; std::unique_ptr booleanDecoder_; // Add decoders for other encodings here. + // Used for pre-decompress + BufferPtr uncompressedDictData_; + BufferPtr uncompressedDataV1Data_; + thrift::PageHeader dictPageHeader_; + const char* FOLLY_NULLABLE dictPageData_{nullptr}; + bool needUncompressDict; + + thrift::PageHeader dataPageHeader_; + const char* FOLLY_NULLABLE dataPageData_{nullptr}; + + int dict_qpl_job_id; + int data_qpl_job_id; + + bool pre_decompress_dict = false; + bool pre_decompress_data = false; + bool isWinSizeFit = false; + PreDecompPageInfo rowGroupPageInfo_; }; template diff --git a/velox/dwio/parquet/reader/ParquetData.cpp b/velox/dwio/parquet/reader/ParquetData.cpp index f94ed2936895b..d1ae582685416 100644 --- a/velox/dwio/parquet/reader/ParquetData.cpp +++ b/velox/dwio/parquet/reader/ParquetData.cpp @@ -16,6 +16,9 @@ #include "velox/dwio/parquet/reader/ParquetData.h" #include "velox/dwio/parquet/reader/Statistics.h" +#ifdef VELOX_ENABLE_QPL +#include "velox/dwio/common/QplJobPool.h" +#endif namespace facebook::velox::parquet { @@ -82,6 +85,33 @@ bool ParquetData::rowGroupMatches( return true; } +bool ParquetData::preDecompRowGroup(uint32_t index) { +#ifdef VELOX_ENABLE_QPL + if (!dwio::common::QplJobHWPool::GetInstance().job_ready()) { + return false; + } +#else + return false; +#endif + auto& metaData = rowGroups_[index].columns[type_->column()].meta_data; + if (metaData.codec != thrift::CompressionCodec::GZIP || !needPreDecomp) { + LOG(WARNING) << "QPL Job not ready or zlib window size(" << needPreDecomp + << ") is not 4KB"; + return false; + } + + pageReaders_.resize(rowGroups_.size()); + auto iaaPageReader = std::make_unique( + std::move(streams_[index]), + pool_, + type_, + metaData.codec, + metaData.total_compressed_size); + iaaPageReader->preDecompressPage(needPreDecomp, metaData.num_values); + pageReaders_[index] = std::move(iaaPageReader); + return needPreDecomp; +} + void ParquetData::enqueueRowGroup( uint32_t index, dwio::common::BufferedInput& input) { @@ -112,8 +142,13 @@ void ParquetData::enqueueRowGroup( dwio::common::PositionProvider ParquetData::seekToRowGroup(uint32_t index) { static std::vector empty; VELOX_CHECK_LT(index, streams_.size()); - VELOX_CHECK(streams_[index], "Stream not enqueued for column"); + // VELOX_CHECK(streams_[index], "Stream not enqueued for column"); auto& metadata = rowGroups_[index].columns[type_->column()].meta_data; + if (metadata.codec == thrift::CompressionCodec::GZIP && + pageReaders_.size() > index && pageReaders_[index] != nullptr) { + reader_ = std::move(pageReaders_[index]); + return dwio::common::PositionProvider(empty); + } reader_ = std::make_unique( std::move(streams_[index]), pool_, diff --git a/velox/dwio/parquet/reader/ParquetData.h b/velox/dwio/parquet/reader/ParquetData.h index cf400165da180..2c502a77e41c2 100644 --- a/velox/dwio/parquet/reader/ParquetData.h +++ b/velox/dwio/parquet/reader/ParquetData.h @@ -55,6 +55,9 @@ class ParquetData : public dwio::common::FormatData { /// Prepares to read data for 'index'th row group. void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input); + /// pre-decompress data for the 'index'th row group + bool preDecompRowGroup(uint32_t index); + /// Positions 'this' at 'index'th row group. loadRowGroup must be called /// first. The returned PositionProvider is empty and should not be used. /// Other formats may use it. @@ -192,7 +195,8 @@ class ParquetData : public dwio::common::FormatData { const uint32_t maxRepeat_; int64_t rowsInRowGroup_; std::unique_ptr reader_; - + std::vector> pageReaders_; + bool needPreDecomp = true; // Nulls derived from leaf repdefs for non-leaf readers. BufferPtr presetNulls_; diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index d0738bc055268..ea7beaf5ca15e 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -119,6 +119,7 @@ class ReaderBase { std::shared_ptr schemaWithId_; const bool binaryAsString = false; + bool needPreDecomp = true; // Map from row group index to pre-created loading BufferedInput. std::unordered_map> @@ -139,6 +140,7 @@ ReaderBase::ReaderBase( loadFileMetaData(); initializeSchema(); + needPreDecomp = true; } void ReaderBase::loadFileMetaData() { @@ -577,12 +579,18 @@ void ReaderBase::scheduleRowGroups( auto input = inputs_[thisGroup].get(); if (!input) { inputs_[thisGroup] = reader.loadRowGroup(thisGroup, input_); + if (needPreDecomp) { + needPreDecomp = reader.preDecompRowGroup(thisGroup); + } } for (auto counter = 0; counter < FLAGS_parquet_prefetch_rowgroups; ++counter) { if (nextGroup) { if (inputs_.count(nextGroup) != 0) { inputs_[nextGroup] = reader.loadRowGroup(thisGroup, input_); + if (needPreDecomp) { + needPreDecomp = reader.preDecompRowGroup(thisGroup); + } } } else { break; diff --git a/velox/dwio/parquet/reader/RepeatedColumnReader.h b/velox/dwio/parquet/reader/RepeatedColumnReader.h index aaa05eff2a286..7264a729ff781 100644 --- a/velox/dwio/parquet/reader/RepeatedColumnReader.h +++ b/velox/dwio/parquet/reader/RepeatedColumnReader.h @@ -128,6 +128,9 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader { void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input); + bool preDecompRowGroup(uint32_t index) { + return true; + } void read( vector_size_t offset, RowSet rows, diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index a0af6324477f6..c4bdfcabf4bb4 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -133,6 +133,25 @@ void StructColumnReader::enqueueRowGroup( } } +bool StructColumnReader::preDecompRowGroup(uint32_t index) { + for (auto& child : children_) { + if (!needPreDecomp) { + return false; + } + if (auto structChild = dynamic_cast(child)) { + continue; + } else if (auto listChild = dynamic_cast(child)) { + continue; + } else if (auto mapChild = dynamic_cast(child)) { + continue; + } else { + needPreDecomp = + child->formatData().as().preDecompRowGroup(index); + } + } + return needPreDecomp; +} + void StructColumnReader::seekToRowGroup(uint32_t index) { SelectiveStructColumnReader::seekToRowGroup(index); BufferPtr noBuffer; diff --git a/velox/dwio/parquet/reader/StructColumnReader.h b/velox/dwio/parquet/reader/StructColumnReader.h index 30e8ebf6d62c6..1ad45458d2edf 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.h +++ b/velox/dwio/parquet/reader/StructColumnReader.h @@ -41,6 +41,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader { uint32_t index, const std::shared_ptr& input); + bool preDecompRowGroup(uint32_t index); + // No-op in Parquet. All readers switch row groups at the same time, there is // no on-demand skipping to a new row group. void advanceFieldReader( @@ -85,6 +87,7 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader { // The level information for extracting nulls for 'this' from the // repdefs in a leaf PageReader. ::parquet::internal::LevelInfo levelInfo_; + bool needPreDecomp = true; }; } // namespace facebook::velox::parquet