diff --git a/CMakeLists.txt b/CMakeLists.txt index f184d1093033c..3b023cf6c9756 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -88,6 +88,7 @@ option( VELOX_ENABLE_INT64_BUILD_PARTITION_BOUND "make buildPartitionBounds_ a vector int64 instead of int32 to avoid integer overflow when the hashtable has billions of records" OFF) +option(VELOX_ENABLE_QPL "Enable Intel QPL support" OFF) if(${VELOX_BUILD_MINIMAL}) # Enable and disable components for velox base build @@ -221,6 +222,11 @@ if(VELOX_ENABLE_REMOTE_FUNCTIONS) find_package(FBThrift CONFIG REQUIRED) endif() +if(VELOX_ENABLE_QPL) + add_definitions(-DVELOX_ENABLE_QPL) + message(STATUS "add VELOX_ENABLE_QPL") +endif() + # Turn on Codegen only for Clang and non Mac systems. if((NOT DEFINED VELOX_CODEGEN_SUPPORT) AND (CMAKE_CXX_COMPILER_ID MATCHES "Clang") diff --git a/third_party/CMakeLists.txt b/third_party/CMakeLists.txt index 0233f77b681a3..73bd7ac8f8bb1 100644 --- a/third_party/CMakeLists.txt +++ b/third_party/CMakeLists.txt @@ -91,3 +91,44 @@ if(VELOX_ENABLE_ARROW) ${ARROW_LIBDIR}/libparquet.a) endif() + +if(VELOX_ENABLE_QPL) + message(STATUS "Building QPL from source") + set(QPL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qpl_ep/install") + set(QPL_STATIC_LIB_NAME ${CMAKE_STATIC_LIBRARY_PREFIX}qpl${CMAKE_STATIC_LIBRARY_SUFFIX}) + set(QPL_STATIC_LIB "${QPL_PREFIX}/lib/${QPL_STATIC_LIB_NAME}") + + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -ldl -laccel-config -L/usr/lib64") + + set(QPL_CMAKE_ARGS + -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + -DCMAKE_INSTALL_LIBDIR=${QPL_PREFIX}/lib + -DCMAKE_INSTALL_PREFIX=${QPL_PREFIX} + -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} + -DEFFICIENT_WAIT=OFF + -DQPL_BUILD_TESTS=OFF + -DCMAKE_C_COMPILER=gcc + -DCMAKE_CXX_COMPILER=g++ + -DQPL_LIB=ON) + +# todo: qpl newest version will released at Sep, to be updated + ExternalProject_Add(qpl_ep + ${EP_LOG_OPTIONS} + URL https://github.com/intel/qpl/archive/refs/tags/v1.2.0.tar.gz + BUILD_BYPRODUCTS "${QPL_STATIC_LIB}" + CMAKE_ARGS ${QPL_CMAKE_ARGS} + ) + + file(MAKE_DIRECTORY "${QPL_PREFIX}/include") + + add_library(qpl::qpl STATIC IMPORTED GLOBAL) + set(QPL_LIBRARIES ${QPL_STATIC_LIB}) + set(QPL_INCLUDE_DIRS "${QPL_PREFIX}/include") + target_link_libraries(qpl::qpl INTERFACE /usr/lib64/libaccel-config.so) + set_target_properties(qpl::qpl + PROPERTIES IMPORTED_LOCATION ${QPL_LIBRARIES} + INTERFACE_INCLUDE_DIRECTORIES ${QPL_INCLUDE_DIRS}) + + add_dependencies(qpl::qpl qpl_ep) + +endif() diff --git a/velox/dwio/common/BitConcatenation.h b/velox/dwio/common/BitConcatenation.h index cc812035d8b82..99e37bb2a2463 100644 --- a/velox/dwio/common/BitConcatenation.h +++ b/velox/dwio/common/BitConcatenation.h @@ -15,6 +15,7 @@ */ #include "velox/buffer/Buffer.h" +#pragma once namespace facebook::velox::dwio::common { diff --git a/velox/dwio/common/CMakeLists.txt b/velox/dwio/common/CMakeLists.txt index 4d041b43bc575..a22faa33a710a 100644 --- a/velox/dwio/common/CMakeLists.txt +++ b/velox/dwio/common/CMakeLists.txt @@ -41,6 +41,7 @@ add_library( MetadataFilter.cpp Options.cpp OutputStream.cpp + QplJobPool.cpp Range.cpp Reader.cpp ReaderFactory.cpp @@ -56,6 +57,12 @@ add_library( WriterFactory.cpp) target_include_directories(velox_dwio_common PRIVATE ${Protobuf_INCLUDE_DIRS}) +set(QPL_STATIC_LINK_LIBS ${QPL_STATIC_LINK_LIBS}) +if(VELOX_ENABLE_QPL) + list(APPEND QPL_STATIC_LINK_LIBS qpl::qpl) +endif() + +message(STATUS "QPL_STATIC_LINK_LIBS: ${QPL_STATIC_LINK_LIBS}") target_link_libraries( velox_dwio_common @@ -70,4 +77,5 @@ target_link_libraries( velox_memory Boost::regex Folly::folly + ${QPL_STATIC_LINK_LIBS} glog::glog) diff --git a/velox/dwio/common/QplJobPool.cpp b/velox/dwio/common/QplJobPool.cpp new file mode 100644 index 0000000000000..34b079ab7ae4f --- /dev/null +++ b/velox/dwio/common/QplJobPool.cpp @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "velox/dwio/common/QplJobPool.h" +#include +#include +#include "velox/common/base/Exceptions.h" + +#ifdef VELOX_ENABLE_QPL + +namespace facebook::velox::dwio::common { + +std::array + QplJobHWPool::hw_job_ptr_pool; +std::array, QplJobHWPool::MAX_JOB_NUMBER> + QplJobHWPool::hw_job_ptr_locks; +bool QplJobHWPool::iaa_job_ready = false; +std::unique_ptr QplJobHWPool::hw_jobs_buffer; + +// static QplJobHWPool pool = QplJobHWPool::GetInstance(); + +QplJobHWPool& QplJobHWPool::GetInstance() { + static QplJobHWPool pool; + return pool; +} + +QplJobHWPool::QplJobHWPool() { + if (!iaa_job_ready) { + (void)AllocateQPLJob(); + } +} + +QplJobHWPool::~QplJobHWPool() { + for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) { + if (hw_job_ptr_pool[i]) { + qpl_fini_job(hw_job_ptr_pool[i]); + hw_job_ptr_pool[i] = nullptr; + } + } + iaa_job_ready = false; +} + +bool QplJobHWPool::AllocateQPLJob() { + uint32_t job_size = 0; + + /// Get size required for saving a single qpl job object + qpl_get_job_size(qpl_path, &job_size); + /// Allocate entire buffer for storing all job objects + hw_jobs_buffer = std::make_unique(job_size * MAX_JOB_NUMBER); + /// Initialize pool for storing all job object pointers + /// Reallocate buffer by shifting address offset for each job object. + for (uint32_t index = 0; index < MAX_JOB_NUMBER; ++index) { + qpl_job* qpl_job_ptr = + reinterpret_cast(hw_jobs_buffer.get() + index * job_size); + auto status = qpl_init_job(qpl_path, qpl_job_ptr); + if (status != QPL_STS_OK) { + iaa_job_ready = false; + LOG(WARNING) << "Initialization of hardware IAA failed, statsu: " + << status << ". Please check if Intel \ + In-Memory Analytics Accelerator (IAA) is properly set up!"; + return false; + } + this->hw_job_ptr_pool[index] = qpl_job_ptr; + hw_job_ptr_locks[index].store(false); + } + + iaa_job_ready = true; + return true; +} + +qpl_job* QplJobHWPool::AcquireDeflateJob(uint32_t& job_id) { + if (!job_ready()) { + return nullptr; + } + uint32_t retry = 0; + auto index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1); + while (!tryLockJob(index)) { + index = folly::Random::rand32(1, MAX_JOB_NUMBER - 1); + retry++; + if (retry > MAX_JOB_NUMBER) { + return nullptr; + } + } + job_id = index; + if (index >= MAX_JOB_NUMBER) { + return nullptr; + } + + return hw_job_ptr_pool[index]; +} + +void QplJobHWPool::ReleaseJob(uint32_t job_id) { + if (job_id >= MAX_JOB_NUMBER) { + return; + } + assert(job_id < MAX_JOB_NUMBER); + hw_job_ptr_locks[job_id].store(false); + return; +} + +bool QplJobHWPool::tryLockJob(uint32_t index) { + bool expected = false; + assert(index < MAX_JOB_NUMBER); + return hw_job_ptr_locks[index].compare_exchange_strong(expected, true); +} + +} // namespace facebook::velox::dwio::common +#endif diff --git a/velox/dwio/common/QplJobPool.h b/velox/dwio/common/QplJobPool.h new file mode 100644 index 0000000000000..79c111cccfa34 --- /dev/null +++ b/velox/dwio/common/QplJobPool.h @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#ifdef VELOX_ENABLE_QPL +#include "qpl/qpl.h" + +namespace facebook::velox::dwio::common { + +/// QplJobHWPool is resource pool to provide the job objects, which is +/// used for storing context information during. +/// Memory for QPL job will be allocated when the QPLJobHWPool instance is +/// created +/// +// QPL job can offload RLE-decoding/Filter/(De)compression works to hardware +// accelerator. +class QplJobHWPool { + public: + static QplJobHWPool& GetInstance(); + QplJobHWPool(); + ~QplJobHWPool(); + /// Acquire QPL job + /// + /// @param job_id QPL job id, used when release QPL job + /// \return Pointer to the QPL job. If acquire job failed, return nullptr. + qpl_job* AcquireDeflateJob(uint32_t& job_id); + + /// \brief Release QPL job by the job_id. + void ReleaseJob(uint32_t job_id); + + /// \brief Return if the QPL job is allocated sucessfully. + const bool& job_ready() { + return iaa_job_ready; + } + + qpl_job* GetJobById(uint32_t job_id) { + return hw_job_ptr_pool[job_id]; + } + + static constexpr qpl_path_t qpl_path = qpl_path_hardware; + + static constexpr auto MAX_JOB_NUMBER = 1024; + + private: + bool tryLockJob(uint32_t index); + bool AllocateQPLJob(); + + /// Max jobs in QPL_JOB_POOL + /// Entire buffer for storing all job objects + static std::unique_ptr hw_jobs_buffer; + /// Job pool for storing all job object pointers + static std::array hw_job_ptr_pool; + + /// Locks for accessing each job object pointers + static bool iaa_job_ready; + static std::array, MAX_JOB_NUMBER> hw_job_ptr_locks; +}; + +} // namespace facebook::velox::dwio::common +#endif diff --git a/velox/dwio/parquet/reader/CMakeLists.txt b/velox/dwio/parquet/reader/CMakeLists.txt index 570f631173126..e19fb96c75294 100644 --- a/velox/dwio/parquet/reader/CMakeLists.txt +++ b/velox/dwio/parquet/reader/CMakeLists.txt @@ -20,6 +20,7 @@ add_library( PageReader.cpp ParquetColumnReader.cpp ParquetData.cpp + IAAPageReader.cpp RepeatedColumnReader.cpp RleBpDecoder.cpp Statistics.cpp diff --git a/velox/dwio/parquet/reader/IAAPageReader.cpp b/velox/dwio/parquet/reader/IAAPageReader.cpp new file mode 100644 index 0000000000000..c29cb74df4d8f --- /dev/null +++ b/velox/dwio/parquet/reader/IAAPageReader.cpp @@ -0,0 +1,1414 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/parquet/reader/IAAPageReader.h" +#include +#include //@manual +#include +#include +#include "velox/dwio/common/BufferUtil.h" +#include "velox/dwio/common/ColumnVisitors.h" +#include "velox/dwio/parquet/reader/NestedStructureDecoder.h" +#include "velox/dwio/parquet/thrift/ThriftTransport.h" +#include "velox/vector/FlatVector.h" + +#ifdef VELOX_ENABLE_QPL +namespace facebook::velox::parquet { + +using thrift::Encoding; +using thrift::PageHeader; + +void IAAPageReader::preDecompressPage(bool& need_pre_decompress) { + for (;;) { + auto dataStart = pageStart_; + if (chunkSize_ <= pageStart_) { + // This may happen if seeking to exactly end of row group. + numRepDefsInPage_ = 0; + numRowsInPage_ = 0; + break; + } + PageHeader pageHeader = readPageHeader(); + pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; + switch (pageHeader.type) { + case thrift::PageType::DATA_PAGE: + prefetchDataPageV1(pageHeader); + break; + case thrift::PageType::DATA_PAGE_V2: + prefetchDataPageV2(pageHeader); + break; + case thrift::PageType::DICTIONARY_PAGE: + prefetchDictionary(pageHeader); + continue; + default: + break; // ignore INDEX page type and any other custom extensions + } + break; + } + need_pre_decompress = isWinSizeFit; +} + +void IAAPageReader::seekToPage(int64_t row) { + defineDecoder_.reset(); + repeatDecoder_.reset(); + // 'rowOfPage_' is the row number of the first row of the next page. + rowOfPage_ += numRowsInPage_; + bool has_qpl = false; + if (dict_qpl_job_id > 0) { + bool job_success = waitQplJob(dict_qpl_job_id); + prepareDict(dictPageHeader_, job_success); + dict_qpl_job_id = 0; + has_qpl = true; + } + if (data_qpl_job_id > 0) { + bool job_success = waitQplJob(data_qpl_job_id); + bool result = prepareData(dataPageHeader_, row, job_success); + if (!result) { + LOG(WARNING) << "Decompress w/IAA error, try again with software."; + pre_decompress_data = false; + result = prepareData(dataPageHeader_, row, job_success); + if (!result) { + VELOX_FAIL("Decomrpess fail!"); + } + } + data_qpl_job_id = 0; + has_qpl = true; + } + + if (has_qpl) { + if (row == kRepDefOnly || row < rowOfPage_ + numRowsInPage_) { + return; + } + updateRowInfoAfterPageSkipped(); + } + + for (;;) { + auto dataStart = pageStart_; + if (chunkSize_ <= pageStart_) { + // This may happen if seeking to exactly end of row group. + numRepDefsInPage_ = 0; + numRowsInPage_ = 0; + break; + } + PageHeader pageHeader = readPageHeader(); + pageStart_ = pageDataStart_ + pageHeader.compressed_page_size; + + switch (pageHeader.type) { + case thrift::PageType::DATA_PAGE: + prepareDataPageV1(pageHeader, row); + break; + case thrift::PageType::DATA_PAGE_V2: + prepareDataPageV2(pageHeader, row); + break; + case thrift::PageType::DICTIONARY_PAGE: + if (row == kRepDefOnly) { + skipBytes( + pageHeader.compressed_page_size, + inputStream_.get(), + bufferStart_, + bufferEnd_); + continue; + } + prepareDictionary(pageHeader); + continue; + default: + break; // ignore INDEX page type and any other custom extensions + } + if (row == kRepDefOnly || row < rowOfPage_ + numRowsInPage_) { + break; + } + updateRowInfoAfterPageSkipped(); + } +} + +PageHeader IAAPageReader::readPageHeader() { + if (bufferEnd_ == bufferStart_) { + const void* buffer; + int32_t size; + inputStream_->Next(&buffer, &size); + bufferStart_ = reinterpret_cast(buffer); + bufferEnd_ = bufferStart_ + size; + } + + std::shared_ptr transport = + std::make_shared( + inputStream_.get(), bufferStart_, bufferEnd_); + apache::thrift::protocol::TCompactProtocolT protocol( + transport); + PageHeader pageHeader; + uint64_t readBytes; + readBytes = pageHeader.read(&protocol); + + pageDataStart_ = pageStart_ + readBytes; + return pageHeader; +} + +const char* IAAPageReader::readBytes(int32_t size, BufferPtr& copy) { + if (bufferEnd_ == bufferStart_) { + const void* buffer = nullptr; + int32_t bufferSize = 0; + if (!inputStream_->Next(&buffer, &bufferSize)) { + VELOX_FAIL("Read past end"); + } + bufferStart_ = reinterpret_cast(buffer); + bufferEnd_ = bufferStart_ + bufferSize; + } + if (bufferEnd_ - bufferStart_ >= size) { + bufferStart_ += size; + return bufferStart_ - size; + } + dwio::common::ensureCapacity(copy, size, &pool_); + dwio::common::readBytes( + size, + inputStream_.get(), + copy->asMutable(), + bufferStart_, + bufferEnd_); + return copy->as(); +} + +const bool FOLLY_NONNULL IAAPageReader::uncompressQplData( + const char* pageData, + uint32_t compressedSize, + uint32_t uncompressedSize, + BufferPtr& uncompressedData, + uint32_t& qpl_job_id) { + dwio::common::ensureCapacity( + uncompressedData, uncompressedSize, &pool_); + + bool isGzip = codec_ == thrift::CompressionCodec::GZIP; + + if (!isWinSizeFit) { + // first time to check window size + int window_size = + getGzipWindowSize((const uint8_t*)pageData, uncompressedSize); + if (window_size == 12) { // window size is not 4KB + isWinSizeFit = true; + } else { + qpl_job_id = dwio::common::QplJobHWPool::GetInstance().MAX_JOB_NUMBER; + return false; + } + } + + qpl_job_id = this->DecompressAsync( + compressedSize, + (const uint8_t*)pageData, + uncompressedSize, + (uint8_t*)uncompressedData->asMutable(), + isGzip); + if (qpl_job_id >= dwio::common::QplJobHWPool::GetInstance().MAX_JOB_NUMBER) { + return false; + } + return true; +} + +uint32_t IAAPageReader::DecompressAsync( + int64_t input_length, + const uint8_t* input, + int64_t output_buffer_length, + uint8_t* output, + bool isGzip) { + // Reset the stream for this block + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::GetInstance(); + uint32_t job_id = 0; + qpl_job* job = qpl_job_pool.AcquireDeflateJob(job_id); + if (job == nullptr) { + LOG(WARNING) << "cannot AcquireDeflateJob "; + return qpl_job_pool.MAX_JOB_NUMBER; // Invalid job id to illustrate the + // failed decompress job. + } + job->op = qpl_op_decompress; + job->next_in_ptr = const_cast(input); + job->next_out_ptr = output; + job->available_in = input_length; + job->available_out = output_buffer_length; + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST; + if (isGzip) { + job->flags |= QPL_FLAG_ZLIB_MODE; + } + + qpl_status status = qpl_submit_job(job); + if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) { + qpl_job_pool.ReleaseJob(job_id); + job = qpl_job_pool.AcquireDeflateJob(job_id); + if (job == nullptr) { + LOG(WARNING) + << "cannot acqure deflate job after QPL_STS_QUEUES_ARE_BUSY_ERR "; + return qpl_job_pool.MAX_JOB_NUMBER; // Invalid job id to illustrate the + // failed decompress job. + } + job->op = qpl_op_decompress; + job->next_in_ptr = const_cast(input); + job->next_out_ptr = output; + job->available_in = input_length; + job->available_out = output_buffer_length; + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST; + if (isGzip) { + job->flags |= QPL_FLAG_ZLIB_MODE; + } + + status = qpl_submit_job(job); + } + if (status != QPL_STS_OK) { + qpl_job_pool.ReleaseJob(job_id); + LOG(WARNING) << "cannot submit job, error status: " << status; + return qpl_job_pool.MAX_JOB_NUMBER; // Invalid job id to illustrate the + // failed decompress job. + } else { + return job_id; + } +} + +const char* FOLLY_NONNULL IAAPageReader::uncompressData( + const char* pageData, + uint32_t compressedSize, + uint32_t uncompressedSize) { + switch (codec_) { + case thrift::CompressionCodec::UNCOMPRESSED: + return pageData; + case thrift::CompressionCodec::SNAPPY: { + dwio::common::ensureCapacity( + uncompressedData_, uncompressedSize, &pool_); + + size_t sizeFromSnappy; + if (!snappy::GetUncompressedLength( + pageData, compressedSize, &sizeFromSnappy)) { + VELOX_FAIL("Snappy uncompressed size not available"); + } + VELOX_CHECK_EQ(uncompressedSize, sizeFromSnappy); + snappy::RawUncompress( + pageData, compressedSize, uncompressedData_->asMutable()); + return uncompressedData_->as(); + } + case thrift::CompressionCodec::ZSTD: { + dwio::common::ensureCapacity( + uncompressedData_, uncompressedSize, &pool_); + + auto ret = ZSTD_decompress( + uncompressedData_->asMutable(), + uncompressedSize, + pageData, + compressedSize); + VELOX_CHECK( + !ZSTD_isError(ret), + "ZSTD returned an error: ", + ZSTD_getErrorName(ret)); + return uncompressedData_->as(); + } + case thrift::CompressionCodec::GZIP: { + dwio::common::ensureCapacity( + uncompressedData_, uncompressedSize, &pool_); + z_stream stream; + memset(&stream, 0, sizeof(stream)); + constexpr int WINDOW_BITS = 15; + // Determine if this is libz or gzip from header. + constexpr int DETECT_CODEC = 32; + // Initialize decompressor. + auto ret = inflateInit2(&stream, WINDOW_BITS | DETECT_CODEC); + VELOX_CHECK( + (ret == Z_OK), + "zlib inflateInit failed: {}", + stream.msg ? stream.msg : ""); + auto inflateEndGuard = folly::makeGuard([&] { + if (inflateEnd(&stream) != Z_OK) { + LOG(WARNING) << "inflateEnd: " << (stream.msg ? stream.msg : ""); + } + }); + // Decompress. + stream.next_in = + const_cast(reinterpret_cast(pageData)); + stream.avail_in = static_cast(compressedSize); + stream.next_out = + reinterpret_cast(uncompressedData_->asMutable()); + stream.avail_out = static_cast(uncompressedSize); + ret = inflate(&stream, Z_FINISH); + VELOX_CHECK( + ret == Z_STREAM_END, + "GZipCodec failed: {}", + stream.msg ? stream.msg : ""); + return uncompressedData_->as(); + } + default: + VELOX_FAIL("Unsupported Parquet compression type '{}'", codec_); + } +} + +/* Get the window size from zlib header(rfc1950). + 0 1 + +---+---+ + |CMF|FLG| (more-->) + +---+---+ + bits 0 to 3 CM Compression method + bits 4 to 7 CINFO Compression info + CM (Compression method) This identifies the compression method used in the + file. CM = 8 denotes the "deflate" compression method with a window size up + to 32K. CINFO (Compression info) For CM = 8, CINFO is the base-2 logarithm of + the LZ77 window size, minus eight (CINFO=7 indicates a 32K window size). +*/ +int IAAPageReader::getGzipWindowSize( + const uint8_t* stream_ptr, + uint32_t stream_size) { + if (stream_size < ZLIB_MIN_HEADER_SIZE) { + return -1; + } + const uint8_t compression_method_and_flag = *stream_ptr++; + const uint8_t compression_method = compression_method_and_flag & 0xf; + const uint8_t compression_info = + compression_method_and_flag >> ZLIB_INFO_OFFSET; + + if (CM_ZLIB_DEFAULT_VALUE != compression_method) { + return -1; + } + if (compression_info > 7) { + return -1; + } + return CM_ZLIB_DEFAULT_VALUE + compression_info; +} + +void IAAPageReader::setPageRowInfo(bool forRepDef) { + if (isTopLevel_ || forRepDef || maxRepeat_ == 0) { + numRowsInPage_ = numRepDefsInPage_; + } else if (hasChunkRepDefs_) { + ++pageIndex_; + VELOX_CHECK_LT( + pageIndex_, + numLeavesInPage_.size(), + "Seeking past known repdefs for non top level column page {}", + pageIndex_); + numRowsInPage_ = numLeavesInPage_[pageIndex_]; + } else { + numRowsInPage_ = kRowsUnknown; + } +} + +void IAAPageReader::readPageDefLevels() { + VELOX_CHECK(kRowsUnknown == numRowsInPage_ || maxDefine_ > 1); + definitionLevels_.resize(numRepDefsInPage_); + wideDefineDecoder_->GetBatch(definitionLevels_.data(), numRepDefsInPage_); + leafNulls_.resize(bits::nwords(numRepDefsInPage_)); + leafNullsSize_ = getLengthsAndNulls( + LevelMode::kNulls, + leafInfo_, + + 0, + numRepDefsInPage_, + numRepDefsInPage_, + nullptr, + leafNulls_.data(), + 0); + numRowsInPage_ = leafNullsSize_; + numLeafNullsConsumed_ = 0; +} + +void IAAPageReader::updateRowInfoAfterPageSkipped() { + rowOfPage_ += numRowsInPage_; + if (hasChunkRepDefs_) { + numLeafNullsConsumed_ = rowOfPage_; + } +} + +void IAAPageReader::prefetchDataPageV1(const thrift::PageHeader& pageHeader) { + dataPageHeader_ = pageHeader; + VELOX_CHECK( + pageHeader.type == thrift::PageType::DATA_PAGE && + pageHeader.__isset.data_page_header); + + dataPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + pre_decompress_data = uncompressQplData( + dataPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + uncompressedDataV1Data_, + data_qpl_job_id); + return; +} + +void IAAPageReader::prefetchDataPageV2(const thrift::PageHeader& pageHeader) { + return; +} + +void IAAPageReader::prefetchDictionary(const thrift::PageHeader& pageHeader) { + dictionary_.numValues = pageHeader.dictionary_page_header.num_values; + dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; + dictionary_.sorted = pageHeader.dictionary_page_header.__isset.is_sorted && + pageHeader.dictionary_page_header.is_sorted; + dictPageHeader_ = pageHeader; + VELOX_CHECK( + dictionaryEncoding_ == Encoding::PLAIN_DICTIONARY || + dictionaryEncoding_ == Encoding::PLAIN); + dictPageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + + pre_decompress_dict = uncompressQplData( + dictPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size, + uncompressedDictData_, + dict_qpl_job_id); + + return; +} + +void IAAPageReader::prepareDict( + const thrift::PageHeader& pageHeader, + bool job_success) { + if (!pre_decompress_dict || !job_success) { + LOG(WARNING) + << "Decompress w/IAA error, try to uncompress dict with software."; + dictPageData_ = uncompressData( + dictPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } else { + dictPageData_ = uncompressedDictData_->as(); + } + + auto parquetType = type_->parquetType_.value(); + switch (parquetType) { + case thrift::Type::INT32: + case thrift::Type::INT64: + case thrift::Type::FLOAT: + case thrift::Type::DOUBLE: { + int32_t typeSize = (parquetType == thrift::Type::INT32 || + parquetType == thrift::Type::FLOAT) + ? sizeof(float) + : sizeof(double); + auto numBytes = dictionary_.numValues * typeSize; + if (type_->type->isShortDecimal() && parquetType == thrift::Type::INT32) { + auto veloxTypeLength = type_->type->cppSizeInBytes(); + auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; + dictionary_.values = + AlignedBuffer::allocate(numVeloxBytes, &pool_); + } else { + dictionary_.values = AlignedBuffer::allocate(numBytes, &pool_); + } + if (dictPageData_) { + memcpy(dictionary_.values->asMutable(), dictPageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, + inputStream_.get(), + dictionary_.values->asMutable(), + bufferStart_, + bufferEnd_); + } + if (type_->type->isShortDecimal() && parquetType == thrift::Type::INT32) { + auto values = dictionary_.values->asMutable(); + auto parquetValues = dictionary_.values->asMutable(); + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + values[i] = parquetValues[i]; + } + } + break; + } + case thrift::Type::BYTE_ARRAY: { + dictionary_.values = + AlignedBuffer::allocate(dictionary_.numValues, &pool_); + auto numBytes = pageHeader.uncompressed_page_size; + auto values = dictionary_.values->asMutable(); + dictionary_.strings = AlignedBuffer::allocate(numBytes, &pool_); + auto strings = dictionary_.strings->asMutable(); + if (dictPageData_) { + memcpy(strings, dictPageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, inputStream_.get(), strings, bufferStart_, bufferEnd_); + } + auto header = strings; + for (auto i = 0; i < dictionary_.numValues; ++i) { + auto length = *reinterpret_cast(header); + values[i] = StringView(header + sizeof(int32_t), length); + header += length + sizeof(int32_t); + } + VELOX_CHECK_EQ(header, strings + numBytes); + break; + } + case thrift::Type::FIXED_LEN_BYTE_ARRAY: { + auto parquetTypeLength = type_->typeLength_; + auto numParquetBytes = dictionary_.numValues * parquetTypeLength; + auto veloxTypeLength = type_->type->cppSizeInBytes(); + auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; + dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); + auto data = dictionary_.values->asMutable(); + // Read the data bytes. + if (dictPageData_) { + memcpy(data, dictPageData_, numParquetBytes); + } else { + dwio::common::readBytes( + numParquetBytes, + inputStream_.get(), + data, + bufferStart_, + bufferEnd_); + } + if (type_->type->isShortDecimal()) { + // Parquet decimal values have a fixed typeLength_ and are in big-endian + // layout. + if (numParquetBytes < numVeloxBytes) { + auto values = dictionary_.values->asMutable(); + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + auto sourceValue = data + (i * parquetTypeLength); + int64_t value = *sourceValue >= 0 ? 0 : -1; + memcpy( + reinterpret_cast(&value) + veloxTypeLength - + parquetTypeLength, + sourceValue, + parquetTypeLength); + values[i] = value; + } + } + auto values = dictionary_.values->asMutable(); + for (auto i = 0; i < dictionary_.numValues; ++i) { + values[i] = __builtin_bswap64(values[i]); + } + break; + } else if (type_->type->isLongDecimal()) { + // Parquet decimal values have a fixed typeLength_ and are in big-endian + // layout. + if (numParquetBytes < numVeloxBytes) { + auto values = dictionary_.values->asMutable(); + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + auto sourceValue = data + (i * parquetTypeLength); + int128_t value = *sourceValue >= 0 ? 0 : -1; + memcpy( + reinterpret_cast(&value) + veloxTypeLength - + parquetTypeLength, + sourceValue, + parquetTypeLength); + values[i] = value; + } + } + auto values = dictionary_.values->asMutable(); + for (auto i = 0; i < dictionary_.numValues; ++i) { + values[i] = dwio::common::builtin_bswap128(values[i]); + } + break; + } + VELOX_UNSUPPORTED( + "Parquet type {} not supported for dictionary", parquetType); + } + case thrift::Type::INT96: + default: + VELOX_UNSUPPORTED( + "Parquet type {} not supported for dictionary", parquetType); + } +} + +bool IAAPageReader::prepareData( + const thrift::PageHeader& pageHeader, + int64_t row, + bool job_success) { + if (!pre_decompress_data || !job_success) { + LOG(WARNING) << "Need to uncompress data with sw, pre_decompress_data: " + << pre_decompress_data << ", job_success: " << job_success; + pageData_ = uncompressData( + dataPageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } else { + pageData_ = uncompressedDataV1Data_->as(); + } + numRepDefsInPage_ = pageHeader.data_page_header.num_values; + setPageRowInfo(row == kRepDefOnly); + + auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; + if (maxRepeat_ > 0) { + uint32_t repeatLength = readField(pageData_); + repeatDecoder_ = std::make_unique( + reinterpret_cast(pageData_), + repeatLength, + arrow::bit_util::NumRequiredBits(maxRepeat_)); + + pageData_ += repeatLength; + } + + if (maxDefine_ > 0) { + auto defineLength = readField(pageData_); + if (maxDefine_ == 1) { + defineDecoder_ = std::make_unique( + pageData_, + pageData_ + defineLength, + arrow::bit_util::NumRequiredBits(maxDefine_)); + } else { + wideDefineDecoder_ = std::make_unique( + reinterpret_cast(pageData_), + defineLength, + arrow::bit_util::NumRequiredBits(maxDefine_)); + } + pageData_ += defineLength; + } + encodedDataSize_ = pageEnd - pageData_; + if (encodedDataSize_ > pageHeader.uncompressed_page_size) { + LOG(WARNING) << "Decompress w/IAA error, false encodedDataSize_: " + << encodedDataSize_ << ", actual encodedDataSize_: " + << pageHeader.uncompressed_page_size; + return false; + } + + encoding_ = pageHeader.data_page_header.encoding; + if (!hasChunkRepDefs_ && (numRowsInPage_ == kRowsUnknown || maxDefine_ > 1)) { + readPageDefLevels(); + } + if (row != kRepDefOnly) { + makeDecoder(); + } + return true; +} +void IAAPageReader::prepareDataPageV1( + const PageHeader& pageHeader, + int64_t row) { + VELOX_CHECK( + pageHeader.type == thrift::PageType::DATA_PAGE && + pageHeader.__isset.data_page_header); + numRepDefsInPage_ = pageHeader.data_page_header.num_values; + setPageRowInfo(row == kRepDefOnly); + if (row != kRepDefOnly && numRowsInPage_ != kRowsUnknown && + numRowsInPage_ + rowOfPage_ <= row) { + dwio::common::skipBytes( + pageHeader.compressed_page_size, + inputStream_.get(), + bufferStart_, + bufferEnd_); + + return; + } + pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + pageData_ = uncompressData( + pageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + auto pageEnd = pageData_ + pageHeader.uncompressed_page_size; + if (maxRepeat_ > 0) { + uint32_t repeatLength = readField(pageData_); + repeatDecoder_ = std::make_unique( + reinterpret_cast(pageData_), + repeatLength, + arrow::bit_util::NumRequiredBits(maxRepeat_)); + + pageData_ += repeatLength; + } + + if (maxDefine_ > 0) { + auto defineLength = readField(pageData_); + if (maxDefine_ == 1) { + defineDecoder_ = std::make_unique( + pageData_, + pageData_ + defineLength, + arrow::bit_util::NumRequiredBits(maxDefine_)); + } else { + wideDefineDecoder_ = std::make_unique( + reinterpret_cast(pageData_), + defineLength, + arrow::bit_util::NumRequiredBits(maxDefine_)); + } + pageData_ += defineLength; + } + encodedDataSize_ = pageEnd - pageData_; + + encoding_ = pageHeader.data_page_header.encoding; + if (!hasChunkRepDefs_ && (numRowsInPage_ == kRowsUnknown || maxDefine_ > 1)) { + readPageDefLevels(); + } + + if (row != kRepDefOnly) { + makeDecoder(); + } +} + +void IAAPageReader::prepareDataPageV2( + const PageHeader& pageHeader, + int64_t row) { + VELOX_CHECK(pageHeader.__isset.data_page_header_v2); + numRepDefsInPage_ = pageHeader.data_page_header_v2.num_values; + setPageRowInfo(row == kRepDefOnly); + if (row != kRepDefOnly && numRowsInPage_ != kRowsUnknown && + numRowsInPage_ + rowOfPage_ <= row) { + skipBytes( + pageHeader.compressed_page_size, + inputStream_.get(), + bufferStart_, + bufferEnd_); + return; + } + + uint32_t defineLength = maxDefine_ > 0 + ? pageHeader.data_page_header_v2.definition_levels_byte_length + : 0; + uint32_t repeatLength = maxRepeat_ > 0 + ? pageHeader.data_page_header_v2.repetition_levels_byte_length + : 0; + auto bytes = pageHeader.compressed_page_size; + pageData_ = readBytes(bytes, pageBuffer_); + + if (repeatLength) { + repeatDecoder_ = std::make_unique( + reinterpret_cast(pageData_), + repeatLength, + arrow::bit_util::NumRequiredBits(maxRepeat_)); + } + + if (maxDefine_ > 0) { + defineDecoder_ = std::make_unique( + pageData_ + repeatLength, + pageData_ + repeatLength + defineLength, + arrow::bit_util::NumRequiredBits(maxDefine_)); + } + auto levelsSize = repeatLength + defineLength; + pageData_ += levelsSize; + if (pageHeader.data_page_header_v2.__isset.is_compressed || + pageHeader.data_page_header_v2.is_compressed) { + pageData_ = uncompressData( + pageData_, + pageHeader.compressed_page_size - levelsSize, + pageHeader.uncompressed_page_size - levelsSize); + } + if (row == kRepDefOnly) { + skipBytes(bytes, inputStream_.get(), bufferStart_, bufferEnd_); + return; + } + + encodedDataSize_ = pageHeader.uncompressed_page_size - levelsSize; + encoding_ = pageHeader.data_page_header_v2.encoding; + if (numRowsInPage_ == kRowsUnknown) { + readPageDefLevels(); + } + if (row != kRepDefOnly) { + makeDecoder(); + } +} + +void IAAPageReader::prepareDictionary(const PageHeader& pageHeader) { + dictionary_.numValues = pageHeader.dictionary_page_header.num_values; + dictionaryEncoding_ = pageHeader.dictionary_page_header.encoding; + dictionary_.sorted = pageHeader.dictionary_page_header.__isset.is_sorted && + pageHeader.dictionary_page_header.is_sorted; + VELOX_CHECK( + dictionaryEncoding_ == Encoding::PLAIN_DICTIONARY || + dictionaryEncoding_ == Encoding::PLAIN); + + if (codec_ != thrift::CompressionCodec::UNCOMPRESSED) { + pageData_ = readBytes(pageHeader.compressed_page_size, pageBuffer_); + pageData_ = uncompressData( + pageData_, + pageHeader.compressed_page_size, + pageHeader.uncompressed_page_size); + } + + auto parquetType = type_->parquetType_.value(); + switch (parquetType) { + case thrift::Type::INT32: + case thrift::Type::INT64: + case thrift::Type::FLOAT: + case thrift::Type::DOUBLE: { + int32_t typeSize = (parquetType == thrift::Type::INT32 || + parquetType == thrift::Type::FLOAT) + ? sizeof(float) + : sizeof(double); + auto numBytes = dictionary_.numValues * typeSize; + if (type_->type->isShortDecimal() && parquetType == thrift::Type::INT32) { + auto veloxTypeLength = type_->type->cppSizeInBytes(); + auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; + dictionary_.values = + AlignedBuffer::allocate(numVeloxBytes, &pool_); + } else { + dictionary_.values = AlignedBuffer::allocate(numBytes, &pool_); + } + if (pageData_) { + memcpy(dictionary_.values->asMutable(), pageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, + inputStream_.get(), + dictionary_.values->asMutable(), + bufferStart_, + bufferEnd_); + } + if (type_->type->isShortDecimal() && parquetType == thrift::Type::INT32) { + auto values = dictionary_.values->asMutable(); + auto parquetValues = dictionary_.values->asMutable(); + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + values[i] = parquetValues[i]; + } + } + break; + } + case thrift::Type::BYTE_ARRAY: { + dictionary_.values = + AlignedBuffer::allocate(dictionary_.numValues, &pool_); + auto numBytes = pageHeader.uncompressed_page_size; + auto values = dictionary_.values->asMutable(); + dictionary_.strings = AlignedBuffer::allocate(numBytes, &pool_); + auto strings = dictionary_.strings->asMutable(); + if (pageData_) { + memcpy(strings, pageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, inputStream_.get(), strings, bufferStart_, bufferEnd_); + } + auto header = strings; + for (auto i = 0; i < dictionary_.numValues; ++i) { + auto length = *reinterpret_cast(header); + values[i] = StringView(header + sizeof(int32_t), length); + header += length + sizeof(int32_t); + } + VELOX_CHECK_EQ(header, strings + numBytes); + break; + } + case thrift::Type::FIXED_LEN_BYTE_ARRAY: { + auto parquetTypeLength = type_->typeLength_; + auto numParquetBytes = dictionary_.numValues * parquetTypeLength; + auto veloxTypeLength = type_->type->cppSizeInBytes(); + auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; + dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); + auto data = dictionary_.values->asMutable(); + // Read the data bytes. + if (pageData_) { + memcpy(data, pageData_, numParquetBytes); + } else { + dwio::common::readBytes( + numParquetBytes, + inputStream_.get(), + data, + bufferStart_, + bufferEnd_); + } + if (type_->type->isShortDecimal()) { + // Parquet decimal values have a fixed typeLength_ and are in big-endian + // layout. + if (numParquetBytes < numVeloxBytes) { + auto values = dictionary_.values->asMutable(); + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + auto sourceValue = data + (i * parquetTypeLength); + int64_t value = *sourceValue >= 0 ? 0 : -1; + memcpy( + reinterpret_cast(&value) + veloxTypeLength - + parquetTypeLength, + sourceValue, + parquetTypeLength); + values[i] = value; + } + } + auto values = dictionary_.values->asMutable(); + for (auto i = 0; i < dictionary_.numValues; ++i) { + values[i] = __builtin_bswap64(values[i]); + } + break; + } else if (type_->type->isLongDecimal()) { + // Parquet decimal values have a fixed typeLength_ and are in big-endian + // layout. + if (numParquetBytes < numVeloxBytes) { + auto values = dictionary_.values->asMutable(); + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + auto sourceValue = data + (i * parquetTypeLength); + int128_t value = *sourceValue >= 0 ? 0 : -1; + memcpy( + reinterpret_cast(&value) + veloxTypeLength - + parquetTypeLength, + sourceValue, + parquetTypeLength); + values[i] = value; + } + } + auto values = dictionary_.values->asMutable(); + for (auto i = 0; i < dictionary_.numValues; ++i) { + values[i] = dwio::common::builtin_bswap128(values[i]); + } + break; + } + VELOX_UNSUPPORTED( + "Parquet type {} not supported for dictionary", parquetType); + } + case thrift::Type::INT96: + default: + VELOX_UNSUPPORTED( + "Parquet type {} not supported for dictionary", parquetType); + } +} + +void IAAPageReader::makeFilterCache(dwio::common::ScanState& state) { + VELOX_CHECK( + !state.dictionary2.values, "Parquet supports only one dictionary"); + state.filterCache.resize(state.dictionary.numValues); + simd::memset( + state.filterCache.data(), + dwio::common::FilterResult::kUnknown, + state.filterCache.size()); + state.rawState.filterCache = state.filterCache.data(); +} + +namespace { +int32_t parquetTypeBytes(thrift::Type::type type) { + switch (type) { + case thrift::Type::INT32: + case thrift::Type::FLOAT: + return 4; + case thrift::Type::INT64: + case thrift::Type::DOUBLE: + return 8; + default: + VELOX_FAIL("Type does not have a byte width {}", type); + } +} +} // namespace + +void IAAPageReader::preloadRepDefs() { + hasChunkRepDefs_ = true; + while (pageStart_ < chunkSize_) { + seekToPage(kRepDefOnly); + auto begin = definitionLevels_.size(); + auto numLevels = definitionLevels_.size() + numRepDefsInPage_; + definitionLevels_.resize(numLevels); + wideDefineDecoder_->GetBatch( + definitionLevels_.data() + begin, numRepDefsInPage_); + if (repeatDecoder_) { + repetitionLevels_.resize(numLevels); + + repeatDecoder_->GetBatch( + repetitionLevels_.data() + begin, numRepDefsInPage_); + } + leafNulls_.resize(bits::nwords(leafNullsSize_ + numRepDefsInPage_)); + auto numLeaves = getLengthsAndNulls( + LevelMode::kNulls, + leafInfo_, + begin, + begin + numRepDefsInPage_, + numRepDefsInPage_, + nullptr, + leafNulls_.data(), + leafNullsSize_); + leafNullsSize_ += numLeaves; + numLeavesInPage_.push_back(numLeaves); + } + + // Reset the input to start of column chunk. + std::vector rewind = {0}; + pageStart_ = 0; + dwio::common::PositionProvider position(rewind); + inputStream_->seekToPosition(position); + bufferStart_ = bufferEnd_ = nullptr; + rowOfPage_ = 0; + numRowsInPage_ = 0; + pageData_ = nullptr; + dictPageData_ = nullptr; + dataPageData_ = nullptr; +} + +void IAAPageReader::decodeRepDefs(int32_t numTopLevelRows) { + if (definitionLevels_.empty()) { + preloadRepDefs(); + } + repDefBegin_ = repDefEnd_; + int32_t numLevels = definitionLevels_.size(); + int32_t topFound = 0; + int32_t i = repDefBegin_; + if (maxRepeat_ > 0) { + for (; i < numLevels; ++i) { + if (repetitionLevels_[i] == 0) { + ++topFound; + if (topFound == numTopLevelRows + 1) { + break; + } + } + } + repDefEnd_ = i; + } else { + repDefEnd_ = i + numTopLevelRows; + } +} + +int32_t IAAPageReader::getLengthsAndNulls( + LevelMode mode, + const ::parquet::internal::LevelInfo& info, + int32_t begin, + int32_t end, + int32_t maxItems, + int32_t* lengths, + uint64_t* nulls, + int32_t nullsStartIndex) const { + ::parquet::internal::ValidityBitmapInputOutput bits; + bits.values_read_upper_bound = maxItems; + bits.values_read = 0; + bits.null_count = 0; + bits.valid_bits = reinterpret_cast(nulls); + bits.valid_bits_offset = nullsStartIndex; + + switch (mode) { + case LevelMode::kNulls: + DefLevelsToBitmap( + definitionLevels_.data() + begin, end - begin, info, &bits); + break; + case LevelMode::kList: { + ::parquet::internal::DefRepLevelsToList( + definitionLevels_.data() + begin, + repetitionLevels_.data() + begin, + end - begin, + info, + &bits, + lengths); + // Convert offsets to lengths. + for (auto i = 0; i < bits.values_read; ++i) { + lengths[i] = lengths[i + 1] - lengths[i]; + } + break; + } + case LevelMode::kStructOverLists: { + DefRepLevelsToBitmap( + definitionLevels_.data() + begin, + repetitionLevels_.data() + begin, + end - begin, + info, + &bits); + break; + } + } + return bits.values_read; +} + +void IAAPageReader::makeDecoder() { + auto parquetType = type_->parquetType_.value(); + switch (encoding_) { + case Encoding::RLE_DICTIONARY: + case Encoding::PLAIN_DICTIONARY: + if (encodedDataSize_ > dataPageHeader_.uncompressed_page_size) { + std::cout << "encodedDataSize_: " << encodedDataSize_ + << ", pre_decompress_data: " << pre_decompress_data << true + << ", dataPageHeader_: " + << dataPageHeader_.uncompressed_page_size << std::endl; + } + dictionaryIdDecoder_ = std::make_unique( + pageData_ + 1, pageData_ + encodedDataSize_, pageData_[0]); + break; + case Encoding::PLAIN: + switch (parquetType) { + case thrift::Type::BOOLEAN: + booleanDecoder_ = std::make_unique( + pageData_, pageData_ + encodedDataSize_); + break; + case thrift::Type::BYTE_ARRAY: + stringDecoder_ = std::make_unique( + pageData_, pageData_ + encodedDataSize_); + break; + case thrift::Type::FIXED_LEN_BYTE_ARRAY: + directDecoder_ = std::make_unique>( + std::make_unique( + pageData_, encodedDataSize_), + false, + type_->typeLength_, + true); + break; + default: { + directDecoder_ = std::make_unique>( + std::make_unique( + pageData_, encodedDataSize_), + false, + parquetTypeBytes(type_->parquetType_.value())); + } + } + break; + case Encoding::DELTA_BINARY_PACKED: + default: + VELOX_UNSUPPORTED("Encoding not supported yet"); + } +} + +void IAAPageReader::skip(int64_t numRows) { + if (!numRows && firstUnvisited_ != rowOfPage_ + numRowsInPage_) { + // Return if no skip and position not at end of page or before first page. + return; + } + auto toSkip = numRows; + if (firstUnvisited_ + numRows >= rowOfPage_ + numRowsInPage_) { + seekToPage(firstUnvisited_ + numRows); + if (hasChunkRepDefs_) { + numLeafNullsConsumed_ = rowOfPage_; + } + toSkip -= rowOfPage_ - firstUnvisited_; + } + firstUnvisited_ += numRows; + + // Skip nulls + toSkip = skipNulls(toSkip); + + // Skip the decoder + if (isDictionary()) { + dictionaryIdDecoder_->skip(toSkip); + } else if (directDecoder_) { + directDecoder_->skip(toSkip); + } else if (stringDecoder_) { + stringDecoder_->skip(toSkip); + } else if (booleanDecoder_) { + booleanDecoder_->skip(toSkip); + } else { + VELOX_FAIL("No decoder to skip"); + } +} + +int32_t IAAPageReader::skipNulls(int32_t numValues) { + if (!defineDecoder_ && isTopLevel_) { + return numValues; + } + VELOX_CHECK(1 == maxDefine_ || !leafNulls_.empty()); + dwio::common::ensureCapacity(tempNulls_, numValues, &pool_); + tempNulls_->setSize(0); + if (isTopLevel_) { + bool allOnes; + defineDecoder_->readBits( + numValues, tempNulls_->asMutable(), &allOnes); + if (allOnes) { + return numValues; + } + } else { + readNulls(numValues, tempNulls_); + } + auto words = tempNulls_->as(); + return bits::countBits(words, 0, numValues); +} + +void IAAPageReader::skipNullsOnly(int64_t numRows) { + if (!numRows && firstUnvisited_ != rowOfPage_ + numRowsInPage_) { + // Return if no skip and position not at end of page or before first page. + return; + } + auto toSkip = numRows; + if (firstUnvisited_ + numRows >= rowOfPage_ + numRowsInPage_) { + seekToPage(firstUnvisited_ + numRows); + firstUnvisited_ += numRows; + toSkip = firstUnvisited_ - rowOfPage_; + } else { + firstUnvisited_ += numRows; + } + + // Skip nulls + skipNulls(toSkip); +} + +void IAAPageReader::readNullsOnly(int64_t numValues, BufferPtr& buffer) { + VELOX_CHECK(!maxRepeat_); + auto toRead = numValues; + if (buffer) { + dwio::common::ensureCapacity(buffer, numValues, &pool_); + } + nullConcatenation_.reset(buffer); + while (toRead) { + auto availableOnPage = rowOfPage_ + numRowsInPage_ - firstUnvisited_; + if (!availableOnPage) { + seekToPage(firstUnvisited_); + availableOnPage = numRowsInPage_; + } + auto numRead = std::min(availableOnPage, toRead); + auto nulls = readNulls(numRead, nullsInReadRange_); + toRead -= numRead; + nullConcatenation_.append(nulls, 0, numRead); + firstUnvisited_ += numRead; + } + buffer = nullConcatenation_.buffer(); +} + +const uint64_t* FOLLY_NULLABLE +IAAPageReader::readNulls(int32_t numValues, BufferPtr& buffer) { + if (maxDefine_ == 0) { + buffer = nullptr; + return nullptr; + } + dwio::common::ensureCapacity(buffer, numValues, &pool_); + if (isTopLevel_) { + VELOX_CHECK_EQ(1, maxDefine_); + bool allOnes; + defineDecoder_->readBits( + numValues, buffer->asMutable(), &allOnes); + return allOnes ? nullptr : buffer->as(); + } + bits::copyBits( + leafNulls_.data(), + numLeafNullsConsumed_, + buffer->asMutable(), + 0, + numValues); + numLeafNullsConsumed_ += numValues; + return buffer->as(); +} + +void IAAPageReader::startVisit(folly::Range rows) { + visitorRows_ = rows.data(); + numVisitorRows_ = rows.size(); + currentVisitorRow_ = 0; + initialRowOfPage_ = rowOfPage_; + visitBase_ = firstUnvisited_; +} + +bool IAAPageReader::rowsForPage( + dwio::common::SelectiveColumnReader& reader, + bool hasFilter, + bool mayProduceNulls, + folly::Range& rows, + const uint64_t* FOLLY_NULLABLE& nulls) { + if (currentVisitorRow_ == numVisitorRows_) { + return false; + } + int32_t numToVisit; + // Check if the first row to go to is in the current page. If not, seek to the + // page that contains the row. + auto rowZero = visitBase_ + visitorRows_[currentVisitorRow_]; + if (rowZero >= rowOfPage_ + numRowsInPage_) { + seekToPage(rowZero); + if (hasChunkRepDefs_) { + numLeafNullsConsumed_ = rowOfPage_; + } + } + auto& scanState = reader.scanState(); + if (isDictionary()) { + if (scanState.dictionary.values != dictionary_.values) { + scanState.dictionary = dictionary_; + if (hasFilter) { + makeFilterCache(scanState); + } + scanState.updateRawState(); + } + } else { + if (scanState.dictionary.values) { + // If there are previous pages in the current read, nulls read + // from them are in 'nullConcatenation_' Put this into the + // reader for the time of dedictionarizing so we don't read + // undefined dictionary indices. + if (mayProduceNulls && reader.numValues()) { + reader.setNulls(nullConcatenation_.buffer()); + } + reader.dedictionarize(); + // The nulls across all pages are in nullConcatenation_. Clear + // the nulls and let the prepareNulls below reserve nulls for + // the new page. + reader.setNulls(nullptr); + scanState.dictionary.clear(); + } + } + + // Then check how many of the rows to visit are on the same page as the + // current one. + int32_t firstOnNextPage = rowOfPage_ + numRowsInPage_ - visitBase_; + if (firstOnNextPage > visitorRows_[numVisitorRows_ - 1]) { + // All the remaining rows are on this page. + numToVisit = numVisitorRows_ - currentVisitorRow_; + } else { + // Find the last row in the rows to visit that is on this page. + auto rangeLeft = folly::Range( + visitorRows_ + currentVisitorRow_, + numVisitorRows_ - currentVisitorRow_); + auto it = + std::lower_bound(rangeLeft.begin(), rangeLeft.end(), firstOnNextPage); + assert(it != rangeLeft.end()); + assert(it != rangeLeft.begin()); + numToVisit = it - (visitorRows_ + currentVisitorRow_); + } + // If the page did not change and this is the first call, we can return a view + // on the original visitor rows. + if (rowOfPage_ == initialRowOfPage_ && currentVisitorRow_ == 0) { + nulls = + readNulls(visitorRows_[numToVisit - 1] + 1, reader.nullsInReadRange()); + rowNumberBias_ = 0; + rows = folly::Range(visitorRows_, numToVisit); + } else { + // We scale row numbers to be relative to first on this page. + auto pageOffset = rowOfPage_ - visitBase_; + rowNumberBias_ = visitorRows_[currentVisitorRow_]; + skip(rowNumberBias_ - pageOffset); + // The decoder is positioned at 'visitorRows_[currentVisitorRow_']' + // We copy the rows to visit with a bias, so that the first to visit has + // offset 0. + rowsCopy_->resize(numToVisit); + auto copy = rowsCopy_->data(); + // Subtract 'rowNumberBias_' from the rows to visit on this page. + // 'copy' has a writable tail of SIMD width, so no special case for end of + // loop. + for (auto i = 0; i < numToVisit; i += xsimd::batch::size) { + auto numbers = xsimd::batch::load_unaligned( + &visitorRows_[i + currentVisitorRow_]) - + rowNumberBias_; + numbers.store_unaligned(copy); + copy += xsimd::batch::size; + } + nulls = readNulls(rowsCopy_->back() + 1, reader.nullsInReadRange()); + rows = folly::Range( + rowsCopy_->data(), rowsCopy_->size()); + } + reader.prepareNulls(rows, nulls != nullptr, currentVisitorRow_); + currentVisitorRow_ += numToVisit; + firstUnvisited_ = visitBase_ + visitorRows_[currentVisitorRow_ - 1] + 1; + return true; +} + +const VectorPtr& IAAPageReader::dictionaryValues(const TypePtr& type) { + if (!dictionaryValues_) { + dictionaryValues_ = std::make_shared>( + &pool_, + type, + nullptr, + dictionary_.numValues, + dictionary_.values, + std::vector{dictionary_.strings}); + } + return dictionaryValues_; +} + +bool IAAPageReader::waitQplJob(uint32_t job_id) { + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::GetInstance(); + if (job_id <= 0 || job_id >= qpl_job_pool.MAX_JOB_NUMBER) { + job_id = 0; + return true; + } + qpl_job* job = qpl_job_pool.GetJobById(job_id); + + auto status = qpl_wait_job(job); + qpl_job_pool.ReleaseJob(job_id); + if (status == QPL_STS_BAD_DIST_ERR) { + isWinSizeFit = false; + } + if (status != QPL_STS_OK) { + LOG(WARNING) << "Decompress w/IAA error, status: " << status; + return false; + } + return true; +} + +IAAPageReader::~IAAPageReader() { + dwio::common::QplJobHWPool& qpl_job_pool = + dwio::common::QplJobHWPool::GetInstance(); + if (dict_qpl_job_id > 0 && dict_qpl_job_id < qpl_job_pool.MAX_JOB_NUMBER) { + qpl_job* job = qpl_job_pool.GetJobById(dict_qpl_job_id); + qpl_job_pool.ReleaseJob(dict_qpl_job_id); + dict_qpl_job_id = 0; + } + if (data_qpl_job_id > 0 && data_qpl_job_id < qpl_job_pool.MAX_JOB_NUMBER) { + qpl_job* job = qpl_job_pool.GetJobById(data_qpl_job_id); + qpl_job_pool.ReleaseJob(data_qpl_job_id); + data_qpl_job_id = 0; + } +} + +} // namespace facebook::velox::parquet + +#endif \ No newline at end of file diff --git a/velox/dwio/parquet/reader/IAAPageReader.h b/velox/dwio/parquet/reader/IAAPageReader.h new file mode 100644 index 0000000000000..d9c4ccb61734c --- /dev/null +++ b/velox/dwio/parquet/reader/IAAPageReader.h @@ -0,0 +1,610 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "velox/dwio/common/BitConcatenation.h" +#include "velox/dwio/common/DirectDecoder.h" +#include "velox/dwio/common/QplJobPool.h" +#include "velox/dwio/common/SelectiveColumnReader.h" +#include "velox/dwio/parquet/reader/BooleanDecoder.h" +#include "velox/dwio/parquet/reader/PageReaderBase.h" +#include "velox/dwio/parquet/reader/ParquetTypeWithId.h" +#include "velox/dwio/parquet/reader/RleBpDataDecoder.h" +#include "velox/dwio/parquet/reader/StringDecoder.h" +#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" +#include "velox/vector/BaseVector.h" + +#ifdef VELOX_ENABLE_QPL +namespace facebook::velox::parquet { + +/// Manages access to pages inside a ColumnChunk. Interprets page headers and +/// encodings and presents the combination of pages and encoded values as a +/// continuous stream accessible via readWithVisitor(). +class IAAPageReader : public parquet::PageReaderBase { + public: + IAAPageReader( + std::unique_ptr stream, + memory::MemoryPool& pool, + ParquetTypeWithIdPtr nodeType, + thrift::CompressionCodec::type codec, + int64_t chunkSize) + : pool_(pool), + inputStream_(std::move(stream)), + type_(std::move(nodeType)), + maxRepeat_(type_->maxRepeat_), + maxDefine_(type_->maxDefine_), + isTopLevel_(maxRepeat_ == 0 && maxDefine_ <= 1), + codec_(codec), + chunkSize_(chunkSize), + nullConcatenation_(pool_) { + type_->makeLevelInfo(leafInfo_); + dict_qpl_job_id = 0; + data_qpl_job_id = 0; + uncompressedDictData_ = nullptr; + uncompressedDataV1Data_ = nullptr; + } + + // This PageReader constructor is for unit test only. + IAAPageReader( + std::unique_ptr stream, + memory::MemoryPool& pool, + thrift::CompressionCodec::type codec, + int64_t chunkSize) + : pool_(pool), + inputStream_(std::move(stream)), + maxRepeat_(0), + maxDefine_(1), + isTopLevel_(maxRepeat_ == 0 && maxDefine_ <= 1), + codec_(codec), + chunkSize_(chunkSize), + nullConcatenation_(pool_) { + dict_qpl_job_id = 0; + data_qpl_job_id = 0; + } + + ~IAAPageReader(); + + /// Advances 'numRows' top level rows. + void skip(int64_t numRows); + void preDecompressPage(bool& need_pre_decompress); + + /// Decodes repdefs for 'numTopLevelRows'. Use getLengthsAndNulls() + /// to access the lengths and nulls for the different nesting + /// levels. + void decodeRepDefs(int32_t numTopLevelRows); + + /// Returns lengths and/or nulls from 'begin' to 'end' for the level of + /// 'info' using 'mode'. 'maxItems' is the maximum number of nulls and lengths + /// to produce. 'lengths' is only filled for mode kList. 'nulls' is filled + /// from bit position 'nullsStartIndex'. Returns the number of lengths/nulls + /// filled. + int32_t getLengthsAndNulls( + LevelMode mode, + const ::parquet::internal::LevelInfo& info, + int32_t begin, + int32_t end, + int32_t maxItems, + int32_t* FOLLY_NULLABLE lengths, + uint64_t* FOLLY_NULLABLE nulls, + int32_t nullsStartIndex) const; + + /// Applies 'visitor' to values in the ColumnChunk of 'this'. The + /// operation to perform and The operand rows are given by + /// 'visitor'. The rows are relative to the current position. The + /// current position after readWithVisitor is immediately after the + /// last accessed row. + template + void readWithVisitor(Visitor& visitor); + + // skips 'numValues' top level rows, touching null flags only. Non-null values + // are not prepared for reading. + void skipNullsOnly(int64_t numValues); + + /// Reads 'numValues' null flags into 'nulls' and advances the + /// decoders by as much. The read may span several pages. If there + /// are no nulls, buffer may be set to nullptr. + void readNullsOnly(int64_t numValues, BufferPtr& buffer); + + // Returns the current string dictionary as a FlatVector. + const VectorPtr& dictionaryValues(const TypePtr& type); + + // True if the current page holds dictionary indices. + bool isDictionary() const { + return encoding_ == thrift::Encoding::PLAIN_DICTIONARY || + encoding_ == thrift::Encoding::RLE_DICTIONARY; + } + + void clearDictionary() { + dictionary_.clear(); + dictionaryValues_.reset(); + } + + PageReaderType getType() { + return PageReaderType::IAA; + } + + /// Returns the range of repdefs for the top level rows covered by the last + /// decoderepDefs(). + std::pair repDefRange() const { + return {repDefBegin_, repDefEnd_}; + } + + // Parses the PageHeader at 'inputStream_', and move the bufferStart_ and + // bufferEnd_ to the corresponding positions. + thrift::PageHeader readPageHeader(); + + private: + // Indicates that we only want the repdefs for the next page. Used when + // prereading repdefs with seekToPage. + static constexpr int64_t kRepDefOnly = -1; + + // In 'numRowsInPage_', indicates that the page's def levels must be + // consulted to determine number of leaf values. + static constexpr int32_t kRowsUnknown = -1; + + // If the current page has nulls, returns a nulls bitmap owned by 'this'. This + // is filled for 'numRows' bits. + const uint64_t* FOLLY_NULLABLE readNulls(int32_t numRows, BufferPtr& buffer); + + void prepareDict(const thrift::PageHeader& pageHeader, bool job_success); + bool prepareData( + const thrift::PageHeader& pageHeader, + int64_t row, + bool job_success); + uint32_t DecompressAsync( + int64_t input_length, + const uint8_t* input, + int64_t output_buffer_length, + uint8_t* output, + bool isGzip); + + // Skips the define decoder, if any, for 'numValues' top level + // rows. Returns the number of non-nulls skipped. The range is the + // current page. + int32_t skipNulls(int32_t numRows); + + // Initializes a filter result cache for the dictionary in 'state'. + void makeFilterCache(dwio::common::ScanState& state); + + // Makes a decoder based on 'encoding_' for bytes from ''pageData_' to + // 'pageData_' + 'encodedDataSize_'. + void makedecoder(); + + // Reads and skips pages until finding a data page that contains + // 'row'. Reads and sets 'rowOfPage_' and 'numRowsInPage_' and + // initializes a decoder for the found page. row kRepDefOnly means + // getting repdefs for the next page. If non-top level column, 'row' + // is interpreted in terms of leaf rows, including leaf + // nulls. Seeking ahead of pages covered by decodeRepDefs is not + // allowed for non-top level columns. + void seekToPage(int64_t row); + + // Preloads the repdefs for the column chunk. To avoid preloading, + // would need a way too clone the input stream so that one stream + // reads ahead for repdefs and the other tracks the data. This is + // supported by CacheInputStream but not the other + // SeekableInputStreams. + void preloadRepDefs(); + + // Sets row number info after reading a page header. If 'forRepDef', + // does not set non-top level row numbers by repdefs. This is on + // when seeking a non-top level page for the first time, i.e. for + // getting the repdefs. + void setPageRowInfo(bool forRepDef); + + // Updates row position / rep defs consumed info to refer to the first of the + // next page. + void updateRowInfoAfterPageSkipped(); + + void prepareDataPageV1(const thrift::PageHeader& pageHeader, int64_t row); + void prepareDataPageV2(const thrift::PageHeader& pageHeader, int64_t row); + void prepareDictionary(const thrift::PageHeader& pageHeader); + void makeDecoder(); + + void prefetchDataPageV1(const thrift::PageHeader& pageHeader); + void prefetchDataPageV2(const thrift::PageHeader& pageHeader); + void prefetchDictionary(const thrift::PageHeader& pageHeader); + bool waitQplJob(uint32_t job_id); + int getGzipWindowSize(const uint8_t* stream_ptr, uint32_t stream_size); + + // For a non-top level leaf, reads the defs and sets 'leafNulls_' and + // 'numRowsInPage_' accordingly. This is used for non-top level leaves when + // 'hasChunkRepDefs_' is false. + void readPageDefLevels(); + + // Returns a pointer to contiguous space for the next 'size' bytes + // from current position. Copies data into 'copy' if the range + // straddles buffers. Allocates or resizes 'copy' as needed. + const char* FOLLY_NONNULL readBytes(int32_t size, BufferPtr& copy); + + // Decompresses data starting at 'pageData_', consuming 'compressedsize' and + // producing up to 'uncompressedSize' bytes. The The start of the decoding + // result is returned. an intermediate copy may be made in 'uncompresseddata_' + const char* FOLLY_NONNULL uncompressData( + const char* FOLLY_NONNULL pageData, + uint32_t compressedSize, + uint32_t uncompressedSize); + + const bool FOLLY_NONNULL uncompressQplData( + const char* FOLLY_NONNULL pageData, + uint32_t compressedSize, + uint32_t uncompressedSize, + BufferPtr& uncompressedData, + uint32_t& qpl_job_id); + + template + T readField(const char* FOLLY_NONNULL& ptr) { + T data = *reinterpret_cast(ptr); + ptr += sizeof(T); + return data; + } + + // Starts iterating over 'rows', which may span multiple pages. 'rows' are + // relative to current position, with 0 meaning the first + // unprocessed value in the current page, i.e. the row after the + // last row touched on a previous call to skip() or + // readWithVisitor(). This is the first row of the first data page + // if first call. + void startVisit(folly::Range rows); + + // Seeks to the next page in a range given by startVisit(). Returns + // true if there are unprocessed rows in the set given to + // startVisit(). Seeks 'this' to the appropriate page and sets + // 'rowsForPage' to refer to the subset of 'rows' that are on the + // current page. The numbers in rowsForPage are relative to the + // first unprocessed value on the page, for a new page 0 means the + // first value. Reads possible nulls and sets 'reader's + // nullsInReadRange_' to that or to nullptr if no null + // flags. Returns the data of nullsInReadRange in 'nulls'. Copies + // dictionary information into 'reader'. If 'hasFilter' is true, + // sets up dictionary hit cache. If the new page is direct and + // previous pages are dictionary, converts any accumulated results + // into flat. 'mayProduceNulls' should be true if nulls may occur in + // the result if they occur in the data. + bool rowsForPage( + dwio::common::SelectiveColumnReader& reader, + bool hasFilter, + bool mayProduceNulls, + folly::Range& rows, + const uint64_t* FOLLY_NULLABLE& nulls); + + // Calls the visitor, specialized on the data type since not all visitors + // apply to all types. + template < + typename Visitor, + typename std::enable_if< + !std::is_same_v && + !std::is_same_v, + int>::type = 0> + void callDecoder( + const uint64_t* FOLLY_NULLABLE nulls, + bool& nullsFromFastPath, + Visitor visitor) { + if (nulls) { + nullsFromFastPath = dwio::common::useFastPath(visitor) && + (!this->type_->type->isLongDecimal()) && + (this->type_->type->isShortDecimal() ? isDictionary() : true); + + if (isDictionary()) { + auto dictVisitor = visitor.toDictionaryColumnVisitor(); + dictionaryIdDecoder_->readWithVisitor(nulls, dictVisitor); + } else { + directDecoder_->readWithVisitor( + nulls, visitor, nullsFromFastPath); + } + } else { + if (isDictionary()) { + auto dictVisitor = visitor.toDictionaryColumnVisitor(); + dictionaryIdDecoder_->readWithVisitor(nullptr, dictVisitor); + } else { + directDecoder_->readWithVisitor( + nulls, visitor, !this->type_->type->isShortDecimal()); + } + } + } + + template < + typename Visitor, + typename std::enable_if< + std::is_same_v, + int>::type = 0> + void callDecoder( + const uint64_t* FOLLY_NULLABLE nulls, + bool& nullsFromFastPath, + Visitor visitor) { + if (nulls) { + if (isDictionary()) { + nullsFromFastPath = dwio::common::useFastPath(visitor); + auto dictVisitor = visitor.toStringDictionaryColumnVisitor(); + dictionaryIdDecoder_->readWithVisitor(nulls, dictVisitor); + } else { + nullsFromFastPath = false; + stringDecoder_->readWithVisitor(nulls, visitor); + } + } else { + if (isDictionary()) { + auto dictVisitor = visitor.toStringDictionaryColumnVisitor(); + dictionaryIdDecoder_->readWithVisitor(nullptr, dictVisitor); + } else { + stringDecoder_->readWithVisitor(nulls, visitor); + } + } + } + + template < + typename Visitor, + typename std::enable_if< + std::is_same_v, + int>::type = 0> + void callDecoder( + const uint64_t* FOLLY_NULLABLE nulls, + bool& nullsFromFastPath, + Visitor visitor) { + VELOX_CHECK(!isDictionary(), "BOOLEAN types are never dictionary-encoded") + if (nulls) { + nullsFromFastPath = false; + booleanDecoder_->readWithVisitor(nulls, visitor); + } else { + booleanDecoder_->readWithVisitor(nulls, visitor); + } + } + + // Returns the number of passed rows/values gathered by + // 'reader'. Only numRows() is set for a filter-only case, only + // numValues() is set for a non-filtered case. + template + static int32_t numRowsInReader( + const dwio::common::SelectiveColumnReader& reader) { + if (hasFilter) { + return reader.numRows(); + } else { + return reader.numValues(); + } + } + + memory::MemoryPool& pool_; + + std::unique_ptr inputStream_; + ParquetTypeWithIdPtr type_; + const int32_t maxRepeat_; + const int32_t maxDefine_; + const bool isTopLevel_; + + const thrift::CompressionCodec::type codec_; + const int64_t chunkSize_; + const char* FOLLY_NULLABLE bufferStart_{nullptr}; + const char* FOLLY_NULLABLE bufferEnd_{nullptr}; + BufferPtr tempNulls_; + BufferPtr nullsInReadRange_; + BufferPtr multiPageNulls_; + // Decoder for single bit definition levels. the arrow decoders are used for + // multibit levels pending fixing RleBpDecoder for the case. + std::unique_ptr defineDecoder_; + std::unique_ptr repeatDecoder_; + std::unique_ptr wideDefineDecoder_; + + // True for a leaf column for which repdefs are loaded for the whole column + // chunk. This is typically the leaftmost leaf of a list. Other leaves under + // the list can read repdefs as they go since the list lengths are already + // known. + bool hasChunkRepDefs_{false}; + + // index of current page in 'numLeavesInPage_' -1 means before first page. + int32_t pageIndex_{-1}; + + // Number of leaf values in each data page of column chunk. + std::vector numLeavesInPage_; + + // First position in '*levels_' for the range of last decodeRepDefs(). + int32_t repDefBegin_{0}; + + // First position in '*levels_' after the range covered in last + // decodeRepDefs(). + int32_t repDefEnd_{0}; + + // Definition levels for the column chunk. + raw_vector definitionLevels_; + + // Repetition levels for the column chunk. + raw_vector repetitionLevels_; + + // Number of valid bits in 'leafNulls_' + int32_t leafNullsSize_{0}; + + // Number of leaf nulls read. + int32_t numLeafNullsConsumed_{0}; + + // Leaf nulls extracted from 'repetitionLevels_/definitionLevels_' + raw_vector leafNulls_; + + // Encoding of current page. + thrift::Encoding::type encoding_; + + // Row number of first value in current page from start of ColumnChunk. + int64_t rowOfPage_{0}; + + // Number of rows in current page. + int32_t numRowsInPage_{0}; + + // Number of repdefs in page. Not the same as number of rows for a non-top + // level column. + int32_t numRepDefsInPage_{0}; + + // Copy of data if data straddles buffer boundary. + BufferPtr pageBuffer_; + + // Uncompressed data for the page. Rep-def-data in V1, data alone in V2. + BufferPtr uncompressedData_; + BufferPtr uncompressedDictData_; + // char* uncompressedDictData_; + BufferPtr uncompressedDataV1Data_; + // char* uncompressedDataV1Data_; + + // First byte of uncompressed encoded data. Contains the encoded data as a + // contiguous run of bytes. + const char* FOLLY_NULLABLE pageData_{nullptr}; + + // Dictionary contents. + dwio::common::DictionaryValues dictionary_; + thrift::Encoding::type dictionaryEncoding_; + + // Offset of current page's header from start of ColumnChunk. + uint64_t pageStart_{0}; + + // Offset of first byte after current page' header. + uint64_t pageDataStart_{0}; + + // Number of bytes starting at pageData_ for current encoded data. + int32_t encodedDataSize_{0}; + + // Below members Keep state between calls to readWithVisitor(). + + // Original rows in Visitor. + const vector_size_t* FOLLY_NULLABLE visitorRows_{nullptr}; + int32_t numVisitorRows_{0}; + + // 'rowOfPage_' at the start of readWithVisitor(). + int64_t initialRowOfPage_{0}; + + // Index in 'visitorRows_' for the first row that is beyond the + // current page. Equals 'numVisitorRows_' if all are on current page. + int32_t currentVisitorRow_{0}; + + // Row relative to ColumnChunk for first unvisited row. 0 if nothing + // visited. The rows passed to readWithVisitor from rowsForPage() + // are relative to this. + int64_t firstUnvisited_{0}; + + // Offset of 'visitorRows_[0]' relative too start of ColumnChunk. + int64_t visitBase_{0}; + + // Temporary for rewriting rows to access in readWithVisitor when moving + // between pages. Initialized from the visitor. + raw_vector* FOLLY_NULLABLE rowsCopy_{nullptr}; + + // If 'rowsCopy_' is used, this is the difference between the rows in + // 'rowsCopy_' and the row numbers in 'rows' given to readWithVisitor(). + int32_t rowNumberBias_{0}; + + // Manages concatenating null flags read from multiple pages. If a + // readWithVisitor is contined in one page, the visitor places the + // nulls in the reader. If many pages are covered, some with and + // some without nulls, we must make a a concatenated null flags to + // return to the caller. + dwio::common::BitConcatenation nullConcatenation_; + + // LevelInfo for reading nulls for the leaf column 'this' represents. + ::parquet::internal::LevelInfo leafInfo_; + + // Base values of dictionary when reading a string dictionary. + VectorPtr dictionaryValues_; + + // Decoders. Only one will be set at a time. + std::unique_ptr> directDecoder_; + std::unique_ptr dictionaryIdDecoder_; + std::unique_ptr stringDecoder_; + std::unique_ptr booleanDecoder_; + // Add decoders for other encodings here. + + thrift::PageHeader dictPageHeader_; + const char* FOLLY_NULLABLE dictPageData_{nullptr}; + bool needUncompressDict; + + thrift::PageHeader dataPageHeader_; + const char* FOLLY_NULLABLE dataPageData_{nullptr}; + + uint32_t dict_qpl_job_id; + uint32_t data_qpl_job_id; + + bool pre_decompress_dict = false; + bool pre_decompress_data = false; + bool isWinSizeFit = false; + static constexpr uint8_t CM_ZLIB_DEFAULT_VALUE = 8u; + static constexpr uint32_t ZLIB_MIN_HEADER_SIZE = 2u; + static constexpr uint32_t ZLIB_INFO_OFFSET = 4u; +}; + +template +void IAAPageReader::readWithVisitor(Visitor& visitor) { + constexpr bool hasFilter = + !std::is_same_v; + constexpr bool filterOnly = + std::is_same_v; + bool mayProduceNulls = !filterOnly && visitor.allowNulls(); + auto rows = visitor.rows(); + auto numRows = visitor.numRows(); + auto& reader = visitor.reader(); + startVisit(folly::Range(rows, numRows)); + rowsCopy_ = &visitor.rowsCopy(); + folly::Range pageRows; + const uint64_t* nulls = nullptr; + bool isMultiPage = false; + while (rowsForPage(reader, hasFilter, mayProduceNulls, pageRows, nulls)) { + bool nullsFromFastPath = false; + int32_t numValuesBeforePage = numRowsInReader(reader); + visitor.setNumValuesBias(numValuesBeforePage); + visitor.setRows(pageRows); + callDecoder(nulls, nullsFromFastPath, visitor); + if (currentVisitorRow_ < numVisitorRows_ || isMultiPage) { + if (mayProduceNulls) { + if (!isMultiPage) { + // Do not reuse nulls concatenation buffer if previous + // results are hanging on to it. + if (multiPageNulls_ && !multiPageNulls_->unique()) { + multiPageNulls_ = nullptr; + } + nullConcatenation_.reset(multiPageNulls_); + } + if (!nulls) { + nullConcatenation_.appendOnes( + numRowsInReader(reader) - numValuesBeforePage); + } else if (reader.returnReaderNulls()) { + // Nulls from decoding go directly to result. + nullConcatenation_.append( + reader.nullsInReadRange()->template as(), + 0, + numRowsInReader(reader) - numValuesBeforePage); + } else { + // Add the nulls produced from the decoder to the result. + auto firstNullIndex = nullsFromFastPath ? 0 : numValuesBeforePage; + nullConcatenation_.append( + reader.mutableNulls(0), + firstNullIndex, + firstNullIndex + numRowsInReader(reader) - + numValuesBeforePage); + } + } + isMultiPage = true; + } + // The passing rows on non-first pages are relative to the start + // of the page, adjust them to be relative to start of this + // read. This can happen on the first processed page as well if + // the first page of scan did not contain any of the rows to + // visit. + if (hasFilter && rowNumberBias_) { + reader.offsetOutputRows(numValuesBeforePage, rowNumberBias_); + } + } + if (isMultiPage) { + reader.setNulls(mayProduceNulls ? nullConcatenation_.buffer() : nullptr); + } +} + +} // namespace facebook::velox::parquet + +#endif \ No newline at end of file diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 336ea5897e74f..51f5140b2bb93 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -21,6 +21,7 @@ #include "velox/dwio/common/DirectDecoder.h" #include "velox/dwio/common/SelectiveColumnReader.h" #include "velox/dwio/parquet/reader/BooleanDecoder.h" +#include "velox/dwio/parquet/reader/PageReaderBase.h" #include "velox/dwio/parquet/reader/ParquetTypeWithId.h" #include "velox/dwio/parquet/reader/RleBpDataDecoder.h" #include "velox/dwio/parquet/reader/StringDecoder.h" @@ -31,7 +32,7 @@ namespace facebook::velox::parquet { /// Manages access to pages inside a ColumnChunk. Interprets page headers and /// encodings and presents the combination of pages and encoded values as a /// continuous stream accessible via readWithVisitor(). -class PageReader { +class PageReader : public parquet::PageReaderBase { public: PageReader( std::unique_ptr stream, @@ -69,6 +70,10 @@ class PageReader { /// Advances 'numRows' top level rows. void skip(int64_t numRows); + PageReaderType getType() { + return PageReaderType::COMMON; + } + /// Decodes repdefs for 'numTopLevelRows'. Use getLengthsAndNulls() /// to access the lengths and nulls for the different nesting /// levels. diff --git a/velox/dwio/parquet/reader/PageReaderBase.h b/velox/dwio/parquet/reader/PageReaderBase.h new file mode 100644 index 0000000000000..7534dca843e8b --- /dev/null +++ b/velox/dwio/parquet/reader/PageReaderBase.h @@ -0,0 +1,81 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "velox/dwio/common/BitConcatenation.h" +#include "velox/dwio/common/DirectDecoder.h" +#include "velox/dwio/common/SelectiveColumnReader.h" +#include "velox/dwio/parquet/reader/BooleanDecoder.h" +#include "velox/dwio/parquet/reader/ParquetTypeWithId.h" +#include "velox/dwio/parquet/reader/RleBpDataDecoder.h" +#include "velox/dwio/parquet/reader/StringDecoder.h" +#include "velox/vector/BaseVector.h" + +namespace facebook::velox::parquet { +enum PageReaderType { COMMON = 0, IAA = 1 }; +/** + * Abstract page reader class. + * + * Reader object is used to process a page. + * + */ +class PageReaderBase { + public: + virtual PageReaderType getType() = 0; + virtual ~PageReaderBase() = default; + + /** + * Skips the define decoder, if any, for 'numValues' top level + * rows. Returns the number of non-nulls skipped. The range is the + * current page. + * @return the rows numbers skiped + */ + virtual int32_t skipNulls(int32_t numRows) = 0; + virtual void skipNullsOnly(int64_t numValues) = 0; + + /** + * Advances 'numRows' top level rows. + * @param numRows + */ + virtual void skip(int64_t numRows) = 0; + + /* Applies 'visitor' to values in the ColumnChunk of 'this'. The + * operation to perform and The operand rows are given by + * 'visitor'. The rows are relative to the current position. The + * current position after readWithVisitor is immediately after the + * last accessed row. + */ + // template + // virtual readWithVisitor(Visitor& visitor) = 0; + + virtual void clearDictionary() = 0; + + /* True if the current page holds dictionary indices. + */ + virtual bool isDictionary() const = 0; + + /* Reads 'numValues' null flags into 'nulls' and advances the + * decoders by as much. The read may span several pages. If there + * are no nulls, buffer may be set to nullptr. + */ + virtual void readNullsOnly(int64_t numValues, BufferPtr& buffer) = 0; + ; + + virtual const VectorPtr& dictionaryValues(const TypePtr& type) = 0; +}; +} // namespace facebook::velox::parquet \ No newline at end of file diff --git a/velox/dwio/parquet/reader/ParquetData.cpp b/velox/dwio/parquet/reader/ParquetData.cpp index 8a11c73a0da7a..d6fa90b46f6e6 100644 --- a/velox/dwio/parquet/reader/ParquetData.cpp +++ b/velox/dwio/parquet/reader/ParquetData.cpp @@ -82,6 +82,34 @@ bool ParquetData::rowGroupMatches( return true; } +bool ParquetData::preDecompRowGroup(uint32_t index) { +#ifdef VELOX_ENABLE_QPL + if (!dwio::common::QplJobHWPool::GetInstance().job_ready() || + !needPreDecomp) { + LOG(WARNING) << "QPL Job not ready or zlib window size(" << needPreDecomp + << ") is not 4KB"; + return false; + } + + auto& chunk = rowGroups_[index].columns[type_->column]; + auto& metaData = chunk.meta_data; + if (metaData.codec == thrift::CompressionCodec::GZIP) { + bool isWinSizeFit; + pageReaders_.resize(rowGroups_.size()); + auto iaaPageReader = std::make_unique( + std::move(streams_[index]), + pool_, + type_, + metaData.codec, + metaData.total_compressed_size); + iaaPageReader->preDecompressPage(needPreDecomp); + pageReaders_[index] = std::move(iaaPageReader); + } + return needPreDecomp; +#endif + return false; +} + void ParquetData::enqueueRowGroup( uint32_t index, dwio::common::BufferedInput& input) { @@ -112,8 +140,16 @@ void ParquetData::enqueueRowGroup( dwio::common::PositionProvider ParquetData::seekToRowGroup(uint32_t index) { static std::vector empty; VELOX_CHECK_LT(index, streams_.size()); - VELOX_CHECK(streams_[index], "Stream not enqueued for column"); auto& metadata = rowGroups_[index].columns[type_->column].meta_data; +#ifdef VELOX_ENABLE_QPL + if (metadata.codec == thrift::CompressionCodec::GZIP && + dwio::common::QplJobHWPool::GetInstance().job_ready() && + pageReaders_.size() > index && pageReaders_[index] != nullptr) { + reader_ = std::move(pageReaders_[index]); + return dwio::common::PositionProvider(empty); + } +#endif + VELOX_CHECK(streams_[index], "Stream not enqueued for column"); reader_ = std::make_unique( std::move(streams_[index]), pool_, diff --git a/velox/dwio/parquet/reader/ParquetData.h b/velox/dwio/parquet/reader/ParquetData.h index cf400165da180..f801cea08c377 100644 --- a/velox/dwio/parquet/reader/ParquetData.h +++ b/velox/dwio/parquet/reader/ParquetData.h @@ -21,6 +21,7 @@ #include "velox/dwio/common/BufferUtil.h" #include "velox/dwio/common/BufferedInput.h" #include "velox/dwio/common/ScanSpec.h" +#include "velox/dwio/parquet/reader/IAAPageReader.h" #include "velox/dwio/parquet/reader/PageReader.h" #include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" #include "velox/dwio/parquet/thrift/ThriftTransport.h" @@ -55,6 +56,9 @@ class ParquetData : public dwio::common::FormatData { /// Prepares to read data for 'index'th row group. void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input); + /// pre-decompress data for the 'index'th row group + bool preDecompRowGroup(uint32_t index); + /// Positions 'this' at 'index'th row group. loadRowGroup must be called /// first. The returned PositionProvider is empty and should not be used. /// Other formats may use it. @@ -67,7 +71,7 @@ class ParquetData : public dwio::common::FormatData { FilterRowGroupsResult&) override; PageReader* FOLLY_NONNULL reader() const { - return reader_.get(); + return dynamic_cast(reader_.get()); } // Reads null flags for 'numValues' next top level rows. The first 'numValues' @@ -153,7 +157,13 @@ class ParquetData : public dwio::common::FormatData { /// PageReader::readWithVisitor(). template void readWithVisitor(Visitor visitor) { - reader_->readWithVisitor(visitor); +#ifdef VELOX_ENABLE_QPL + if (reader_->getType() == PageReaderType::IAA) { + return dynamic_cast(reader_.get()) + ->readWithVisitor(visitor); + } +#endif + dynamic_cast(reader_.get())->readWithVisitor(visitor); } const VectorPtr& dictionaryValues(const TypePtr& type) { @@ -191,8 +201,11 @@ class ParquetData : public dwio::common::FormatData { const uint32_t maxDefine_; const uint32_t maxRepeat_; int64_t rowsInRowGroup_; - std::unique_ptr reader_; - + std::unique_ptr reader_; +#ifdef VELOX_ENABLE_QPL + std::vector> pageReaders_; + bool needPreDecomp = true; +#endif // Nulls derived from leaf repdefs for non-leaf readers. BufferPtr presetNulls_; diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 495befba42587..fdfd6cd1fa60c 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -117,6 +117,7 @@ class ReaderBase { std::shared_ptr schemaWithId_; const bool binaryAsString = false; + bool needPreDecomp = true; // Map from row group index to pre-created loading BufferedInput. std::unordered_map> @@ -137,6 +138,7 @@ ReaderBase::ReaderBase( loadFileMetaData(); initializeSchema(); + needPreDecomp = true; } void ReaderBase::loadFileMetaData() { @@ -575,12 +577,18 @@ void ReaderBase::scheduleRowGroups( auto input = inputs_[thisGroup].get(); if (!input) { inputs_[thisGroup] = reader.loadRowGroup(thisGroup, input_); + if (needPreDecomp) { + needPreDecomp = reader.preDecompRowGroup(thisGroup); + } } for (auto counter = 0; counter < FLAGS_parquet_prefetch_rowgroups; ++counter) { if (nextGroup) { if (inputs_.count(nextGroup) != 0) { inputs_[nextGroup] = reader.loadRowGroup(thisGroup, input_); + if (needPreDecomp) { + needPreDecomp = reader.preDecompRowGroup(thisGroup); + } } } else { break; diff --git a/velox/dwio/parquet/reader/RepeatedColumnReader.h b/velox/dwio/parquet/reader/RepeatedColumnReader.h index 6fc9afaaddab2..076cb76280252 100644 --- a/velox/dwio/parquet/reader/RepeatedColumnReader.h +++ b/velox/dwio/parquet/reader/RepeatedColumnReader.h @@ -126,6 +126,9 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader { void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input); + bool preDecompRowGroup(uint32_t index) { + return true; + } void read( vector_size_t offset, RowSet rows, diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index cec542c487caf..ee49208cc693f 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -129,6 +129,28 @@ void StructColumnReader::enqueueRowGroup( } } +bool StructColumnReader::preDecompRowGroup(uint32_t index) { +#ifdef VELOX_ENABLE_QPL + for (auto& child : children_) { + if (!needPreDecomp) { + return false; + } + if (auto structChild = dynamic_cast(child)) { + continue; + } else if (auto listChild = dynamic_cast(child)) { + continue; + } else if (auto mapChild = dynamic_cast(child)) { + continue; + } else { + needPreDecomp = + child->formatData().as().preDecompRowGroup(index); + } + } + return needPreDecomp; +#endif + return false; +} + void StructColumnReader::seekToRowGroup(uint32_t index) { SelectiveStructColumnReader::seekToRowGroup(index); BufferPtr noBuffer; diff --git a/velox/dwio/parquet/reader/StructColumnReader.h b/velox/dwio/parquet/reader/StructColumnReader.h index 2713c4218f789..2e3c215a3f6db 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.h +++ b/velox/dwio/parquet/reader/StructColumnReader.h @@ -40,6 +40,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader { uint32_t index, const std::shared_ptr& input); + bool preDecompRowGroup(uint32_t index); + // No-op in Parquet. All readers switch row groups at the same time, there is // no on-demand skipping to a new row group. void advanceFieldReader( @@ -84,6 +86,7 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader { // The level information for extracting nulls for 'this' from the // repdefs in a leaf PageReader. ::parquet::internal::LevelInfo levelInfo_; + bool needPreDecomp = true; }; } // namespace facebook::velox::parquet