From 1efe071cd17a709bc57860ca0d792e846d47501a Mon Sep 17 00:00:00 2001 From: yaqi-zhao Date: Tue, 7 Nov 2023 15:38:46 +0800 Subject: [PATCH 1/2] pre-decompress with Intel IAA --- .../resolve_dependency_modules/inteliaa.cmake | 59 ++++ CMakeLists.txt | 11 + velox/dwio/common/CMakeLists.txt | 6 + velox/dwio/common/QplJobPool.cpp | 129 +++++++++ velox/dwio/common/QplJobPool.h | 91 ++++++ velox/dwio/common/compression/Compression.cpp | 124 ++++++++ velox/dwio/common/compression/Compression.h | 86 ++++++ velox/dwio/parquet/reader/PageReader.cpp | 267 +++++++++++++++++- velox/dwio/parquet/reader/PageReader.h | 59 +++- velox/dwio/parquet/reader/ParquetData.cpp | 36 +++ velox/dwio/parquet/reader/ParquetData.h | 6 +- velox/dwio/parquet/reader/ParquetReader.cpp | 5 + .../parquet/reader/RepeatedColumnReader.cpp | 23 ++ .../parquet/reader/RepeatedColumnReader.h | 2 + .../parquet/reader/StructColumnReader.cpp | 19 ++ .../dwio/parquet/reader/StructColumnReader.h | 3 + 16 files changed, 909 insertions(+), 17 deletions(-) create mode 100644 CMake/resolve_dependency_modules/inteliaa.cmake create mode 100644 velox/dwio/common/QplJobPool.cpp create mode 100644 velox/dwio/common/QplJobPool.h diff --git a/CMake/resolve_dependency_modules/inteliaa.cmake b/CMake/resolve_dependency_modules/inteliaa.cmake new file mode 100644 index 000000000000..452fb4547ba0 --- /dev/null +++ b/CMake/resolve_dependency_modules/inteliaa.cmake @@ -0,0 +1,59 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +include_guard(GLOBAL) + +set(VELOX_INTELIAA_VERSION 1.3.0) +set(VELOX_INTELIAA_BUILD_SHA256_CHECKSUM + c3eba4d04a9d7aabcf26c9eaf81f6e9b26d19cb1b87a4a5f197a652cfa98f310) +set(VELOX_INTELIAA_SOURCE_URL + "https://github.com/intel/qpl/archive/refs/tags/v${VELOX_INTELIAA_VERSION}.tar.gz" +) + +resolve_dependency_url(INTELIAA) + +message(STATUS "Building Intel IAA from source") + +set(QPL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qpl_ep/install") +set(QPL_STATIC_LIB_NAME + ${CMAKE_STATIC_LIBRARY_PREFIX}qpl${CMAKE_STATIC_LIBRARY_SUFFIX}) +set(QPL_STATIC_LIB "${QPL_PREFIX}/lib/${QPL_STATIC_LIB_NAME}") + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -ldl -laccel-config -L/usr/lib64") + +set(QPL_CMAKE_ARGS + -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + -DCMAKE_INSTALL_LIBDIR=${QPL_PREFIX}/lib + -DCMAKE_INSTALL_PREFIX=${QPL_PREFIX} + -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} + -DQPL_BUILD_TESTS=OFF + -DQPL_BUILD_EXAMPLES=OFF + -DQPL_LIB=ON) + +ExternalProject_Add( + intel_iaa + URL ${VELOX_INTELIAA_SOURCE_URL} + URL_HASH ${VELOX_INTELIAA_BUILD_SHA256_CHECKSUM} + BUILD_BYPRODUCTS "${QPL_STATIC_LIB}" + CMAKE_ARGS ${QPL_CMAKE_ARGS}) + +file(MAKE_DIRECTORY "${QPL_PREFIX}/include") + +add_library(iaa::iaa UNKNOWN IMPORTED) +set(QPL_LIBRARIES ${QPL_STATIC_LIB}) +set(QPL_INCLUDE_DIRS "${QPL_PREFIX}/include") +set_target_properties( + iaa::iaa PROPERTIES IMPORTED_LOCATION ${QPL_LIBRARIES} + INTERFACE_INCLUDE_DIRECTORIES ${QPL_INCLUDE_DIRS}) + +add_dependencies(iaa::iaa intel_iaa-build) diff --git a/CMakeLists.txt b/CMakeLists.txt index b9c88d4add33..8b1220546561 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,6 +103,7 @@ option( VELOX_ENABLE_INT64_BUILD_PARTITION_BOUND "make buildPartitionBounds_ a vector int64 instead of int32 to avoid integer overflow when the hashtable has billions of records" OFF) +option(VELOX_ENABLE_INTEL_IAA "Enable Intel IAA support" OFF) # Explicitly force compilers to generate colored output. Compilers usually do # this by default if they detect the output is a terminal, but this assumption @@ -256,6 +257,11 @@ if(VELOX_ENABLE_PARQUET) set(VELOX_ENABLE_ARROW ON) endif() +if(VELOX_ENABLE_INTEL_IAA) + add_definitions(-DVELOX_ENABLE_INTEL_IAA) + message(STATUS "Intel IAA acceleration enabled") +endif() + # define processor variable for conditional compilation if(${VELOX_CODEGEN_SUPPORT}) add_compile_definitions(CODEGEN_ENABLED=1) @@ -474,6 +480,11 @@ if(NOT ${VELOX_BUILD_MINIMAL}) include_directories(${Protobuf_INCLUDE_DIRS}) endif() +if(VELOX_ENABLE_INTEL_IAA) + set_source(inteliaa) + resolve_dependency(inteliaa) +endif() + # GCC needs to link a library to enable std::filesystem. if("${CMAKE_CXX_COMPILER_ID}" MATCHES "GNU") set(FILESYSTEM "stdc++fs") diff --git a/velox/dwio/common/CMakeLists.txt b/velox/dwio/common/CMakeLists.txt index 8334de75e0f5..c6562a4e2fcc 100644 --- a/velox/dwio/common/CMakeLists.txt +++ b/velox/dwio/common/CMakeLists.txt @@ -78,3 +78,9 @@ target_link_libraries( Boost::regex Folly::folly glog::glog) + +if(VELOX_ENABLE_INTEL_IAA) + add_library(velox_dwio_qpl QplJobPool.cpp) + target_link_libraries(velox_dwio_qpl iaa::iaa Folly::folly) + target_link_libraries(velox_dwio_common velox_dwio_qpl) +endif() diff --git a/velox/dwio/common/QplJobPool.cpp b/velox/dwio/common/QplJobPool.cpp new file mode 100644 index 000000000000..6a62ef6376cf --- /dev/null +++ b/velox/dwio/common/QplJobPool.cpp @@ -0,0 +1,129 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/QplJobPool.h" +#include +#include +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::dwio::common { + +// std::array +// QplJobHWPool::hwJobPtrPool; +std::array, QplJobHWPool::MAX_JOB_NUMBER> + QplJobHWPool::hwJobPtrLocks; +// bool QplJobHWPool::iaa_job_ready = false; +// std::unique_ptr QplJobHWPool::hwJobsBuffer; + +QplJobHWPool& QplJobHWPool::getInstance() { + static QplJobHWPool pool; + return pool; +} + +QplJobHWPool::QplJobHWPool() { + if (!iaaJobReady) { + allocateQPLJob(); + } +} + +QplJobHWPool::~QplJobHWPool() { + for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) { + if (hwJobPtrPool[i]) { + qpl_fini_job(hwJobPtrPool[i]); + hwJobPtrPool[i] = nullptr; + } + } + iaaJobReady = false; +} + +void QplJobHWPool::allocateQPLJob() { + uint32_t job_size = 0; + + // Get size required for saving a single qpl job object + qpl_get_job_size(qpl_path, &job_size); + + // Allocate entire buffer for storing all job objects + hwJobsBuffer = std::make_unique(job_size * MAX_JOB_NUMBER); + + // Initialize pool for storing all job object pointers + // Allocate buffer by shifting address offset for each job object. + for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) { + qpl_job* qplJobPtr = + reinterpret_cast(hwJobsBuffer.get() + i * job_size); + auto status = qpl_init_job(qpl_path, qplJobPtr); + if (status != QPL_STS_OK) { + iaaJobReady = false; + LOG(WARNING) << "Initialization of hardware IAA failed, statsu: " + << status << ". Please check if Intel \ + In-Memory Analytics Accelerator (IAA) is properly set up!"; + return; + } + this->hwJobPtrPool[i] = qplJobPtr; + hwJobPtrLocks[i].store(false); + } + + iaaJobReady = true; + return; +} + +/** + * Acquire a deflate job. + * QplJobHWPool maintains MAX_JOB_NUMBER job slot to avoid frequently allocate, + * initialize and release job. Random slots is used to select a job and + * tryLockJob will check if the job is free. + * @return job_id and qpl_job pointer + */ +std::pair QplJobHWPool::acquireDeflateJob() { + std::pair res; + res.first = -1; + res.second = nullptr; + if (!job_ready()) { + return res; + } + uint32_t retry = 0; + uint32_t index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1); + while (!tryLockJob(index)) { + index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1); + retry++; + if (retry > MAX_JOB_NUMBER) { + return res; + } + } + res.first = index; + if (index >= MAX_JOB_NUMBER) { + return res; + } + res.second = hwJobPtrPool[index]; + + return res; +} + +void QplJobHWPool::releaseJob(int job_id) { + if (job_id >= MAX_JOB_NUMBER || job_id <= 0) { + return; + } + VELOX_CHECK_LT(job_id, MAX_JOB_NUMBER); + hwJobPtrLocks[job_id].store(false); + return; +} + +bool QplJobHWPool::tryLockJob(uint32_t index) { + bool expected = false; + VELOX_CHECK_LT(index, MAX_JOB_NUMBER); + return hwJobPtrLocks[index].compare_exchange_strong(expected, true); +} + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/QplJobPool.h b/velox/dwio/common/QplJobPool.h new file mode 100644 index 000000000000..ae3fe51b45f1 --- /dev/null +++ b/velox/dwio/common/QplJobPool.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "qpl/qpl.h" + +namespace facebook::velox::dwio::common { + +// QplJobHWPool is resource pool to provide the job that will be submitted to +// Intel® IAA Memory for Intel® IAA job will be allocated when the QPLJobHWPool +// instance is created. +// +// The Intel® In-Memory Analytics Accelerator (Intel® IAA) is a hardware +// accelerator that provides very high throughput compression and decompression +// combined with primitive analytic functions. It primarily targets applications +// such as big-data and in-memory analytic databases, as well as +// application-transparent usages such as memory page compression. Intel® IAA +// contains two main functional blocks: Compression and Analytics. The Analytics +// pipe contains two sub-blocks: Decompress and Filter. These functions are tied +// together, so that each analytics operation can perform decompress-only, +// filter-only, or decompress-and-filter processing. +// +// Intel QPLis library to provide application programming interface (API) for +// interaction with Intel® In-Memory Analytics Accelerator (Intel® IAA) hardware +// +// Intel® IAA: +// https://www.intel.com/content/www/us/en/content-details/780887/intel-in-memory-analytics-accelerator-intel-iaa.html +// Intel QPL: +// https://intel.github.io/qpl/documentation/introduction_docs/introduction.html +class QplJobHWPool { + public: + static QplJobHWPool& getInstance(); + QplJobHWPool(); + ~QplJobHWPool(); + + // Release QPL job by the job_id. + void releaseJob(int job_id); + + // Return if the QPL job is allocated sucessfully. + const bool& job_ready() { + return iaaJobReady; + } + + std::pair acquireDeflateJob(); + qpl_job* getJobById(int job_id) { + if (job_id >= MAX_JOB_NUMBER || job_id <= 0) { + return nullptr; + } + return hwJobPtrPool[job_id]; + } + + static constexpr uint32_t MAX_JOB_NUMBER = 1024; + + private: + bool tryLockJob(uint32_t index); + void allocateQPLJob(); + + qpl_path_t qpl_path = qpl_path_hardware; + + // Max jobs in QPL_JOB_POOL + // Entire buffer for storing all job objects + std::unique_ptr hwJobsBuffer; + + // Job pool for storing all job object pointers + std::array hwJobPtrPool; + + // Locks for accessing each job object pointers + bool iaaJobReady; + static std::array, MAX_JOB_NUMBER> hwJobPtrLocks; +}; + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/compression/Compression.cpp b/velox/dwio/common/compression/Compression.cpp index dfe1e3f31564..e36d58e7756f 100644 --- a/velox/dwio/common/compression/Compression.cpp +++ b/velox/dwio/common/compression/Compression.cpp @@ -445,6 +445,113 @@ std::pair ZstdDecompressor::getDecompressedLength( return {uncompressedLength, true}; } +class GzipIAADecompressor : public AsyncDecompressor { + public: + explicit GzipIAADecompressor() {} + + explicit GzipIAADecompressor( + uint64_t blockSize, + const std::string& streamDebugInfo) + : AsyncDecompressor{blockSize, streamDebugInfo} {} + + int decompress( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) override; + + bool waitResult(int job_id) override; + + void releaseJob(int job_id) override; +}; + +int GzipIAADecompressor::decompress( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) { +#ifdef VELOX_ENABLE_INTEL_IAA + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::getInstance(); + // int job_id = 0; + auto deflate_job = qpl_job_pool.acquireDeflateJob(); + // qpl_job* job = qpl_job_pool.AcquireDeflateJob(job_id); + auto job = deflate_job.second; + if (job == nullptr) { + LOG(WARNING) << "cannot AcquireDeflateJob "; + return -1; // Invalid job id to illustrate the + // failed decompress job. + } + job->op = qpl_op_decompress; + job->next_in_ptr = reinterpret_cast(const_cast(src)); + job->next_out_ptr = reinterpret_cast(dest); + job->available_in = static_cast(srcLength); + job->available_out = static_cast(destLength); + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE; + + qpl_status status = qpl_submit_job(job); + if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) { + qpl_job_pool.releaseJob(deflate_job.first); + deflate_job = qpl_job_pool.acquireDeflateJob(); + job = deflate_job.second; + if (job == nullptr) { + LOG(WARNING) + << "cannot acqure deflate job after QPL_STS_QUEUES_ARE_BUSY_ERR "; + return -1; // Invalid job id to illustrate the + // failed decompress job. + } + job->op = qpl_op_decompress; + job->next_in_ptr = reinterpret_cast(const_cast(src)); + job->next_out_ptr = reinterpret_cast(dest); + job->available_in = static_cast(srcLength); + job->available_out = static_cast(destLength); + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE; + + status = qpl_submit_job(job); + } + if (status != QPL_STS_OK) { + qpl_job_pool.releaseJob(deflate_job.first); + LOG(WARNING) << "cannot submit job, error status: " << status; + return -1; // Invalid job id to illustrate the + // failed decompress job. + } else { + return deflate_job.first; + } +#else + return -1; +#endif +} + +bool GzipIAADecompressor::waitResult(int job_id) { +#ifdef VELOX_ENABLE_INTEL_IAA + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::getInstance(); + if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) { + return true; + } + qpl_job* job = qpl_job_pool.getJobById(job_id); + + auto status = qpl_wait_job(job); + qpl_job_pool.releaseJob(job_id); + if (status == QPL_STS_OK) { + return true; + } + LOG(WARNING) << "Decompress w/IAA error, status: " << status; +#endif + return false; +} + +void GzipIAADecompressor::releaseJob(int job_id) { +#ifdef VELOX_ENABLE_INTEL_IAA + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::getInstance(); + if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) { + return; + } + return qpl_job_pool.releaseJob(job_id); +#endif +} + class SnappyDecompressor : public Decompressor { public: explicit SnappyDecompressor( @@ -726,4 +833,21 @@ std::unique_ptr createDecompressor( compressedLength); } +std::unique_ptr +createAsyncDecompressor( + facebook::velox::common::CompressionKind kind, + uint64_t bufferSize, + const std::string& streamDebugInfo) { + std::unique_ptr decompressor; + switch (static_cast(kind)) { + case CompressionKind::CompressionKind_GZIP: + return std::make_unique(bufferSize, streamDebugInfo); + default: + LOG(WARNING) << "Asynchronous mode not support for compression codec " + << kind; + return nullptr; + } + return nullptr; +} + } // namespace facebook::velox::dwio::common::compression diff --git a/velox/dwio/common/compression/Compression.h b/velox/dwio/common/compression/Compression.h index 3d26b3af98a4..f24c2c486782 100644 --- a/velox/dwio/common/compression/Compression.h +++ b/velox/dwio/common/compression/Compression.h @@ -19,6 +19,9 @@ #include "velox/common/compression/Compression.h" #include "velox/dwio/common/SeekableInputStream.h" #include "velox/dwio/common/encryption/Encryption.h" +#ifdef VELOX_ENABLE_INTEL_IAA +#include "velox/dwio/common/QplJobPool.h" +#endif namespace facebook::velox::dwio::common::compression { @@ -30,6 +33,7 @@ class Compressor { // https://zlib.net/manual.html static constexpr int DWRF_ORC_ZLIB_WINDOW_BITS = -15; static constexpr int PARQUET_ZLIB_WINDOW_BITS = 15; + static constexpr int PARQUET_ZLIB_WINDOW_BITS_4KB = 12; explicit Compressor(int32_t level) : level_{level} {} @@ -65,6 +69,37 @@ class Decompressor { const std::string streamDebugInfo_; }; +class AsyncDecompressor { + public: + explicit AsyncDecompressor(){}; + explicit AsyncDecompressor( + uint64_t blockSize, + const std::string& streamDebugInfo) + : blockSize_{blockSize}, streamDebugInfo_{streamDebugInfo} {} + + virtual ~AsyncDecompressor() = default; + + virtual uint64_t getUncompressedLength( + const char* /* unused */, + uint64_t /* unused */) const { + return blockSize_; + } + + virtual int decompress( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) = 0; + + virtual bool waitResult(int job_id) = 0; + + virtual void releaseJob(int job_id) = 0; + + protected: + uint64_t blockSize_; + const std::string streamDebugInfo_; +}; + struct CompressionOptions { /// Format specific compression/decompression options union Format { @@ -89,6 +124,42 @@ struct CompressionOptions { uint32_t compressionThreshold; }; +/** + * Get the window size from zlib header(rfc1950). + * 0 1 + * +---+---+ + * |CMF|FLG| (more-->) + * +---+---+ + * bits 0 to 3 CM Compression method + * bits 4 to 7 CINFO Compression info + * CM (Compression method) This identifies the compression method used in the + * file. CM = 8 denotes the "deflate" compression method with a window size up + * to 32K. CINFO (Compression info) For CM = 8, CINFO is the base-2 logarithm of + * the LZ77 window size, minus eight (CINFO=7 indicates a 32K window size). + * @param stream_ptr the compressed block length for raw decompression + * @param stream_size compression options to use + */ +static int getZlibWindowBits(const uint8_t* stream_ptr, uint32_t stream_size) { + static constexpr uint8_t CM_ZLIB_DEFAULT_VALUE = 8u; + static constexpr uint32_t ZLIB_MIN_HEADER_SIZE = 2u; + static constexpr uint32_t ZLIB_INFO_OFFSET = 4u; + if (stream_size < ZLIB_MIN_HEADER_SIZE) { + return -1; + } + const uint8_t compression_method_and_flag = *stream_ptr++; + const uint8_t compression_method = compression_method_and_flag & 0xf; + const uint8_t compression_info = + compression_method_and_flag >> ZLIB_INFO_OFFSET; + + if (CM_ZLIB_DEFAULT_VALUE != compression_method) { + return -1; + } + if (compression_info > 7) { + return -1; + } + return CM_ZLIB_DEFAULT_VALUE + compression_info; +} + /** * Create a decompressor for the given compression kind. * @param kind The compression type to implement @@ -119,4 +190,19 @@ std::unique_ptr createCompressor( facebook::velox::common::CompressionKind kind, const CompressionOptions& options); +/** + * Create a decompressor for the given compression kind in asynchronous mode. + * @param kind the compression type to implement + * @param input the input stream that is the underlying source + * @param bufferSize the maximum size of the buffer + * @param pool the memory pool + * @param useRawDecompression specify whether to perform raw decompression + * @param compressedLength the compressed block length for raw decompression + * @param options compression options to use + */ +std::unique_ptr +createAsyncDecompressor( + facebook::velox::common::CompressionKind kind, + uint64_t bufferSize, + const std::string& streamDebugInfo); } // namespace facebook::velox::dwio::common::compression diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index e6f44f42ecca..691f56dbccad 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -28,11 +28,116 @@ namespace facebook::velox::parquet { using thrift::Encoding; using thrift::PageHeader; +void PageReader::preDecompressPage( + bool& need_pre_decompress, + int64_t numValues) { + if (codec_ != thrift::CompressionCodec::GZIP) { + need_pre_decompress = false; + return; + } + for (;;) { + auto dataStart = pageStart_; + if (chunkSize_ <= pageStart_) { + // This may happen if seeking to exactly end of row group. + numRepDefsInPage_ = 0; + numRowsInPage_ = 0; + break; + } + PageHeader pageHeader = readPageHeader(); + pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; + switch (pageHeader.type) { + case thrift::PageType::DATA_PAGE: + prefetchDataPageV1(pageHeader); + break; + case thrift::PageType::DATA_PAGE_V2: + prefetchDataPageV2(pageHeader); + break; + case thrift::PageType::DICTIONARY_PAGE: + prefetchDictionary(pageHeader); + continue; + default: + break; // ignore INDEX page type and any other custom extensions + } + break; + } + need_pre_decompress = isWinSizeFit; + rowGroupPageInfo_.numValues = numValues; + rowGroupPageInfo_.visitedRows = 0; +} + +void PageReader::prefetchNextPage() { + if (rowGroupPageInfo_.visitedRows + numRowsInPage_ >= + rowGroupPageInfo_.numValues) { + return; + } + if (chunkSize_ <= pageStart_) { + return; + } + PageHeader pageHeader = readPageHeader(); + rowGroupPageInfo_.pageStart = + pageDataStart_ + pageHeader.compressed_page_size; + switch (pageHeader.type) { + case thrift::PageType::DATA_PAGE: { + dataPageHeader_ = pageHeader; + VELOX_CHECK( + pageHeader.type == thrift::PageType::DATA_PAGE && + pageHeader.__isset.data_page_header); + rowGroupPageInfo_.dataPageData = + readBytes(pageHeader.compressed_page_size, pageBuffer_); + pre_decompress_data = iaaDecompressGzip( + rowGroupPageInfo_.dataPageData, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + rowGroupPageInfo_.uncompressedDataV1Data, + data_qpl_job_id); + break; + } + case thrift::PageType::DATA_PAGE_V2: + LOG(WARNING) << "Data Page V2 not support "; + break; + case thrift::PageType::DICTIONARY_PAGE: + LOG(WARNING) << "Wrong path "; + break; + default: + break; // ignore INDEX page type and any other custom extensions + } +} + +bool PageReader::seekToPreDecompPage(int64_t row) { + bool has_qpl = false; + if (dict_qpl_job_id != 0) { + bool job_success = getDecompRes(dict_qpl_job_id); + prepareDictionary(dictPageHeader_, job_success); + dict_qpl_job_id = 0; + has_qpl = true; + } + + if (this->data_qpl_job_id != 0) { + bool job_success = getDecompRes(data_qpl_job_id); + prepareDataPageV1(dataPageHeader_, row, job_success); + data_qpl_job_id = 0; + has_qpl = true; + } + + if (has_qpl) { + if (row == kRepDefOnly || row < rowOfPage_ + numRowsInPage_) { + return true; + } + updateRowInfoAfterPageSkipped(); + } + return false; +} + void PageReader::seekToPage(int64_t row) { defineDecoder_.reset(); repeatDecoder_.reset(); // 'rowOfPage_' is the row number of the first row of the next page. rowOfPage_ += numRowsInPage_; + + if (seekToPreDecompPage(row)) { + return; + } + for (;;) { auto dataStart = pageStart_; if (chunkSize_ <= pageStart_) { @@ -189,7 +294,10 @@ void PageReader::updateRowInfoAfterPageSkipped() { } } -void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { +void PageReader::prepareDataPageV1( + const PageHeader& pageHeader, + int64_t row, + bool job_success) { VELOX_CHECK( pageHeader.type == thrift::PageType::DATA_PAGE && pageHeader.__isset.data_page_header); @@ -205,11 +313,24 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { return; } - pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - pageData_ = decompressData( - pageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); + if (data_qpl_job_id != 0 && pre_decompress_data && job_success) { + if (rowGroupPageInfo_.visitedRows > 0) { + BufferPtr tmp = uncompressedDataV1Data_; + uncompressedDataV1Data_ = rowGroupPageInfo_.uncompressedDataV1Data; + rowGroupPageInfo_.uncompressedDataV1Data = tmp; + } + pageData_ = uncompressedDataV1Data_->as(); + } else { + if (data_qpl_job_id == 0) { + dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + } else if (rowGroupPageInfo_.visitedRows > 0) { + dataPageData_ = rowGroupPageInfo_.dataPageData; + } + pageData_ = decompressData( + dataPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; if (maxRepeat_ > 0) { uint32_t repeatLength = readField(pageData_); @@ -307,7 +428,9 @@ void PageReader::prepareDataPageV2(const PageHeader& pageHeader, int64_t row) { } } -void PageReader::prepareDictionary(const PageHeader& pageHeader) { +void PageReader::prepareDictionary( + const PageHeader& pageHeader, + bool job_success) { dictionary_.numValues = pageHeader.dictionary_page_header.num_values; dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; dictionary_.sorted = pageHeader.dictionary_page_header.__isset.is_sorted && @@ -317,11 +440,17 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { dictionaryEncoding_ == Encoding::PLAIN); if (codec_ != common::CompressionKind::CompressionKind_NONE) { - pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - pageData_ = decompressData( - pageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); + if (dict_qpl_job_id != 0 && pre_decompress_dict && job_success) { + pageData_ = uncompressedDictData_->as(); + } else { + if (dict_qpl_job_id == 0) { + pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + } + pageData_ = decompressData( + pageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } } auto parquetType = type_->parquetType_.value(); @@ -663,6 +792,10 @@ void PageReader::skip(int64_t numRows) { numLeafNullsConsumed_ = rowOfPage_; } toSkip -= rowOfPage_ - firstUnvisited_; + if (isWinSizeFit) { + prefetchNextPage(); + } + rowGroupPageInfo_.visitedRows += numRowsInPage_; } firstUnvisited_ += numRows; @@ -716,6 +849,10 @@ void PageReader::skipNullsOnly(int64_t numRows) { seekToPage(firstUnvisited_ + numRows); firstUnvisited_ += numRows; toSkip = firstUnvisited_ - rowOfPage_; + if (isWinSizeFit) { + prefetchNextPage(); + } + rowGroupPageInfo_.visitedRows += numRowsInPage_; } else { firstUnvisited_ += numRows; } @@ -736,6 +873,10 @@ void PageReader::readNullsOnly(int64_t numValues, BufferPtr& buffer) { if (!availableOnPage) { seekToPage(firstUnvisited_); availableOnPage = numRowsInPage_; + if (isWinSizeFit) { + prefetchNextPage(); + } + rowGroupPageInfo_.visitedRows += numRowsInPage_; } auto numRead = std::min(availableOnPage, toRead); auto nulls = readNulls(numRead, nullsInReadRange_); @@ -796,6 +937,10 @@ bool PageReader::rowsForPage( if (hasChunkRepDefs_) { numLeafNullsConsumed_ = rowOfPage_; } + if (isWinSizeFit) { + prefetchNextPage(); + } + rowGroupPageInfo_.visitedRows += numRowsInPage_; } auto& scanState = reader.scanState(); if (isDictionary()) { @@ -891,4 +1036,102 @@ const VectorPtr& PageReader::dictionaryValues(const TypePtr& type) { return dictionaryValues_; } +void PageReader::prefetchDataPageV1(const thrift::PageHeader& pageHeader) { + dataPageHeader_ = pageHeader; + VELOX_CHECK( + pageHeader.type == thrift::PageType::DATA_PAGE && + pageHeader.__isset.data_page_header); + + dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + pre_decompress_data = iaaDecompressGzip( + dataPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + uncompressedDataV1Data_, + data_qpl_job_id); + return; +} + +void PageReader::prefetchDataPageV2(const thrift::PageHeader& pageHeader) { + return; +} + +void PageReader::prefetchDictionary(const thrift::PageHeader& pageHeader) { + dictPageHeader_ = pageHeader; + dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; + VELOX_CHECK( + dictionaryEncoding_ == Encoding::PLAIN_DICTIONARY || + dictionaryEncoding_ == Encoding::PLAIN); + dictPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + + pre_decompress_dict = iaaDecompressGzip( + dictPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + uncompressedDictData_, + dict_qpl_job_id); + + return; +} + +const bool PageReader::iaaDecompressGzip( + const char* pageData, + uint32_t compressedSize, + uint32_t uncompressedSize, + BufferPtr& uncompressedData, + int& qpl_job_id) { + dwio::common::ensureCapacity( + uncompressedData, uncompressedSize, &pool_); + + if (!isWinSizeFit) { + // window size should be 4KB for IAA + if (dwio::common::compression::Compressor::PARQUET_ZLIB_WINDOW_BITS_4KB == + dwio::common::compression::getZlibWindowBits( + (const uint8_t*)pageData, uncompressedSize)) { + isWinSizeFit = true; + } else { + qpl_job_id = -1; + return false; + } + } + auto streamDebugInfo = + fmt::format("Page Reader: Stream {}", inputStream_->getName()); + std::unique_ptr decompressor = + dwio::common::compression::createAsyncDecompressor( + thriftCodecToCompressionKind(), uncompressedSize, streamDebugInfo); + if (decompressor == nullptr) { + return false; + } + qpl_job_id = decompressor->decompress( + (const char*)pageData, + compressedSize, + (char*)uncompressedData->asMutable(), + uncompressedSize); + if (qpl_job_id < 0) { + return false; + } + return true; +} + +const bool PageReader::getDecompRes(int job_id) { + auto streamDebugInfo = + fmt::format("Page Reader: Stream {}", inputStream_->getName()); + std::unique_ptr decompressor = + dwio::common::compression::createAsyncDecompressor( + thriftCodecToCompressionKind(), 0, streamDebugInfo); + return decompressor->waitResult(job_id); +} + +PageReader::~PageReader() { + if (data_qpl_job_id > 0 || dict_qpl_job_id > 0) { + auto streamDebugInfo = + fmt::format("Page Reader: Stream {}", inputStream_->getName()); + std::unique_ptr decompressor = + dwio::common::compression::createAsyncDecompressor( + thriftCodecToCompressionKind(), 0, streamDebugInfo); + decompressor->releaseJob(data_qpl_job_id); + decompressor->releaseJob(dict_qpl_job_id); + } +} + } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 4ef9c959b0db..b88ad0393c3b 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -31,6 +31,15 @@ namespace facebook::velox::parquet { +struct PreDecompPageInfo { + int64_t numValues; // Number of values in this row group + int64_t visitedRows; // rows already read + uint64_t pageStart{0}; + thrift::PageHeader dataPageHeader; + const char* FOLLY_NULLABLE dataPageData{nullptr}; + BufferPtr uncompressedDataV1Data; +}; + /// Manages access to pages inside a ColumnChunk. Interprets page headers and /// encodings and presents the combination of pages and encoded values as a /// continuous stream accessible via readWithVisitor(). @@ -52,6 +61,10 @@ class PageReader { chunkSize_(chunkSize), nullConcatenation_(pool_) { type_->makeLevelInfo(leafInfo_); + dict_qpl_job_id = 0; + data_qpl_job_id = 0; + uncompressedDictData_ = nullptr; + uncompressedDataV1Data_ = nullptr; } // This PageReader constructor is for unit test only. @@ -68,10 +81,26 @@ class PageReader { codec_(codec), chunkSize_(chunkSize), nullConcatenation_(pool_) {} + ~PageReader(); /// Advances 'numRows' top level rows. void skip(int64_t numRows); + /// Pre-decompress GZIP page with IAA + void preDecompressPage(bool& need_pre_decompress, int64_t numValues); + void prefetchDataPageV1(const thrift::PageHeader& pageHeader); + void prefetchDataPageV2(const thrift::PageHeader& pageHeader); + void prefetchDictionary(const thrift::PageHeader& pageHeader); + const bool getDecompRes(int job_id); + void prefetchNextPage(); + bool seekToPreDecompPage(int64_t row); + const bool iaaDecompressGzip( + const char* FOLLY_NONNULL pageData, + uint32_t compressedSize, + uint32_t uncompressedSize, + BufferPtr& uncompressedData, + int& qpl_job_id); + /// Decodes repdefs for 'numTopLevelRows'. Use getLengthsAndNulls() /// to access the lengths and nulls for the different nesting /// levels. @@ -188,9 +217,14 @@ class PageReader { // next page. void updateRowInfoAfterPageSkipped(); - void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row); + void prepareDataPageV1( + const thrift::PageHeader& pageHeader, + int64_t row, + bool job_success = false); void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row); - void prepareDictionary(const thrift::PageHeader& pageHeader); + void prepareDictionary( + const thrift::PageHeader& pageHeader, + bool job_success = false); void makeDecoder(); // For a non-top level leaf, reads the defs and sets 'leafNulls_' and @@ -204,12 +238,12 @@ class PageReader { const char* FOLLY_NONNULL readBytes(int32_t size, BufferPtr& copy); // Decompresses data starting at 'pageData_', consuming 'compressedsize' and - // producing up to 'uncompressedSize' bytes. The start of the decoding + // producing up to 'decompressedSize' bytes. The The start of the decoding // result is returned. an intermediate copy may be made in 'decompresseddata_' const char* FOLLY_NONNULL decompressData( const char* FOLLY_NONNULL pageData, uint32_t compressedSize, - uint32_t uncompressedSize); + uint32_t decompressedSize); template T readField(const char* FOLLY_NONNULL& ptr) { @@ -486,6 +520,23 @@ class PageReader { std::unique_ptr booleanDecoder_; std::unique_ptr deltaBpDecoder_; // Add decoders for other encodings here. + // Used for pre-decompress + BufferPtr uncompressedDictData_; + BufferPtr uncompressedDataV1Data_; + thrift::PageHeader dictPageHeader_; + const char* FOLLY_NULLABLE dictPageData_{nullptr}; + bool needUncompressDict; + + thrift::PageHeader dataPageHeader_; + const char* FOLLY_NULLABLE dataPageData_{nullptr}; + + int dict_qpl_job_id; + int data_qpl_job_id; + + bool pre_decompress_dict = false; + bool pre_decompress_data = false; + bool isWinSizeFit = false; + PreDecompPageInfo rowGroupPageInfo_; }; FOLLY_ALWAYS_INLINE dwio::common::compression::CompressionOptions diff --git a/velox/dwio/parquet/reader/ParquetData.cpp b/velox/dwio/parquet/reader/ParquetData.cpp index 283190bbfb0a..c844d7845595 100644 --- a/velox/dwio/parquet/reader/ParquetData.cpp +++ b/velox/dwio/parquet/reader/ParquetData.cpp @@ -18,6 +18,9 @@ #include "velox/dwio/common/BufferedInput.h" #include "velox/dwio/parquet/reader/Statistics.h" +#ifdef VELOX_ENABLE_INTEL_IAA +#include "velox/dwio/common/QplJobPool.h" +#endif namespace facebook::velox::parquet { @@ -81,6 +84,33 @@ bool ParquetData::rowGroupMatches( return true; } +bool ParquetData::preDecompRowGroup(uint32_t index) { +#ifdef VELOX_ENABLE_INTEL_IAA + if (!dwio::common::QplJobHWPool::getInstance().job_ready()) { + return false; + } +#else + return false; +#endif + auto& metaData = rowGroups_[index].columns[type_->column()].meta_data; + if (metaData.codec != thrift::CompressionCodec::GZIP || !needPreDecomp) { + LOG(WARNING) << "QPL Job not ready or zlib window size(" << needPreDecomp + << ") is not 4KB"; + return false; + } + + pageReaders_.resize(rowGroups_.size()); + auto iaaPageReader = std::make_unique( + std::move(streams_[index]), + pool_, + type_, + metaData.codec, + metaData.total_compressed_size); + iaaPageReader->preDecompressPage(needPreDecomp, metaData.num_values); + pageReaders_[index] = std::move(iaaPageReader); + return needPreDecomp; +} + void ParquetData::enqueueRowGroup( uint32_t index, dwio::common::BufferedInput& input) { @@ -113,6 +143,12 @@ dwio::common::PositionProvider ParquetData::seekToRowGroup(uint32_t index) { VELOX_CHECK_LT(index, streams_.size()); VELOX_CHECK(streams_[index], "Stream not enqueued for column"); auto metadata = fileMetaDataPtr_.rowGroup(index).columnChunk(type_->column()); + auto& metadata = rowGroups_[index].columns[type_->column()].meta_data; + if (metadata.codec == thrift::CompressionCodec::GZIP && + pageReaders_.size() > index && pageReaders_[index] != nullptr) { + reader_ = std::move(pageReaders_[index]); + return dwio::common::PositionProvider(empty); + } reader_ = std::make_unique( std::move(streams_[index]), pool_, diff --git a/velox/dwio/parquet/reader/ParquetData.h b/velox/dwio/parquet/reader/ParquetData.h index e75e69558b8e..51d472025ed7 100644 --- a/velox/dwio/parquet/reader/ParquetData.h +++ b/velox/dwio/parquet/reader/ParquetData.h @@ -62,6 +62,9 @@ class ParquetData : public dwio::common::FormatData { /// Prepares to read data for 'index'th row group. void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input); + /// pre-decompress data for the 'index'th row group + bool preDecompRowGroup(uint32_t index); + /// Positions 'this' at 'index'th row group. loadRowGroup must be called /// first. The returned PositionProvider is empty and should not be used. /// Other formats may use it. @@ -203,7 +206,8 @@ class ParquetData : public dwio::common::FormatData { const uint32_t maxRepeat_; int64_t rowsInRowGroup_; std::unique_ptr reader_; - + std::vector> pageReaders_; + bool needPreDecomp = true; // Nulls derived from leaf repdefs for non-leaf readers. BufferPtr presetNulls_; diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 7d9a7d97da19..b0f286123256 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -117,6 +117,7 @@ class ReaderBase { std::shared_ptr schemaWithId_; const bool binaryAsString = false; + bool needPreDecomp = true; // Map from row group index to pre-created loading BufferedInput. std::unordered_map> @@ -137,6 +138,7 @@ ReaderBase::ReaderBase( loadFileMetaData(); initializeSchema(); + needPreDecomp = true; } void ReaderBase::loadFileMetaData() { @@ -600,6 +602,9 @@ void ReaderBase::scheduleRowGroups( auto thisGroup = rowGroupIds[currentGroup + i]; if (!inputs_[thisGroup]) { inputs_[thisGroup] = reader.loadRowGroup(thisGroup, input_); + if (needPreDecomp) { + needPreDecomp = reader.preDecompRowGroup(thisGroup); + } } } diff --git a/velox/dwio/parquet/reader/RepeatedColumnReader.cpp b/velox/dwio/parquet/reader/RepeatedColumnReader.cpp index 250bd204e083..afb68ecd0a01 100644 --- a/velox/dwio/parquet/reader/RepeatedColumnReader.cpp +++ b/velox/dwio/parquet/reader/RepeatedColumnReader.cpp @@ -83,6 +83,21 @@ void skipUnreadLengthsAndNulls(dwio::common::SelectiveColumnReader& reader) { } } +void preDecompChild( + dwio::common::SelectiveColumnReader* reader, + uint32_t index) { +#ifdef VELOX_ENABLE_INTEL_IAA + auto children = reader->children(); + if (children.empty()) { + reader->formatData().as().preDecompRowGroup(index); + return; + } + for (auto* child : children) { + preDecompChild(child, index); + } +#endif +} + void enqueueChildren( dwio::common::SelectiveColumnReader* reader, uint32_t index, @@ -137,6 +152,10 @@ void MapColumnReader::enqueueRowGroup( enqueueChildren(this, index, input); } +void MapColumnReader::preDecompRowGroup(uint32_t index) { + preDecompChild(this, index); +} + void MapColumnReader::seekToRowGroup(uint32_t index) { SelectiveMapColumnReader::seekToRowGroup(index); readOffset_ = 0; @@ -243,6 +262,10 @@ void ListColumnReader::enqueueRowGroup( enqueueChildren(this, index, input); } +void ListColumnReader::preDecompRowGroup(uint32_t index) { + preDecompChild(this, index); +} + void ListColumnReader::seekToRowGroup(uint32_t index) { SelectiveListColumnReader::seekToRowGroup(index); readOffset_ = 0; diff --git a/velox/dwio/parquet/reader/RepeatedColumnReader.h b/velox/dwio/parquet/reader/RepeatedColumnReader.h index 3155e8d66478..7d9c831ff1c5 100644 --- a/velox/dwio/parquet/reader/RepeatedColumnReader.h +++ b/velox/dwio/parquet/reader/RepeatedColumnReader.h @@ -71,6 +71,7 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader { void seekToRowGroup(uint32_t index) override; void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input); + void preDecompRowGroup(uint32_t index); void read( vector_size_t offset, @@ -128,6 +129,7 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader { void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input); + void preDecompRowGroup(uint32_t index); void read( vector_size_t offset, RowSet rows, diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index eca887eab155..bcbc65f82166 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -140,6 +140,25 @@ void StructColumnReader::enqueueRowGroup( } } +bool StructColumnReader::preDecompRowGroup(uint32_t index) { + for (auto& child : children_) { + if (!needPreDecomp) { + return false; + } + if (auto structChild = dynamic_cast(child)) { + structChild->preDecompRowGroup(index); + } else if (auto listChild = dynamic_cast(child)) { + listChild->preDecompRowGroup(index); + } else if (auto mapChild = dynamic_cast(child)) { + mapChild->preDecompRowGroup(index); + } else { + needPreDecomp = + child->formatData().as().preDecompRowGroup(index); + } + } + return needPreDecomp; +} + void StructColumnReader::seekToRowGroup(uint32_t index) { SelectiveStructColumnReader::seekToRowGroup(index); BufferPtr noBuffer; diff --git a/velox/dwio/parquet/reader/StructColumnReader.h b/velox/dwio/parquet/reader/StructColumnReader.h index f38c9e849c73..e9485554c7f0 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.h +++ b/velox/dwio/parquet/reader/StructColumnReader.h @@ -49,6 +49,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader { uint32_t index, const std::shared_ptr& input); + bool preDecompRowGroup(uint32_t index); + // No-op in Parquet. All readers switch row groups at the same time, there is // no on-demand skipping to a new row group. void advanceFieldReader( @@ -93,6 +95,7 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader { // The level information for extracting nulls for 'this' from the // repdefs in a leaf PageReader. arrow::LevelInfo levelInfo_; + bool needPreDecomp = true; }; } // namespace facebook::velox::parquet From 4ab13b303114b51c7ae2a5220b6427362c8412ed Mon Sep 17 00:00:00 2001 From: yaqi-zhao Date: Wed, 15 Nov 2023 16:41:43 +0800 Subject: [PATCH 2/2] define async decompressor --- velox/dwio/common/QplJobPool.cpp | 11 +- velox/dwio/common/QplJobPool.h | 10 +- .../common/compression/AsyncCompression.h | 98 ++++ velox/dwio/common/compression/CMakeLists.txt | 8 + velox/dwio/common/compression/Compression.cpp | 124 ----- velox/dwio/common/compression/Compression.h | 86 --- .../common/compression/IAACompression.cpp | 116 ++++ velox/dwio/parquet/reader/CMakeLists.txt | 1 + velox/dwio/parquet/reader/IAAPageReader.cpp | 507 ++++++++++++++++++ velox/dwio/parquet/reader/IAAPageReader.h | 114 ++++ velox/dwio/parquet/reader/PageReader.cpp | 267 +-------- velox/dwio/parquet/reader/PageReader.h | 70 +-- velox/dwio/parquet/reader/PageReaderBase.h | 72 +++ velox/dwio/parquet/reader/ParquetData.cpp | 27 +- velox/dwio/parquet/reader/ParquetData.h | 13 +- 15 files changed, 972 insertions(+), 552 deletions(-) create mode 100644 velox/dwio/common/compression/AsyncCompression.h create mode 100644 velox/dwio/common/compression/IAACompression.cpp create mode 100644 velox/dwio/parquet/reader/IAAPageReader.cpp create mode 100644 velox/dwio/parquet/reader/IAAPageReader.h create mode 100644 velox/dwio/parquet/reader/PageReaderBase.h diff --git a/velox/dwio/common/QplJobPool.cpp b/velox/dwio/common/QplJobPool.cpp index 6a62ef6376cf..bb5c7d108857 100644 --- a/velox/dwio/common/QplJobPool.cpp +++ b/velox/dwio/common/QplJobPool.cpp @@ -16,17 +16,12 @@ #include "velox/dwio/common/QplJobPool.h" #include -#include #include "velox/common/base/Exceptions.h" namespace facebook::velox::dwio::common { -// std::array -// QplJobHWPool::hwJobPtrPool; std::array, QplJobHWPool::MAX_JOB_NUMBER> QplJobHWPool::hwJobPtrLocks; -// bool QplJobHWPool::iaa_job_ready = false; -// std::unique_ptr QplJobHWPool::hwJobsBuffer; QplJobHWPool& QplJobHWPool::getInstance() { static QplJobHWPool pool; @@ -49,6 +44,9 @@ QplJobHWPool::~QplJobHWPool() { iaaJobReady = false; } +/** + * Allocate qpl job and put it into hwJobPtrPool + */ void QplJobHWPool::allocateQPLJob() { uint32_t job_size = 0; @@ -60,6 +58,7 @@ void QplJobHWPool::allocateQPLJob() { // Initialize pool for storing all job object pointers // Allocate buffer by shifting address offset for each job object. + hwJobPtrPool.resize(MAX_JOB_NUMBER); for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) { qpl_job* qplJobPtr = reinterpret_cast(hwJobsBuffer.get() + i * job_size); @@ -84,7 +83,7 @@ void QplJobHWPool::allocateQPLJob() { * QplJobHWPool maintains MAX_JOB_NUMBER job slot to avoid frequently allocate, * initialize and release job. Random slots is used to select a job and * tryLockJob will check if the job is free. - * @return job_id and qpl_job pointer + * @return job_id and qpl_job pair */ std::pair QplJobHWPool::acquireDeflateJob() { std::pair res; diff --git a/velox/dwio/common/QplJobPool.h b/velox/dwio/common/QplJobPool.h index ae3fe51b45f1..f80f3a02a704 100644 --- a/velox/dwio/common/QplJobPool.h +++ b/velox/dwio/common/QplJobPool.h @@ -39,7 +39,7 @@ namespace facebook::velox::dwio::common { // together, so that each analytics operation can perform decompress-only, // filter-only, or decompress-and-filter processing. // -// Intel QPLis library to provide application programming interface (API) for +// Intel QPL is library to provide application programming interface (API) for // interaction with Intel® In-Memory Analytics Accelerator (Intel® IAA) hardware // // Intel® IAA: @@ -61,6 +61,12 @@ class QplJobHWPool { } std::pair acquireDeflateJob(); + + /** + * Get qpl job by job id + * @param job_id the job id or index in the qpl job pool + * @return nullptr if the job id is invalid + */ qpl_job* getJobById(int job_id) { if (job_id >= MAX_JOB_NUMBER || job_id <= 0) { return nullptr; @@ -81,7 +87,7 @@ class QplJobHWPool { std::unique_ptr hwJobsBuffer; // Job pool for storing all job object pointers - std::array hwJobPtrPool; + std::vector hwJobPtrPool; // Locks for accessing each job object pointers bool iaaJobReady; diff --git a/velox/dwio/common/compression/AsyncCompression.h b/velox/dwio/common/compression/AsyncCompression.h new file mode 100644 index 000000000000..b6c01bdb8ab7 --- /dev/null +++ b/velox/dwio/common/compression/AsyncCompression.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ASYNC_COMPRESSION_H_ +#define ASYNC_COMPRESSION_H_ + +#include +#include "velox/common/compression/Compression.h" + +namespace facebook::velox::dwio::common::compression { + +using facebook::velox::common::CompressionKind; + +class AsyncDecompressor { + public: + explicit AsyncDecompressor(){}; + + virtual ~AsyncDecompressor() = default; + + virtual folly::SemiFuture decompressAsync( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) = 0; +}; + +std::unique_ptr MakeIAAGzipCodec(); + +/** + * Get the window size from zlib header(rfc1950). + * 0 1 + * +---+---+ + * |CMF|FLG| (more-->) + * +---+---+ + * bits 0 to 3 CM Compression method + * bits 4 to 7 CINFO Compression info + * CM (Compression method) This identifies the compression method used in the + * file. CM = 8 denotes the "deflate" compression method with a window size up + * to 32K. CINFO (Compression info) For CM = 8, CINFO is the base-2 logarithm of + * the LZ77 window size, minus eight (CINFO=7 indicates a 32K window size). + * @param stream_ptr the compressed block length for raw decompression + * @param stream_size compression options to use + */ +static int getZlibWindowBits(const uint8_t* stream_ptr, uint32_t stream_size) { + static constexpr uint8_t CM_ZLIB_DEFAULT_VALUE = 8u; + static constexpr uint32_t ZLIB_MIN_HEADER_SIZE = 2u; + static constexpr uint32_t ZLIB_INFO_OFFSET = 4u; + if (stream_size < ZLIB_MIN_HEADER_SIZE) { + return -1; + } + const uint8_t compression_method_and_flag = *stream_ptr++; + const uint8_t compression_method = compression_method_and_flag & 0xf; + const uint8_t compression_info = + compression_method_and_flag >> ZLIB_INFO_OFFSET; + + if (CM_ZLIB_DEFAULT_VALUE != compression_method) { + return -1; + } + if (compression_info > 7) { + return -1; + } + return CM_ZLIB_DEFAULT_VALUE + compression_info; +} + +/** + * Create a decompressor for the given compression kind in asynchronous mode. + * @param kind the compression type to implement + */ +static std::unique_ptr +createAsyncDecompressor(facebook::velox::common::CompressionKind kind) { + switch (static_cast(kind)) { +#ifdef VELOX_ENABLE_INTEL_IAA + case CompressionKind::CompressionKind_GZIP: + return MakeIAAGzipCodec(); +#endif + default: + LOG(WARNING) << "Asynchronous mode not support for compression codec " + << kind; + return nullptr; + } + return nullptr; +} +} // namespace facebook::velox::dwio::common::compression + +#endif \ No newline at end of file diff --git a/velox/dwio/common/compression/CMakeLists.txt b/velox/dwio/common/compression/CMakeLists.txt index bda716ca7a25..579395281ae5 100644 --- a/velox/dwio/common/compression/CMakeLists.txt +++ b/velox/dwio/common/compression/CMakeLists.txt @@ -17,3 +17,11 @@ add_library(velox_dwio_common_compression Compression.cpp PagedInputStream.cpp target_link_libraries(velox_dwio_common_compression velox_dwio_common xsimd Folly::folly) + +if(VELOX_ENABLE_INTEL_IAA) + add_library(velox_dwio_common_iaa_compression IAACompression.cpp) + target_link_libraries(velox_dwio_common_iaa_compression velox_dwio_qpl + Folly::folly xsimd) + target_link_libraries(velox_dwio_common_compression + velox_dwio_common_iaa_compression) +endif() diff --git a/velox/dwio/common/compression/Compression.cpp b/velox/dwio/common/compression/Compression.cpp index e36d58e7756f..dfe1e3f31564 100644 --- a/velox/dwio/common/compression/Compression.cpp +++ b/velox/dwio/common/compression/Compression.cpp @@ -445,113 +445,6 @@ std::pair ZstdDecompressor::getDecompressedLength( return {uncompressedLength, true}; } -class GzipIAADecompressor : public AsyncDecompressor { - public: - explicit GzipIAADecompressor() {} - - explicit GzipIAADecompressor( - uint64_t blockSize, - const std::string& streamDebugInfo) - : AsyncDecompressor{blockSize, streamDebugInfo} {} - - int decompress( - const char* src, - uint64_t srcLength, - char* dest, - uint64_t destLength) override; - - bool waitResult(int job_id) override; - - void releaseJob(int job_id) override; -}; - -int GzipIAADecompressor::decompress( - const char* src, - uint64_t srcLength, - char* dest, - uint64_t destLength) { -#ifdef VELOX_ENABLE_INTEL_IAA - dwio::common::QplJobHWPool& qpl_job_pool = - dwio::common::QplJobHWPool::getInstance(); - // int job_id = 0; - auto deflate_job = qpl_job_pool.acquireDeflateJob(); - // qpl_job* job = qpl_job_pool.AcquireDeflateJob(job_id); - auto job = deflate_job.second; - if (job == nullptr) { - LOG(WARNING) << "cannot AcquireDeflateJob "; - return -1; // Invalid job id to illustrate the - // failed decompress job. - } - job->op = qpl_op_decompress; - job->next_in_ptr = reinterpret_cast(const_cast(src)); - job->next_out_ptr = reinterpret_cast(dest); - job->available_in = static_cast(srcLength); - job->available_out = static_cast(destLength); - job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE; - - qpl_status status = qpl_submit_job(job); - if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) { - qpl_job_pool.releaseJob(deflate_job.first); - deflate_job = qpl_job_pool.acquireDeflateJob(); - job = deflate_job.second; - if (job == nullptr) { - LOG(WARNING) - << "cannot acqure deflate job after QPL_STS_QUEUES_ARE_BUSY_ERR "; - return -1; // Invalid job id to illustrate the - // failed decompress job. - } - job->op = qpl_op_decompress; - job->next_in_ptr = reinterpret_cast(const_cast(src)); - job->next_out_ptr = reinterpret_cast(dest); - job->available_in = static_cast(srcLength); - job->available_out = static_cast(destLength); - job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE; - - status = qpl_submit_job(job); - } - if (status != QPL_STS_OK) { - qpl_job_pool.releaseJob(deflate_job.first); - LOG(WARNING) << "cannot submit job, error status: " << status; - return -1; // Invalid job id to illustrate the - // failed decompress job. - } else { - return deflate_job.first; - } -#else - return -1; -#endif -} - -bool GzipIAADecompressor::waitResult(int job_id) { -#ifdef VELOX_ENABLE_INTEL_IAA - dwio::common::QplJobHWPool& qpl_job_pool = - dwio::common::QplJobHWPool::getInstance(); - if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) { - return true; - } - qpl_job* job = qpl_job_pool.getJobById(job_id); - - auto status = qpl_wait_job(job); - qpl_job_pool.releaseJob(job_id); - if (status == QPL_STS_OK) { - return true; - } - LOG(WARNING) << "Decompress w/IAA error, status: " << status; -#endif - return false; -} - -void GzipIAADecompressor::releaseJob(int job_id) { -#ifdef VELOX_ENABLE_INTEL_IAA - dwio::common::QplJobHWPool& qpl_job_pool = - dwio::common::QplJobHWPool::getInstance(); - if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) { - return; - } - return qpl_job_pool.releaseJob(job_id); -#endif -} - class SnappyDecompressor : public Decompressor { public: explicit SnappyDecompressor( @@ -833,21 +726,4 @@ std::unique_ptr createDecompressor( compressedLength); } -std::unique_ptr -createAsyncDecompressor( - facebook::velox::common::CompressionKind kind, - uint64_t bufferSize, - const std::string& streamDebugInfo) { - std::unique_ptr decompressor; - switch (static_cast(kind)) { - case CompressionKind::CompressionKind_GZIP: - return std::make_unique(bufferSize, streamDebugInfo); - default: - LOG(WARNING) << "Asynchronous mode not support for compression codec " - << kind; - return nullptr; - } - return nullptr; -} - } // namespace facebook::velox::dwio::common::compression diff --git a/velox/dwio/common/compression/Compression.h b/velox/dwio/common/compression/Compression.h index f24c2c486782..3d26b3af98a4 100644 --- a/velox/dwio/common/compression/Compression.h +++ b/velox/dwio/common/compression/Compression.h @@ -19,9 +19,6 @@ #include "velox/common/compression/Compression.h" #include "velox/dwio/common/SeekableInputStream.h" #include "velox/dwio/common/encryption/Encryption.h" -#ifdef VELOX_ENABLE_INTEL_IAA -#include "velox/dwio/common/QplJobPool.h" -#endif namespace facebook::velox::dwio::common::compression { @@ -33,7 +30,6 @@ class Compressor { // https://zlib.net/manual.html static constexpr int DWRF_ORC_ZLIB_WINDOW_BITS = -15; static constexpr int PARQUET_ZLIB_WINDOW_BITS = 15; - static constexpr int PARQUET_ZLIB_WINDOW_BITS_4KB = 12; explicit Compressor(int32_t level) : level_{level} {} @@ -69,37 +65,6 @@ class Decompressor { const std::string streamDebugInfo_; }; -class AsyncDecompressor { - public: - explicit AsyncDecompressor(){}; - explicit AsyncDecompressor( - uint64_t blockSize, - const std::string& streamDebugInfo) - : blockSize_{blockSize}, streamDebugInfo_{streamDebugInfo} {} - - virtual ~AsyncDecompressor() = default; - - virtual uint64_t getUncompressedLength( - const char* /* unused */, - uint64_t /* unused */) const { - return blockSize_; - } - - virtual int decompress( - const char* src, - uint64_t srcLength, - char* dest, - uint64_t destLength) = 0; - - virtual bool waitResult(int job_id) = 0; - - virtual void releaseJob(int job_id) = 0; - - protected: - uint64_t blockSize_; - const std::string streamDebugInfo_; -}; - struct CompressionOptions { /// Format specific compression/decompression options union Format { @@ -124,42 +89,6 @@ struct CompressionOptions { uint32_t compressionThreshold; }; -/** - * Get the window size from zlib header(rfc1950). - * 0 1 - * +---+---+ - * |CMF|FLG| (more-->) - * +---+---+ - * bits 0 to 3 CM Compression method - * bits 4 to 7 CINFO Compression info - * CM (Compression method) This identifies the compression method used in the - * file. CM = 8 denotes the "deflate" compression method with a window size up - * to 32K. CINFO (Compression info) For CM = 8, CINFO is the base-2 logarithm of - * the LZ77 window size, minus eight (CINFO=7 indicates a 32K window size). - * @param stream_ptr the compressed block length for raw decompression - * @param stream_size compression options to use - */ -static int getZlibWindowBits(const uint8_t* stream_ptr, uint32_t stream_size) { - static constexpr uint8_t CM_ZLIB_DEFAULT_VALUE = 8u; - static constexpr uint32_t ZLIB_MIN_HEADER_SIZE = 2u; - static constexpr uint32_t ZLIB_INFO_OFFSET = 4u; - if (stream_size < ZLIB_MIN_HEADER_SIZE) { - return -1; - } - const uint8_t compression_method_and_flag = *stream_ptr++; - const uint8_t compression_method = compression_method_and_flag & 0xf; - const uint8_t compression_info = - compression_method_and_flag >> ZLIB_INFO_OFFSET; - - if (CM_ZLIB_DEFAULT_VALUE != compression_method) { - return -1; - } - if (compression_info > 7) { - return -1; - } - return CM_ZLIB_DEFAULT_VALUE + compression_info; -} - /** * Create a decompressor for the given compression kind. * @param kind The compression type to implement @@ -190,19 +119,4 @@ std::unique_ptr createCompressor( facebook::velox::common::CompressionKind kind, const CompressionOptions& options); -/** - * Create a decompressor for the given compression kind in asynchronous mode. - * @param kind the compression type to implement - * @param input the input stream that is the underlying source - * @param bufferSize the maximum size of the buffer - * @param pool the memory pool - * @param useRawDecompression specify whether to perform raw decompression - * @param compressedLength the compressed block length for raw decompression - * @param options compression options to use - */ -std::unique_ptr -createAsyncDecompressor( - facebook::velox::common::CompressionKind kind, - uint64_t bufferSize, - const std::string& streamDebugInfo); } // namespace facebook::velox::dwio::common::compression diff --git a/velox/dwio/common/compression/IAACompression.cpp b/velox/dwio/common/compression/IAACompression.cpp new file mode 100644 index 000000000000..139ec9494111 --- /dev/null +++ b/velox/dwio/common/compression/IAACompression.cpp @@ -0,0 +1,116 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/Exceptions.h" +#include "velox/dwio/common/QplJobPool.h" +#include "velox/dwio/common/compression/AsyncCompression.h" + +namespace facebook::velox::dwio::common::compression { + +class GzipIAADecompressor : public AsyncDecompressor { + public: + explicit GzipIAADecompressor() {} + + folly::SemiFuture decompressAsync( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) override; + + int waitResult(int job_id); +}; + +folly::SemiFuture GzipIAADecompressor::decompressAsync( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) { + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::getInstance(); + auto deflate_job = qpl_job_pool.acquireDeflateJob(); + auto job = deflate_job.second; + if (job == nullptr) { + LOG(WARNING) << "cannot AcquireDeflateJob "; + return folly::makeSemiFutureWith([]() -> uint64_t { + throw std::runtime_error("Cannot acquire deflate job from pool"); + return 0; + }); + } + job->op = qpl_op_decompress; + job->next_in_ptr = reinterpret_cast(const_cast(src)); + job->next_out_ptr = reinterpret_cast(dest); + job->available_in = static_cast(srcLength); + job->available_out = static_cast(destLength); + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE; + + qpl_status status = qpl_submit_job(job); + if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) { + qpl_job_pool.releaseJob(deflate_job.first); + deflate_job = qpl_job_pool.acquireDeflateJob(); + job = deflate_job.second; + if (job == nullptr) { + LOG(WARNING) + << "cannot acqure deflate job after QPL_STS_QUEUES_ARE_BUSY_ERR "; + return folly::makeSemiFutureWith([]() -> uint64_t { + throw std::runtime_error( + "Cannot acqure deflate job from pool after QPL_STS_QUEUES_ARE_BUSY_ERR"); + return 0; + }); + } + job->op = qpl_op_decompress; + job->next_in_ptr = reinterpret_cast(const_cast(src)); + job->next_out_ptr = reinterpret_cast(dest); + job->available_in = static_cast(srcLength); + job->available_out = static_cast(destLength); + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE; + + status = qpl_submit_job(job); + } + if (status != QPL_STS_OK) { + qpl_job_pool.releaseJob(deflate_job.first); + LOG(WARNING) << "cannot submit job, error status: " << status; + return folly::makeSemiFutureWith([this, status]() -> uint64_t { + throw std::runtime_error("Cannot submit job, error status: " + status); + return 0; + }); + } else { + return folly::makeSemiFuture().deferValue( + [this, deflate_job](auto&&) -> uint64_t { + return this->waitResult(deflate_job.first); + }); + } +} + +int GzipIAADecompressor::waitResult(int job_id) { + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::getInstance(); + VELOX_CHECK_LT(job_id, qpl_job_pool.MAX_JOB_NUMBER); + qpl_job* job = qpl_job_pool.getJobById(job_id); + + auto status = qpl_wait_job(job); + qpl_job_pool.releaseJob(job_id); + if (status == QPL_STS_OK) { + return 1; + } + LOG(WARNING) << "Decompress w/IAA error, status: " << status; + return 0; +} + +std::unique_ptr MakeIAAGzipCodec() { + return std::make_unique(); +} + +} // namespace facebook::velox::dwio::common::compression \ No newline at end of file diff --git a/velox/dwio/parquet/reader/CMakeLists.txt b/velox/dwio/parquet/reader/CMakeLists.txt index 3fb5250b7e64..7254b9efec05 100644 --- a/velox/dwio/parquet/reader/CMakeLists.txt +++ b/velox/dwio/parquet/reader/CMakeLists.txt @@ -19,6 +19,7 @@ add_library( ParquetReader.cpp ParquetTypeWithId.cpp PageReader.cpp + IAAPageReader.cpp ParquetColumnReader.cpp ParquetData.cpp RepeatedColumnReader.cpp diff --git a/velox/dwio/parquet/reader/IAAPageReader.cpp b/velox/dwio/parquet/reader/IAAPageReader.cpp new file mode 100644 index 000000000000..07976158dd6e --- /dev/null +++ b/velox/dwio/parquet/reader/IAAPageReader.cpp @@ -0,0 +1,507 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/parquet/reader/IAAPageReader.h" +#include "velox/dwio/common/BufferUtil.h" + +namespace facebook::velox::parquet { + +using thrift::Encoding; +using thrift::PageHeader; + +void IAAPageReader::preDecompressPage( + bool& need_pre_decompress, + int64_t numValues) { + if (codec_ != common::CompressionKind::CompressionKind_GZIP) { + need_pre_decompress = false; + return; + } + for (;;) { + auto dataStart = pageStart_; + if (chunkSize_ <= pageStart_) { + // This may happen if seeking to exactly end of row group. + numRepDefsInPage_ = 0; + numRowsInPage_ = 0; + break; + } + PageHeader pageHeader = readPageHeader(); + pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; + switch (pageHeader.type) { + case thrift::PageType::DATA_PAGE: + prefetchDataPageV1(pageHeader); + break; + case thrift::PageType::DATA_PAGE_V2: + prefetchDataPageV2(pageHeader); + break; + case thrift::PageType::DICTIONARY_PAGE: + prefetchDictionary(pageHeader); + continue; + default: + break; // ignore INDEX page type and any other custom extensions + } + break; + } + need_pre_decompress = isWinSizeFit_; + rowGroupPageInfo_.numValues = numValues; + rowGroupPageInfo_.visitedRows = 0; +} + +void IAAPageReader::prefetchNextPage() { + if (rowGroupPageInfo_.visitedRows + numRowsInPage_ >= + rowGroupPageInfo_.numValues) { + return; + } + if (chunkSize_ <= pageStart_) { + return; + } + PageHeader pageHeader = readPageHeader(); + switch (pageHeader.type) { + case thrift::PageType::DATA_PAGE: { + dataPageHeader_ = pageHeader; + VELOX_CHECK( + pageHeader.type == thrift::PageType::DATA_PAGE && + pageHeader.__isset.data_page_header); + rowGroupPageInfo_.dataPageData = + readBytes(pageHeader.compressed_page_size, pageBuffer_); + preDecompressData_ = iaaDecompress( + rowGroupPageInfo_.dataPageData, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + rowGroupPageInfo_.uncompressedData, + dataDecompFuture); + break; + } + case thrift::PageType::DATA_PAGE_V2: + LOG(WARNING) << "Data Page V2 not support "; + break; + case thrift::PageType::DICTIONARY_PAGE: + LOG(WARNING) << "Wrong path "; + break; + default: + break; // ignore INDEX page type and any other custom extensions + } +} + +bool IAAPageReader::seekToPreDecompPage(int64_t row) { + bool has_qpl = false; + if (this->dictDecompFuture.valid()) { + bool job_success = std::move(this->dictDecompFuture).get() > 0; + prepareDictionary(dictPageHeader_, job_success); + preDecompressDict_ = false; + has_qpl = true; + } + + if (dataDecompFuture.valid()) { + bool job_success = std::move(this->dataDecompFuture).get() > 0; + prepareDataPageV1(dataPageHeader_, row, job_success); + preDecompressData_ = false; + has_qpl = true; + } + + if (has_qpl) { + if (row == kRepDefOnly || row < rowOfPage_ + numRowsInPage_) { + return true; + } + updateRowInfoAfterPageSkipped(); + } + return false; +} + +void IAAPageReader::prefetchDataPageV1(const thrift::PageHeader& pageHeader) { + dataPageHeader_ = pageHeader; + VELOX_CHECK( + pageHeader.type == thrift::PageType::DATA_PAGE && + pageHeader.__isset.data_page_header); + + dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + preDecompressData_ = iaaDecompress( + dataPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + uncompressedDataV1Data_, + dataDecompFuture); + return; +} + +void IAAPageReader::prefetchDataPageV2(const thrift::PageHeader& pageHeader) { + return; +} + +void IAAPageReader::prefetchDictionary(const thrift::PageHeader& pageHeader) { + dictPageHeader_ = pageHeader; + dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; + VELOX_CHECK( + dictionaryEncoding_ == Encoding::PLAIN_DICTIONARY || + dictionaryEncoding_ == Encoding::PLAIN); + dictPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + + preDecompressDict_ = iaaDecompress( + dictPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + uncompressedDictData_, + dictDecompFuture); + + return; +} + +const bool IAAPageReader::iaaDecompress( + const char* pageData, + uint32_t compressedSize, + uint32_t uncompressedSize, + BufferPtr& uncompressedData, + folly::SemiFuture& future) { + dwio::common::ensureCapacity( + uncompressedData, uncompressedSize, &pool_); + static constexpr int PARQUET_ZLIB_WINDOW_BITS_4KB = 12; + future = folly::makeSemiFuture((uint64_t)0); + if (!isWinSizeFit_) { + // window size should be 4KB for IAA + if (PARQUET_ZLIB_WINDOW_BITS_4KB == + dwio::common::compression::getZlibWindowBits( + (const uint8_t*)pageData, uncompressedSize)) { + isWinSizeFit_ = true; + } else { + future = folly::makeSemiFuture((uint64_t)0); + return true; + } + } + std::unique_ptr decompressor = + dwio::common::compression::createAsyncDecompressor(codec_); + if (decompressor == nullptr) { + return true; + } + auto decompFuture = decompressor->decompressAsync( + (const char*)pageData, + compressedSize, + (char*)uncompressedData->asMutable(), + uncompressedSize); + if (decompFuture.isReady()) { + auto result = std::move(decompFuture).getTry(); + if (result.hasException()) { + future = folly::makeSemiFuture((uint64_t)0); + return true; + } + } + future = std::move(decompFuture); + return true; +} + +void IAAPageReader::seekToPage(int64_t row) { + this->defineDecoder_.reset(); + this->repeatDecoder_.reset(); + // 'rowOfPage_' is the row number of the first row of the next page. + this->rowOfPage_ += this->numRowsInPage_; + + if (seekToPreDecompPage(row)) { + if (isWinSizeFit_) { + prefetchNextPage(); + } + rowGroupPageInfo_.visitedRows += numRowsInPage_; + return; + } + + for (;;) { + auto dataStart = pageStart_; + if (chunkSize_ <= pageStart_) { + // This may happen if seeking to exactly end of row group. + numRepDefsInPage_ = 0; + numRowsInPage_ = 0; + break; + } + PageHeader pageHeader = this->readPageHeader(); + pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; + + switch (pageHeader.type) { + case thrift::PageType::DATA_PAGE: + prepareDataPageV1(pageHeader, row); + break; + case thrift::PageType::DATA_PAGE_V2: + prepareDataPageV2(pageHeader, row); + break; + case thrift::PageType::DICTIONARY_PAGE: + if (row == kRepDefOnly) { + skipBytes( + pageHeader.compressed_page_size, + inputStream_.get(), + bufferStart_, + bufferEnd_); + continue; + } + prepareDictionary(pageHeader); + continue; + default: + break; // ignore INDEX page type and any other custom extensions + } + if (row == kRepDefOnly || row < rowOfPage_ + numRowsInPage_) { + break; + } + this->updateRowInfoAfterPageSkipped(); + } + if (isWinSizeFit_) { + prefetchNextPage(); + } + rowGroupPageInfo_.visitedRows += numRowsInPage_; +} + +void IAAPageReader::prepareDataPageV1( + const PageHeader& pageHeader, + int64_t row, + bool job_success) { + VELOX_CHECK( + pageHeader.type == thrift::PageType::DATA_PAGE && + pageHeader.__isset.data_page_header); + numRepDefsInPage_ = pageHeader.data_page_header.num_values; + setPageRowInfo(row == kRepDefOnly); + if (row != kRepDefOnly && numRowsInPage_ != kRowsUnknown && + numRowsInPage_ + rowOfPage_ <= row) { + dwio::common::skipBytes( + pageHeader.compressed_page_size, + inputStream_.get(), + bufferStart_, + bufferEnd_); + + return; + } + if (job_success) { + if (rowGroupPageInfo_.visitedRows > 0) { + BufferPtr tmp = uncompressedDataV1Data_; + uncompressedDataV1Data_ = rowGroupPageInfo_.uncompressedData; + rowGroupPageInfo_.uncompressedData = tmp; + } + pageData_ = uncompressedDataV1Data_->as(); + } else { + if (!preDecompressData_) { + dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + } else if (rowGroupPageInfo_.visitedRows > 0) { + dataPageData_ = rowGroupPageInfo_.dataPageData; + } + pageData_ = decompressData( + dataPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } + auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; + if (maxRepeat_ > 0) { + uint32_t repeatLength = readField(pageData_); + repeatDecoder_ = std::make_unique<::arrow::util::RleDecoder>( + reinterpret_cast(pageData_), + repeatLength, + ::arrow::bit_util::NumRequiredBits(maxRepeat_)); + + pageData_ += repeatLength; + } + + if (maxDefine_ > 0) { + auto defineLength = readField(pageData_); + if (maxDefine_ == 1) { + defineDecoder_ = std::make_unique( + pageData_, + pageData_ + defineLength, + ::arrow::bit_util::NumRequiredBits(maxDefine_)); + } + wideDefineDecoder_ = std::make_unique<::arrow::util::RleDecoder>( + reinterpret_cast(pageData_), + defineLength, + ::arrow::bit_util::NumRequiredBits(maxDefine_)); + pageData_ += defineLength; + } + encodedDataSize_ = pageEnd - pageData_; + + encoding_ = pageHeader.data_page_header.encoding; + if (!hasChunkRepDefs_ && (numRowsInPage_ == kRowsUnknown || maxDefine_ > 1)) { + readPageDefLevels(); + } + + if (row != kRepDefOnly) { + makeDecoder(); + } +} + +void IAAPageReader::prepareDictionary( + const PageHeader& pageHeader, + bool job_success) { + dictionary_.numValues = pageHeader.dictionary_page_header.num_values; + dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; + dictionary_.sorted = pageHeader.dictionary_page_header.__isset.is_sorted && + pageHeader.dictionary_page_header.is_sorted; + VELOX_CHECK( + dictionaryEncoding_ == Encoding::PLAIN_DICTIONARY || + dictionaryEncoding_ == Encoding::PLAIN); + + if (codec_ != common::CompressionKind::CompressionKind_NONE) { + if (job_success) { + pageData_ = uncompressedDictData_->as(); + } else { + if (!preDecompressDict_) { + dictPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + } + pageData_ = decompressData( + dictPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } + } + + auto parquetType = type_->parquetType_.value(); + switch (parquetType) { + case thrift::Type::INT32: + case thrift::Type::INT64: + case thrift::Type::FLOAT: + case thrift::Type::DOUBLE: { + int32_t typeSize = (parquetType == thrift::Type::INT32 || + parquetType == thrift::Type::FLOAT) + ? sizeof(float) + : sizeof(double); + auto numBytes = dictionary_.numValues * typeSize; + if (type_->type()->isShortDecimal() && + parquetType == thrift::Type::INT32) { + auto veloxTypeLength = type_->type()->cppSizeInBytes(); + auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; + dictionary_.values = + AlignedBuffer::allocate(numVeloxBytes, &pool_); + } else { + dictionary_.values = AlignedBuffer::allocate(numBytes, &pool_); + } + if (pageData_) { + memcpy(dictionary_.values->asMutable(), pageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, + inputStream_.get(), + dictionary_.values->asMutable(), + bufferStart_, + bufferEnd_); + } + if (type_->type()->isShortDecimal() && + parquetType == thrift::Type::INT32) { + auto values = dictionary_.values->asMutable(); + auto parquetValues = dictionary_.values->asMutable(); + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + values[i] = parquetValues[i]; + } + } + break; + } + case thrift::Type::BYTE_ARRAY: { + dictionary_.values = + AlignedBuffer::allocate(dictionary_.numValues, &pool_); + auto numBytes = pageHeader.uncompressed_page_size; + auto values = dictionary_.values->asMutable(); + dictionary_.strings = AlignedBuffer::allocate(numBytes, &pool_); + auto strings = dictionary_.strings->asMutable(); + if (pageData_) { + memcpy(strings, pageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, inputStream_.get(), strings, bufferStart_, bufferEnd_); + } + auto header = strings; + for (auto i = 0; i < dictionary_.numValues; ++i) { + auto length = *reinterpret_cast(header); + values[i] = StringView(header + sizeof(int32_t), length); + header += length + sizeof(int32_t); + } + VELOX_CHECK_EQ(header, strings + numBytes); + break; + } + case thrift::Type::FIXED_LEN_BYTE_ARRAY: { + auto parquetTypeLength = type_->typeLength_; + auto numParquetBytes = dictionary_.numValues * parquetTypeLength; + auto veloxTypeLength = type_->type()->cppSizeInBytes(); + auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; + dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); + auto data = dictionary_.values->asMutable(); + // Read the data bytes. + if (pageData_) { + memcpy(data, pageData_, numParquetBytes); + } else { + dwio::common::readBytes( + numParquetBytes, + inputStream_.get(), + data, + bufferStart_, + bufferEnd_); + } + if (type_->type()->isShortDecimal()) { + // Parquet decimal values have a fixed typeLength_ and are in big-endian + // layout. + if (numParquetBytes < numVeloxBytes) { + auto values = dictionary_.values->asMutable(); + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + auto sourceValue = data + (i * parquetTypeLength); + int64_t value = *sourceValue >= 0 ? 0 : -1; + memcpy( + reinterpret_cast(&value) + veloxTypeLength - + parquetTypeLength, + sourceValue, + parquetTypeLength); + values[i] = value; + } + } + auto values = dictionary_.values->asMutable(); + for (auto i = 0; i < dictionary_.numValues; ++i) { + values[i] = __builtin_bswap64(values[i]); + } + break; + } else if (type_->type()->isLongDecimal()) { + // Parquet decimal values have a fixed typeLength_ and are in big-endian + // layout. + if (numParquetBytes < numVeloxBytes) { + auto values = dictionary_.values->asMutable(); + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + auto sourceValue = data + (i * parquetTypeLength); + int128_t value = *sourceValue >= 0 ? 0 : -1; + memcpy( + reinterpret_cast(&value) + veloxTypeLength - + parquetTypeLength, + sourceValue, + parquetTypeLength); + values[i] = value; + } + } + auto values = dictionary_.values->asMutable(); + for (auto i = 0; i < dictionary_.numValues; ++i) { + values[i] = bits::builtin_bswap128(values[i]); + } + break; + } + VELOX_UNSUPPORTED( + "Parquet type {} not supported for dictionary", parquetType); + } + case thrift::Type::INT96: + default: + VELOX_UNSUPPORTED( + "Parquet type {} not supported for dictionary", parquetType); + } +} + +IAAPageReader::~IAAPageReader() { + if (dataDecompFuture.valid()) { + std::move(dataDecompFuture).get(); + } + + if (dictDecompFuture.valid()) { + std::move(dictDecompFuture).get(); + } +} +} // namespace facebook::velox::parquet \ No newline at end of file diff --git a/velox/dwio/parquet/reader/IAAPageReader.h b/velox/dwio/parquet/reader/IAAPageReader.h new file mode 100644 index 000000000000..dc331465b798 --- /dev/null +++ b/velox/dwio/parquet/reader/IAAPageReader.h @@ -0,0 +1,114 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/dwio/common/compression/AsyncCompression.h" +#include "velox/dwio/parquet/reader/PageReader.h" + +namespace facebook::velox::parquet { + +using folly::SemiFuture; + +struct PreDecompPageInfo { + int64_t numValues; // Number of values in this row group + int64_t visitedRows; // rows already read + const char* FOLLY_NULLABLE dataPageData{nullptr}; + BufferPtr uncompressedData; +}; + +class IAAPageReader : public PageReader { + public: + IAAPageReader( + std::unique_ptr stream, + memory::MemoryPool& pool, + ParquetTypeWithIdPtr fileType, + common::CompressionKind codec, + int64_t chunkSize) + : PageReader(std::move(stream), pool, fileType, codec, chunkSize) { + uncompressedDictData_ = nullptr; + uncompressedDataV1Data_ = nullptr; + } + ~IAAPageReader(); + + PageReaderType getType() { + return PageReaderType::IAA; + }; + + /** + * Submit decompression job to IAA, store the decompression future + * @param need_pre_decompress true if the codec and window bits are adaptable + * for IAA + * @param numValues number of values in row group. This value is stored in + * rowGroupPageInfo_ and used for determine whether there is need to + * pre-decompress by IAA + */ + void preDecompressPage(bool& need_pre_decompress, int64_t numValues); + + // Override method to call seekToPreDecompPage and + // prefetchNextPage in IAAPageReader + virtual void seekToPage(int64_t row); + + private: + void prefetchDataPageV1(const thrift::PageHeader& pageHeader); + void prefetchDataPageV2(const thrift::PageHeader& pageHeader); + void prefetchDictionary(const thrift::PageHeader& pageHeader); + + // Prefetch the next page if there are more than one page in the row group + // and then submit the page decompression job to IAA + void prefetchNextPage(); + + // Get decompressed page from IAA async decompressor. Then Reads and sets + // 'rowOfPage_' and 'numRowsInPage_' and initializes a decoder for the found + // page. + bool seekToPreDecompPage(int64_t row); + + void prepareDataPageV1( + const thrift::PageHeader& pageHeader, + int64_t row, + bool job_success = false); + void prepareDictionary( + const thrift::PageHeader& pageHeader, + bool job_success = false); + const bool iaaDecompress( + const char* FOLLY_NONNULL pageData, + uint32_t compressedSize, + uint32_t uncompressedSize, + BufferPtr& uncompressedData, + SemiFuture& future); + + // Used for pre-decompress + BufferPtr uncompressedDictData_; + BufferPtr uncompressedDataV1Data_; + thrift::PageHeader dictPageHeader_; + const char* FOLLY_NULLABLE dictPageData_{nullptr}; + bool needUncompressDict; + + thrift::PageHeader dataPageHeader_; + const char* FOLLY_NULLABLE dataPageData_{nullptr}; + + SemiFuture dictDecompFuture = SemiFuture::makeEmpty(); + SemiFuture dataDecompFuture = SemiFuture::makeEmpty(); + + bool preDecompressDict_ = false; + bool preDecompressData_ = false; + bool isWinSizeFit_ = false; + PreDecompPageInfo rowGroupPageInfo_; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 691f56dbccad..e6f44f42ecca 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -28,116 +28,11 @@ namespace facebook::velox::parquet { using thrift::Encoding; using thrift::PageHeader; -void PageReader::preDecompressPage( - bool& need_pre_decompress, - int64_t numValues) { - if (codec_ != thrift::CompressionCodec::GZIP) { - need_pre_decompress = false; - return; - } - for (;;) { - auto dataStart = pageStart_; - if (chunkSize_ <= pageStart_) { - // This may happen if seeking to exactly end of row group. - numRepDefsInPage_ = 0; - numRowsInPage_ = 0; - break; - } - PageHeader pageHeader = readPageHeader(); - pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; - switch (pageHeader.type) { - case thrift::PageType::DATA_PAGE: - prefetchDataPageV1(pageHeader); - break; - case thrift::PageType::DATA_PAGE_V2: - prefetchDataPageV2(pageHeader); - break; - case thrift::PageType::DICTIONARY_PAGE: - prefetchDictionary(pageHeader); - continue; - default: - break; // ignore INDEX page type and any other custom extensions - } - break; - } - need_pre_decompress = isWinSizeFit; - rowGroupPageInfo_.numValues = numValues; - rowGroupPageInfo_.visitedRows = 0; -} - -void PageReader::prefetchNextPage() { - if (rowGroupPageInfo_.visitedRows + numRowsInPage_ >= - rowGroupPageInfo_.numValues) { - return; - } - if (chunkSize_ <= pageStart_) { - return; - } - PageHeader pageHeader = readPageHeader(); - rowGroupPageInfo_.pageStart = - pageDataStart_ + pageHeader.compressed_page_size; - switch (pageHeader.type) { - case thrift::PageType::DATA_PAGE: { - dataPageHeader_ = pageHeader; - VELOX_CHECK( - pageHeader.type == thrift::PageType::DATA_PAGE && - pageHeader.__isset.data_page_header); - rowGroupPageInfo_.dataPageData = - readBytes(pageHeader.compressed_page_size, pageBuffer_); - pre_decompress_data = iaaDecompressGzip( - rowGroupPageInfo_.dataPageData, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size, - rowGroupPageInfo_.uncompressedDataV1Data, - data_qpl_job_id); - break; - } - case thrift::PageType::DATA_PAGE_V2: - LOG(WARNING) << "Data Page V2 not support "; - break; - case thrift::PageType::DICTIONARY_PAGE: - LOG(WARNING) << "Wrong path "; - break; - default: - break; // ignore INDEX page type and any other custom extensions - } -} - -bool PageReader::seekToPreDecompPage(int64_t row) { - bool has_qpl = false; - if (dict_qpl_job_id != 0) { - bool job_success = getDecompRes(dict_qpl_job_id); - prepareDictionary(dictPageHeader_, job_success); - dict_qpl_job_id = 0; - has_qpl = true; - } - - if (this->data_qpl_job_id != 0) { - bool job_success = getDecompRes(data_qpl_job_id); - prepareDataPageV1(dataPageHeader_, row, job_success); - data_qpl_job_id = 0; - has_qpl = true; - } - - if (has_qpl) { - if (row == kRepDefOnly || row < rowOfPage_ + numRowsInPage_) { - return true; - } - updateRowInfoAfterPageSkipped(); - } - return false; -} - void PageReader::seekToPage(int64_t row) { defineDecoder_.reset(); repeatDecoder_.reset(); // 'rowOfPage_' is the row number of the first row of the next page. rowOfPage_ += numRowsInPage_; - - if (seekToPreDecompPage(row)) { - return; - } - for (;;) { auto dataStart = pageStart_; if (chunkSize_ <= pageStart_) { @@ -294,10 +189,7 @@ void PageReader::updateRowInfoAfterPageSkipped() { } } -void PageReader::prepareDataPageV1( - const PageHeader& pageHeader, - int64_t row, - bool job_success) { +void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { VELOX_CHECK( pageHeader.type == thrift::PageType::DATA_PAGE && pageHeader.__isset.data_page_header); @@ -313,24 +205,11 @@ void PageReader::prepareDataPageV1( return; } - if (data_qpl_job_id != 0 && pre_decompress_data && job_success) { - if (rowGroupPageInfo_.visitedRows > 0) { - BufferPtr tmp = uncompressedDataV1Data_; - uncompressedDataV1Data_ = rowGroupPageInfo_.uncompressedDataV1Data; - rowGroupPageInfo_.uncompressedDataV1Data = tmp; - } - pageData_ = uncompressedDataV1Data_->as(); - } else { - if (data_qpl_job_id == 0) { - dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - } else if (rowGroupPageInfo_.visitedRows > 0) { - dataPageData_ = rowGroupPageInfo_.dataPageData; - } - pageData_ = decompressData( - dataPageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); - } + pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + pageData_ = decompressData( + pageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; if (maxRepeat_ > 0) { uint32_t repeatLength = readField(pageData_); @@ -428,9 +307,7 @@ void PageReader::prepareDataPageV2(const PageHeader& pageHeader, int64_t row) { } } -void PageReader::prepareDictionary( - const PageHeader& pageHeader, - bool job_success) { +void PageReader::prepareDictionary(const PageHeader& pageHeader) { dictionary_.numValues = pageHeader.dictionary_page_header.num_values; dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; dictionary_.sorted = pageHeader.dictionary_page_header.__isset.is_sorted && @@ -440,17 +317,11 @@ void PageReader::prepareDictionary( dictionaryEncoding_ == Encoding::PLAIN); if (codec_ != common::CompressionKind::CompressionKind_NONE) { - if (dict_qpl_job_id != 0 && pre_decompress_dict && job_success) { - pageData_ = uncompressedDictData_->as(); - } else { - if (dict_qpl_job_id == 0) { - pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - } - pageData_ = decompressData( - pageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); - } + pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + pageData_ = decompressData( + pageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); } auto parquetType = type_->parquetType_.value(); @@ -792,10 +663,6 @@ void PageReader::skip(int64_t numRows) { numLeafNullsConsumed_ = rowOfPage_; } toSkip -= rowOfPage_ - firstUnvisited_; - if (isWinSizeFit) { - prefetchNextPage(); - } - rowGroupPageInfo_.visitedRows += numRowsInPage_; } firstUnvisited_ += numRows; @@ -849,10 +716,6 @@ void PageReader::skipNullsOnly(int64_t numRows) { seekToPage(firstUnvisited_ + numRows); firstUnvisited_ += numRows; toSkip = firstUnvisited_ - rowOfPage_; - if (isWinSizeFit) { - prefetchNextPage(); - } - rowGroupPageInfo_.visitedRows += numRowsInPage_; } else { firstUnvisited_ += numRows; } @@ -873,10 +736,6 @@ void PageReader::readNullsOnly(int64_t numValues, BufferPtr& buffer) { if (!availableOnPage) { seekToPage(firstUnvisited_); availableOnPage = numRowsInPage_; - if (isWinSizeFit) { - prefetchNextPage(); - } - rowGroupPageInfo_.visitedRows += numRowsInPage_; } auto numRead = std::min(availableOnPage, toRead); auto nulls = readNulls(numRead, nullsInReadRange_); @@ -937,10 +796,6 @@ bool PageReader::rowsForPage( if (hasChunkRepDefs_) { numLeafNullsConsumed_ = rowOfPage_; } - if (isWinSizeFit) { - prefetchNextPage(); - } - rowGroupPageInfo_.visitedRows += numRowsInPage_; } auto& scanState = reader.scanState(); if (isDictionary()) { @@ -1036,102 +891,4 @@ const VectorPtr& PageReader::dictionaryValues(const TypePtr& type) { return dictionaryValues_; } -void PageReader::prefetchDataPageV1(const thrift::PageHeader& pageHeader) { - dataPageHeader_ = pageHeader; - VELOX_CHECK( - pageHeader.type == thrift::PageType::DATA_PAGE && - pageHeader.__isset.data_page_header); - - dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - pre_decompress_data = iaaDecompressGzip( - dataPageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size, - uncompressedDataV1Data_, - data_qpl_job_id); - return; -} - -void PageReader::prefetchDataPageV2(const thrift::PageHeader& pageHeader) { - return; -} - -void PageReader::prefetchDictionary(const thrift::PageHeader& pageHeader) { - dictPageHeader_ = pageHeader; - dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; - VELOX_CHECK( - dictionaryEncoding_ == Encoding::PLAIN_DICTIONARY || - dictionaryEncoding_ == Encoding::PLAIN); - dictPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - - pre_decompress_dict = iaaDecompressGzip( - dictPageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size, - uncompressedDictData_, - dict_qpl_job_id); - - return; -} - -const bool PageReader::iaaDecompressGzip( - const char* pageData, - uint32_t compressedSize, - uint32_t uncompressedSize, - BufferPtr& uncompressedData, - int& qpl_job_id) { - dwio::common::ensureCapacity( - uncompressedData, uncompressedSize, &pool_); - - if (!isWinSizeFit) { - // window size should be 4KB for IAA - if (dwio::common::compression::Compressor::PARQUET_ZLIB_WINDOW_BITS_4KB == - dwio::common::compression::getZlibWindowBits( - (const uint8_t*)pageData, uncompressedSize)) { - isWinSizeFit = true; - } else { - qpl_job_id = -1; - return false; - } - } - auto streamDebugInfo = - fmt::format("Page Reader: Stream {}", inputStream_->getName()); - std::unique_ptr decompressor = - dwio::common::compression::createAsyncDecompressor( - thriftCodecToCompressionKind(), uncompressedSize, streamDebugInfo); - if (decompressor == nullptr) { - return false; - } - qpl_job_id = decompressor->decompress( - (const char*)pageData, - compressedSize, - (char*)uncompressedData->asMutable(), - uncompressedSize); - if (qpl_job_id < 0) { - return false; - } - return true; -} - -const bool PageReader::getDecompRes(int job_id) { - auto streamDebugInfo = - fmt::format("Page Reader: Stream {}", inputStream_->getName()); - std::unique_ptr decompressor = - dwio::common::compression::createAsyncDecompressor( - thriftCodecToCompressionKind(), 0, streamDebugInfo); - return decompressor->waitResult(job_id); -} - -PageReader::~PageReader() { - if (data_qpl_job_id > 0 || dict_qpl_job_id > 0) { - auto streamDebugInfo = - fmt::format("Page Reader: Stream {}", inputStream_->getName()); - std::unique_ptr decompressor = - dwio::common::compression::createAsyncDecompressor( - thriftCodecToCompressionKind(), 0, streamDebugInfo); - decompressor->releaseJob(data_qpl_job_id); - decompressor->releaseJob(dict_qpl_job_id); - } -} - } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index b88ad0393c3b..ef75075bde2f 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -23,6 +23,7 @@ #include "velox/dwio/common/compression/Compression.h" #include "velox/dwio/parquet/reader/BooleanDecoder.h" #include "velox/dwio/parquet/reader/DeltaBpDecoder.h" +#include "velox/dwio/parquet/reader/PageReaderBase.h" #include "velox/dwio/parquet/reader/ParquetTypeWithId.h" #include "velox/dwio/parquet/reader/RleBpDataDecoder.h" #include "velox/dwio/parquet/reader/StringDecoder.h" @@ -31,19 +32,10 @@ namespace facebook::velox::parquet { -struct PreDecompPageInfo { - int64_t numValues; // Number of values in this row group - int64_t visitedRows; // rows already read - uint64_t pageStart{0}; - thrift::PageHeader dataPageHeader; - const char* FOLLY_NULLABLE dataPageData{nullptr}; - BufferPtr uncompressedDataV1Data; -}; - /// Manages access to pages inside a ColumnChunk. Interprets page headers and /// encodings and presents the combination of pages and encoded values as a /// continuous stream accessible via readWithVisitor(). -class PageReader { +class PageReader : public PageReaderBase { public: PageReader( std::unique_ptr stream, @@ -61,10 +53,6 @@ class PageReader { chunkSize_(chunkSize), nullConcatenation_(pool_) { type_->makeLevelInfo(leafInfo_); - dict_qpl_job_id = 0; - data_qpl_job_id = 0; - uncompressedDictData_ = nullptr; - uncompressedDataV1Data_ = nullptr; } // This PageReader constructor is for unit test only. @@ -81,26 +69,14 @@ class PageReader { codec_(codec), chunkSize_(chunkSize), nullConcatenation_(pool_) {} - ~PageReader(); + + PageReaderType getType() { + return PageReaderType::Common; + }; /// Advances 'numRows' top level rows. void skip(int64_t numRows); - /// Pre-decompress GZIP page with IAA - void preDecompressPage(bool& need_pre_decompress, int64_t numValues); - void prefetchDataPageV1(const thrift::PageHeader& pageHeader); - void prefetchDataPageV2(const thrift::PageHeader& pageHeader); - void prefetchDictionary(const thrift::PageHeader& pageHeader); - const bool getDecompRes(int job_id); - void prefetchNextPage(); - bool seekToPreDecompPage(int64_t row); - const bool iaaDecompressGzip( - const char* FOLLY_NONNULL pageData, - uint32_t compressedSize, - uint32_t uncompressedSize, - BufferPtr& uncompressedData, - int& qpl_job_id); - /// Decodes repdefs for 'numTopLevelRows'. Use getLengthsAndNulls() /// to access the lengths and nulls for the different nesting /// levels. @@ -166,7 +142,7 @@ class PageReader { // bufferEnd_ to the corresponding positions. thrift::PageHeader readPageHeader(); - private: + protected: // Indicates that we only want the repdefs for the next page. Used when // prereading repdefs with seekToPage. static constexpr int64_t kRepDefOnly = -1; @@ -198,7 +174,7 @@ class PageReader { // is interpreted in terms of leaf rows, including leaf // nulls. Seeking ahead of pages covered by decodeRepDefs is not // allowed for non-top level columns. - void seekToPage(int64_t row); + virtual void seekToPage(int64_t row); // Preloads the repdefs for the column chunk. To avoid preloading, // would need a way too clone the input stream so that one stream @@ -217,14 +193,9 @@ class PageReader { // next page. void updateRowInfoAfterPageSkipped(); - void prepareDataPageV1( - const thrift::PageHeader& pageHeader, - int64_t row, - bool job_success = false); + void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row); void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row); - void prepareDictionary( - const thrift::PageHeader& pageHeader, - bool job_success = false); + void prepareDictionary(const thrift::PageHeader& pageHeader); void makeDecoder(); // For a non-top level leaf, reads the defs and sets 'leafNulls_' and @@ -238,12 +209,12 @@ class PageReader { const char* FOLLY_NONNULL readBytes(int32_t size, BufferPtr& copy); // Decompresses data starting at 'pageData_', consuming 'compressedsize' and - // producing up to 'decompressedSize' bytes. The The start of the decoding + // producing up to 'uncompressedSize' bytes. The start of the decoding // result is returned. an intermediate copy may be made in 'decompresseddata_' const char* FOLLY_NONNULL decompressData( const char* FOLLY_NONNULL pageData, uint32_t compressedSize, - uint32_t decompressedSize); + uint32_t uncompressedSize); template T readField(const char* FOLLY_NONNULL& ptr) { @@ -520,23 +491,6 @@ class PageReader { std::unique_ptr booleanDecoder_; std::unique_ptr deltaBpDecoder_; // Add decoders for other encodings here. - // Used for pre-decompress - BufferPtr uncompressedDictData_; - BufferPtr uncompressedDataV1Data_; - thrift::PageHeader dictPageHeader_; - const char* FOLLY_NULLABLE dictPageData_{nullptr}; - bool needUncompressDict; - - thrift::PageHeader dataPageHeader_; - const char* FOLLY_NULLABLE dataPageData_{nullptr}; - - int dict_qpl_job_id; - int data_qpl_job_id; - - bool pre_decompress_dict = false; - bool pre_decompress_data = false; - bool isWinSizeFit = false; - PreDecompPageInfo rowGroupPageInfo_; }; FOLLY_ALWAYS_INLINE dwio::common::compression::CompressionOptions diff --git a/velox/dwio/parquet/reader/PageReaderBase.h b/velox/dwio/parquet/reader/PageReaderBase.h new file mode 100644 index 000000000000..51af5b8b7cca --- /dev/null +++ b/velox/dwio/parquet/reader/PageReaderBase.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::parquet { + +enum PageReaderType { + Common = 0, + IAA = 1, +}; + +class PageReaderBase { + public: + // explicit PageReaderBase(){}; + + virtual ~PageReaderBase(){}; + + virtual PageReaderType getType() = 0; + + /** + * skips 'numValues' top level rows, touching null flags only. + * Non-null values are not prepared for reading. + * @param numValues + * @return void + */ + virtual void skipNullsOnly(int64_t numValues) = 0; + + /** + * Reads 'numValues' null flags into 'nulls' and advances the + * decoders by as much. The read may span several pages. If there + * are no nulls, buffer may be set to nullptr. + * @param numValues + * @param buffer + * @return void + */ + virtual void readNullsOnly(int64_t numValues, BufferPtr& buffer) = 0; + + /** + * Advances 'numRows' top level rows. + * @param numRows + * @return void + */ + virtual void skip(int64_t numRows) = 0; + + /* Returns the current string dictionary as a FlatVector. + * @param type + * @return VectorPtr + */ + virtual const VectorPtr& dictionaryValues(const TypePtr& type) = 0; + + virtual bool isDictionary() const = 0; + + virtual void clearDictionary() = 0; + + virtual bool isDeltaBinaryPacked() const = 0; +}; + +} // namespace facebook::velox::parquet \ No newline at end of file diff --git a/velox/dwio/parquet/reader/ParquetData.cpp b/velox/dwio/parquet/reader/ParquetData.cpp index c844d7845595..0c95f065e01c 100644 --- a/velox/dwio/parquet/reader/ParquetData.cpp +++ b/velox/dwio/parquet/reader/ParquetData.cpp @@ -18,9 +18,6 @@ #include "velox/dwio/common/BufferedInput.h" #include "velox/dwio/parquet/reader/Statistics.h" -#ifdef VELOX_ENABLE_INTEL_IAA -#include "velox/dwio/common/QplJobPool.h" -#endif namespace facebook::velox::parquet { @@ -85,28 +82,25 @@ bool ParquetData::rowGroupMatches( } bool ParquetData::preDecompRowGroup(uint32_t index) { -#ifdef VELOX_ENABLE_INTEL_IAA - if (!dwio::common::QplJobHWPool::getInstance().job_ready()) { - return false; - } -#else +#ifndef VELOX_ENABLE_INTEL_IAA return false; #endif - auto& metaData = rowGroups_[index].columns[type_->column()].meta_data; - if (metaData.codec != thrift::CompressionCodec::GZIP || !needPreDecomp) { + auto metaData = fileMetaDataPtr_.rowGroup(index).columnChunk(type_->column()); + if (metaData.compression() != common::CompressionKind::CompressionKind_GZIP || + !needPreDecomp) { LOG(WARNING) << "QPL Job not ready or zlib window size(" << needPreDecomp << ") is not 4KB"; return false; } - pageReaders_.resize(rowGroups_.size()); - auto iaaPageReader = std::make_unique( + pageReaders_.resize(fileMetaDataPtr_.numRowGroups()); + auto iaaPageReader = std::make_unique( std::move(streams_[index]), pool_, type_, - metaData.codec, - metaData.total_compressed_size); - iaaPageReader->preDecompressPage(needPreDecomp, metaData.num_values); + metaData.compression(), + metaData.totalCompressedSize()); + iaaPageReader->preDecompressPage(needPreDecomp, metaData.numValues()); pageReaders_[index] = std::move(iaaPageReader); return needPreDecomp; } @@ -143,8 +137,7 @@ dwio::common::PositionProvider ParquetData::seekToRowGroup(uint32_t index) { VELOX_CHECK_LT(index, streams_.size()); VELOX_CHECK(streams_[index], "Stream not enqueued for column"); auto metadata = fileMetaDataPtr_.rowGroup(index).columnChunk(type_->column()); - auto& metadata = rowGroups_[index].columns[type_->column()].meta_data; - if (metadata.codec == thrift::CompressionCodec::GZIP && + if (metadata.compression() == common::CompressionKind::CompressionKind_GZIP && pageReaders_.size() > index && pageReaders_[index] != nullptr) { reader_ = std::move(pageReaders_[index]); return dwio::common::PositionProvider(empty); diff --git a/velox/dwio/parquet/reader/ParquetData.h b/velox/dwio/parquet/reader/ParquetData.h index 51d472025ed7..bfc71d7e42cd 100644 --- a/velox/dwio/parquet/reader/ParquetData.h +++ b/velox/dwio/parquet/reader/ParquetData.h @@ -17,6 +17,7 @@ #pragma once #include "velox/dwio/common/BufferUtil.h" +#include "velox/dwio/parquet/reader/IAAPageReader.h" #include "velox/dwio/parquet/reader/Metadata.h" #include "velox/dwio/parquet/reader/PageReader.h" @@ -77,7 +78,7 @@ class ParquetData : public dwio::common::FormatData { FilterRowGroupsResult&) override; PageReader* FOLLY_NONNULL reader() const { - return reader_.get(); + return dynamic_cast(reader_.get()); } // Reads null flags for 'numValues' next top level rows. The first 'numValues' @@ -163,7 +164,11 @@ class ParquetData : public dwio::common::FormatData { /// PageReader::readWithVisitor(). template void readWithVisitor(Visitor visitor) { - reader_->readWithVisitor(visitor); + if (reader_->getType() == PageReaderType::IAA) { + dynamic_cast(reader_.get())->readWithVisitor(visitor); + } else { + dynamic_cast(reader_.get())->readWithVisitor(visitor); + } } const VectorPtr& dictionaryValues(const TypePtr& type) { @@ -205,8 +210,8 @@ class ParquetData : public dwio::common::FormatData { const uint32_t maxDefine_; const uint32_t maxRepeat_; int64_t rowsInRowGroup_; - std::unique_ptr reader_; - std::vector> pageReaders_; + std::unique_ptr reader_; + std::vector> pageReaders_; bool needPreDecomp = true; // Nulls derived from leaf repdefs for non-leaf readers. BufferPtr presetNulls_;