From ced346c17d9f8af8cff16b23f3fc0121e48f942e Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Fri, 13 Dec 2024 07:39:34 -0800 Subject: [PATCH] fix: Optimize DWRF footer IO read count and size in Hive connector (#11798) Summary: Presto splitting policy sometimes over-splits files with large stripes (e.g. those with large flatmap columns), resulting in quite a lot splits which actually do not contain any stripes. For those essentially empty splits, we still need to read the file footer in order to compare the split boundary with stripe boundaries. However we can skip the stripe metadata cache in this case. We also reduce the number of tiny reads while reading file footer, so most footers can be read in 1 read IO instead of 3. This combination of optimizations gives up to 2.5 times execution time reduction for some queries. I also tried caching the parsed footer in file handles; however that does not work well, since Presto seems sending splits from same file to different workers and the cache hit rate remains quite low. Reviewed By: oerling Differential Revision: D66943503 --- velox/connectors/hive/HiveConfig.cpp | 2 +- velox/dwio/common/BufferedInput.h | 2 +- velox/dwio/common/CacheInputStream.cpp | 2 +- velox/dwio/common/CacheInputStream.h | 7 +- velox/dwio/common/DirectInputStream.cpp | 2 +- velox/dwio/common/DirectInputStream.h | 2 +- velox/dwio/common/SeekableInputStream.cpp | 4 +- velox/dwio/common/SeekableInputStream.h | 6 +- velox/dwio/common/Statistics.h | 37 ++++- .../common/compression/PagedInputStream.h | 2 +- velox/dwio/dwrf/reader/DwrfReader.cpp | 11 +- velox/dwio/dwrf/reader/DwrfReader.h | 4 +- velox/dwio/dwrf/reader/ReaderBase.cpp | 143 ++++++++++++------ velox/dwio/dwrf/reader/ReaderBase.h | 31 ++-- velox/dwio/dwrf/test/ReaderTest.cpp | 6 +- velox/dwio/dwrf/test/TestDecompression.cpp | 2 +- velox/dwio/dwrf/test/WriterTest.cpp | 4 +- velox/exec/tests/PrintPlanWithStatsTest.cpp | 18 +-- velox/exec/tests/TableScanTest.cpp | 27 +++- 19 files changed, 212 insertions(+), 100 deletions(-) diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index ba30a0c19e758..1f99a98a38058 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -266,7 +266,7 @@ uint64_t HiveConfig::sortWriterFinishTimeSliceLimitMs( } uint64_t HiveConfig::footerEstimatedSize() const { - return config_->get(kFooterEstimatedSize, 1UL << 20); + return config_->get(kFooterEstimatedSize, 256UL << 10); } uint64_t HiveConfig::filePreloadThreshold() const { diff --git a/velox/dwio/common/BufferedInput.h b/velox/dwio/common/BufferedInput.h index ac969fdee2ccf..2a1f1eba9826e 100644 --- a/velox/dwio/common/BufferedInput.h +++ b/velox/dwio/common/BufferedInput.h @@ -91,7 +91,7 @@ class BufferedInput { virtual std::unique_ptr read(uint64_t offset, uint64_t length, LogType logType) const { - std::unique_ptr ret = readBuffer(offset, length); + auto ret = readBuffer(offset, length); if (ret != nullptr) { return ret; } diff --git a/velox/dwio/common/CacheInputStream.cpp b/velox/dwio/common/CacheInputStream.cpp index ca05a178e3b63..dedda0c72f6a7 100644 --- a/velox/dwio/common/CacheInputStream.cpp +++ b/velox/dwio/common/CacheInputStream.cpp @@ -160,7 +160,7 @@ std::string CacheInputStream::getName() const { return result; } -size_t CacheInputStream::positionSize() { +size_t CacheInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/CacheInputStream.h b/velox/dwio/common/CacheInputStream.h index 54650d9ebc435..195bdbdd1351f 100644 --- a/velox/dwio/common/CacheInputStream.h +++ b/velox/dwio/common/CacheInputStream.h @@ -43,13 +43,18 @@ class CacheInputStream : public SeekableInputStream { ~CacheInputStream() override; + CacheInputStream& operator=(const CacheInputStream&) = delete; + CacheInputStream(const CacheInputStream&) = delete; + CacheInputStream& operator=(CacheInputStream&&) = delete; + CacheInputStream(CacheInputStream&&) = delete; + bool Next(const void** data, int* size) override; void BackUp(int count) override; bool SkipInt64(int64_t count) override; google::protobuf::int64 ByteCount() const override; void seekToPosition(PositionProvider& position) override; std::string getName() const override; - size_t positionSize() override; + size_t positionSize() const override; /// Returns a copy of 'this', ranging over the same bytes. The clone is /// initially positioned at the position of 'this' and can be moved diff --git a/velox/dwio/common/DirectInputStream.cpp b/velox/dwio/common/DirectInputStream.cpp index 3d8c8a13f636c..68173b40f259f 100644 --- a/velox/dwio/common/DirectInputStream.cpp +++ b/velox/dwio/common/DirectInputStream.cpp @@ -105,7 +105,7 @@ std::string DirectInputStream::getName() const { "DirectInputStream {} of {}", offsetInRegion_, region_.length); } -size_t DirectInputStream::positionSize() { +size_t DirectInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/DirectInputStream.h b/velox/dwio/common/DirectInputStream.h index 3715da6666822..3d75b44595682 100644 --- a/velox/dwio/common/DirectInputStream.h +++ b/velox/dwio/common/DirectInputStream.h @@ -48,7 +48,7 @@ class DirectInputStream : public SeekableInputStream { void seekToPosition(PositionProvider& position) override; std::string getName() const override; - size_t positionSize() override; + size_t positionSize() const override; /// Testing function to access loaded state. void testingData( diff --git a/velox/dwio/common/SeekableInputStream.cpp b/velox/dwio/common/SeekableInputStream.cpp index 7773445dea5d3..be2fa90f41047 100644 --- a/velox/dwio/common/SeekableInputStream.cpp +++ b/velox/dwio/common/SeekableInputStream.cpp @@ -176,7 +176,7 @@ std::string SeekableArrayInputStream::getName() const { "SeekableArrayInputStream ", position_, " of ", length_); } -size_t SeekableArrayInputStream::positionSize() { +size_t SeekableArrayInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } @@ -257,7 +257,7 @@ std::string SeekableFileInputStream::getName() const { input_->getName(), " from ", start_, " for ", length_); } -size_t SeekableFileInputStream::positionSize() { +size_t SeekableFileInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/SeekableInputStream.h b/velox/dwio/common/SeekableInputStream.h index a91fbb379732d..71dbdc23c3e97 100644 --- a/velox/dwio/common/SeekableInputStream.h +++ b/velox/dwio/common/SeekableInputStream.h @@ -40,7 +40,7 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream { // Returns the number of position values this input stream uses to identify an // ORC/DWRF stream address. - virtual size_t positionSize() = 0; + virtual size_t positionSize() const = 0; virtual bool SkipInt64(int64_t count) = 0; @@ -82,7 +82,7 @@ class SeekableArrayInputStream : public SeekableInputStream { virtual google::protobuf::int64 ByteCount() const override; virtual void seekToPosition(PositionProvider& position) override; virtual std::string getName() const override; - virtual size_t positionSize() override; + virtual size_t positionSize() const override; /// Return the total number of bytes returned from Next() calls. Intended to /// be used for test validation. @@ -123,7 +123,7 @@ class SeekableFileInputStream : public SeekableInputStream { virtual google::protobuf::int64 ByteCount() const override; virtual void seekToPosition(PositionProvider& position) override; virtual std::string getName() const override; - virtual size_t positionSize() override; + virtual size_t positionSize() const override; private: const std::shared_ptr input_; diff --git a/velox/dwio/common/Statistics.h b/velox/dwio/common/Statistics.h index db5a91b064135..699066c1df4dc 100644 --- a/velox/dwio/common/Statistics.h +++ b/velox/dwio/common/Statistics.h @@ -535,16 +535,39 @@ struct RuntimeStatistics { // Number of strides (row groups) skipped based on statistics. int64_t skippedStrides{0}; + int64_t footerBufferOverread{0}; + + int64_t numStripes{0}; + ColumnReaderStatistics columnReaderStatistics; std::unordered_map toMap() { - return { - {"skippedSplits", RuntimeCounter(skippedSplits)}, - {"skippedSplitBytes", - RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)}, - {"skippedStrides", RuntimeCounter(skippedStrides)}, - {"flattenStringDictionaryValues", - RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)}}; + std::unordered_map result; + if (skippedSplits > 0) { + result.emplace("skippedSplits", RuntimeCounter(skippedSplits)); + } + if (skippedSplitBytes > 0) { + result.emplace( + "skippedSplitBytes", + RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)); + } + if (skippedStrides > 0) { + result.emplace("skippedStrides", RuntimeCounter(skippedStrides)); + } + if (footerBufferOverread > 0) { + result.emplace( + "footerBufferOverread", + RuntimeCounter(footerBufferOverread, RuntimeCounter::Unit::kBytes)); + } + if (numStripes > 0) { + result.emplace("numStripes", RuntimeCounter(numStripes)); + } + if (columnReaderStatistics.flattenStringDictionaryValues > 0) { + result.emplace( + "flattenStringDictionaryValues", + RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)); + } + return result; } }; diff --git a/velox/dwio/common/compression/PagedInputStream.h b/velox/dwio/common/compression/PagedInputStream.h index b7ecd99fd4c94..15b1acd630a0f 100644 --- a/velox/dwio/common/compression/PagedInputStream.h +++ b/velox/dwio/common/compression/PagedInputStream.h @@ -74,7 +74,7 @@ class PagedInputStream : public dwio::common::SeekableInputStream { ")"); } - size_t positionSize() override { + size_t positionSize() const override { // not compressed, so need 2 positions (compressed position + uncompressed // position) return 2; diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index ed9dede68142a..42fd560a5830b 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -301,6 +301,9 @@ DwrfRowReader::DwrfRowReader( } unitLoader_ = getUnitLoader(); + if (!emptyFile()) { + getReader().loadCache(); + } } std::unique_ptr& DwrfRowReader::getColumnReader() { @@ -339,7 +342,7 @@ std::unique_ptr DwrfRowReader::getUnitLoader() { uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { // Empty file - if (isEmptyFile()) { + if (emptyFile()) { return 0; } nextRowNumber_.reset(); @@ -422,7 +425,7 @@ uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { } uint64_t DwrfRowReader::skipRows(uint64_t numberOfRowsToSkip) { - if (isEmptyFile()) { + if (emptyFile()) { VLOG(1) << "Empty file, nothing to skip"; return 0; } @@ -586,7 +589,7 @@ uint64_t DwrfRowReader::rowNumber() { if (nextRow != kAtEnd) { return nextRow; } - if (isEmptyFile()) { + if (emptyFile()) { return 0; } return getReader().footer().numberOfRows(); @@ -615,7 +618,7 @@ uint64_t DwrfRowReader::next( const dwio::common::Mutation* mutation) { const auto nextRow = nextRowNumber(); if (nextRow == kAtEnd) { - if (!isEmptyFile()) { + if (!emptyFile()) { previousRow_ = firstRowOfStripe_[stripeCeiling_ - 1] + getReader().footer().stripes(stripeCeiling_ - 1).numberOfRows(); } else { diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index a18b6982fa591..39e6f2a534ab3 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -108,6 +108,8 @@ class DwrfRowReader : public StrideIndexProvider, void updateRuntimeStats( dwio::common::RuntimeStatistics& stats) const override { stats.skippedStrides += skippedStrides_; + stats.footerBufferOverread += getReader().footerBufferOverread(); + stats.numStripes += stripeCeiling_ - firstStripe_; stats.columnReaderStatistics.flattenStringDictionaryValues += columnReaderStatistics_.flattenStringDictionaryValues; } @@ -152,7 +154,7 @@ class DwrfRowReader : public StrideIndexProvider, const dwio::common::Statistics& stats, uint32_t nodeId) const; - bool isEmptyFile() const { + bool emptyFile() const { return stripeCeiling_ == firstStripe_; } diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index b81c208276e0a..b0eec7dc1c32d 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -82,56 +82,76 @@ ReaderBase::ReaderBase( FileFormat fileFormat) : ReaderBase(createReaderOptions(pool, fileFormat), std::move(input)) {} +namespace { + +template +std::unique_ptr parsePostScript(const char* input, int size) { + auto impl = std::make_unique(); + VELOX_CHECK(impl->ParseFromArray(input, size)); + return std::make_unique(std::move(impl)); +} + +template +std::unique_ptr parseFooter( + dwio::common::SeekableInputStream* input, + google::protobuf::Arena* arena) { + auto* impl = google::protobuf::Arena::CreateMessage(arena); + VELOX_CHECK(impl->ParseFromZeroCopyStream(input)); + return std::make_unique(impl); +} + +} // namespace + ReaderBase::ReaderBase( const dwio::common::ReaderOptions& options, std::unique_ptr input) - : arena_(std::make_unique()), - options_{options}, + : options_{options}, input_(std::move(input)), - fileLength_(input_->getReadFile()->size()) { + fileLength_(input_->getReadFile()->size()), + arena_(std::make_unique()) { process::TraceContext trace("ReaderBase::ReaderBase"); // TODO: make a config DWIO_ENSURE(fileLength_ > 0, "ORC file is empty"); VELOX_CHECK_GE(fileLength_, 4, "File size too small"); const auto preloadFile = fileLength_ <= options_.filePreloadThreshold(); - const uint64_t readSize = preloadFile - ? fileLength_ - : std::min(fileLength_, options_.footerEstimatedSize()); + const int64_t footerBufSize = + std::min(fileLength_, options_.footerEstimatedSize()); + const uint64_t readSize = preloadFile ? fileLength_ : footerBufSize; if (input_->supportSyncLoad()) { input_->enqueue({fileLength_ - readSize, readSize, "footer"}); input_->load(preloadFile ? LogType::FILE : LogType::FOOTER); } // TODO: read footer from spectrum - { - const void* buf; - int32_t ignored; - auto lastByteStream = input_->read(fileLength_ - 1, 1, LogType::FOOTER); - const bool ret = lastByteStream->Next(&buf, &ignored); - VELOX_CHECK(ret, "Failed to read"); - // Make sure 'lastByteStream' is live while dereferencing 'buf'. - psLength_ = *static_cast(buf) & 0xff; - } + auto footerBuffer = + AlignedBuffer::allocate(footerBufSize, &options_.memoryPool()); + auto* rawFooterBuffer = footerBuffer->asMutable(); + input_->read(fileLength_ - footerBufSize, footerBufSize, LogType::FOOTER) + ->readFully(rawFooterBuffer, footerBufSize); + int32_t footerOffset = footerBufSize - 1; + psLength_ = static_cast(rawFooterBuffer[footerOffset]); VELOX_CHECK_LE( psLength_ + 4, // 1 byte for post script len, 3 byte "ORC" header. fileLength_, "Corrupted file, Post script size is invalid"); + VELOX_CHECK_GE(footerOffset, psLength_); + footerOffset -= psLength_; if (fileFormat() == FileFormat::DWRF) { - auto postScript = ProtoUtils::readProto( - input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); - postScript_ = std::make_unique(std::move(postScript)); + postScript_ = parsePostScript( + rawFooterBuffer + footerOffset, psLength_); } else { - auto postScript = ProtoUtils::readProto( - input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); - postScript_ = std::make_unique(std::move(postScript)); + postScript_ = parsePostScript( + rawFooterBuffer + footerOffset, psLength_); } const uint64_t footerSize = postScript_->footerLength(); const uint64_t cacheSize = postScript_->hasCacheSize() ? postScript_->cacheSize() : 0; const uint64_t tailSize = 1 + psLength_ + footerSize + cacheSize; + footerBufferOverread_ = + std::max(0, footerBufSize - static_cast(tailSize)); // There are cases in warehouse, where RC/text files are stored // in ORC partition. This causes the Reader to SIGSEGV. The following @@ -154,29 +174,55 @@ ReaderBase::ReaderBase( input_->load(LogType::FOOTER); } - auto footerStream = input_->read( - fileLength_ - psLength_ - footerSize - 1, footerSize, LogType::FOOTER); + BufferPtr remainingFooterBuffer; + char* footerStart; + if (footerOffset >= footerSize) { + footerOffset -= footerSize; + footerStart = rawFooterBuffer + footerOffset; + } else { + remainingFooterBuffer = + AlignedBuffer::allocate(footerSize, &options_.memoryPool()); + footerStart = remainingFooterBuffer->asMutable(); + auto remainingBytes = footerSize - footerOffset; + input_ + ->read( + fileLength_ - footerSize - psLength_ - 1, + remainingBytes, + LogType::FOOTER) + ->readFully(footerStart, remainingBytes); + ::memcpy(footerStart + remainingBytes, rawFooterBuffer, footerOffset); + footerOffset = 0; + } + auto decompressed = createDecompressedStream( + std::make_unique( + footerStart, footerSize), + "File Footer"); if (fileFormat() == FileFormat::DWRF) { - auto footer = - google::protobuf::Arena::CreateMessage(arena_.get()); - ProtoUtils::readProtoInto( - createDecompressedStream(std::move(footerStream), "File Footer"), - footer); - footer_ = std::make_unique(footer); + footer_ = parseFooter(decompressed.get(), arena_.get()); } else { - auto footer = google::protobuf::Arena::CreateMessage( - arena_.get()); - ProtoUtils::readProtoInto( - createDecompressedStream(std::move(footerStream), "File Footer"), - footer); - footer_ = std::make_unique(footer); + footer_ = parseFooter(decompressed.get(), arena_.get()); } + stripeMetadataCacheBuffer_ = footerBuffer; + stripeMetadataCacheBufferSize_ = footerOffset; + schema_ = std::dynamic_pointer_cast( convertType(*footer_, 0, options_.fileColumnNamesReadAsLowerCase())); VELOX_CHECK_NOT_NULL(schema_, "invalid schema"); - // load stripe index/footer cache + // initialize file decrypter + handler_ = + DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); +} + +void ReaderBase::loadCache() { + if (!stripeMetadataCacheBuffer_) { + return; + } + const uint64_t footerSize = postScript_->footerLength(); + const uint64_t cacheSize = + postScript_->hasCacheSize() ? postScript_->cacheSize() : 0; + const uint64_t tailSize = 1 + psLength_ + footerSize + cacheSize; if (cacheSize > 0) { VELOX_CHECK_EQ(format(), DwrfFormat::kDwrf); const uint64_t cacheOffset = fileLength_ - tailSize; @@ -187,12 +233,24 @@ ReaderBase::ReaderBase( input_->read(cacheOffset, cacheSize, LogType::FOOTER)); input_->load(LogType::FOOTER); } else { - auto cacheBuffer = std::make_shared>( + auto fullCacheBuffer = std::make_shared>( options_.memoryPool(), cacheSize); - input_->read(cacheOffset, cacheSize, LogType::FOOTER) - ->readFully(cacheBuffer->data(), cacheSize); + auto* target = fullCacheBuffer->data(); + auto* source = stripeMetadataCacheBuffer_->as(); + auto copySize = cacheSize; + if (cacheSize > stripeMetadataCacheBufferSize_) { + auto remainingBytes = cacheSize - stripeMetadataCacheBufferSize_; + auto stream = + input_->read(cacheOffset, remainingBytes, LogType::FOOTER); + stream->readFully(target, remainingBytes); + target += remainingBytes; + copySize -= remainingBytes; + } else { + source += stripeMetadataCacheBufferSize_ - cacheSize; + } + ::memcpy(target, source, copySize); cache_ = std::make_unique( - postScript_->cacheMode(), *footer_, std::move(cacheBuffer)); + postScript_->cacheMode(), *footer_, std::move(fullCacheBuffer)); } } if (!cache_ && input_->shouldPrefetchStripes()) { @@ -208,9 +266,8 @@ ReaderBase::ReaderBase( input_->load(LogType::FOOTER); } } - // initialize file decrypter - handler_ = - DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); + // Release the memory as we no longer need it. + stripeMetadataCacheBuffer_.reset(); } std::vector ReaderBase::rowsPerStripe() const { diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index 11f9070610e71..d43c660dc1ab4 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -80,13 +80,13 @@ class ReaderBase { const proto::Footer* footer, std::unique_ptr cache, std::unique_ptr handler = nullptr) - : postScript_{std::move(ps)}, - footer_{std::make_unique(footer)}, - cache_{std::move(cache)}, - handler_{std::move(handler)}, - options_{dwio::common::ReaderOptions(&pool)}, + : options_{dwio::common::ReaderOptions(&pool)}, input_{std::move(input)}, fileLength_{0}, + postScript_{std::move(ps)}, + footer_{std::make_unique(footer)}, + handler_{std::move(handler)}, + cache_{std::move(cache)}, schema_{ std::dynamic_pointer_cast(convertType(*footer_))}, psLength_{0} { @@ -150,6 +150,8 @@ class ReaderBase { return *input_; } + void loadCache(); + const std::unique_ptr& metadataCache() const { return cache_; } @@ -246,6 +248,10 @@ class ReaderBase { return options_.randomSkip(); } + int footerBufferOverread() const { + return footerBufferOverread_; + } + private: static std::shared_ptr convertType( const FooterWrapper& footer, @@ -260,16 +266,19 @@ class ReaderBase { return options; } - std::unique_ptr arena_; - std::unique_ptr postScript_; - std::unique_ptr footer_ = nullptr; - std::unique_ptr cache_; - std::unique_ptr handler_; - const dwio::common::ReaderOptions options_; const std::unique_ptr input_; const uint64_t fileLength_; + BufferPtr stripeMetadataCacheBuffer_; + int32_t stripeMetadataCacheBufferSize_; + int32_t footerBufferOverread_; + std::unique_ptr arena_; + std::unique_ptr postScript_; + std::unique_ptr footer_; + std::unique_ptr handler_; + std::unique_ptr cache_; + RowTypePtr schema_; // Lazily populated mutable std::shared_ptr schemaWithId_; diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index fa2ee0259aa2b..ad26b7954e6c9 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -1392,7 +1392,7 @@ TEST_F(TestReader, fileColumnNamesReadAsLowerCaseComplexStruct) { TEST_F(TestReader, TestStripeSizeCallback) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< @@ -1420,7 +1420,7 @@ TEST_F(TestReader, TestStripeSizeCallback) { TEST_F(TestReader, TestStripeSizeCallbackLimitsOneStripe) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< @@ -1449,7 +1449,7 @@ TEST_F(TestReader, TestStripeSizeCallbackLimitsOneStripe) { TEST_F(TestReader, TestStripeSizeCallbackLimitsTwoStripe) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< diff --git a/velox/dwio/dwrf/test/TestDecompression.cpp b/velox/dwio/dwrf/test/TestDecompression.cpp index fcba33d80cfdc..37b0166558ff9 100644 --- a/velox/dwio/dwrf/test/TestDecompression.cpp +++ b/velox/dwio/dwrf/test/TestDecompression.cpp @@ -1054,7 +1054,7 @@ class TestingSeekableInputStream : public SeekableInputStream { return "testing"; } - size_t positionSize() override { + size_t positionSize() const override { return 1; } diff --git a/velox/dwio/dwrf/test/WriterTest.cpp b/velox/dwio/dwrf/test/WriterTest.cpp index 1570f29888655..13d81ff9cb41b 100644 --- a/velox/dwio/dwrf/test/WriterTest.cpp +++ b/velox/dwio/dwrf/test/WriterTest.cpp @@ -62,7 +62,9 @@ class WriterTest : public Test { auto readFile = std::make_shared(std::move(data)); auto input = std::make_unique(std::move(readFile), *pool_); dwio::common::ReaderOptions readerOpts{pool_.get()}; - return std::make_unique(readerOpts, std::move(input)); + auto reader = std::make_unique(readerOpts, std::move(input)); + reader->loadCache(); + return reader; } auto& getContext() { diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index 440f3b47f0727..eb9f9bff740ce 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -190,12 +190,13 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dynamicFiltersAccepted[ ]* sum: 1, count: 1, min: 1, max: 1"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" numRamRead [ ]* sum: 60, count: 1, min: 60, max: 60"}, {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" prefetchBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" preloadedSplits[ ]+sum: .+, count: .+, min: .+, max: .+", @@ -206,9 +207,6 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes [ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}, @@ -283,12 +281,13 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" Input: 10000 rows \\(.+\\), Output: 10000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}, {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" numRamRead [ ]* sum: 7, count: 1, min: 7, max: 7"}, {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" prefetchBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, @@ -300,9 +299,6 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}}); @@ -355,12 +351,13 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {R"( Input: 100 rows \(.+\), Output: 100 rows \(.+\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+))"}, {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" numRamRead [ ]* sum: 7, count: 1, min: 7, max: 7"}, {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" prefetchBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, @@ -372,9 +369,6 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}}); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 035d1ed42d4d1..10ea2cd0c454b 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -2388,8 +2388,8 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(31'234, stats.rawInputRows); EXPECT_EQ(31'234, stats.inputRows); EXPECT_EQ(31'234, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedStrides"), 0); task = assertQuery("c0 IS NULL"); @@ -2398,7 +2398,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(0, stats.inputRows); EXPECT_EQ(0, stats.outputRows); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 1); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedStrides"), 0); // c1 IS NULL - first stride should be skipped based on stats task = assertQuery("c1 IS NULL"); @@ -2407,7 +2407,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(size - 10'000, stats.rawInputRows); EXPECT_EQ(size - 11'111, stats.inputRows); EXPECT_EQ(size - 11'111, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 1); // c1 IS NOT NULL - 3rd and 4th strides should be skipped based on stats @@ -2417,7 +2417,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(20'000, stats.rawInputRows); EXPECT_EQ(11'111, stats.inputRows); EXPECT_EQ(11'111, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 2); } @@ -5419,3 +5419,20 @@ TEST_F(TableScanTest, rowId) { AssertQueryBuilder(plan).split(split).assertResults(expected); } } + +TEST_F(TableScanTest, footerIOCount) { + // We should issue only 1 IO for a split range that does not contain any + // stripe. + auto vector = makeRowVector({makeFlatVector(10, folly::identity)}); + auto file = TempFilePath::create(); + writeToFile(file->getPath(), {vector}); + auto plan = PlanBuilder().tableScan(asRowType(vector->type())).planNode(); + auto task = + AssertQueryBuilder(plan) + .split(makeHiveConnectorSplit(file->getPath(), 10'000, 10'000)) + .assertResults( + BaseVector::create(vector->type(), 0, pool())); + auto stats = getTableScanRuntimeStats(task); + ASSERT_EQ(stats.at("numStorageRead").sum, 1); + ASSERT_GT(stats.at("footerBufferOverread").sum, 0); +}