From 19fd5c5ceeae4b84837ab2aeae0fec78e9887791 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Fri, 13 Dec 2024 12:10:06 -0800 Subject: [PATCH] fix: Optimize DWRF footer IO read count and size in Hive connector (#11798) Summary: Presto splitting policy sometimes over-splits files with large stripes (e.g. those with large flatmap columns), resulting in quite a lot splits which actually do not contain any stripes. For those essentially empty splits, we still need to read the file footer in order to compare the split boundary with stripe boundaries. However we can skip the stripe metadata cache in this case. We also reduce the number of tiny reads while reading file footer, so most footers can be read in 1 read IO instead of 3. This combination of optimizations gives up to 2.5 times execution time reduction for some queries. I also tried caching the parsed footer in file handles; however that does not work well, since Presto seems sending splits from same file to different workers and the cache hit rate remains quite low. Reviewed By: xiaoxmeng, oerling Differential Revision: D66943503 --- velox/connectors/hive/HiveConfig.cpp | 2 +- velox/dwio/common/BufferedInput.h | 2 +- velox/dwio/common/CacheInputStream.cpp | 2 +- velox/dwio/common/CacheInputStream.h | 7 +- velox/dwio/common/DirectInputStream.cpp | 2 +- velox/dwio/common/DirectInputStream.h | 2 +- velox/dwio/common/SeekableInputStream.cpp | 4 +- velox/dwio/common/SeekableInputStream.h | 6 +- velox/dwio/common/Statistics.h | 37 ++++- .../common/compression/PagedInputStream.h | 2 +- velox/dwio/dwrf/reader/DwrfReader.cpp | 11 +- velox/dwio/dwrf/reader/DwrfReader.h | 4 +- velox/dwio/dwrf/reader/ReaderBase.cpp | 141 +++++++++++++----- velox/dwio/dwrf/reader/ReaderBase.h | 31 ++-- velox/dwio/dwrf/test/ReaderTest.cpp | 6 +- velox/dwio/dwrf/test/TestDecompression.cpp | 2 +- velox/dwio/dwrf/test/WriterTest.cpp | 4 +- velox/exec/tests/PrintPlanWithStatsTest.cpp | 18 +-- velox/exec/tests/TableScanTest.cpp | 27 +++- 19 files changed, 212 insertions(+), 98 deletions(-) diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index ba30a0c19e75..1f99a98a3805 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -266,7 +266,7 @@ uint64_t HiveConfig::sortWriterFinishTimeSliceLimitMs( } uint64_t HiveConfig::footerEstimatedSize() const { - return config_->get(kFooterEstimatedSize, 1UL << 20); + return config_->get(kFooterEstimatedSize, 256UL << 10); } uint64_t HiveConfig::filePreloadThreshold() const { diff --git a/velox/dwio/common/BufferedInput.h b/velox/dwio/common/BufferedInput.h index ac969fdee2cc..2a1f1eba9826 100644 --- a/velox/dwio/common/BufferedInput.h +++ b/velox/dwio/common/BufferedInput.h @@ -91,7 +91,7 @@ class BufferedInput { virtual std::unique_ptr read(uint64_t offset, uint64_t length, LogType logType) const { - std::unique_ptr ret = readBuffer(offset, length); + auto ret = readBuffer(offset, length); if (ret != nullptr) { return ret; } diff --git a/velox/dwio/common/CacheInputStream.cpp b/velox/dwio/common/CacheInputStream.cpp index ca05a178e3b6..dedda0c72f6a 100644 --- a/velox/dwio/common/CacheInputStream.cpp +++ b/velox/dwio/common/CacheInputStream.cpp @@ -160,7 +160,7 @@ std::string CacheInputStream::getName() const { return result; } -size_t CacheInputStream::positionSize() { +size_t CacheInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/CacheInputStream.h b/velox/dwio/common/CacheInputStream.h index 54650d9ebc43..195bdbdd1351 100644 --- a/velox/dwio/common/CacheInputStream.h +++ b/velox/dwio/common/CacheInputStream.h @@ -43,13 +43,18 @@ class CacheInputStream : public SeekableInputStream { ~CacheInputStream() override; + CacheInputStream& operator=(const CacheInputStream&) = delete; + CacheInputStream(const CacheInputStream&) = delete; + CacheInputStream& operator=(CacheInputStream&&) = delete; + CacheInputStream(CacheInputStream&&) = delete; + bool Next(const void** data, int* size) override; void BackUp(int count) override; bool SkipInt64(int64_t count) override; google::protobuf::int64 ByteCount() const override; void seekToPosition(PositionProvider& position) override; std::string getName() const override; - size_t positionSize() override; + size_t positionSize() const override; /// Returns a copy of 'this', ranging over the same bytes. The clone is /// initially positioned at the position of 'this' and can be moved diff --git a/velox/dwio/common/DirectInputStream.cpp b/velox/dwio/common/DirectInputStream.cpp index 3d8c8a13f636..68173b40f259 100644 --- a/velox/dwio/common/DirectInputStream.cpp +++ b/velox/dwio/common/DirectInputStream.cpp @@ -105,7 +105,7 @@ std::string DirectInputStream::getName() const { "DirectInputStream {} of {}", offsetInRegion_, region_.length); } -size_t DirectInputStream::positionSize() { +size_t DirectInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/DirectInputStream.h b/velox/dwio/common/DirectInputStream.h index 3715da666682..3d75b4459568 100644 --- a/velox/dwio/common/DirectInputStream.h +++ b/velox/dwio/common/DirectInputStream.h @@ -48,7 +48,7 @@ class DirectInputStream : public SeekableInputStream { void seekToPosition(PositionProvider& position) override; std::string getName() const override; - size_t positionSize() override; + size_t positionSize() const override; /// Testing function to access loaded state. void testingData( diff --git a/velox/dwio/common/SeekableInputStream.cpp b/velox/dwio/common/SeekableInputStream.cpp index 7773445dea5d..be2fa90f4104 100644 --- a/velox/dwio/common/SeekableInputStream.cpp +++ b/velox/dwio/common/SeekableInputStream.cpp @@ -176,7 +176,7 @@ std::string SeekableArrayInputStream::getName() const { "SeekableArrayInputStream ", position_, " of ", length_); } -size_t SeekableArrayInputStream::positionSize() { +size_t SeekableArrayInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } @@ -257,7 +257,7 @@ std::string SeekableFileInputStream::getName() const { input_->getName(), " from ", start_, " for ", length_); } -size_t SeekableFileInputStream::positionSize() { +size_t SeekableFileInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/SeekableInputStream.h b/velox/dwio/common/SeekableInputStream.h index a91fbb379732..71dbdc23c3e9 100644 --- a/velox/dwio/common/SeekableInputStream.h +++ b/velox/dwio/common/SeekableInputStream.h @@ -40,7 +40,7 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream { // Returns the number of position values this input stream uses to identify an // ORC/DWRF stream address. - virtual size_t positionSize() = 0; + virtual size_t positionSize() const = 0; virtual bool SkipInt64(int64_t count) = 0; @@ -82,7 +82,7 @@ class SeekableArrayInputStream : public SeekableInputStream { virtual google::protobuf::int64 ByteCount() const override; virtual void seekToPosition(PositionProvider& position) override; virtual std::string getName() const override; - virtual size_t positionSize() override; + virtual size_t positionSize() const override; /// Return the total number of bytes returned from Next() calls. Intended to /// be used for test validation. @@ -123,7 +123,7 @@ class SeekableFileInputStream : public SeekableInputStream { virtual google::protobuf::int64 ByteCount() const override; virtual void seekToPosition(PositionProvider& position) override; virtual std::string getName() const override; - virtual size_t positionSize() override; + virtual size_t positionSize() const override; private: const std::shared_ptr input_; diff --git a/velox/dwio/common/Statistics.h b/velox/dwio/common/Statistics.h index db5a91b06413..699066c1df4d 100644 --- a/velox/dwio/common/Statistics.h +++ b/velox/dwio/common/Statistics.h @@ -535,16 +535,39 @@ struct RuntimeStatistics { // Number of strides (row groups) skipped based on statistics. int64_t skippedStrides{0}; + int64_t footerBufferOverread{0}; + + int64_t numStripes{0}; + ColumnReaderStatistics columnReaderStatistics; std::unordered_map toMap() { - return { - {"skippedSplits", RuntimeCounter(skippedSplits)}, - {"skippedSplitBytes", - RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)}, - {"skippedStrides", RuntimeCounter(skippedStrides)}, - {"flattenStringDictionaryValues", - RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)}}; + std::unordered_map result; + if (skippedSplits > 0) { + result.emplace("skippedSplits", RuntimeCounter(skippedSplits)); + } + if (skippedSplitBytes > 0) { + result.emplace( + "skippedSplitBytes", + RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)); + } + if (skippedStrides > 0) { + result.emplace("skippedStrides", RuntimeCounter(skippedStrides)); + } + if (footerBufferOverread > 0) { + result.emplace( + "footerBufferOverread", + RuntimeCounter(footerBufferOverread, RuntimeCounter::Unit::kBytes)); + } + if (numStripes > 0) { + result.emplace("numStripes", RuntimeCounter(numStripes)); + } + if (columnReaderStatistics.flattenStringDictionaryValues > 0) { + result.emplace( + "flattenStringDictionaryValues", + RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)); + } + return result; } }; diff --git a/velox/dwio/common/compression/PagedInputStream.h b/velox/dwio/common/compression/PagedInputStream.h index b7ecd99fd4c9..15b1acd630a0 100644 --- a/velox/dwio/common/compression/PagedInputStream.h +++ b/velox/dwio/common/compression/PagedInputStream.h @@ -74,7 +74,7 @@ class PagedInputStream : public dwio::common::SeekableInputStream { ")"); } - size_t positionSize() override { + size_t positionSize() const override { // not compressed, so need 2 positions (compressed position + uncompressed // position) return 2; diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index ed9dede68142..42fd560a5830 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -301,6 +301,9 @@ DwrfRowReader::DwrfRowReader( } unitLoader_ = getUnitLoader(); + if (!emptyFile()) { + getReader().loadCache(); + } } std::unique_ptr& DwrfRowReader::getColumnReader() { @@ -339,7 +342,7 @@ std::unique_ptr DwrfRowReader::getUnitLoader() { uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { // Empty file - if (isEmptyFile()) { + if (emptyFile()) { return 0; } nextRowNumber_.reset(); @@ -422,7 +425,7 @@ uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { } uint64_t DwrfRowReader::skipRows(uint64_t numberOfRowsToSkip) { - if (isEmptyFile()) { + if (emptyFile()) { VLOG(1) << "Empty file, nothing to skip"; return 0; } @@ -586,7 +589,7 @@ uint64_t DwrfRowReader::rowNumber() { if (nextRow != kAtEnd) { return nextRow; } - if (isEmptyFile()) { + if (emptyFile()) { return 0; } return getReader().footer().numberOfRows(); @@ -615,7 +618,7 @@ uint64_t DwrfRowReader::next( const dwio::common::Mutation* mutation) { const auto nextRow = nextRowNumber(); if (nextRow == kAtEnd) { - if (!isEmptyFile()) { + if (!emptyFile()) { previousRow_ = firstRowOfStripe_[stripeCeiling_ - 1] + getReader().footer().stripes(stripeCeiling_ - 1).numberOfRows(); } else { diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index a18b6982fa59..39e6f2a534ab 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -108,6 +108,8 @@ class DwrfRowReader : public StrideIndexProvider, void updateRuntimeStats( dwio::common::RuntimeStatistics& stats) const override { stats.skippedStrides += skippedStrides_; + stats.footerBufferOverread += getReader().footerBufferOverread(); + stats.numStripes += stripeCeiling_ - firstStripe_; stats.columnReaderStatistics.flattenStringDictionaryValues += columnReaderStatistics_.flattenStringDictionaryValues; } @@ -152,7 +154,7 @@ class DwrfRowReader : public StrideIndexProvider, const dwio::common::Statistics& stats, uint32_t nodeId) const; - bool isEmptyFile() const { + bool emptyFile() const { return stripeCeiling_ == firstStripe_; } diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index b81c208276e0..71dbc2e41510 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -82,56 +82,76 @@ ReaderBase::ReaderBase( FileFormat fileFormat) : ReaderBase(createReaderOptions(pool, fileFormat), std::move(input)) {} +namespace { + +template +std::unique_ptr parsePostScript(const char* input, int size) { + auto impl = std::make_unique(); + VELOX_CHECK(impl->ParseFromArray(input, size)); + return std::make_unique(std::move(impl)); +} + +template +std::unique_ptr parseFooter( + dwio::common::SeekableInputStream* input, + google::protobuf::Arena* arena) { + auto* impl = google::protobuf::Arena::CreateMessage(arena); + VELOX_CHECK(impl->ParseFromZeroCopyStream(input)); + return std::make_unique(impl); +} + +} // namespace + ReaderBase::ReaderBase( const dwio::common::ReaderOptions& options, std::unique_ptr input) - : arena_(std::make_unique()), - options_{options}, + : options_{options}, input_(std::move(input)), - fileLength_(input_->getReadFile()->size()) { + fileLength_(input_->getReadFile()->size()), + arena_(std::make_unique()) { process::TraceContext trace("ReaderBase::ReaderBase"); // TODO: make a config DWIO_ENSURE(fileLength_ > 0, "ORC file is empty"); VELOX_CHECK_GE(fileLength_, 4, "File size too small"); const auto preloadFile = fileLength_ <= options_.filePreloadThreshold(); - const uint64_t readSize = preloadFile - ? fileLength_ - : std::min(fileLength_, options_.footerEstimatedSize()); + const int64_t footerBufSize = + std::min(fileLength_, options_.footerEstimatedSize()); + const uint64_t readSize = preloadFile ? fileLength_ : footerBufSize; if (input_->supportSyncLoad()) { input_->enqueue({fileLength_ - readSize, readSize, "footer"}); input_->load(preloadFile ? LogType::FILE : LogType::FOOTER); } // TODO: read footer from spectrum - { - const void* buf; - int32_t ignored; - auto lastByteStream = input_->read(fileLength_ - 1, 1, LogType::FOOTER); - const bool ret = lastByteStream->Next(&buf, &ignored); - VELOX_CHECK(ret, "Failed to read"); - // Make sure 'lastByteStream' is live while dereferencing 'buf'. - psLength_ = *static_cast(buf) & 0xff; - } + auto footerBuffer = + AlignedBuffer::allocate(footerBufSize, &options_.memoryPool()); + auto* rawFooterBuffer = footerBuffer->asMutable(); + input_->read(fileLength_ - footerBufSize, footerBufSize, LogType::FOOTER) + ->readFully(rawFooterBuffer, footerBufSize); + int32_t footerOffset = footerBufSize - 1; + psLength_ = static_cast(rawFooterBuffer[footerOffset]); VELOX_CHECK_LE( psLength_ + 4, // 1 byte for post script len, 3 byte "ORC" header. fileLength_, "Corrupted file, Post script size is invalid"); + VELOX_CHECK_GE(footerOffset, psLength_); + footerOffset -= psLength_; if (fileFormat() == FileFormat::DWRF) { - auto postScript = ProtoUtils::readProto( - input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); - postScript_ = std::make_unique(std::move(postScript)); + postScript_ = parsePostScript( + rawFooterBuffer + footerOffset, psLength_); } else { - auto postScript = ProtoUtils::readProto( - input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); - postScript_ = std::make_unique(std::move(postScript)); + postScript_ = parsePostScript( + rawFooterBuffer + footerOffset, psLength_); } const uint64_t footerSize = postScript_->footerLength(); const uint64_t cacheSize = postScript_->hasCacheSize() ? postScript_->cacheSize() : 0; const uint64_t tailSize = 1 + psLength_ + footerSize + cacheSize; + footerBufferOverread_ = + std::max(0, footerBufSize - static_cast(tailSize)); // There are cases in warehouse, where RC/text files are stored // in ORC partition. This causes the Reader to SIGSEGV. The following @@ -154,29 +174,57 @@ ReaderBase::ReaderBase( input_->load(LogType::FOOTER); } - auto footerStream = input_->read( - fileLength_ - psLength_ - footerSize - 1, footerSize, LogType::FOOTER); + BufferPtr fullFooterBuffer; + char* footerStart; + if (footerOffset >= footerSize) { + footerOffset -= footerSize; + footerStart = rawFooterBuffer + footerOffset; + } else { + fullFooterBuffer = + AlignedBuffer::allocate(footerSize, &options_.memoryPool()); + footerStart = fullFooterBuffer->asMutable(); + auto remainingBytes = footerSize - footerOffset; + input_ + ->read( + fileLength_ - footerSize - psLength_ - 1, + remainingBytes, + LogType::FOOTER) + ->readFully(footerStart, remainingBytes); + ::memcpy(footerStart + remainingBytes, rawFooterBuffer, footerOffset); + footerOffset = 0; + } + auto decompressed = createDecompressedStream( + std::make_unique( + footerStart, footerSize), + "File Footer"); if (fileFormat() == FileFormat::DWRF) { - auto footer = - google::protobuf::Arena::CreateMessage(arena_.get()); - ProtoUtils::readProtoInto( - createDecompressedStream(std::move(footerStream), "File Footer"), - footer); - footer_ = std::make_unique(footer); + footer_ = parseFooter(decompressed.get(), arena_.get()); } else { - auto footer = google::protobuf::Arena::CreateMessage( - arena_.get()); - ProtoUtils::readProtoInto( - createDecompressedStream(std::move(footerStream), "File Footer"), - footer); - footer_ = std::make_unique(footer); + footer_ = parseFooter(decompressed.get(), arena_.get()); } + stripeMetadataCacheBuffer_ = footerBuffer; + stripeMetadataCacheBufferSize_ = footerOffset; + schema_ = std::dynamic_pointer_cast( convertType(*footer_, 0, options_.fileColumnNamesReadAsLowerCase())); VELOX_CHECK_NOT_NULL(schema_, "invalid schema"); - // load stripe index/footer cache + // initialize file decrypter + handler_ = + DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); +} + +void ReaderBase::loadCache() { + if (!stripeMetadataCacheBuffer_) { + // NOTE: we only expect call this once as stripeMetadataCacheBuffer_ is + // reset on the first call. + return; + } + const uint64_t footerSize = postScript_->footerLength(); + const uint64_t cacheSize = + postScript_->hasCacheSize() ? postScript_->cacheSize() : 0; + const uint64_t tailSize = 1 + psLength_ + footerSize + cacheSize; if (cacheSize > 0) { VELOX_CHECK_EQ(format(), DwrfFormat::kDwrf); const uint64_t cacheOffset = fileLength_ - tailSize; @@ -189,8 +237,20 @@ ReaderBase::ReaderBase( } else { auto cacheBuffer = std::make_shared>( options_.memoryPool(), cacheSize); - input_->read(cacheOffset, cacheSize, LogType::FOOTER) - ->readFully(cacheBuffer->data(), cacheSize); + auto* target = cacheBuffer->data(); + auto* source = stripeMetadataCacheBuffer_->as(); + auto copySize = cacheSize; + if (cacheSize > stripeMetadataCacheBufferSize_) { + auto remainingBytes = cacheSize - stripeMetadataCacheBufferSize_; + auto stream = + input_->read(cacheOffset, remainingBytes, LogType::FOOTER); + stream->readFully(target, remainingBytes); + target += remainingBytes; + copySize -= remainingBytes; + } else { + source += stripeMetadataCacheBufferSize_ - cacheSize; + } + ::memcpy(target, source, copySize); cache_ = std::make_unique( postScript_->cacheMode(), *footer_, std::move(cacheBuffer)); } @@ -208,9 +268,8 @@ ReaderBase::ReaderBase( input_->load(LogType::FOOTER); } } - // initialize file decrypter - handler_ = - DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); + // Release the memory as we no longer need it. + stripeMetadataCacheBuffer_.reset(); } std::vector ReaderBase::rowsPerStripe() const { diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index 11f9070610e7..d43c660dc1ab 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -80,13 +80,13 @@ class ReaderBase { const proto::Footer* footer, std::unique_ptr cache, std::unique_ptr handler = nullptr) - : postScript_{std::move(ps)}, - footer_{std::make_unique(footer)}, - cache_{std::move(cache)}, - handler_{std::move(handler)}, - options_{dwio::common::ReaderOptions(&pool)}, + : options_{dwio::common::ReaderOptions(&pool)}, input_{std::move(input)}, fileLength_{0}, + postScript_{std::move(ps)}, + footer_{std::make_unique(footer)}, + handler_{std::move(handler)}, + cache_{std::move(cache)}, schema_{ std::dynamic_pointer_cast(convertType(*footer_))}, psLength_{0} { @@ -150,6 +150,8 @@ class ReaderBase { return *input_; } + void loadCache(); + const std::unique_ptr& metadataCache() const { return cache_; } @@ -246,6 +248,10 @@ class ReaderBase { return options_.randomSkip(); } + int footerBufferOverread() const { + return footerBufferOverread_; + } + private: static std::shared_ptr convertType( const FooterWrapper& footer, @@ -260,16 +266,19 @@ class ReaderBase { return options; } - std::unique_ptr arena_; - std::unique_ptr postScript_; - std::unique_ptr footer_ = nullptr; - std::unique_ptr cache_; - std::unique_ptr handler_; - const dwio::common::ReaderOptions options_; const std::unique_ptr input_; const uint64_t fileLength_; + BufferPtr stripeMetadataCacheBuffer_; + int32_t stripeMetadataCacheBufferSize_; + int32_t footerBufferOverread_; + std::unique_ptr arena_; + std::unique_ptr postScript_; + std::unique_ptr footer_; + std::unique_ptr handler_; + std::unique_ptr cache_; + RowTypePtr schema_; // Lazily populated mutable std::shared_ptr schemaWithId_; diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index fa2ee0259aa2..ad26b7954e6c 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -1392,7 +1392,7 @@ TEST_F(TestReader, fileColumnNamesReadAsLowerCaseComplexStruct) { TEST_F(TestReader, TestStripeSizeCallback) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< @@ -1420,7 +1420,7 @@ TEST_F(TestReader, TestStripeSizeCallback) { TEST_F(TestReader, TestStripeSizeCallbackLimitsOneStripe) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< @@ -1449,7 +1449,7 @@ TEST_F(TestReader, TestStripeSizeCallbackLimitsOneStripe) { TEST_F(TestReader, TestStripeSizeCallbackLimitsTwoStripe) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< diff --git a/velox/dwio/dwrf/test/TestDecompression.cpp b/velox/dwio/dwrf/test/TestDecompression.cpp index fcba33d80cfd..37b0166558ff 100644 --- a/velox/dwio/dwrf/test/TestDecompression.cpp +++ b/velox/dwio/dwrf/test/TestDecompression.cpp @@ -1054,7 +1054,7 @@ class TestingSeekableInputStream : public SeekableInputStream { return "testing"; } - size_t positionSize() override { + size_t positionSize() const override { return 1; } diff --git a/velox/dwio/dwrf/test/WriterTest.cpp b/velox/dwio/dwrf/test/WriterTest.cpp index 1570f2988865..13d81ff9cb41 100644 --- a/velox/dwio/dwrf/test/WriterTest.cpp +++ b/velox/dwio/dwrf/test/WriterTest.cpp @@ -62,7 +62,9 @@ class WriterTest : public Test { auto readFile = std::make_shared(std::move(data)); auto input = std::make_unique(std::move(readFile), *pool_); dwio::common::ReaderOptions readerOpts{pool_.get()}; - return std::make_unique(readerOpts, std::move(input)); + auto reader = std::make_unique(readerOpts, std::move(input)); + reader->loadCache(); + return reader; } auto& getContext() { diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index 440f3b47f072..eb9f9bff740c 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -190,12 +190,13 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dynamicFiltersAccepted[ ]* sum: 1, count: 1, min: 1, max: 1"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" numRamRead [ ]* sum: 60, count: 1, min: 60, max: 60"}, {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" prefetchBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" preloadedSplits[ ]+sum: .+, count: .+, min: .+, max: .+", @@ -206,9 +207,6 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes [ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}, @@ -283,12 +281,13 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" Input: 10000 rows \\(.+\\), Output: 10000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}, {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" numRamRead [ ]* sum: 7, count: 1, min: 7, max: 7"}, {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" prefetchBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, @@ -300,9 +299,6 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}}); @@ -355,12 +351,13 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {R"( Input: 100 rows \(.+\), Output: 100 rows \(.+\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+))"}, {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" numRamRead [ ]* sum: 7, count: 1, min: 7, max: 7"}, {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" prefetchBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, @@ -372,9 +369,6 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}}); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 035d1ed42d4d..10ea2cd0c454 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -2388,8 +2388,8 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(31'234, stats.rawInputRows); EXPECT_EQ(31'234, stats.inputRows); EXPECT_EQ(31'234, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedStrides"), 0); task = assertQuery("c0 IS NULL"); @@ -2398,7 +2398,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(0, stats.inputRows); EXPECT_EQ(0, stats.outputRows); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 1); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedStrides"), 0); // c1 IS NULL - first stride should be skipped based on stats task = assertQuery("c1 IS NULL"); @@ -2407,7 +2407,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(size - 10'000, stats.rawInputRows); EXPECT_EQ(size - 11'111, stats.inputRows); EXPECT_EQ(size - 11'111, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 1); // c1 IS NOT NULL - 3rd and 4th strides should be skipped based on stats @@ -2417,7 +2417,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(20'000, stats.rawInputRows); EXPECT_EQ(11'111, stats.inputRows); EXPECT_EQ(11'111, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 2); } @@ -5419,3 +5419,20 @@ TEST_F(TableScanTest, rowId) { AssertQueryBuilder(plan).split(split).assertResults(expected); } } + +TEST_F(TableScanTest, footerIOCount) { + // We should issue only 1 IO for a split range that does not contain any + // stripe. + auto vector = makeRowVector({makeFlatVector(10, folly::identity)}); + auto file = TempFilePath::create(); + writeToFile(file->getPath(), {vector}); + auto plan = PlanBuilder().tableScan(asRowType(vector->type())).planNode(); + auto task = + AssertQueryBuilder(plan) + .split(makeHiveConnectorSplit(file->getPath(), 10'000, 10'000)) + .assertResults( + BaseVector::create(vector->type(), 0, pool())); + auto stats = getTableScanRuntimeStats(task); + ASSERT_EQ(stats.at("numStorageRead").sum, 1); + ASSERT_GT(stats.at("footerBufferOverread").sum, 0); +}