From c042e46f4ad8414de507acddbe824f3ec48d6448 Mon Sep 17 00:00:00 2001 From: yaqi-zhao Date: Wed, 25 Oct 2023 16:14:28 +0800 Subject: [PATCH] map/struct/list --- velox/dwio/common/compression/Compression.cpp | 3 ++- velox/dwio/parquet/reader/PageReader.cpp | 6 ++--- velox/dwio/parquet/reader/PageReader.h | 4 ++-- .../parquet/reader/RepeatedColumnReader.cpp | 23 +++++++++++++++++++ .../parquet/reader/RepeatedColumnReader.h | 5 ++-- .../parquet/reader/StructColumnReader.cpp | 6 ++--- 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/velox/dwio/common/compression/Compression.cpp b/velox/dwio/common/compression/Compression.cpp index 1f07331c5d6be..bedf463d04dac 100644 --- a/velox/dwio/common/compression/Compression.cpp +++ b/velox/dwio/common/compression/Compression.cpp @@ -684,7 +684,8 @@ createAsyncDecompressor( case CompressionKind::CompressionKind_GZIP: return std::make_unique(bufferSize, streamDebugInfo); default: - LOG(WARNING) << "Asynchronous mode not support for compression codec " << kind; + LOG(WARNING) << "Asynchronous mode not support for compression codec " + << kind; return nullptr; } return nullptr; diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 617d6d3da6be4..f695655fbd749 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -1177,7 +1177,7 @@ void PageReader::prefetchDictionary(const thrift::PageHeader& pageHeader) { return; } -const bool FOLLY_NONNULL PageReader::iaaDecompressGzip( +const bool PageReader::iaaDecompressGzip( const char* pageData, uint32_t compressedSize, uint32_t uncompressedSize, @@ -1218,7 +1218,7 @@ const bool FOLLY_NONNULL PageReader::iaaDecompressGzip( return true; } -const bool FOLLY_NONNULL PageReader::getDecompRes(int job_id) { +const bool PageReader::getDecompRes(int job_id) { auto streamDebugInfo = fmt::format("Page Reader: Stream {}", inputStream_->getName()); std::unique_ptr decompressor = @@ -1228,7 +1228,7 @@ const bool FOLLY_NONNULL PageReader::getDecompRes(int job_id) { } PageReader::~PageReader() { - if (data_qpl_job_id >= 0 || dict_qpl_job_id >= 0) { + if (data_qpl_job_id > 0 || dict_qpl_job_id > 0) { auto streamDebugInfo = fmt::format("Page Reader: Stream {}", inputStream_->getName()); std::unique_ptr decompressor = diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index 2aeb876e692db..ef96188e307f4 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -129,10 +129,10 @@ class PageReader { void prefetchDataPageV1(const thrift::PageHeader& pageHeader); void prefetchDataPageV2(const thrift::PageHeader& pageHeader); void prefetchDictionary(const thrift::PageHeader& pageHeader); - const bool FOLLY_NONNULL getDecompRes(int job_id); + const bool getDecompRes(int job_id); void prefetchNextPage(); bool seekToPreDecompPage(int64_t row); - const bool FOLLY_NONNULL iaaDecompressGzip( + const bool iaaDecompressGzip( const char* FOLLY_NONNULL pageData, uint32_t compressedSize, uint32_t uncompressedSize, diff --git a/velox/dwio/parquet/reader/RepeatedColumnReader.cpp b/velox/dwio/parquet/reader/RepeatedColumnReader.cpp index 6b6657b057453..73a6d878c237c 100644 --- a/velox/dwio/parquet/reader/RepeatedColumnReader.cpp +++ b/velox/dwio/parquet/reader/RepeatedColumnReader.cpp @@ -81,6 +81,21 @@ void skipUnreadLengthsAndNulls(dwio::common::SelectiveColumnReader& reader) { } } +void preDecompChild( + dwio::common::SelectiveColumnReader* reader, + uint32_t index) { +#ifdef VELOX_ENABLE_QPL + auto children = reader->children(); + if (children.empty()) { + reader->formatData().as().preDecompRowGroup(index); + return; + } + for (auto* child : children) { + preDecompChild(child, index); + } +#endif +} + void enqueueChildren( dwio::common::SelectiveColumnReader* reader, uint32_t index, @@ -135,6 +150,10 @@ void MapColumnReader::enqueueRowGroup( enqueueChildren(this, index, input); } +void MapColumnReader::preDecompRowGroup(uint32_t index) { + preDecompChild(this, index); +} + void MapColumnReader::seekToRowGroup(uint32_t index) { SelectiveMapColumnReader::seekToRowGroup(index); readOffset_ = 0; @@ -241,6 +260,10 @@ void ListColumnReader::enqueueRowGroup( enqueueChildren(this, index, input); } +void ListColumnReader::preDecompRowGroup(uint32_t index) { + preDecompChild(this, index); +} + void ListColumnReader::seekToRowGroup(uint32_t index) { SelectiveListColumnReader::seekToRowGroup(index); readOffset_ = 0; diff --git a/velox/dwio/parquet/reader/RepeatedColumnReader.h b/velox/dwio/parquet/reader/RepeatedColumnReader.h index cf51b8175beac..ef8799e74af36 100644 --- a/velox/dwio/parquet/reader/RepeatedColumnReader.h +++ b/velox/dwio/parquet/reader/RepeatedColumnReader.h @@ -71,6 +71,7 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader { void seekToRowGroup(uint32_t index) override; void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input); + void preDecompRowGroup(uint32_t index); void read( vector_size_t offset, @@ -128,9 +129,7 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader { void enqueueRowGroup(uint32_t index, dwio::common::BufferedInput& input); - bool preDecompRowGroup(uint32_t index) { - return true; - } + void preDecompRowGroup(uint32_t index); void read( vector_size_t offset, RowSet rows, diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index 7bc038f49998c..291b895abc9ee 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -139,11 +139,11 @@ bool StructColumnReader::preDecompRowGroup(uint32_t index) { return false; } if (auto structChild = dynamic_cast(child)) { - continue; + structChild->preDecompRowGroup(index); } else if (auto listChild = dynamic_cast(child)) { - continue; + listChild->preDecompRowGroup(index); } else if (auto mapChild = dynamic_cast(child)) { - continue; + mapChild->preDecompRowGroup(index); } else { needPreDecomp = child->formatData().as().preDecompRowGroup(index);