From 6ff1026505dcc2feca4b5d26d3f18f44efb32a6e Mon Sep 17 00:00:00 2001 From: yaqi-zhao Date: Wed, 23 Aug 2023 13:14:58 +0800 Subject: [PATCH] rebase on #5914 --- velox/dwio/common/BitConcatenation.h | 1 - velox/dwio/common/CMakeLists.txt | 14 +- velox/dwio/common/QplJobPool.cpp | 20 +- velox/dwio/common/QplJobPool.h | 46 +- velox/dwio/common/compression/Compression.cpp | 106 ++ velox/dwio/common/compression/Compression.h | 85 + velox/dwio/parquet/reader/CMakeLists.txt | 1 - velox/dwio/parquet/reader/IAAPageReader.cpp | 1414 ----------------- velox/dwio/parquet/reader/IAAPageReader.h | 610 ------- velox/dwio/parquet/reader/PageReader.cpp | 193 ++- velox/dwio/parquet/reader/PageReader.h | 49 +- velox/dwio/parquet/reader/PageReaderBase.h | 81 - velox/dwio/parquet/reader/ParquetData.cpp | 39 +- velox/dwio/parquet/reader/ParquetData.h | 17 +- .../parquet/reader/StructColumnReader.cpp | 3 - 15 files changed, 472 insertions(+), 2207 deletions(-) delete mode 100644 velox/dwio/parquet/reader/IAAPageReader.cpp delete mode 100644 velox/dwio/parquet/reader/IAAPageReader.h delete mode 100644 velox/dwio/parquet/reader/PageReaderBase.h diff --git a/velox/dwio/common/BitConcatenation.h b/velox/dwio/common/BitConcatenation.h index 99e37bb2a2463..cc812035d8b82 100644 --- a/velox/dwio/common/BitConcatenation.h +++ b/velox/dwio/common/BitConcatenation.h @@ -15,7 +15,6 @@ */ #include "velox/buffer/Buffer.h" -#pragma once namespace facebook::velox::dwio::common { diff --git a/velox/dwio/common/CMakeLists.txt b/velox/dwio/common/CMakeLists.txt index 3f0337d1b6978..1276bdb7801d0 100644 --- a/velox/dwio/common/CMakeLists.txt +++ b/velox/dwio/common/CMakeLists.txt @@ -41,7 +41,6 @@ add_library( MetadataFilter.cpp Options.cpp OutputStream.cpp - QplJobPool.cpp Range.cpp Reader.cpp ReaderFactory.cpp @@ -57,12 +56,6 @@ add_library( WriterFactory.cpp) target_include_directories(velox_dwio_common PRIVATE ${Protobuf_INCLUDE_DIRS}) -set(QPL_STATIC_LINK_LIBS ${QPL_STATIC_LINK_LIBS}) -if(VELOX_ENABLE_QPL) - list(APPEND QPL_STATIC_LINK_LIBS qpl::qpl) -endif() - -message(STATUS "QPL_STATIC_LINK_LIBS: ${QPL_STATIC_LINK_LIBS}") target_link_libraries( velox_dwio_common @@ -77,5 +70,10 @@ target_link_libraries( velox_memory Boost::regex Folly::folly - ${QPL_STATIC_LINK_LIBS} glog::glog) + +if(VELOX_ENABLE_QPL) + add_library(velox_dwio_qpl QplJobPool.cpp) + target_link_libraries(velox_dwio_qpl qpl::qpl Folly::folly) + target_link_libraries(velox_dwio_common velox_dwio_qpl) +endif() diff --git a/velox/dwio/common/QplJobPool.cpp b/velox/dwio/common/QplJobPool.cpp index d39800a4f7ec6..42ff29c2b1be6 100644 --- a/velox/dwio/common/QplJobPool.cpp +++ b/velox/dwio/common/QplJobPool.cpp @@ -19,8 +19,6 @@ #include #include "velox/common/base/Exceptions.h" -#ifdef VELOX_ENABLE_QPL - namespace facebook::velox::dwio::common { std::array @@ -30,8 +28,6 @@ std::array, QplJobHWPool::MAX_JOB_NUMBER> bool QplJobHWPool::iaa_job_ready = false; std::unique_ptr QplJobHWPool::hw_jobs_buffer; -// static QplJobHWPool pool = QplJobHWPool::GetInstance(); - QplJobHWPool& QplJobHWPool::GetInstance() { static QplJobHWPool pool; return pool; @@ -56,12 +52,12 @@ QplJobHWPool::~QplJobHWPool() { bool QplJobHWPool::AllocateQPLJob() { uint32_t job_size = 0; - /// Get size required for saving a single qpl job object + // Get size required for saving a single qpl job object qpl_get_job_size(qpl_path, &job_size); - /// Allocate entire buffer for storing all job objects + // Allocate entire buffer for storing all job objects hw_jobs_buffer = std::make_unique(job_size * MAX_JOB_NUMBER); - /// Initialize pool for storing all job object pointers - /// Reallocate buffer by shifting address offset for each job object. + // Initialize pool for storing all job object pointers + // Allocate buffer by shifting address offset for each job object. for (uint32_t index = 0; index < MAX_JOB_NUMBER; ++index) { qpl_job* qpl_job_ptr = reinterpret_cast(hw_jobs_buffer.get() + index * job_size); @@ -81,7 +77,8 @@ bool QplJobHWPool::AllocateQPLJob() { return true; } -qpl_job* QplJobHWPool::AcquireDeflateJob(uint32_t& job_id) { +qpl_job* QplJobHWPool::AcquireDeflateJob(int& job_id) { + job_id = -1; if (!job_ready()) { return nullptr; } @@ -102,8 +99,8 @@ qpl_job* QplJobHWPool::AcquireDeflateJob(uint32_t& job_id) { return hw_job_ptr_pool[index]; } -void QplJobHWPool::ReleaseJob(uint32_t job_id) { - if (job_id >= MAX_JOB_NUMBER) { +void QplJobHWPool::ReleaseJob(int job_id) { + if (job_id >= MAX_JOB_NUMBER || job_id <= 0) { return; } assert(job_id < MAX_JOB_NUMBER); @@ -118,4 +115,3 @@ bool QplJobHWPool::tryLockJob(uint32_t index) { } } // namespace facebook::velox::dwio::common -#endif diff --git a/velox/dwio/common/QplJobPool.h b/velox/dwio/common/QplJobPool.h index 785bc84e93afa..6a7f0c89cbf17 100644 --- a/velox/dwio/common/QplJobPool.h +++ b/velox/dwio/common/QplJobPool.h @@ -20,60 +20,52 @@ #include #include #include - -#ifdef VELOX_ENABLE_QPL #include "qpl/qpl.h" namespace facebook::velox::dwio::common { -/// QplJobHWPool is resource pool to provide the job objects, which is -/// used for storing context information during. -/// Memory for QPL job will be allocated when the QPLJobHWPool instance is -/// created -/// -// QPL job can offload RLE-decoding/Filter/(De)compression works to hardware -// accelerator. +// QplJobHWPool is resource pool to provide the job objects, which is +// used for storing context information. +// Memory for QPL job will be allocated when the QPLJobHWPool instance is +// created class QplJobHWPool { public: static QplJobHWPool& GetInstance(); QplJobHWPool(); ~QplJobHWPool(); - /// Acquire QPL job - /// - /// @param job_id QPL job id, used when release QPL job - /// \return Pointer to the QPL job. If acquire job failed, return nullptr. - qpl_job* AcquireDeflateJob(uint32_t& job_id); - /// \brief Release QPL job by the job_id. - void ReleaseJob(uint32_t job_id); + // Release QPL job by the job_id. + void ReleaseJob(int job_id); - /// \brief Return if the QPL job is allocated sucessfully. + // Return if the QPL job is allocated sucessfully. const bool& job_ready() { return iaa_job_ready; } - qpl_job* GetJobById(uint32_t job_id) { + qpl_job* AcquireDeflateJob(int& job_id); + qpl_job* GetJobById(int job_id) { + if (job_id >= MAX_JOB_NUMBER || job_id <= 0) { + return nullptr; + } return hw_job_ptr_pool[job_id]; } - static constexpr qpl_path_t qpl_path = qpl_path_hardware; - + // Max jobs in QPL_JOB_POOL static constexpr auto MAX_JOB_NUMBER = 1024; private: bool tryLockJob(uint32_t index); bool AllocateQPLJob(); + static constexpr qpl_path_t qpl_path = qpl_path_hardware; + // Job pool for storing all job object pointers + static std::array hw_job_ptr_pool; - /// Max jobs in QPL_JOB_POOL - /// Entire buffer for storing all job objects + // Entire buffer for storing all job objects static std::unique_ptr hw_jobs_buffer; - /// Job pool for storing all job object pointers - static std::array hw_job_ptr_pool; - /// Locks for accessing each job object pointers - static bool iaa_job_ready; + // Locks for accessing each job object pointers static std::array, MAX_JOB_NUMBER> hw_job_ptr_locks; + static bool iaa_job_ready; }; } // namespace facebook::velox::dwio::common -#endif diff --git a/velox/dwio/common/compression/Compression.cpp b/velox/dwio/common/compression/Compression.cpp index aaf79be6b6bd0..7d261353ad8e7 100644 --- a/velox/dwio/common/compression/Compression.cpp +++ b/velox/dwio/common/compression/Compression.cpp @@ -194,6 +194,97 @@ uint64_t ZlibDecompressor::decompress( return destLength - zstream_.avail_out; } +class GzipIAADecompressor : public AsyncDecompressor { + public: + explicit GzipIAADecompressor() {} + + explicit GzipIAADecompressor( + uint64_t blockSize, + const std::string& streamDebugInfo) + : AsyncDecompressor{blockSize, streamDebugInfo} {} + + int decompress( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) override; + + bool waitResult(int job_id) override; +}; + +int GzipIAADecompressor::decompress( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) { +#ifdef VELOX_ENABLE_QPL + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::GetInstance(); + int job_id = 0; + qpl_job* job = qpl_job_pool.AcquireDeflateJob(job_id); + if (job == nullptr) { + LOG(WARNING) << "cannot AcquireDeflateJob "; + return -1; // Invalid job id to illustrate the + // failed decompress job. + } + job->op = qpl_op_decompress; + job->next_in_ptr = reinterpret_cast(const_cast(src)); + job->next_out_ptr = reinterpret_cast(dest); + job->available_in = static_cast(srcLength); + job->available_out = static_cast(destLength); + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE; + + qpl_status status = qpl_submit_job(job); + if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) { + qpl_job_pool.ReleaseJob(job_id); + job = qpl_job_pool.AcquireDeflateJob(job_id); + if (job == nullptr) { + LOG(WARNING) + << "cannot acqure deflate job after QPL_STS_QUEUES_ARE_BUSY_ERR "; + return -1; // Invalid job id to illustrate the + // failed decompress job. + } + job->op = qpl_op_decompress; + job->next_in_ptr = reinterpret_cast(const_cast(src)); + job->next_out_ptr = reinterpret_cast(dest); + job->available_in = static_cast(srcLength); + job->available_out = static_cast(destLength); + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_ZLIB_MODE; + + status = qpl_submit_job(job); + } + if (status != QPL_STS_OK) { + qpl_job_pool.ReleaseJob(job_id); + LOG(WARNING) << "cannot submit job, error status: " << status; + return -1; // Invalid job id to illustrate the + // failed decompress job. + } else { + return job_id; + } +#else + return -1; +#endif +} + +bool GzipIAADecompressor::waitResult(int job_id) { +#ifdef VELOX_ENABLE_QPL + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::GetInstance(); + if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) { + return true; + } + qpl_job* job = qpl_job_pool.GetJobById(job_id); + + auto status = qpl_wait_job(job); + qpl_job_pool.ReleaseJob(job_id); + if (status == QPL_STS_OK) { + return true; + } + LOG(WARNING) << "Decompress w/IAA error, status: " << status; +#endif + return false; +} + class LzoDecompressor : public Decompressor { public: explicit LzoDecompressor( @@ -573,4 +664,19 @@ std::unique_ptr createDecompressor( compressedLength); } +std::unique_ptr +createAsyncDecompressor( + facebook::velox::common::CompressionKind kind, + uint64_t bufferSize, + const std::string& streamDebugInfo) { + std::unique_ptr decompressor; + switch (static_cast(kind)) { + case CompressionKind::CompressionKind_GZIP: + return std::make_unique(bufferSize, streamDebugInfo); + default: + DWIO_RAISE("Asynchronous mode not support for compression codec ", kind); + } + return nullptr; +} + } // namespace facebook::velox::dwio::common::compression diff --git a/velox/dwio/common/compression/Compression.h b/velox/dwio/common/compression/Compression.h index 0e3d3c7b9a7bd..3f6bd8989af1b 100644 --- a/velox/dwio/common/compression/Compression.h +++ b/velox/dwio/common/compression/Compression.h @@ -21,6 +21,9 @@ #include "velox/dwio/common/SeekableInputStream.h" #include "velox/dwio/common/compression/CompressionBufferPool.h" #include "velox/dwio/common/encryption/Encryption.h" +#ifdef VELOX_ENABLE_QPL +#include "velox/dwio/common/QplJobPool.h" +#endif namespace facebook::velox::dwio::common::compression { @@ -32,6 +35,7 @@ class Compressor { // https://zlib.net/manual.html static constexpr int DWRF_ORC_ZLIB_WINDOW_BITS = -15; static constexpr int PARQUET_ZLIB_WINDOW_BITS = 15; + static constexpr int PARQUET_ZLIB_WINDOW_BITS_4KB = 12; explicit Compressor(int32_t level) : level_{level} {} @@ -67,6 +71,35 @@ class Decompressor { const std::string streamDebugInfo_; }; +class AsyncDecompressor { + public: + explicit AsyncDecompressor(){}; + explicit AsyncDecompressor( + uint64_t blockSize, + const std::string& streamDebugInfo) + : blockSize_{blockSize}, streamDebugInfo_{streamDebugInfo} {} + + virtual ~AsyncDecompressor() = default; + + virtual uint64_t getUncompressedLength( + const char* /* unused */, + uint64_t /* unused */) const { + return blockSize_; + } + + virtual int decompress( + const char* src, + uint64_t srcLength, + char* dest, + uint64_t destLength) = 0; + + virtual bool waitResult(int job_id) = 0; + + protected: + uint64_t blockSize_; + const std::string streamDebugInfo_; +}; + struct CompressionOptions { union Format { struct { @@ -111,6 +144,42 @@ static CompressionOptions getParquetDecompressionOptions() { return options; } +/** + * Get the window size from zlib header(rfc1950). + * 0 1 + * +---+---+ + * |CMF|FLG| (more-->) + * +---+---+ + * bits 0 to 3 CM Compression method + * bits 4 to 7 CINFO Compression info + * CM (Compression method) This identifies the compression method used in the + * file. CM = 8 denotes the "deflate" compression method with a window size up + * to 32K. CINFO (Compression info) For CM = 8, CINFO is the base-2 logarithm of + * the LZ77 window size, minus eight (CINFO=7 indicates a 32K window size). + * @param stream_ptr the compressed block length for raw decompression + * @param stream_size compression options to use + */ +static int getZlibWindowBits(const uint8_t* stream_ptr, uint32_t stream_size) { + static constexpr uint8_t CM_ZLIB_DEFAULT_VALUE = 8u; + static constexpr uint32_t ZLIB_MIN_HEADER_SIZE = 2u; + static constexpr uint32_t ZLIB_INFO_OFFSET = 4u; + if (stream_size < ZLIB_MIN_HEADER_SIZE) { + return -1; + } + const uint8_t compression_method_and_flag = *stream_ptr++; + const uint8_t compression_method = compression_method_and_flag & 0xf; + const uint8_t compression_info = + compression_method_and_flag >> ZLIB_INFO_OFFSET; + + if (CM_ZLIB_DEFAULT_VALUE != compression_method) { + return -1; + } + if (compression_info > 7) { + return -1; + } + return CM_ZLIB_DEFAULT_VALUE + compression_info; +} + /** * Create a decompressor for the given compression kind. * @param kind the compression type to implement @@ -132,6 +201,22 @@ std::unique_ptr createDecompressor( size_t compressedLength = 0, CompressionOptions options = getDwrfOrcDecompressionOptions()); +/** + * Create a decompressor for the given compression kind in asynchronous mode. + * @param kind the compression type to implement + * @param input the input stream that is the underlying source + * @param bufferSize the maximum size of the buffer + * @param pool the memory pool + * @param useRawDecompression specify whether to perform raw decompression + * @param compressedLength the compressed block length for raw decompression + * @param options compression options to use + */ +std::unique_ptr +createAsyncDecompressor( + facebook::velox::common::CompressionKind kind, + uint64_t bufferSize, + const std::string& streamDebugInfo); + /** * Create a compressor for the given compression kind. * @param kind the compression type to implement diff --git a/velox/dwio/parquet/reader/CMakeLists.txt b/velox/dwio/parquet/reader/CMakeLists.txt index 6320324edfbba..82e712c7870ae 100644 --- a/velox/dwio/parquet/reader/CMakeLists.txt +++ b/velox/dwio/parquet/reader/CMakeLists.txt @@ -20,7 +20,6 @@ add_library( PageReader.cpp ParquetColumnReader.cpp ParquetData.cpp - IAAPageReader.cpp RepeatedColumnReader.cpp RleBpDecoder.cpp Statistics.cpp diff --git a/velox/dwio/parquet/reader/IAAPageReader.cpp b/velox/dwio/parquet/reader/IAAPageReader.cpp deleted file mode 100644 index c29cb74df4d8f..0000000000000 --- a/velox/dwio/parquet/reader/IAAPageReader.cpp +++ /dev/null @@ -1,1414 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/dwio/parquet/reader/IAAPageReader.h" -#include -#include //@manual -#include -#include -#include "velox/dwio/common/BufferUtil.h" -#include "velox/dwio/common/ColumnVisitors.h" -#include "velox/dwio/parquet/reader/NestedStructureDecoder.h" -#include "velox/dwio/parquet/thrift/ThriftTransport.h" -#include "velox/vector/FlatVector.h" - -#ifdef VELOX_ENABLE_QPL -namespace facebook::velox::parquet { - -using thrift::Encoding; -using thrift::PageHeader; - -void IAAPageReader::preDecompressPage(bool& need_pre_decompress) { - for (;;) { - auto dataStart = pageStart_; - if (chunkSize_ <= pageStart_) { - // This may happen if seeking to exactly end of row group. - numRepDefsInPage_ = 0; - numRowsInPage_ = 0; - break; - } - PageHeader pageHeader = readPageHeader(); - pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; - switch (pageHeader.type) { - case thrift::PageType::DATA_PAGE: - prefetchDataPageV1(pageHeader); - break; - case thrift::PageType::DATA_PAGE_V2: - prefetchDataPageV2(pageHeader); - break; - case thrift::PageType::DICTIONARY_PAGE: - prefetchDictionary(pageHeader); - continue; - default: - break; // ignore INDEX page type and any other custom extensions - } - break; - } - need_pre_decompress = isWinSizeFit; -} - -void IAAPageReader::seekToPage(int64_t row) { - defineDecoder_.reset(); - repeatDecoder_.reset(); - // 'rowOfPage_' is the row number of the first row of the next page. - rowOfPage_ += numRowsInPage_; - bool has_qpl = false; - if (dict_qpl_job_id > 0) { - bool job_success = waitQplJob(dict_qpl_job_id); - prepareDict(dictPageHeader_, job_success); - dict_qpl_job_id = 0; - has_qpl = true; - } - if (data_qpl_job_id > 0) { - bool job_success = waitQplJob(data_qpl_job_id); - bool result = prepareData(dataPageHeader_, row, job_success); - if (!result) { - LOG(WARNING) << "Decompress w/IAA error, try again with software."; - pre_decompress_data = false; - result = prepareData(dataPageHeader_, row, job_success); - if (!result) { - VELOX_FAIL("Decomrpess fail!"); - } - } - data_qpl_job_id = 0; - has_qpl = true; - } - - if (has_qpl) { - if (row == kRepDefOnly || row < rowOfPage_ + numRowsInPage_) { - return; - } - updateRowInfoAfterPageSkipped(); - } - - for (;;) { - auto dataStart = pageStart_; - if (chunkSize_ <= pageStart_) { - // This may happen if seeking to exactly end of row group. - numRepDefsInPage_ = 0; - numRowsInPage_ = 0; - break; - } - PageHeader pageHeader = readPageHeader(); - pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; - - switch (pageHeader.type) { - case thrift::PageType::DATA_PAGE: - prepareDataPageV1(pageHeader, row); - break; - case thrift::PageType::DATA_PAGE_V2: - prepareDataPageV2(pageHeader, row); - break; - case thrift::PageType::DICTIONARY_PAGE: - if (row == kRepDefOnly) { - skipBytes( - pageHeader.compressed_page_size, - inputStream_.get(), - bufferStart_, - bufferEnd_); - continue; - } - prepareDictionary(pageHeader); - continue; - default: - break; // ignore INDEX page type and any other custom extensions - } - if (row == kRepDefOnly || row < rowOfPage_ + numRowsInPage_) { - break; - } - updateRowInfoAfterPageSkipped(); - } -} - -PageHeader IAAPageReader::readPageHeader() { - if (bufferEnd_ == bufferStart_) { - const void* buffer; - int32_t size; - inputStream_->Next(&buffer, &size); - bufferStart_ = reinterpret_cast(buffer); - bufferEnd_ = bufferStart_ + size; - } - - std::shared_ptr transport = - std::make_shared( - inputStream_.get(), bufferStart_, bufferEnd_); - apache::thrift::protocol::TCompactProtocolT protocol( - transport); - PageHeader pageHeader; - uint64_t readBytes; - readBytes = pageHeader.read(&protocol); - - pageDataStart_ = pageStart_ + readBytes; - return pageHeader; -} - -const char* IAAPageReader::readBytes(int32_t size, BufferPtr& copy) { - if (bufferEnd_ == bufferStart_) { - const void* buffer = nullptr; - int32_t bufferSize = 0; - if (!inputStream_->Next(&buffer, &bufferSize)) { - VELOX_FAIL("Read past end"); - } - bufferStart_ = reinterpret_cast(buffer); - bufferEnd_ = bufferStart_ + bufferSize; - } - if (bufferEnd_ - bufferStart_ >= size) { - bufferStart_ += size; - return bufferStart_ - size; - } - dwio::common::ensureCapacity(copy, size, &pool_); - dwio::common::readBytes( - size, - inputStream_.get(), - copy->asMutable(), - bufferStart_, - bufferEnd_); - return copy->as(); -} - -const bool FOLLY_NONNULL IAAPageReader::uncompressQplData( - const char* pageData, - uint32_t compressedSize, - uint32_t uncompressedSize, - BufferPtr& uncompressedData, - uint32_t& qpl_job_id) { - dwio::common::ensureCapacity( - uncompressedData, uncompressedSize, &pool_); - - bool isGzip = codec_ == thrift::CompressionCodec::GZIP; - - if (!isWinSizeFit) { - // first time to check window size - int window_size = - getGzipWindowSize((const uint8_t*)pageData, uncompressedSize); - if (window_size == 12) { // window size is not 4KB - isWinSizeFit = true; - } else { - qpl_job_id = dwio::common::QplJobHWPool::GetInstance().MAX_JOB_NUMBER; - return false; - } - } - - qpl_job_id = this->DecompressAsync( - compressedSize, - (const uint8_t*)pageData, - uncompressedSize, - (uint8_t*)uncompressedData->asMutable(), - isGzip); - if (qpl_job_id >= dwio::common::QplJobHWPool::GetInstance().MAX_JOB_NUMBER) { - return false; - } - return true; -} - -uint32_t IAAPageReader::DecompressAsync( - int64_t input_length, - const uint8_t* input, - int64_t output_buffer_length, - uint8_t* output, - bool isGzip) { - // Reset the stream for this block - dwio::common::QplJobHWPool& qpl_job_pool = - dwio::common::QplJobHWPool::GetInstance(); - uint32_t job_id = 0; - qpl_job* job = qpl_job_pool.AcquireDeflateJob(job_id); - if (job == nullptr) { - LOG(WARNING) << "cannot AcquireDeflateJob "; - return qpl_job_pool.MAX_JOB_NUMBER; // Invalid job id to illustrate the - // failed decompress job. - } - job->op = qpl_op_decompress; - job->next_in_ptr = const_cast(input); - job->next_out_ptr = output; - job->available_in = input_length; - job->available_out = output_buffer_length; - job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST; - if (isGzip) { - job->flags |= QPL_FLAG_ZLIB_MODE; - } - - qpl_status status = qpl_submit_job(job); - if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) { - qpl_job_pool.ReleaseJob(job_id); - job = qpl_job_pool.AcquireDeflateJob(job_id); - if (job == nullptr) { - LOG(WARNING) - << "cannot acqure deflate job after QPL_STS_QUEUES_ARE_BUSY_ERR "; - return qpl_job_pool.MAX_JOB_NUMBER; // Invalid job id to illustrate the - // failed decompress job. - } - job->op = qpl_op_decompress; - job->next_in_ptr = const_cast(input); - job->next_out_ptr = output; - job->available_in = input_length; - job->available_out = output_buffer_length; - job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST; - if (isGzip) { - job->flags |= QPL_FLAG_ZLIB_MODE; - } - - status = qpl_submit_job(job); - } - if (status != QPL_STS_OK) { - qpl_job_pool.ReleaseJob(job_id); - LOG(WARNING) << "cannot submit job, error status: " << status; - return qpl_job_pool.MAX_JOB_NUMBER; // Invalid job id to illustrate the - // failed decompress job. - } else { - return job_id; - } -} - -const char* FOLLY_NONNULL IAAPageReader::uncompressData( - const char* pageData, - uint32_t compressedSize, - uint32_t uncompressedSize) { - switch (codec_) { - case thrift::CompressionCodec::UNCOMPRESSED: - return pageData; - case thrift::CompressionCodec::SNAPPY: { - dwio::common::ensureCapacity( - uncompressedData_, uncompressedSize, &pool_); - - size_t sizeFromSnappy; - if (!snappy::GetUncompressedLength( - pageData, compressedSize, &sizeFromSnappy)) { - VELOX_FAIL("Snappy uncompressed size not available"); - } - VELOX_CHECK_EQ(uncompressedSize, sizeFromSnappy); - snappy::RawUncompress( - pageData, compressedSize, uncompressedData_->asMutable()); - return uncompressedData_->as(); - } - case thrift::CompressionCodec::ZSTD: { - dwio::common::ensureCapacity( - uncompressedData_, uncompressedSize, &pool_); - - auto ret = ZSTD_decompress( - uncompressedData_->asMutable(), - uncompressedSize, - pageData, - compressedSize); - VELOX_CHECK( - !ZSTD_isError(ret), - "ZSTD returned an error: ", - ZSTD_getErrorName(ret)); - return uncompressedData_->as(); - } - case thrift::CompressionCodec::GZIP: { - dwio::common::ensureCapacity( - uncompressedData_, uncompressedSize, &pool_); - z_stream stream; - memset(&stream, 0, sizeof(stream)); - constexpr int WINDOW_BITS = 15; - // Determine if this is libz or gzip from header. - constexpr int DETECT_CODEC = 32; - // Initialize decompressor. - auto ret = inflateInit2(&stream, WINDOW_BITS | DETECT_CODEC); - VELOX_CHECK( - (ret == Z_OK), - "zlib inflateInit failed: {}", - stream.msg ? stream.msg : ""); - auto inflateEndGuard = folly::makeGuard([&] { - if (inflateEnd(&stream) != Z_OK) { - LOG(WARNING) << "inflateEnd: " << (stream.msg ? stream.msg : ""); - } - }); - // Decompress. - stream.next_in = - const_cast(reinterpret_cast(pageData)); - stream.avail_in = static_cast(compressedSize); - stream.next_out = - reinterpret_cast(uncompressedData_->asMutable()); - stream.avail_out = static_cast(uncompressedSize); - ret = inflate(&stream, Z_FINISH); - VELOX_CHECK( - ret == Z_STREAM_END, - "GZipCodec failed: {}", - stream.msg ? stream.msg : ""); - return uncompressedData_->as(); - } - default: - VELOX_FAIL("Unsupported Parquet compression type '{}'", codec_); - } -} - -/* Get the window size from zlib header(rfc1950). - 0 1 - +---+---+ - |CMF|FLG| (more-->) - +---+---+ - bits 0 to 3 CM Compression method - bits 4 to 7 CINFO Compression info - CM (Compression method) This identifies the compression method used in the - file. CM = 8 denotes the "deflate" compression method with a window size up - to 32K. CINFO (Compression info) For CM = 8, CINFO is the base-2 logarithm of - the LZ77 window size, minus eight (CINFO=7 indicates a 32K window size). -*/ -int IAAPageReader::getGzipWindowSize( - const uint8_t* stream_ptr, - uint32_t stream_size) { - if (stream_size < ZLIB_MIN_HEADER_SIZE) { - return -1; - } - const uint8_t compression_method_and_flag = *stream_ptr++; - const uint8_t compression_method = compression_method_and_flag & 0xf; - const uint8_t compression_info = - compression_method_and_flag >> ZLIB_INFO_OFFSET; - - if (CM_ZLIB_DEFAULT_VALUE != compression_method) { - return -1; - } - if (compression_info > 7) { - return -1; - } - return CM_ZLIB_DEFAULT_VALUE + compression_info; -} - -void IAAPageReader::setPageRowInfo(bool forRepDef) { - if (isTopLevel_ || forRepDef || maxRepeat_ == 0) { - numRowsInPage_ = numRepDefsInPage_; - } else if (hasChunkRepDefs_) { - ++pageIndex_; - VELOX_CHECK_LT( - pageIndex_, - numLeavesInPage_.size(), - "Seeking past known repdefs for non top level column page {}", - pageIndex_); - numRowsInPage_ = numLeavesInPage_[pageIndex_]; - } else { - numRowsInPage_ = kRowsUnknown; - } -} - -void IAAPageReader::readPageDefLevels() { - VELOX_CHECK(kRowsUnknown == numRowsInPage_ || maxDefine_ > 1); - definitionLevels_.resize(numRepDefsInPage_); - wideDefineDecoder_->GetBatch(definitionLevels_.data(), numRepDefsInPage_); - leafNulls_.resize(bits::nwords(numRepDefsInPage_)); - leafNullsSize_ = getLengthsAndNulls( - LevelMode::kNulls, - leafInfo_, - - 0, - numRepDefsInPage_, - numRepDefsInPage_, - nullptr, - leafNulls_.data(), - 0); - numRowsInPage_ = leafNullsSize_; - numLeafNullsConsumed_ = 0; -} - -void IAAPageReader::updateRowInfoAfterPageSkipped() { - rowOfPage_ += numRowsInPage_; - if (hasChunkRepDefs_) { - numLeafNullsConsumed_ = rowOfPage_; - } -} - -void IAAPageReader::prefetchDataPageV1(const thrift::PageHeader& pageHeader) { - dataPageHeader_ = pageHeader; - VELOX_CHECK( - pageHeader.type == thrift::PageType::DATA_PAGE && - pageHeader.__isset.data_page_header); - - dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - pre_decompress_data = uncompressQplData( - dataPageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size, - uncompressedDataV1Data_, - data_qpl_job_id); - return; -} - -void IAAPageReader::prefetchDataPageV2(const thrift::PageHeader& pageHeader) { - return; -} - -void IAAPageReader::prefetchDictionary(const thrift::PageHeader& pageHeader) { - dictionary_.numValues = pageHeader.dictionary_page_header.num_values; - dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; - dictionary_.sorted = pageHeader.dictionary_page_header.__isset.is_sorted && - pageHeader.dictionary_page_header.is_sorted; - dictPageHeader_ = pageHeader; - VELOX_CHECK( - dictionaryEncoding_ == Encoding::PLAIN_DICTIONARY || - dictionaryEncoding_ == Encoding::PLAIN); - dictPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - - pre_decompress_dict = uncompressQplData( - dictPageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size, - uncompressedDictData_, - dict_qpl_job_id); - - return; -} - -void IAAPageReader::prepareDict( - const thrift::PageHeader& pageHeader, - bool job_success) { - if (!pre_decompress_dict || !job_success) { - LOG(WARNING) - << "Decompress w/IAA error, try to uncompress dict with software."; - dictPageData_ = uncompressData( - dictPageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); - } else { - dictPageData_ = uncompressedDictData_->as(); - } - - auto parquetType = type_->parquetType_.value(); - switch (parquetType) { - case thrift::Type::INT32: - case thrift::Type::INT64: - case thrift::Type::FLOAT: - case thrift::Type::DOUBLE: { - int32_t typeSize = (parquetType == thrift::Type::INT32 || - parquetType == thrift::Type::FLOAT) - ? sizeof(float) - : sizeof(double); - auto numBytes = dictionary_.numValues * typeSize; - if (type_->type->isShortDecimal() && parquetType == thrift::Type::INT32) { - auto veloxTypeLength = type_->type->cppSizeInBytes(); - auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; - dictionary_.values = - AlignedBuffer::allocate(numVeloxBytes, &pool_); - } else { - dictionary_.values = AlignedBuffer::allocate(numBytes, &pool_); - } - if (dictPageData_) { - memcpy(dictionary_.values->asMutable(), dictPageData_, numBytes); - } else { - dwio::common::readBytes( - numBytes, - inputStream_.get(), - dictionary_.values->asMutable(), - bufferStart_, - bufferEnd_); - } - if (type_->type->isShortDecimal() && parquetType == thrift::Type::INT32) { - auto values = dictionary_.values->asMutable(); - auto parquetValues = dictionary_.values->asMutable(); - for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - // Expand the Parquet type length values to Velox type length. - // We start from the end to allow in-place expansion. - values[i] = parquetValues[i]; - } - } - break; - } - case thrift::Type::BYTE_ARRAY: { - dictionary_.values = - AlignedBuffer::allocate(dictionary_.numValues, &pool_); - auto numBytes = pageHeader.uncompressed_page_size; - auto values = dictionary_.values->asMutable(); - dictionary_.strings = AlignedBuffer::allocate(numBytes, &pool_); - auto strings = dictionary_.strings->asMutable(); - if (dictPageData_) { - memcpy(strings, dictPageData_, numBytes); - } else { - dwio::common::readBytes( - numBytes, inputStream_.get(), strings, bufferStart_, bufferEnd_); - } - auto header = strings; - for (auto i = 0; i < dictionary_.numValues; ++i) { - auto length = *reinterpret_cast(header); - values[i] = StringView(header + sizeof(int32_t), length); - header += length + sizeof(int32_t); - } - VELOX_CHECK_EQ(header, strings + numBytes); - break; - } - case thrift::Type::FIXED_LEN_BYTE_ARRAY: { - auto parquetTypeLength = type_->typeLength_; - auto numParquetBytes = dictionary_.numValues * parquetTypeLength; - auto veloxTypeLength = type_->type->cppSizeInBytes(); - auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; - dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); - auto data = dictionary_.values->asMutable(); - // Read the data bytes. - if (dictPageData_) { - memcpy(data, dictPageData_, numParquetBytes); - } else { - dwio::common::readBytes( - numParquetBytes, - inputStream_.get(), - data, - bufferStart_, - bufferEnd_); - } - if (type_->type->isShortDecimal()) { - // Parquet decimal values have a fixed typeLength_ and are in big-endian - // layout. - if (numParquetBytes < numVeloxBytes) { - auto values = dictionary_.values->asMutable(); - for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - // Expand the Parquet type length values to Velox type length. - // We start from the end to allow in-place expansion. - auto sourceValue = data + (i * parquetTypeLength); - int64_t value = *sourceValue >= 0 ? 0 : -1; - memcpy( - reinterpret_cast(&value) + veloxTypeLength - - parquetTypeLength, - sourceValue, - parquetTypeLength); - values[i] = value; - } - } - auto values = dictionary_.values->asMutable(); - for (auto i = 0; i < dictionary_.numValues; ++i) { - values[i] = __builtin_bswap64(values[i]); - } - break; - } else if (type_->type->isLongDecimal()) { - // Parquet decimal values have a fixed typeLength_ and are in big-endian - // layout. - if (numParquetBytes < numVeloxBytes) { - auto values = dictionary_.values->asMutable(); - for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - // Expand the Parquet type length values to Velox type length. - // We start from the end to allow in-place expansion. - auto sourceValue = data + (i * parquetTypeLength); - int128_t value = *sourceValue >= 0 ? 0 : -1; - memcpy( - reinterpret_cast(&value) + veloxTypeLength - - parquetTypeLength, - sourceValue, - parquetTypeLength); - values[i] = value; - } - } - auto values = dictionary_.values->asMutable(); - for (auto i = 0; i < dictionary_.numValues; ++i) { - values[i] = dwio::common::builtin_bswap128(values[i]); - } - break; - } - VELOX_UNSUPPORTED( - "Parquet type {} not supported for dictionary", parquetType); - } - case thrift::Type::INT96: - default: - VELOX_UNSUPPORTED( - "Parquet type {} not supported for dictionary", parquetType); - } -} - -bool IAAPageReader::prepareData( - const thrift::PageHeader& pageHeader, - int64_t row, - bool job_success) { - if (!pre_decompress_data || !job_success) { - LOG(WARNING) << "Need to uncompress data with sw, pre_decompress_data: " - << pre_decompress_data << ", job_success: " << job_success; - pageData_ = uncompressData( - dataPageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); - } else { - pageData_ = uncompressedDataV1Data_->as(); - } - numRepDefsInPage_ = pageHeader.data_page_header.num_values; - setPageRowInfo(row == kRepDefOnly); - - auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; - if (maxRepeat_ > 0) { - uint32_t repeatLength = readField(pageData_); - repeatDecoder_ = std::make_unique( - reinterpret_cast(pageData_), - repeatLength, - arrow::bit_util::NumRequiredBits(maxRepeat_)); - - pageData_ += repeatLength; - } - - if (maxDefine_ > 0) { - auto defineLength = readField(pageData_); - if (maxDefine_ == 1) { - defineDecoder_ = std::make_unique( - pageData_, - pageData_ + defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); - } else { - wideDefineDecoder_ = std::make_unique( - reinterpret_cast(pageData_), - defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); - } - pageData_ += defineLength; - } - encodedDataSize_ = pageEnd - pageData_; - if (encodedDataSize_ > pageHeader.uncompressed_page_size) { - LOG(WARNING) << "Decompress w/IAA error, false encodedDataSize_: " - << encodedDataSize_ << ", actual encodedDataSize_: " - << pageHeader.uncompressed_page_size; - return false; - } - - encoding_ = pageHeader.data_page_header.encoding; - if (!hasChunkRepDefs_ && (numRowsInPage_ == kRowsUnknown || maxDefine_ > 1)) { - readPageDefLevels(); - } - if (row != kRepDefOnly) { - makeDecoder(); - } - return true; -} -void IAAPageReader::prepareDataPageV1( - const PageHeader& pageHeader, - int64_t row) { - VELOX_CHECK( - pageHeader.type == thrift::PageType::DATA_PAGE && - pageHeader.__isset.data_page_header); - numRepDefsInPage_ = pageHeader.data_page_header.num_values; - setPageRowInfo(row == kRepDefOnly); - if (row != kRepDefOnly && numRowsInPage_ != kRowsUnknown && - numRowsInPage_ + rowOfPage_ <= row) { - dwio::common::skipBytes( - pageHeader.compressed_page_size, - inputStream_.get(), - bufferStart_, - bufferEnd_); - - return; - } - pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - pageData_ = uncompressData( - pageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); - auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; - if (maxRepeat_ > 0) { - uint32_t repeatLength = readField(pageData_); - repeatDecoder_ = std::make_unique( - reinterpret_cast(pageData_), - repeatLength, - arrow::bit_util::NumRequiredBits(maxRepeat_)); - - pageData_ += repeatLength; - } - - if (maxDefine_ > 0) { - auto defineLength = readField(pageData_); - if (maxDefine_ == 1) { - defineDecoder_ = std::make_unique( - pageData_, - pageData_ + defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); - } else { - wideDefineDecoder_ = std::make_unique( - reinterpret_cast(pageData_), - defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); - } - pageData_ += defineLength; - } - encodedDataSize_ = pageEnd - pageData_; - - encoding_ = pageHeader.data_page_header.encoding; - if (!hasChunkRepDefs_ && (numRowsInPage_ == kRowsUnknown || maxDefine_ > 1)) { - readPageDefLevels(); - } - - if (row != kRepDefOnly) { - makeDecoder(); - } -} - -void IAAPageReader::prepareDataPageV2( - const PageHeader& pageHeader, - int64_t row) { - VELOX_CHECK(pageHeader.__isset.data_page_header_v2); - numRepDefsInPage_ = pageHeader.data_page_header_v2.num_values; - setPageRowInfo(row == kRepDefOnly); - if (row != kRepDefOnly && numRowsInPage_ != kRowsUnknown && - numRowsInPage_ + rowOfPage_ <= row) { - skipBytes( - pageHeader.compressed_page_size, - inputStream_.get(), - bufferStart_, - bufferEnd_); - return; - } - - uint32_t defineLength = maxDefine_ > 0 - ? pageHeader.data_page_header_v2.definition_levels_byte_length - : 0; - uint32_t repeatLength = maxRepeat_ > 0 - ? pageHeader.data_page_header_v2.repetition_levels_byte_length - : 0; - auto bytes = pageHeader.compressed_page_size; - pageData_ = readBytes(bytes, pageBuffer_); - - if (repeatLength) { - repeatDecoder_ = std::make_unique( - reinterpret_cast(pageData_), - repeatLength, - arrow::bit_util::NumRequiredBits(maxRepeat_)); - } - - if (maxDefine_ > 0) { - defineDecoder_ = std::make_unique( - pageData_ + repeatLength, - pageData_ + repeatLength + defineLength, - arrow::bit_util::NumRequiredBits(maxDefine_)); - } - auto levelsSize = repeatLength + defineLength; - pageData_ += levelsSize; - if (pageHeader.data_page_header_v2.__isset.is_compressed || - pageHeader.data_page_header_v2.is_compressed) { - pageData_ = uncompressData( - pageData_, - pageHeader.compressed_page_size - levelsSize, - pageHeader.uncompressed_page_size - levelsSize); - } - if (row == kRepDefOnly) { - skipBytes(bytes, inputStream_.get(), bufferStart_, bufferEnd_); - return; - } - - encodedDataSize_ = pageHeader.uncompressed_page_size - levelsSize; - encoding_ = pageHeader.data_page_header_v2.encoding; - if (numRowsInPage_ == kRowsUnknown) { - readPageDefLevels(); - } - if (row != kRepDefOnly) { - makeDecoder(); - } -} - -void IAAPageReader::prepareDictionary(const PageHeader& pageHeader) { - dictionary_.numValues = pageHeader.dictionary_page_header.num_values; - dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; - dictionary_.sorted = pageHeader.dictionary_page_header.__isset.is_sorted && - pageHeader.dictionary_page_header.is_sorted; - VELOX_CHECK( - dictionaryEncoding_ == Encoding::PLAIN_DICTIONARY || - dictionaryEncoding_ == Encoding::PLAIN); - - if (codec_ != thrift::CompressionCodec::UNCOMPRESSED) { - pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - pageData_ = uncompressData( - pageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); - } - - auto parquetType = type_->parquetType_.value(); - switch (parquetType) { - case thrift::Type::INT32: - case thrift::Type::INT64: - case thrift::Type::FLOAT: - case thrift::Type::DOUBLE: { - int32_t typeSize = (parquetType == thrift::Type::INT32 || - parquetType == thrift::Type::FLOAT) - ? sizeof(float) - : sizeof(double); - auto numBytes = dictionary_.numValues * typeSize; - if (type_->type->isShortDecimal() && parquetType == thrift::Type::INT32) { - auto veloxTypeLength = type_->type->cppSizeInBytes(); - auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; - dictionary_.values = - AlignedBuffer::allocate(numVeloxBytes, &pool_); - } else { - dictionary_.values = AlignedBuffer::allocate(numBytes, &pool_); - } - if (pageData_) { - memcpy(dictionary_.values->asMutable(), pageData_, numBytes); - } else { - dwio::common::readBytes( - numBytes, - inputStream_.get(), - dictionary_.values->asMutable(), - bufferStart_, - bufferEnd_); - } - if (type_->type->isShortDecimal() && parquetType == thrift::Type::INT32) { - auto values = dictionary_.values->asMutable(); - auto parquetValues = dictionary_.values->asMutable(); - for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - // Expand the Parquet type length values to Velox type length. - // We start from the end to allow in-place expansion. - values[i] = parquetValues[i]; - } - } - break; - } - case thrift::Type::BYTE_ARRAY: { - dictionary_.values = - AlignedBuffer::allocate(dictionary_.numValues, &pool_); - auto numBytes = pageHeader.uncompressed_page_size; - auto values = dictionary_.values->asMutable(); - dictionary_.strings = AlignedBuffer::allocate(numBytes, &pool_); - auto strings = dictionary_.strings->asMutable(); - if (pageData_) { - memcpy(strings, pageData_, numBytes); - } else { - dwio::common::readBytes( - numBytes, inputStream_.get(), strings, bufferStart_, bufferEnd_); - } - auto header = strings; - for (auto i = 0; i < dictionary_.numValues; ++i) { - auto length = *reinterpret_cast(header); - values[i] = StringView(header + sizeof(int32_t), length); - header += length + sizeof(int32_t); - } - VELOX_CHECK_EQ(header, strings + numBytes); - break; - } - case thrift::Type::FIXED_LEN_BYTE_ARRAY: { - auto parquetTypeLength = type_->typeLength_; - auto numParquetBytes = dictionary_.numValues * parquetTypeLength; - auto veloxTypeLength = type_->type->cppSizeInBytes(); - auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; - dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); - auto data = dictionary_.values->asMutable(); - // Read the data bytes. - if (pageData_) { - memcpy(data, pageData_, numParquetBytes); - } else { - dwio::common::readBytes( - numParquetBytes, - inputStream_.get(), - data, - bufferStart_, - bufferEnd_); - } - if (type_->type->isShortDecimal()) { - // Parquet decimal values have a fixed typeLength_ and are in big-endian - // layout. - if (numParquetBytes < numVeloxBytes) { - auto values = dictionary_.values->asMutable(); - for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - // Expand the Parquet type length values to Velox type length. - // We start from the end to allow in-place expansion. - auto sourceValue = data + (i * parquetTypeLength); - int64_t value = *sourceValue >= 0 ? 0 : -1; - memcpy( - reinterpret_cast(&value) + veloxTypeLength - - parquetTypeLength, - sourceValue, - parquetTypeLength); - values[i] = value; - } - } - auto values = dictionary_.values->asMutable(); - for (auto i = 0; i < dictionary_.numValues; ++i) { - values[i] = __builtin_bswap64(values[i]); - } - break; - } else if (type_->type->isLongDecimal()) { - // Parquet decimal values have a fixed typeLength_ and are in big-endian - // layout. - if (numParquetBytes < numVeloxBytes) { - auto values = dictionary_.values->asMutable(); - for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - // Expand the Parquet type length values to Velox type length. - // We start from the end to allow in-place expansion. - auto sourceValue = data + (i * parquetTypeLength); - int128_t value = *sourceValue >= 0 ? 0 : -1; - memcpy( - reinterpret_cast(&value) + veloxTypeLength - - parquetTypeLength, - sourceValue, - parquetTypeLength); - values[i] = value; - } - } - auto values = dictionary_.values->asMutable(); - for (auto i = 0; i < dictionary_.numValues; ++i) { - values[i] = dwio::common::builtin_bswap128(values[i]); - } - break; - } - VELOX_UNSUPPORTED( - "Parquet type {} not supported for dictionary", parquetType); - } - case thrift::Type::INT96: - default: - VELOX_UNSUPPORTED( - "Parquet type {} not supported for dictionary", parquetType); - } -} - -void IAAPageReader::makeFilterCache(dwio::common::ScanState& state) { - VELOX_CHECK( - !state.dictionary2.values, "Parquet supports only one dictionary"); - state.filterCache.resize(state.dictionary.numValues); - simd::memset( - state.filterCache.data(), - dwio::common::FilterResult::kUnknown, - state.filterCache.size()); - state.rawState.filterCache = state.filterCache.data(); -} - -namespace { -int32_t parquetTypeBytes(thrift::Type::type type) { - switch (type) { - case thrift::Type::INT32: - case thrift::Type::FLOAT: - return 4; - case thrift::Type::INT64: - case thrift::Type::DOUBLE: - return 8; - default: - VELOX_FAIL("Type does not have a byte width {}", type); - } -} -} // namespace - -void IAAPageReader::preloadRepDefs() { - hasChunkRepDefs_ = true; - while (pageStart_ < chunkSize_) { - seekToPage(kRepDefOnly); - auto begin = definitionLevels_.size(); - auto numLevels = definitionLevels_.size() + numRepDefsInPage_; - definitionLevels_.resize(numLevels); - wideDefineDecoder_->GetBatch( - definitionLevels_.data() + begin, numRepDefsInPage_); - if (repeatDecoder_) { - repetitionLevels_.resize(numLevels); - - repeatDecoder_->GetBatch( - repetitionLevels_.data() + begin, numRepDefsInPage_); - } - leafNulls_.resize(bits::nwords(leafNullsSize_ + numRepDefsInPage_)); - auto numLeaves = getLengthsAndNulls( - LevelMode::kNulls, - leafInfo_, - begin, - begin + numRepDefsInPage_, - numRepDefsInPage_, - nullptr, - leafNulls_.data(), - leafNullsSize_); - leafNullsSize_ += numLeaves; - numLeavesInPage_.push_back(numLeaves); - } - - // Reset the input to start of column chunk. - std::vector rewind = {0}; - pageStart_ = 0; - dwio::common::PositionProvider position(rewind); - inputStream_->seekToPosition(position); - bufferStart_ = bufferEnd_ = nullptr; - rowOfPage_ = 0; - numRowsInPage_ = 0; - pageData_ = nullptr; - dictPageData_ = nullptr; - dataPageData_ = nullptr; -} - -void IAAPageReader::decodeRepDefs(int32_t numTopLevelRows) { - if (definitionLevels_.empty()) { - preloadRepDefs(); - } - repDefBegin_ = repDefEnd_; - int32_t numLevels = definitionLevels_.size(); - int32_t topFound = 0; - int32_t i = repDefBegin_; - if (maxRepeat_ > 0) { - for (; i < numLevels; ++i) { - if (repetitionLevels_[i] == 0) { - ++topFound; - if (topFound == numTopLevelRows + 1) { - break; - } - } - } - repDefEnd_ = i; - } else { - repDefEnd_ = i + numTopLevelRows; - } -} - -int32_t IAAPageReader::getLengthsAndNulls( - LevelMode mode, - const ::parquet::internal::LevelInfo& info, - int32_t begin, - int32_t end, - int32_t maxItems, - int32_t* lengths, - uint64_t* nulls, - int32_t nullsStartIndex) const { - ::parquet::internal::ValidityBitmapInputOutput bits; - bits.values_read_upper_bound = maxItems; - bits.values_read = 0; - bits.null_count = 0; - bits.valid_bits = reinterpret_cast(nulls); - bits.valid_bits_offset = nullsStartIndex; - - switch (mode) { - case LevelMode::kNulls: - DefLevelsToBitmap( - definitionLevels_.data() + begin, end - begin, info, &bits); - break; - case LevelMode::kList: { - ::parquet::internal::DefRepLevelsToList( - definitionLevels_.data() + begin, - repetitionLevels_.data() + begin, - end - begin, - info, - &bits, - lengths); - // Convert offsets to lengths. - for (auto i = 0; i < bits.values_read; ++i) { - lengths[i] = lengths[i + 1] - lengths[i]; - } - break; - } - case LevelMode::kStructOverLists: { - DefRepLevelsToBitmap( - definitionLevels_.data() + begin, - repetitionLevels_.data() + begin, - end - begin, - info, - &bits); - break; - } - } - return bits.values_read; -} - -void IAAPageReader::makeDecoder() { - auto parquetType = type_->parquetType_.value(); - switch (encoding_) { - case Encoding::RLE_DICTIONARY: - case Encoding::PLAIN_DICTIONARY: - if (encodedDataSize_ > dataPageHeader_.uncompressed_page_size) { - std::cout << "encodedDataSize_: " << encodedDataSize_ - << ", pre_decompress_data: " << pre_decompress_data << true - << ", dataPageHeader_: " - << dataPageHeader_.uncompressed_page_size << std::endl; - } - dictionaryIdDecoder_ = std::make_unique( - pageData_ + 1, pageData_ + encodedDataSize_, pageData_[0]); - break; - case Encoding::PLAIN: - switch (parquetType) { - case thrift::Type::BOOLEAN: - booleanDecoder_ = std::make_unique( - pageData_, pageData_ + encodedDataSize_); - break; - case thrift::Type::BYTE_ARRAY: - stringDecoder_ = std::make_unique( - pageData_, pageData_ + encodedDataSize_); - break; - case thrift::Type::FIXED_LEN_BYTE_ARRAY: - directDecoder_ = std::make_unique>( - std::make_unique( - pageData_, encodedDataSize_), - false, - type_->typeLength_, - true); - break; - default: { - directDecoder_ = std::make_unique>( - std::make_unique( - pageData_, encodedDataSize_), - false, - parquetTypeBytes(type_->parquetType_.value())); - } - } - break; - case Encoding::DELTA_BINARY_PACKED: - default: - VELOX_UNSUPPORTED("Encoding not supported yet"); - } -} - -void IAAPageReader::skip(int64_t numRows) { - if (!numRows && firstUnvisited_ != rowOfPage_ + numRowsInPage_) { - // Return if no skip and position not at end of page or before first page. - return; - } - auto toSkip = numRows; - if (firstUnvisited_ + numRows >= rowOfPage_ + numRowsInPage_) { - seekToPage(firstUnvisited_ + numRows); - if (hasChunkRepDefs_) { - numLeafNullsConsumed_ = rowOfPage_; - } - toSkip -= rowOfPage_ - firstUnvisited_; - } - firstUnvisited_ += numRows; - - // Skip nulls - toSkip = skipNulls(toSkip); - - // Skip the decoder - if (isDictionary()) { - dictionaryIdDecoder_->skip(toSkip); - } else if (directDecoder_) { - directDecoder_->skip(toSkip); - } else if (stringDecoder_) { - stringDecoder_->skip(toSkip); - } else if (booleanDecoder_) { - booleanDecoder_->skip(toSkip); - } else { - VELOX_FAIL("No decoder to skip"); - } -} - -int32_t IAAPageReader::skipNulls(int32_t numValues) { - if (!defineDecoder_ && isTopLevel_) { - return numValues; - } - VELOX_CHECK(1 == maxDefine_ || !leafNulls_.empty()); - dwio::common::ensureCapacity(tempNulls_, numValues, &pool_); - tempNulls_->setSize(0); - if (isTopLevel_) { - bool allOnes; - defineDecoder_->readBits( - numValues, tempNulls_->asMutable(), &allOnes); - if (allOnes) { - return numValues; - } - } else { - readNulls(numValues, tempNulls_); - } - auto words = tempNulls_->as(); - return bits::countBits(words, 0, numValues); -} - -void IAAPageReader::skipNullsOnly(int64_t numRows) { - if (!numRows && firstUnvisited_ != rowOfPage_ + numRowsInPage_) { - // Return if no skip and position not at end of page or before first page. - return; - } - auto toSkip = numRows; - if (firstUnvisited_ + numRows >= rowOfPage_ + numRowsInPage_) { - seekToPage(firstUnvisited_ + numRows); - firstUnvisited_ += numRows; - toSkip = firstUnvisited_ - rowOfPage_; - } else { - firstUnvisited_ += numRows; - } - - // Skip nulls - skipNulls(toSkip); -} - -void IAAPageReader::readNullsOnly(int64_t numValues, BufferPtr& buffer) { - VELOX_CHECK(!maxRepeat_); - auto toRead = numValues; - if (buffer) { - dwio::common::ensureCapacity(buffer, numValues, &pool_); - } - nullConcatenation_.reset(buffer); - while (toRead) { - auto availableOnPage = rowOfPage_ + numRowsInPage_ - firstUnvisited_; - if (!availableOnPage) { - seekToPage(firstUnvisited_); - availableOnPage = numRowsInPage_; - } - auto numRead = std::min(availableOnPage, toRead); - auto nulls = readNulls(numRead, nullsInReadRange_); - toRead -= numRead; - nullConcatenation_.append(nulls, 0, numRead); - firstUnvisited_ += numRead; - } - buffer = nullConcatenation_.buffer(); -} - -const uint64_t* FOLLY_NULLABLE -IAAPageReader::readNulls(int32_t numValues, BufferPtr& buffer) { - if (maxDefine_ == 0) { - buffer = nullptr; - return nullptr; - } - dwio::common::ensureCapacity(buffer, numValues, &pool_); - if (isTopLevel_) { - VELOX_CHECK_EQ(1, maxDefine_); - bool allOnes; - defineDecoder_->readBits( - numValues, buffer->asMutable(), &allOnes); - return allOnes ? nullptr : buffer->as(); - } - bits::copyBits( - leafNulls_.data(), - numLeafNullsConsumed_, - buffer->asMutable(), - 0, - numValues); - numLeafNullsConsumed_ += numValues; - return buffer->as(); -} - -void IAAPageReader::startVisit(folly::Range rows) { - visitorRows_ = rows.data(); - numVisitorRows_ = rows.size(); - currentVisitorRow_ = 0; - initialRowOfPage_ = rowOfPage_; - visitBase_ = firstUnvisited_; -} - -bool IAAPageReader::rowsForPage( - dwio::common::SelectiveColumnReader& reader, - bool hasFilter, - bool mayProduceNulls, - folly::Range& rows, - const uint64_t* FOLLY_NULLABLE& nulls) { - if (currentVisitorRow_ == numVisitorRows_) { - return false; - } - int32_t numToVisit; - // Check if the first row to go to is in the current page. If not, seek to the - // page that contains the row. - auto rowZero = visitBase_ + visitorRows_[currentVisitorRow_]; - if (rowZero >= rowOfPage_ + numRowsInPage_) { - seekToPage(rowZero); - if (hasChunkRepDefs_) { - numLeafNullsConsumed_ = rowOfPage_; - } - } - auto& scanState = reader.scanState(); - if (isDictionary()) { - if (scanState.dictionary.values != dictionary_.values) { - scanState.dictionary = dictionary_; - if (hasFilter) { - makeFilterCache(scanState); - } - scanState.updateRawState(); - } - } else { - if (scanState.dictionary.values) { - // If there are previous pages in the current read, nulls read - // from them are in 'nullConcatenation_' Put this into the - // reader for the time of dedictionarizing so we don't read - // undefined dictionary indices. - if (mayProduceNulls && reader.numValues()) { - reader.setNulls(nullConcatenation_.buffer()); - } - reader.dedictionarize(); - // The nulls across all pages are in nullConcatenation_. Clear - // the nulls and let the prepareNulls below reserve nulls for - // the new page. - reader.setNulls(nullptr); - scanState.dictionary.clear(); - } - } - - // Then check how many of the rows to visit are on the same page as the - // current one. - int32_t firstOnNextPage = rowOfPage_ + numRowsInPage_ - visitBase_; - if (firstOnNextPage > visitorRows_[numVisitorRows_ - 1]) { - // All the remaining rows are on this page. - numToVisit = numVisitorRows_ - currentVisitorRow_; - } else { - // Find the last row in the rows to visit that is on this page. - auto rangeLeft = folly::Range( - visitorRows_ + currentVisitorRow_, - numVisitorRows_ - currentVisitorRow_); - auto it = - std::lower_bound(rangeLeft.begin(), rangeLeft.end(), firstOnNextPage); - assert(it != rangeLeft.end()); - assert(it != rangeLeft.begin()); - numToVisit = it - (visitorRows_ + currentVisitorRow_); - } - // If the page did not change and this is the first call, we can return a view - // on the original visitor rows. - if (rowOfPage_ == initialRowOfPage_ && currentVisitorRow_ == 0) { - nulls = - readNulls(visitorRows_[numToVisit - 1] + 1, reader.nullsInReadRange()); - rowNumberBias_ = 0; - rows = folly::Range(visitorRows_, numToVisit); - } else { - // We scale row numbers to be relative to first on this page. - auto pageOffset = rowOfPage_ - visitBase_; - rowNumberBias_ = visitorRows_[currentVisitorRow_]; - skip(rowNumberBias_ - pageOffset); - // The decoder is positioned at 'visitorRows_[currentVisitorRow_']' - // We copy the rows to visit with a bias, so that the first to visit has - // offset 0. - rowsCopy_->resize(numToVisit); - auto copy = rowsCopy_->data(); - // Subtract 'rowNumberBias_' from the rows to visit on this page. - // 'copy' has a writable tail of SIMD width, so no special case for end of - // loop. - for (auto i = 0; i < numToVisit; i += xsimd::batch::size) { - auto numbers = xsimd::batch::load_unaligned( - &visitorRows_[i + currentVisitorRow_]) - - rowNumberBias_; - numbers.store_unaligned(copy); - copy += xsimd::batch::size; - } - nulls = readNulls(rowsCopy_->back() + 1, reader.nullsInReadRange()); - rows = folly::Range( - rowsCopy_->data(), rowsCopy_->size()); - } - reader.prepareNulls(rows, nulls != nullptr, currentVisitorRow_); - currentVisitorRow_ += numToVisit; - firstUnvisited_ = visitBase_ + visitorRows_[currentVisitorRow_ - 1] + 1; - return true; -} - -const VectorPtr& IAAPageReader::dictionaryValues(const TypePtr& type) { - if (!dictionaryValues_) { - dictionaryValues_ = std::make_shared>( - &pool_, - type, - nullptr, - dictionary_.numValues, - dictionary_.values, - std::vector{dictionary_.strings}); - } - return dictionaryValues_; -} - -bool IAAPageReader::waitQplJob(uint32_t job_id) { - dwio::common::QplJobHWPool& qpl_job_pool = - dwio::common::QplJobHWPool::GetInstance(); - if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) { - job_id = 0; - return true; - } - qpl_job* job = qpl_job_pool.GetJobById(job_id); - - auto status = qpl_wait_job(job); - qpl_job_pool.ReleaseJob(job_id); - if (status == QPL_STS_BAD_DIST_ERR) { - isWinSizeFit = false; - } - if (status != QPL_STS_OK) { - LOG(WARNING) << "Decompress w/IAA error, status: " << status; - return false; - } - return true; -} - -IAAPageReader::~IAAPageReader() { - dwio::common::QplJobHWPool& qpl_job_pool = - dwio::common::QplJobHWPool::GetInstance(); - if (dict_qpl_job_id > 0 && dict_qpl_job_id < qpl_job_pool.MAX_JOB_NUMBER) { - qpl_job* job = qpl_job_pool.GetJobById(dict_qpl_job_id); - qpl_job_pool.ReleaseJob(dict_qpl_job_id); - dict_qpl_job_id = 0; - } - if (data_qpl_job_id > 0 && data_qpl_job_id < qpl_job_pool.MAX_JOB_NUMBER) { - qpl_job* job = qpl_job_pool.GetJobById(data_qpl_job_id); - qpl_job_pool.ReleaseJob(data_qpl_job_id); - data_qpl_job_id = 0; - } -} - -} // namespace facebook::velox::parquet - -#endif \ No newline at end of file diff --git a/velox/dwio/parquet/reader/IAAPageReader.h b/velox/dwio/parquet/reader/IAAPageReader.h deleted file mode 100644 index d9c4ccb61734c..0000000000000 --- a/velox/dwio/parquet/reader/IAAPageReader.h +++ /dev/null @@ -1,610 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include "velox/dwio/common/BitConcatenation.h" -#include "velox/dwio/common/DirectDecoder.h" -#include "velox/dwio/common/QplJobPool.h" -#include "velox/dwio/common/SelectiveColumnReader.h" -#include "velox/dwio/parquet/reader/BooleanDecoder.h" -#include "velox/dwio/parquet/reader/PageReaderBase.h" -#include "velox/dwio/parquet/reader/ParquetTypeWithId.h" -#include "velox/dwio/parquet/reader/RleBpDataDecoder.h" -#include "velox/dwio/parquet/reader/StringDecoder.h" -#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" -#include "velox/vector/BaseVector.h" - -#ifdef VELOX_ENABLE_QPL -namespace facebook::velox::parquet { - -/// Manages access to pages inside a ColumnChunk. Interprets page headers and -/// encodings and presents the combination of pages and encoded values as a -/// continuous stream accessible via readWithVisitor(). -class IAAPageReader : public parquet::PageReaderBase { - public: - IAAPageReader( - std::unique_ptr stream, - memory::MemoryPool& pool, - ParquetTypeWithIdPtr nodeType, - thrift::CompressionCodec::type codec, - int64_t chunkSize) - : pool_(pool), - inputStream_(std::move(stream)), - type_(std::move(nodeType)), - maxRepeat_(type_->maxRepeat_), - maxDefine_(type_->maxDefine_), - isTopLevel_(maxRepeat_ == 0 && maxDefine_ <= 1), - codec_(codec), - chunkSize_(chunkSize), - nullConcatenation_(pool_) { - type_->makeLevelInfo(leafInfo_); - dict_qpl_job_id = 0; - data_qpl_job_id = 0; - uncompressedDictData_ = nullptr; - uncompressedDataV1Data_ = nullptr; - } - - // This PageReader constructor is for unit test only. - IAAPageReader( - std::unique_ptr stream, - memory::MemoryPool& pool, - thrift::CompressionCodec::type codec, - int64_t chunkSize) - : pool_(pool), - inputStream_(std::move(stream)), - maxRepeat_(0), - maxDefine_(1), - isTopLevel_(maxRepeat_ == 0 && maxDefine_ <= 1), - codec_(codec), - chunkSize_(chunkSize), - nullConcatenation_(pool_) { - dict_qpl_job_id = 0; - data_qpl_job_id = 0; - } - - ~IAAPageReader(); - - /// Advances 'numRows' top level rows. - void skip(int64_t numRows); - void preDecompressPage(bool& need_pre_decompress); - - /// Decodes repdefs for 'numTopLevelRows'. Use getLengthsAndNulls() - /// to access the lengths and nulls for the different nesting - /// levels. - void decodeRepDefs(int32_t numTopLevelRows); - - /// Returns lengths and/or nulls from 'begin' to 'end' for the level of - /// 'info' using 'mode'. 'maxItems' is the maximum number of nulls and lengths - /// to produce. 'lengths' is only filled for mode kList. 'nulls' is filled - /// from bit position 'nullsStartIndex'. Returns the number of lengths/nulls - /// filled. - int32_t getLengthsAndNulls( - LevelMode mode, - const ::parquet::internal::LevelInfo& info, - int32_t begin, - int32_t end, - int32_t maxItems, - int32_t* FOLLY_NULLABLE lengths, - uint64_t* FOLLY_NULLABLE nulls, - int32_t nullsStartIndex) const; - - /// Applies 'visitor' to values in the ColumnChunk of 'this'. The - /// operation to perform and The operand rows are given by - /// 'visitor'. The rows are relative to the current position. The - /// current position after readWithVisitor is immediately after the - /// last accessed row. - template - void readWithVisitor(Visitor& visitor); - - // skips 'numValues' top level rows, touching null flags only. Non-null values - // are not prepared for reading. - void skipNullsOnly(int64_t numValues); - - /// Reads 'numValues' null flags into 'nulls' and advances the - /// decoders by as much. The read may span several pages. If there - /// are no nulls, buffer may be set to nullptr. - void readNullsOnly(int64_t numValues, BufferPtr& buffer); - - // Returns the current string dictionary as a FlatVector. - const VectorPtr& dictionaryValues(const TypePtr& type); - - // True if the current page holds dictionary indices. - bool isDictionary() const { - return encoding_ == thrift::Encoding::PLAIN_DICTIONARY || - encoding_ == thrift::Encoding::RLE_DICTIONARY; - } - - void clearDictionary() { - dictionary_.clear(); - dictionaryValues_.reset(); - } - - PageReaderType getType() { - return PageReaderType::IAA; - } - - /// Returns the range of repdefs for the top level rows covered by the last - /// decoderepDefs(). - std::pair repDefRange() const { - return {repDefBegin_, repDefEnd_}; - } - - // Parses the PageHeader at 'inputStream_', and move the bufferStart_ and - // bufferEnd_ to the corresponding positions. - thrift::PageHeader readPageHeader(); - - private: - // Indicates that we only want the repdefs for the next page. Used when - // prereading repdefs with seekToPage. - static constexpr int64_t kRepDefOnly = -1; - - // In 'numRowsInPage_', indicates that the page's def levels must be - // consulted to determine number of leaf values. - static constexpr int32_t kRowsUnknown = -1; - - // If the current page has nulls, returns a nulls bitmap owned by 'this'. This - // is filled for 'numRows' bits. - const uint64_t* FOLLY_NULLABLE readNulls(int32_t numRows, BufferPtr& buffer); - - void prepareDict(const thrift::PageHeader& pageHeader, bool job_success); - bool prepareData( - const thrift::PageHeader& pageHeader, - int64_t row, - bool job_success); - uint32_t DecompressAsync( - int64_t input_length, - const uint8_t* input, - int64_t output_buffer_length, - uint8_t* output, - bool isGzip); - - // Skips the define decoder, if any, for 'numValues' top level - // rows. Returns the number of non-nulls skipped. The range is the - // current page. - int32_t skipNulls(int32_t numRows); - - // Initializes a filter result cache for the dictionary in 'state'. - void makeFilterCache(dwio::common::ScanState& state); - - // Makes a decoder based on 'encoding_' for bytes from ''pageData_' to - // 'pageData_' + 'encodedDataSize_'. - void makedecoder(); - - // Reads and skips pages until finding a data page that contains - // 'row'. Reads and sets 'rowOfPage_' and 'numRowsInPage_' and - // initializes a decoder for the found page. row kRepDefOnly means - // getting repdefs for the next page. If non-top level column, 'row' - // is interpreted in terms of leaf rows, including leaf - // nulls. Seeking ahead of pages covered by decodeRepDefs is not - // allowed for non-top level columns. - void seekToPage(int64_t row); - - // Preloads the repdefs for the column chunk. To avoid preloading, - // would need a way too clone the input stream so that one stream - // reads ahead for repdefs and the other tracks the data. This is - // supported by CacheInputStream but not the other - // SeekableInputStreams. - void preloadRepDefs(); - - // Sets row number info after reading a page header. If 'forRepDef', - // does not set non-top level row numbers by repdefs. This is on - // when seeking a non-top level page for the first time, i.e. for - // getting the repdefs. - void setPageRowInfo(bool forRepDef); - - // Updates row position / rep defs consumed info to refer to the first of the - // next page. - void updateRowInfoAfterPageSkipped(); - - void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row); - void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row); - void prepareDictionary(const thrift::PageHeader& pageHeader); - void makeDecoder(); - - void prefetchDataPageV1(const thrift::PageHeader& pageHeader); - void prefetchDataPageV2(const thrift::PageHeader& pageHeader); - void prefetchDictionary(const thrift::PageHeader& pageHeader); - bool waitQplJob(uint32_t job_id); - int getGzipWindowSize(const uint8_t* stream_ptr, uint32_t stream_size); - - // For a non-top level leaf, reads the defs and sets 'leafNulls_' and - // 'numRowsInPage_' accordingly. This is used for non-top level leaves when - // 'hasChunkRepDefs_' is false. - void readPageDefLevels(); - - // Returns a pointer to contiguous space for the next 'size' bytes - // from current position. Copies data into 'copy' if the range - // straddles buffers. Allocates or resizes 'copy' as needed. - const char* FOLLY_NONNULL readBytes(int32_t size, BufferPtr& copy); - - // Decompresses data starting at 'pageData_', consuming 'compressedsize' and - // producing up to 'uncompressedSize' bytes. The The start of the decoding - // result is returned. an intermediate copy may be made in 'uncompresseddata_' - const char* FOLLY_NONNULL uncompressData( - const char* FOLLY_NONNULL pageData, - uint32_t compressedSize, - uint32_t uncompressedSize); - - const bool FOLLY_NONNULL uncompressQplData( - const char* FOLLY_NONNULL pageData, - uint32_t compressedSize, - uint32_t uncompressedSize, - BufferPtr& uncompressedData, - uint32_t& qpl_job_id); - - template - T readField(const char* FOLLY_NONNULL& ptr) { - T data = *reinterpret_cast(ptr); - ptr += sizeof(T); - return data; - } - - // Starts iterating over 'rows', which may span multiple pages. 'rows' are - // relative to current position, with 0 meaning the first - // unprocessed value in the current page, i.e. the row after the - // last row touched on a previous call to skip() or - // readWithVisitor(). This is the first row of the first data page - // if first call. - void startVisit(folly::Range rows); - - // Seeks to the next page in a range given by startVisit(). Returns - // true if there are unprocessed rows in the set given to - // startVisit(). Seeks 'this' to the appropriate page and sets - // 'rowsForPage' to refer to the subset of 'rows' that are on the - // current page. The numbers in rowsForPage are relative to the - // first unprocessed value on the page, for a new page 0 means the - // first value. Reads possible nulls and sets 'reader's - // nullsInReadRange_' to that or to nullptr if no null - // flags. Returns the data of nullsInReadRange in 'nulls'. Copies - // dictionary information into 'reader'. If 'hasFilter' is true, - // sets up dictionary hit cache. If the new page is direct and - // previous pages are dictionary, converts any accumulated results - // into flat. 'mayProduceNulls' should be true if nulls may occur in - // the result if they occur in the data. - bool rowsForPage( - dwio::common::SelectiveColumnReader& reader, - bool hasFilter, - bool mayProduceNulls, - folly::Range& rows, - const uint64_t* FOLLY_NULLABLE& nulls); - - // Calls the visitor, specialized on the data type since not all visitors - // apply to all types. - template < - typename Visitor, - typename std::enable_if< - !std::is_same_v && - !std::is_same_v, - int>::type = 0> - void callDecoder( - const uint64_t* FOLLY_NULLABLE nulls, - bool& nullsFromFastPath, - Visitor visitor) { - if (nulls) { - nullsFromFastPath = dwio::common::useFastPath(visitor) && - (!this->type_->type->isLongDecimal()) && - (this->type_->type->isShortDecimal() ? isDictionary() : true); - - if (isDictionary()) { - auto dictVisitor = visitor.toDictionaryColumnVisitor(); - dictionaryIdDecoder_->readWithVisitor(nulls, dictVisitor); - } else { - directDecoder_->readWithVisitor( - nulls, visitor, nullsFromFastPath); - } - } else { - if (isDictionary()) { - auto dictVisitor = visitor.toDictionaryColumnVisitor(); - dictionaryIdDecoder_->readWithVisitor(nullptr, dictVisitor); - } else { - directDecoder_->readWithVisitor( - nulls, visitor, !this->type_->type->isShortDecimal()); - } - } - } - - template < - typename Visitor, - typename std::enable_if< - std::is_same_v, - int>::type = 0> - void callDecoder( - const uint64_t* FOLLY_NULLABLE nulls, - bool& nullsFromFastPath, - Visitor visitor) { - if (nulls) { - if (isDictionary()) { - nullsFromFastPath = dwio::common::useFastPath(visitor); - auto dictVisitor = visitor.toStringDictionaryColumnVisitor(); - dictionaryIdDecoder_->readWithVisitor(nulls, dictVisitor); - } else { - nullsFromFastPath = false; - stringDecoder_->readWithVisitor(nulls, visitor); - } - } else { - if (isDictionary()) { - auto dictVisitor = visitor.toStringDictionaryColumnVisitor(); - dictionaryIdDecoder_->readWithVisitor(nullptr, dictVisitor); - } else { - stringDecoder_->readWithVisitor(nulls, visitor); - } - } - } - - template < - typename Visitor, - typename std::enable_if< - std::is_same_v, - int>::type = 0> - void callDecoder( - const uint64_t* FOLLY_NULLABLE nulls, - bool& nullsFromFastPath, - Visitor visitor) { - VELOX_CHECK(!isDictionary(), "BOOLEAN types are never dictionary-encoded") - if (nulls) { - nullsFromFastPath = false; - booleanDecoder_->readWithVisitor(nulls, visitor); - } else { - booleanDecoder_->readWithVisitor(nulls, visitor); - } - } - - // Returns the number of passed rows/values gathered by - // 'reader'. Only numRows() is set for a filter-only case, only - // numValues() is set for a non-filtered case. - template - static int32_t numRowsInReader( - const dwio::common::SelectiveColumnReader& reader) { - if (hasFilter) { - return reader.numRows(); - } else { - return reader.numValues(); - } - } - - memory::MemoryPool& pool_; - - std::unique_ptr inputStream_; - ParquetTypeWithIdPtr type_; - const int32_t maxRepeat_; - const int32_t maxDefine_; - const bool isTopLevel_; - - const thrift::CompressionCodec::type codec_; - const int64_t chunkSize_; - const char* FOLLY_NULLABLE bufferStart_{nullptr}; - const char* FOLLY_NULLABLE bufferEnd_{nullptr}; - BufferPtr tempNulls_; - BufferPtr nullsInReadRange_; - BufferPtr multiPageNulls_; - // Decoder for single bit definition levels. the arrow decoders are used for - // multibit levels pending fixing RleBpDecoder for the case. - std::unique_ptr defineDecoder_; - std::unique_ptr repeatDecoder_; - std::unique_ptr wideDefineDecoder_; - - // True for a leaf column for which repdefs are loaded for the whole column - // chunk. This is typically the leaftmost leaf of a list. Other leaves under - // the list can read repdefs as they go since the list lengths are already - // known. - bool hasChunkRepDefs_{false}; - - // index of current page in 'numLeavesInPage_' -1 means before first page. - int32_t pageIndex_{-1}; - - // Number of leaf values in each data page of column chunk. - std::vector numLeavesInPage_; - - // First position in '*levels_' for the range of last decodeRepDefs(). - int32_t repDefBegin_{0}; - - // First position in '*levels_' after the range covered in last - // decodeRepDefs(). - int32_t repDefEnd_{0}; - - // Definition levels for the column chunk. - raw_vector definitionLevels_; - - // Repetition levels for the column chunk. - raw_vector repetitionLevels_; - - // Number of valid bits in 'leafNulls_' - int32_t leafNullsSize_{0}; - - // Number of leaf nulls read. - int32_t numLeafNullsConsumed_{0}; - - // Leaf nulls extracted from 'repetitionLevels_/definitionLevels_' - raw_vector leafNulls_; - - // Encoding of current page. - thrift::Encoding::type encoding_; - - // Row number of first value in current page from start of ColumnChunk. - int64_t rowOfPage_{0}; - - // Number of rows in current page. - int32_t numRowsInPage_{0}; - - // Number of repdefs in page. Not the same as number of rows for a non-top - // level column. - int32_t numRepDefsInPage_{0}; - - // Copy of data if data straddles buffer boundary. - BufferPtr pageBuffer_; - - // Uncompressed data for the page. Rep-def-data in V1, data alone in V2. - BufferPtr uncompressedData_; - BufferPtr uncompressedDictData_; - // char* uncompressedDictData_; - BufferPtr uncompressedDataV1Data_; - // char* uncompressedDataV1Data_; - - // First byte of uncompressed encoded data. Contains the encoded data as a - // contiguous run of bytes. - const char* FOLLY_NULLABLE pageData_{nullptr}; - - // Dictionary contents. - dwio::common::DictionaryValues dictionary_; - thrift::Encoding::type dictionaryEncoding_; - - // Offset of current page's header from start of ColumnChunk. - uint64_t pageStart_{0}; - - // Offset of first byte after current page' header. - uint64_t pageDataStart_{0}; - - // Number of bytes starting at pageData_ for current encoded data. - int32_t encodedDataSize_{0}; - - // Below members Keep state between calls to readWithVisitor(). - - // Original rows in Visitor. - const vector_size_t* FOLLY_NULLABLE visitorRows_{nullptr}; - int32_t numVisitorRows_{0}; - - // 'rowOfPage_' at the start of readWithVisitor(). - int64_t initialRowOfPage_{0}; - - // Index in 'visitorRows_' for the first row that is beyond the - // current page. Equals 'numVisitorRows_' if all are on current page. - int32_t currentVisitorRow_{0}; - - // Row relative to ColumnChunk for first unvisited row. 0 if nothing - // visited. The rows passed to readWithVisitor from rowsForPage() - // are relative to this. - int64_t firstUnvisited_{0}; - - // Offset of 'visitorRows_[0]' relative too start of ColumnChunk. - int64_t visitBase_{0}; - - // Temporary for rewriting rows to access in readWithVisitor when moving - // between pages. Initialized from the visitor. - raw_vector* FOLLY_NULLABLE rowsCopy_{nullptr}; - - // If 'rowsCopy_' is used, this is the difference between the rows in - // 'rowsCopy_' and the row numbers in 'rows' given to readWithVisitor(). - int32_t rowNumberBias_{0}; - - // Manages concatenating null flags read from multiple pages. If a - // readWithVisitor is contined in one page, the visitor places the - // nulls in the reader. If many pages are covered, some with and - // some without nulls, we must make a a concatenated null flags to - // return to the caller. - dwio::common::BitConcatenation nullConcatenation_; - - // LevelInfo for reading nulls for the leaf column 'this' represents. - ::parquet::internal::LevelInfo leafInfo_; - - // Base values of dictionary when reading a string dictionary. - VectorPtr dictionaryValues_; - - // Decoders. Only one will be set at a time. - std::unique_ptr> directDecoder_; - std::unique_ptr dictionaryIdDecoder_; - std::unique_ptr stringDecoder_; - std::unique_ptr booleanDecoder_; - // Add decoders for other encodings here. - - thrift::PageHeader dictPageHeader_; - const char* FOLLY_NULLABLE dictPageData_{nullptr}; - bool needUncompressDict; - - thrift::PageHeader dataPageHeader_; - const char* FOLLY_NULLABLE dataPageData_{nullptr}; - - uint32_t dict_qpl_job_id; - uint32_t data_qpl_job_id; - - bool pre_decompress_dict = false; - bool pre_decompress_data = false; - bool isWinSizeFit = false; - static constexpr uint8_t CM_ZLIB_DEFAULT_VALUE = 8u; - static constexpr uint32_t ZLIB_MIN_HEADER_SIZE = 2u; - static constexpr uint32_t ZLIB_INFO_OFFSET = 4u; -}; - -template -void IAAPageReader::readWithVisitor(Visitor& visitor) { - constexpr bool hasFilter = - !std::is_same_v; - constexpr bool filterOnly = - std::is_same_v; - bool mayProduceNulls = !filterOnly && visitor.allowNulls(); - auto rows = visitor.rows(); - auto numRows = visitor.numRows(); - auto& reader = visitor.reader(); - startVisit(folly::Range(rows, numRows)); - rowsCopy_ = &visitor.rowsCopy(); - folly::Range pageRows; - const uint64_t* nulls = nullptr; - bool isMultiPage = false; - while (rowsForPage(reader, hasFilter, mayProduceNulls, pageRows, nulls)) { - bool nullsFromFastPath = false; - int32_t numValuesBeforePage = numRowsInReader(reader); - visitor.setNumValuesBias(numValuesBeforePage); - visitor.setRows(pageRows); - callDecoder(nulls, nullsFromFastPath, visitor); - if (currentVisitorRow_ < numVisitorRows_ || isMultiPage) { - if (mayProduceNulls) { - if (!isMultiPage) { - // Do not reuse nulls concatenation buffer if previous - // results are hanging on to it. - if (multiPageNulls_ && !multiPageNulls_->unique()) { - multiPageNulls_ = nullptr; - } - nullConcatenation_.reset(multiPageNulls_); - } - if (!nulls) { - nullConcatenation_.appendOnes( - numRowsInReader(reader) - numValuesBeforePage); - } else if (reader.returnReaderNulls()) { - // Nulls from decoding go directly to result. - nullConcatenation_.append( - reader.nullsInReadRange()->template as(), - 0, - numRowsInReader(reader) - numValuesBeforePage); - } else { - // Add the nulls produced from the decoder to the result. - auto firstNullIndex = nullsFromFastPath ? 0 : numValuesBeforePage; - nullConcatenation_.append( - reader.mutableNulls(0), - firstNullIndex, - firstNullIndex + numRowsInReader(reader) - - numValuesBeforePage); - } - } - isMultiPage = true; - } - // The passing rows on non-first pages are relative to the start - // of the page, adjust them to be relative to start of this - // read. This can happen on the first processed page as well if - // the first page of scan did not contain any of the rows to - // visit. - if (hasFilter && rowNumberBias_) { - reader.offsetOutputRows(numValuesBeforePage, rowNumberBias_); - } - } - if (isMultiPage) { - reader.setNulls(mayProduceNulls ? nullConcatenation_.buffer() : nullptr); - } -} - -} // namespace facebook::velox::parquet - -#endif \ No newline at end of file diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 5b83f77b2ea51..4bce8ad4b67e4 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -34,11 +34,74 @@ namespace facebook::velox::parquet { using thrift::Encoding; using thrift::PageHeader; +void PageReader::preDecompressPage(bool& need_pre_decompress) { + if (codec_ != thrift::CompressionCodec::GZIP) { + need_pre_decompress = false; + return; + } + for (;;) { + auto dataStart = pageStart_; + if (chunkSize_ <= pageStart_) { + // This may happen if seeking to exactly end of row group. + numRepDefsInPage_ = 0; + numRowsInPage_ = 0; + break; + } + PageHeader pageHeader = readPageHeader(); + pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; + switch (pageHeader.type) { + case thrift::PageType::DATA_PAGE: + prefetchDataPageV1(pageHeader); + break; + case thrift::PageType::DATA_PAGE_V2: + prefetchDataPageV2(pageHeader); + break; + case thrift::PageType::DICTIONARY_PAGE: + prefetchDictionary(pageHeader); + continue; + default: + break; // ignore INDEX page type and any other custom extensions + } + break; + } + need_pre_decompress = isWinSizeFit; +} + +bool PageReader::seekToPreDecompPage(int64_t row) { + bool has_qpl = false; + if (dict_qpl_job_id != 0) { + bool job_success = getDecompRes(dict_qpl_job_id); + prepareDictionary(dictPageHeader_, job_success); + dict_qpl_job_id = 0; + has_qpl = true; + } + + if (this->data_qpl_job_id != 0) { + bool job_success = getDecompRes(data_qpl_job_id); + prepareDataPageV1(dataPageHeader_, row, job_success); + data_qpl_job_id = 0; + has_qpl = true; + } + + if (has_qpl) { + if (row == kRepDefOnly || row < rowOfPage_ + numRowsInPage_) { + return true; + } + updateRowInfoAfterPageSkipped(); + } + return false; +} + void PageReader::seekToPage(int64_t row) { defineDecoder_.reset(); repeatDecoder_.reset(); // 'rowOfPage_' is the row number of the first row of the next page. rowOfPage_ += numRowsInPage_; + + if (seekToPreDecompPage(row)) { + return; + } + for (;;) { auto dataStart = pageStart_; if (chunkSize_ <= pageStart_) { @@ -304,7 +367,10 @@ void PageReader::updateRowInfoAfterPageSkipped() { } } -void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { +void PageReader::prepareDataPageV1( + const PageHeader& pageHeader, + int64_t row, + bool job_success) { VELOX_CHECK( pageHeader.type == thrift::PageType::DATA_PAGE && pageHeader.__isset.data_page_header); @@ -320,12 +386,20 @@ void PageReader::prepareDataPageV1(const PageHeader& pageHeader, int64_t row) { return; } - pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - pageData_ = decompressData( - pageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); + + if (data_qpl_job_id != 0 && pre_decompress_data && job_success) { + pageData_ = uncompressedDataV1Data_->as(); + } else { + if (data_qpl_job_id == 0) { + dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + } + pageData_ = decompressData( + dataPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; + if (maxRepeat_ > 0) { uint32_t repeatLength = readField(pageData_); repeatDecoder_ = std::make_unique( @@ -422,7 +496,9 @@ void PageReader::prepareDataPageV2(const PageHeader& pageHeader, int64_t row) { } } -void PageReader::prepareDictionary(const PageHeader& pageHeader) { +void PageReader::prepareDictionary( + const PageHeader& pageHeader, + bool job_success) { dictionary_.numValues = pageHeader.dictionary_page_header.num_values; dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; dictionary_.sorted = pageHeader.dictionary_page_header.__isset.is_sorted && @@ -432,11 +508,17 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { dictionaryEncoding_ == Encoding::PLAIN); if (codec_ != thrift::CompressionCodec::UNCOMPRESSED) { - pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); - pageData_ = decompressData( - pageData_, - pageHeader.compressed_page_size, - pageHeader.uncompressed_page_size); + if (dict_qpl_job_id != 0 && pre_decompress_dict && job_success) { + pageData_ = uncompressedDictData_->as(); + } else { + if (dict_qpl_job_id == 0) { + dictPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + } + pageData_ = decompressData( + dictPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } } auto parquetType = type_->parquetType_.value(); @@ -994,4 +1076,91 @@ const VectorPtr& PageReader::dictionaryValues(const TypePtr& type) { return dictionaryValues_; } +void PageReader::prefetchDataPageV1(const thrift::PageHeader& pageHeader) { + dataPageHeader_ = pageHeader; + VELOX_CHECK( + pageHeader.type == thrift::PageType::DATA_PAGE && + pageHeader.__isset.data_page_header); + + dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + pre_decompress_data = iaaDecompressGzip( + dataPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + uncompressedDataV1Data_, + data_qpl_job_id); + return; +} + +void PageReader::prefetchDataPageV2(const thrift::PageHeader& pageHeader) { + return; +} + +void PageReader::prefetchDictionary(const thrift::PageHeader& pageHeader) { + dictPageHeader_ = pageHeader; + dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; + VELOX_CHECK( + dictionaryEncoding_ == Encoding::PLAIN_DICTIONARY || + dictionaryEncoding_ == Encoding::PLAIN); + dictPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + + pre_decompress_dict = iaaDecompressGzip( + dictPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + uncompressedDictData_, + dict_qpl_job_id); + + return; +} + +const bool FOLLY_NONNULL PageReader::iaaDecompressGzip( + const char* pageData, + uint32_t compressedSize, + uint32_t uncompressedSize, + BufferPtr& uncompressedData, + int& qpl_job_id) { + dwio::common::ensureCapacity( + uncompressedData, uncompressedSize, &pool_); + + if (!isWinSizeFit) { + // window size should be 4KB for IAA + if (dwio::common::compression::Compressor::PARQUET_ZLIB_WINDOW_BITS_4KB == + dwio::common::compression::getZlibWindowBits( + (const uint8_t*)pageData, uncompressedSize)) { + isWinSizeFit = true; + } else { + qpl_job_id = -1; + return false; + } + } + auto streamDebugInfo = + fmt::format("Page Reader: Stream {}", inputStream_->getName()); + std::unique_ptr decompressor = + dwio::common::compression::createAsyncDecompressor( + ThriftCodecToCompressionKind(codec_), + uncompressedSize, + streamDebugInfo); + if (decompressor == nullptr) { + return false; + } + qpl_job_id = decompressor->decompress( + (const char*)pageData, + compressedSize, + (char*)uncompressedData->asMutable(), + uncompressedSize); + if (qpl_job_id < 0) { + return false; + } + return true; +} + +const bool FOLLY_NONNULL PageReader::getDecompRes(int job_id) { + auto streamDebugInfo = + fmt::format("Page Reader: Stream {}", inputStream_->getName()); + std::unique_ptr decompressor = + dwio::common::compression::createAsyncDecompressor( + ThriftCodecToCompressionKind(codec_), 0, streamDebugInfo); + return decompressor->waitResult(job_id); +} } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index d8c7729016a5c..570680dc0d81d 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -22,7 +22,6 @@ #include "velox/dwio/common/DirectDecoder.h" #include "velox/dwio/common/SelectiveColumnReader.h" #include "velox/dwio/parquet/reader/BooleanDecoder.h" -#include "velox/dwio/parquet/reader/PageReaderBase.h" #include "velox/dwio/parquet/reader/ParquetTypeWithId.h" #include "velox/dwio/parquet/reader/RleBpDataDecoder.h" #include "velox/dwio/parquet/reader/StringDecoder.h" @@ -33,7 +32,7 @@ namespace facebook::velox::parquet { /// Manages access to pages inside a ColumnChunk. Interprets page headers and /// encodings and presents the combination of pages and encoded values as a /// continuous stream accessible via readWithVisitor(). -class PageReader : public parquet::PageReaderBase { +class PageReader { public: PageReader( std::unique_ptr stream, @@ -51,6 +50,10 @@ class PageReader : public parquet::PageReaderBase { chunkSize_(chunkSize), nullConcatenation_(pool_) { type_->makeLevelInfo(leafInfo_); + dict_qpl_job_id = 0; + data_qpl_job_id = 0; + uncompressedDictData_ = nullptr; + uncompressedDataV1Data_ = nullptr; } // This PageReader constructor is for unit test only. @@ -102,9 +105,19 @@ class PageReader : public parquet::PageReaderBase { /// Advances 'numRows' top level rows. void skip(int64_t numRows); - PageReaderType getType() { - return PageReaderType::COMMON; - } + /// Pre-decompress GZIP page with IAA + void preDecompressPage(bool& need_pre_decompress); + void prefetchDataPageV1(const thrift::PageHeader& pageHeader); + void prefetchDataPageV2(const thrift::PageHeader& pageHeader); + void prefetchDictionary(const thrift::PageHeader& pageHeader); + const bool FOLLY_NONNULL getDecompRes(int job_id); + bool seekToPreDecompPage(int64_t row); + const bool FOLLY_NONNULL iaaDecompressGzip( + const char* FOLLY_NONNULL pageData, + uint32_t compressedSize, + uint32_t uncompressedSize, + BufferPtr& uncompressedData, + int& qpl_job_id); /// Decodes repdefs for 'numTopLevelRows'. Use getLengthsAndNulls() /// to access the lengths and nulls for the different nesting @@ -218,9 +231,14 @@ class PageReader : public parquet::PageReaderBase { // next page. void updateRowInfoAfterPageSkipped(); - void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row); + void prepareDataPageV1( + const thrift::PageHeader& pageHeader, + int64_t row, + bool job_success = false); void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row); - void prepareDictionary(const thrift::PageHeader& pageHeader); + void prepareDictionary( + const thrift::PageHeader& pageHeader, + bool job_success = false); void makeDecoder(); // For a non-top level leaf, reads the defs and sets 'leafNulls_' and @@ -510,6 +528,23 @@ class PageReader : public parquet::PageReaderBase { std::unique_ptr stringDecoder_; std::unique_ptr booleanDecoder_; // Add decoders for other encodings here. + + // Used for pre-decompress + BufferPtr uncompressedDictData_; + BufferPtr uncompressedDataV1Data_; + thrift::PageHeader dictPageHeader_; + const char* FOLLY_NULLABLE dictPageData_{nullptr}; + bool needUncompressDict; + + thrift::PageHeader dataPageHeader_; + const char* FOLLY_NULLABLE dataPageData_{nullptr}; + + int dict_qpl_job_id; + int data_qpl_job_id; + + bool pre_decompress_dict = false; + bool pre_decompress_data = false; + bool isWinSizeFit = false; }; template diff --git a/velox/dwio/parquet/reader/PageReaderBase.h b/velox/dwio/parquet/reader/PageReaderBase.h deleted file mode 100644 index 7534dca843e8b..0000000000000 --- a/velox/dwio/parquet/reader/PageReaderBase.h +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include "velox/dwio/common/BitConcatenation.h" -#include "velox/dwio/common/DirectDecoder.h" -#include "velox/dwio/common/SelectiveColumnReader.h" -#include "velox/dwio/parquet/reader/BooleanDecoder.h" -#include "velox/dwio/parquet/reader/ParquetTypeWithId.h" -#include "velox/dwio/parquet/reader/RleBpDataDecoder.h" -#include "velox/dwio/parquet/reader/StringDecoder.h" -#include "velox/vector/BaseVector.h" - -namespace facebook::velox::parquet { -enum PageReaderType { COMMON = 0, IAA = 1 }; -/** - * Abstract page reader class. - * - * Reader object is used to process a page. - * - */ -class PageReaderBase { - public: - virtual PageReaderType getType() = 0; - virtual ~PageReaderBase() = default; - - /** - * Skips the define decoder, if any, for 'numValues' top level - * rows. Returns the number of non-nulls skipped. The range is the - * current page. - * @return the rows numbers skiped - */ - virtual int32_t skipNulls(int32_t numRows) = 0; - virtual void skipNullsOnly(int64_t numValues) = 0; - - /** - * Advances 'numRows' top level rows. - * @param numRows - */ - virtual void skip(int64_t numRows) = 0; - - /* Applies 'visitor' to values in the ColumnChunk of 'this'. The - * operation to perform and The operand rows are given by - * 'visitor'. The rows are relative to the current position. The - * current position after readWithVisitor is immediately after the - * last accessed row. - */ - // template - // virtual readWithVisitor(Visitor& visitor) = 0; - - virtual void clearDictionary() = 0; - - /* True if the current page holds dictionary indices. - */ - virtual bool isDictionary() const = 0; - - /* Reads 'numValues' null flags into 'nulls' and advances the - * decoders by as much. The read may span several pages. If there - * are no nulls, buffer may be set to nullptr. - */ - virtual void readNullsOnly(int64_t numValues, BufferPtr& buffer) = 0; - ; - - virtual const VectorPtr& dictionaryValues(const TypePtr& type) = 0; -}; -} // namespace facebook::velox::parquet \ No newline at end of file diff --git a/velox/dwio/parquet/reader/ParquetData.cpp b/velox/dwio/parquet/reader/ParquetData.cpp index 7635de06ae750..274aa03684d40 100644 --- a/velox/dwio/parquet/reader/ParquetData.cpp +++ b/velox/dwio/parquet/reader/ParquetData.cpp @@ -16,6 +16,9 @@ #include "velox/dwio/parquet/reader/ParquetData.h" #include "velox/dwio/parquet/reader/Statistics.h" +#ifdef VELOX_ENABLE_QPL +#include "velox/dwio/common/QplJobPool.h" +#endif namespace facebook::velox::parquet { @@ -84,30 +87,30 @@ bool ParquetData::rowGroupMatches( bool ParquetData::preDecompRowGroup(uint32_t index) { #ifdef VELOX_ENABLE_QPL - if (!dwio::common::QplJobHWPool::GetInstance().job_ready() || - !needPreDecomp) { - LOG(WARNING) << "QPL Job not ready or zlib window size(" << needPreDecomp - << ") is not 4KB"; + if (!dwio::common::QplJobHWPool::GetInstance().job_ready()) { return false; } - +#else + return false; +#endif auto& chunk = rowGroups_[index].columns[type_->column]; auto& metaData = chunk.meta_data; - if (metaData.codec == thrift::CompressionCodec::GZIP) { - bool isWinSizeFit; - pageReaders_.resize(rowGroups_.size()); - auto iaaPageReader = std::make_unique( - std::move(streams_[index]), - pool_, - type_, - metaData.codec, - metaData.total_compressed_size); - iaaPageReader->preDecompressPage(needPreDecomp); - pageReaders_[index] = std::move(iaaPageReader); + if (metaData.codec != thrift::CompressionCodec::GZIP || !needPreDecomp) { + LOG(WARNING) << "QPL Job not ready or zlib window size(" << needPreDecomp + << ") is not 4KB"; + return false; } + + pageReaders_.resize(rowGroups_.size()); + auto iaaPageReader = std::make_unique( + std::move(streams_[index]), + pool_, + type_, + metaData.codec, + metaData.total_compressed_size); + iaaPageReader->preDecompressPage(needPreDecomp); + pageReaders_[index] = std::move(iaaPageReader); return needPreDecomp; -#endif - return false; } void ParquetData::enqueueRowGroup( diff --git a/velox/dwio/parquet/reader/ParquetData.h b/velox/dwio/parquet/reader/ParquetData.h index f801cea08c377..2c502a77e41c2 100644 --- a/velox/dwio/parquet/reader/ParquetData.h +++ b/velox/dwio/parquet/reader/ParquetData.h @@ -21,7 +21,6 @@ #include "velox/dwio/common/BufferUtil.h" #include "velox/dwio/common/BufferedInput.h" #include "velox/dwio/common/ScanSpec.h" -#include "velox/dwio/parquet/reader/IAAPageReader.h" #include "velox/dwio/parquet/reader/PageReader.h" #include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" #include "velox/dwio/parquet/thrift/ThriftTransport.h" @@ -71,7 +70,7 @@ class ParquetData : public dwio::common::FormatData { FilterRowGroupsResult&) override; PageReader* FOLLY_NONNULL reader() const { - return dynamic_cast(reader_.get()); + return reader_.get(); } // Reads null flags for 'numValues' next top level rows. The first 'numValues' @@ -157,13 +156,7 @@ class ParquetData : public dwio::common::FormatData { /// PageReader::readWithVisitor(). template void readWithVisitor(Visitor visitor) { -#ifdef VELOX_ENABLE_QPL - if (reader_->getType() == PageReaderType::IAA) { - return dynamic_cast(reader_.get()) - ->readWithVisitor(visitor); - } -#endif - dynamic_cast(reader_.get())->readWithVisitor(visitor); + reader_->readWithVisitor(visitor); } const VectorPtr& dictionaryValues(const TypePtr& type) { @@ -201,11 +194,9 @@ class ParquetData : public dwio::common::FormatData { const uint32_t maxDefine_; const uint32_t maxRepeat_; int64_t rowsInRowGroup_; - std::unique_ptr reader_; -#ifdef VELOX_ENABLE_QPL - std::vector> pageReaders_; + std::unique_ptr reader_; + std::vector> pageReaders_; bool needPreDecomp = true; -#endif // Nulls derived from leaf repdefs for non-leaf readers. BufferPtr presetNulls_; diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index 0c90dfce3d374..4b974b9330c07 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -130,7 +130,6 @@ void StructColumnReader::enqueueRowGroup( } bool StructColumnReader::preDecompRowGroup(uint32_t index) { -#ifdef VELOX_ENABLE_QPL for (auto& child : children_) { if (!needPreDecomp) { return false; @@ -147,8 +146,6 @@ bool StructColumnReader::preDecompRowGroup(uint32_t index) { } } return needPreDecomp; -#endif - return false; } void StructColumnReader::seekToRowGroup(uint32_t index) {