From bdfdc8af382eecb1196c67ca828f21c64fbfeb33 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Mon, 9 Dec 2024 07:58:56 -0800 Subject: [PATCH] fix: Optimize DWRF footer IO read count and size in Hive connector Summary: Presto splitting policy sometimes over-splits files with large stripes (e.g. those with large flatmap columns), resulting in quite a lot splits which actually do not contain any stripes. For those essentially empty splits, we still need to read the file footer in order to compare the split boundary with stripe boundaries. However we can skip the stripe metadata cache in this case. We also reduce the number of tiny reads while reading file footer, so most footers can be read in 1 read IO instead of 3. This combination of optimizations gives up to 2.5 times execution time reduction for some queries. I also tried caching the parsed footer in file handles; however that does not work well, since Presto seems sending splits from same file to different workers and the cache hit rate remains quite low. Differential Revision: D66943503 --- velox/connectors/hive/HiveConfig.cpp | 2 +- velox/dwio/common/BufferedInput.cpp | 2 +- velox/dwio/common/BufferedInput.h | 6 +- velox/dwio/common/CacheInputStream.cpp | 2 +- velox/dwio/common/CacheInputStream.h | 13 +- velox/dwio/common/CachedBufferedInput.cpp | 2 +- velox/dwio/common/CachedBufferedInput.h | 2 +- velox/dwio/common/DirectBufferedInput.cpp | 2 +- velox/dwio/common/DirectBufferedInput.h | 2 +- velox/dwio/common/DirectInputStream.cpp | 2 +- velox/dwio/common/DirectInputStream.h | 8 +- velox/dwio/common/SeekableInputStream.cpp | 78 ++++++++++- velox/dwio/common/SeekableInputStream.h | 53 ++++++- velox/dwio/common/Statistics.h | 32 ++++- .../common/compression/PagedInputStream.h | 2 +- velox/dwio/common/tests/CMakeLists.txt | 1 + .../common/tests/SeekableInputStreamTest.cpp | 110 +++++++++++++++ velox/dwio/dwrf/reader/DwrfReader.cpp | 3 + velox/dwio/dwrf/reader/DwrfReader.h | 1 + velox/dwio/dwrf/reader/ReaderBase.cpp | 130 ++++++++++++------ velox/dwio/dwrf/reader/ReaderBase.h | 33 +++-- velox/dwio/dwrf/test/ReaderTest.cpp | 6 +- velox/dwio/dwrf/test/TestDecompression.cpp | 2 +- velox/dwio/dwrf/test/WriterTest.cpp | 4 +- velox/exec/tests/PrintPlanWithStatsTest.cpp | 15 +- velox/exec/tests/TableScanTest.cpp | 25 +++- 26 files changed, 435 insertions(+), 103 deletions(-) create mode 100644 velox/dwio/common/tests/SeekableInputStreamTest.cpp diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index ba30a0c19e758..1f99a98a38058 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -266,7 +266,7 @@ uint64_t HiveConfig::sortWriterFinishTimeSliceLimitMs( } uint64_t HiveConfig::footerEstimatedSize() const { - return config_->get(kFooterEstimatedSize, 1UL << 20); + return config_->get(kFooterEstimatedSize, 256UL << 10); } uint64_t HiveConfig::filePreloadThreshold() const { diff --git a/velox/dwio/common/BufferedInput.cpp b/velox/dwio/common/BufferedInput.cpp index 8791e418f49e1..47d58d582ee37 100644 --- a/velox/dwio/common/BufferedInput.cpp +++ b/velox/dwio/common/BufferedInput.cpp @@ -213,7 +213,7 @@ bool BufferedInput::tryMerge(Region& first, const Region& second) { return false; } -std::unique_ptr BufferedInput::readBuffer( +std::unique_ptr BufferedInput::readBuffer( uint64_t offset, uint64_t length) const { const auto result = readInternal(offset, length); diff --git a/velox/dwio/common/BufferedInput.h b/velox/dwio/common/BufferedInput.h index ac969fdee2ccf..c5bda5c75cf69 100644 --- a/velox/dwio/common/BufferedInput.h +++ b/velox/dwio/common/BufferedInput.h @@ -89,9 +89,9 @@ class BufferedInput { return !!readBuffer(offset, length); } - virtual std::unique_ptr + virtual std::unique_ptr read(uint64_t offset, uint64_t length, LogType logType) const { - std::unique_ptr ret = readBuffer(offset, length); + auto ret = readBuffer(offset, length); if (ret != nullptr) { return ret; } @@ -171,7 +171,7 @@ class BufferedInput { memory::MemoryPool* const pool_; private: - std::unique_ptr readBuffer( + std::unique_ptr readBuffer( uint64_t offset, uint64_t length) const; diff --git a/velox/dwio/common/CacheInputStream.cpp b/velox/dwio/common/CacheInputStream.cpp index ca05a178e3b63..dedda0c72f6a7 100644 --- a/velox/dwio/common/CacheInputStream.cpp +++ b/velox/dwio/common/CacheInputStream.cpp @@ -160,7 +160,7 @@ std::string CacheInputStream::getName() const { return result; } -size_t CacheInputStream::positionSize() { +size_t CacheInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/CacheInputStream.h b/velox/dwio/common/CacheInputStream.h index 54650d9ebc435..8df46325a7974 100644 --- a/velox/dwio/common/CacheInputStream.h +++ b/velox/dwio/common/CacheInputStream.h @@ -27,7 +27,7 @@ namespace facebook::velox::dwio::common { class CachedBufferedInput; -class CacheInputStream : public SeekableInputStream { +class CacheInputStream : public SeekableInputStreamWithKnownLength { public: CacheInputStream( CachedBufferedInput* cache, @@ -43,13 +43,22 @@ class CacheInputStream : public SeekableInputStream { ~CacheInputStream() override; + CacheInputStream& operator=(const CacheInputStream&) = delete; + CacheInputStream(const CacheInputStream&) = delete; + CacheInputStream& operator=(CacheInputStream&&) = delete; + CacheInputStream(CacheInputStream&&) = delete; + bool Next(const void** data, int* size) override; void BackUp(int count) override; bool SkipInt64(int64_t count) override; google::protobuf::int64 ByteCount() const override; void seekToPosition(PositionProvider& position) override; std::string getName() const override; - size_t positionSize() override; + size_t positionSize() const override; + + int64_t totalLength() const override { + return region_.length; + } /// Returns a copy of 'this', ranging over the same bytes. The clone is /// initially positioned at the position of 'this' and can be moved diff --git a/velox/dwio/common/CachedBufferedInput.cpp b/velox/dwio/common/CachedBufferedInput.cpp index 70e0b4f960f79..4cf94cc63115d 100644 --- a/velox/dwio/common/CachedBufferedInput.cpp +++ b/velox/dwio/common/CachedBufferedInput.cpp @@ -488,7 +488,7 @@ std::shared_ptr CachedBufferedInput::coalescedLoad( }); } -std::unique_ptr CachedBufferedInput::read( +std::unique_ptr CachedBufferedInput::read( uint64_t offset, uint64_t length, LogType /*logType*/) const { diff --git a/velox/dwio/common/CachedBufferedInput.h b/velox/dwio/common/CachedBufferedInput.h index 240ae8354bd8e..ea5401cfca35a 100644 --- a/velox/dwio/common/CachedBufferedInput.h +++ b/velox/dwio/common/CachedBufferedInput.h @@ -118,7 +118,7 @@ class CachedBufferedInput : public BufferedInput { bool isBuffered(uint64_t /*unused*/, uint64_t /*unused*/) const override; - std::unique_ptr + std::unique_ptr read(uint64_t offset, uint64_t length, LogType logType) const override; /// Schedules load of 'region' on 'executor_'. Fails silently if no memory or diff --git a/velox/dwio/common/DirectBufferedInput.cpp b/velox/dwio/common/DirectBufferedInput.cpp index d580a14250ad1..d273f0153a1be 100644 --- a/velox/dwio/common/DirectBufferedInput.cpp +++ b/velox/dwio/common/DirectBufferedInput.cpp @@ -206,7 +206,7 @@ std::shared_ptr DirectBufferedInput::coalescedLoad( }); } -std::unique_ptr DirectBufferedInput::read( +std::unique_ptr DirectBufferedInput::read( uint64_t offset, uint64_t length, LogType /*logType*/) const { diff --git a/velox/dwio/common/DirectBufferedInput.h b/velox/dwio/common/DirectBufferedInput.h index 734b3bfeb026f..2430c6e2ad6e4 100644 --- a/velox/dwio/common/DirectBufferedInput.h +++ b/velox/dwio/common/DirectBufferedInput.h @@ -174,7 +174,7 @@ class DirectBufferedInput : public BufferedInput { std::shared_ptr coalescedLoad( const SeekableInputStream* stream); - std::unique_ptr + std::unique_ptr read(uint64_t offset, uint64_t length, LogType logType) const override; folly::Executor* executor() const override { diff --git a/velox/dwio/common/DirectInputStream.cpp b/velox/dwio/common/DirectInputStream.cpp index 3d8c8a13f636c..68173b40f259f 100644 --- a/velox/dwio/common/DirectInputStream.cpp +++ b/velox/dwio/common/DirectInputStream.cpp @@ -105,7 +105,7 @@ std::string DirectInputStream::getName() const { "DirectInputStream {} of {}", offsetInRegion_, region_.length); } -size_t DirectInputStream::positionSize() { +size_t DirectInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/DirectInputStream.h b/velox/dwio/common/DirectInputStream.h index 3715da6666822..fc613160650db 100644 --- a/velox/dwio/common/DirectInputStream.h +++ b/velox/dwio/common/DirectInputStream.h @@ -28,7 +28,7 @@ class DirectBufferedInput; /// An input stream over possibly coalesced loads. Created by /// DirectBufferedInput. Similar to CacheInputStream but does not use cache. -class DirectInputStream : public SeekableInputStream { +class DirectInputStream : public SeekableInputStreamWithKnownLength { public: DirectInputStream( DirectBufferedInput* bufferedInput, @@ -48,7 +48,11 @@ class DirectInputStream : public SeekableInputStream { void seekToPosition(PositionProvider& position) override; std::string getName() const override; - size_t positionSize() override; + size_t positionSize() const override; + + int64_t totalLength() const override { + return region_.length; + } /// Testing function to access loaded state. void testingData( diff --git a/velox/dwio/common/SeekableInputStream.cpp b/velox/dwio/common/SeekableInputStream.cpp index 7773445dea5d3..4dcdcdd4d2798 100644 --- a/velox/dwio/common/SeekableInputStream.cpp +++ b/velox/dwio/common/SeekableInputStream.cpp @@ -176,7 +176,7 @@ std::string SeekableArrayInputStream::getName() const { "SeekableArrayInputStream ", position_, " of ", length_); } -size_t SeekableArrayInputStream::positionSize() { +size_t SeekableArrayInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } @@ -257,9 +257,83 @@ std::string SeekableFileInputStream::getName() const { input_->getName(), " from ", start_, " for ", length_); } -size_t SeekableFileInputStream::positionSize() { +size_t SeekableFileInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } +SeekableConcatInputStream::SeekableConcatInputStream( + std::vector> streams) + : streams_(std::move(streams)) { + VELOX_CHECK(!streams_.empty()); + length_ = 0; + for (auto& stream : streams_) { + VELOX_CHECK_EQ(stream->positionSize(), 1); + length_ += stream->totalLength(); + } +} + +bool SeekableConcatInputStream::Next(const void** data, int32_t* size) { + while (streamIndex_ < streams_.size()) { + if (streams_[streamIndex_]->Next(data, size)) { + position_ += *size; + return true; + } + ++streamIndex_; + } + *size = 0; + return false; +} + +void SeekableConcatInputStream::BackUp(int32_t count) { + VELOX_CHECK_GE(count, 0); + while (count > 0) { + VELOX_CHECK_GE(streamIndex_, 0); + if (streamIndex_ >= streams_.size()) { + --streamIndex_; + continue; + } + auto streamByteCount = streams_[streamIndex_]->ByteCount(); + if (streamByteCount == 0) { + --streamIndex_; + continue; + } + auto streamBackup = std::min(count, streamByteCount); + streams_[streamIndex_]->BackUp(streamBackup); + position_ -= streamBackup; + count -= streamBackup; + } +} + +bool SeekableConcatInputStream::SkipInt64(int64_t count) { + VELOX_CHECK_GE(count, 0); + while (count > 0) { + if (streamIndex_ >= streams_.size()) { + return false; + } + auto& stream = streams_[streamIndex_]; + auto streamRemaining = stream->totalLength() - stream->ByteCount(); + if (streamRemaining == 0) { + ++streamIndex_; + continue; + } + auto skip = std::min(count, streamRemaining); + VELOX_CHECK(stream->SkipInt64(skip)); + position_ += skip; + count -= skip; + } + return true; +} + +void SeekableConcatInputStream::seekToPosition(PositionProvider& /*position*/) { + // We only use this class to read metadata for now, so this method is not + // used. + VELOX_NYI(); +} + +std::string SeekableConcatInputStream::getName() const { + return fmt::format( + "SeekableConcatInputStream of {} streams", streams_.size()); +} + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/SeekableInputStream.h b/velox/dwio/common/SeekableInputStream.h index a91fbb379732d..a415edc905266 100644 --- a/velox/dwio/common/SeekableInputStream.h +++ b/velox/dwio/common/SeekableInputStream.h @@ -40,7 +40,7 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream { // Returns the number of position values this input stream uses to identify an // ORC/DWRF stream address. - virtual size_t positionSize() = 0; + virtual size_t positionSize() const = 0; virtual bool SkipInt64(int64_t count) = 0; @@ -51,10 +51,15 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream { void readFully(char* buffer, size_t bufferSize); }; +class SeekableInputStreamWithKnownLength : public SeekableInputStream { + public: + virtual int64_t totalLength() const = 0; +}; + /** * Create a seekable input stream based on a memory range. */ -class SeekableArrayInputStream : public SeekableInputStream { +class SeekableArrayInputStream : public SeekableInputStreamWithKnownLength { public: SeekableArrayInputStream( const unsigned char* list, @@ -82,7 +87,11 @@ class SeekableArrayInputStream : public SeekableInputStream { virtual google::protobuf::int64 ByteCount() const override; virtual void seekToPosition(PositionProvider& position) override; virtual std::string getName() const override; - virtual size_t positionSize() override; + virtual size_t positionSize() const override; + + int64_t totalLength() const override { + return length_; + } /// Return the total number of bytes returned from Next() calls. Intended to /// be used for test validation. @@ -106,7 +115,7 @@ class SeekableArrayInputStream : public SeekableInputStream { /** * Create a seekable input stream based on an io stream. */ -class SeekableFileInputStream : public SeekableInputStream { +class SeekableFileInputStream : public SeekableInputStreamWithKnownLength { public: SeekableFileInputStream( std::shared_ptr input, @@ -123,7 +132,11 @@ class SeekableFileInputStream : public SeekableInputStream { virtual google::protobuf::int64 ByteCount() const override; virtual void seekToPosition(PositionProvider& position) override; virtual std::string getName() const override; - virtual size_t positionSize() override; + virtual size_t positionSize() const override; + + int64_t totalLength() const override { + return length_; + } private: const std::shared_ptr input_; @@ -138,4 +151,34 @@ class SeekableFileInputStream : public SeekableInputStream { uint64_t pushback_; }; +/// A SeekableInputStream that is the concatenation of multiple streams. +class SeekableConcatInputStream : public SeekableInputStreamWithKnownLength { + public: + explicit SeekableConcatInputStream( + std::vector> streams); + bool Next(const void** data, int32_t* size) override; + void BackUp(int32_t count) override; + bool SkipInt64(int64_t count) override; + void seekToPosition(PositionProvider& position) override; + std::string getName() const override; + + size_t positionSize() const override { + return 1; + } + + google::protobuf::int64 ByteCount() const override { + return position_; + } + + int64_t totalLength() const override { + return length_; + } + + private: + std::vector> streams_; + int64_t length_; + int streamIndex_ = 0; + int64_t position_ = 0; +}; + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/Statistics.h b/velox/dwio/common/Statistics.h index db5a91b064135..a9f9179d38f01 100644 --- a/velox/dwio/common/Statistics.h +++ b/velox/dwio/common/Statistics.h @@ -535,16 +535,34 @@ struct RuntimeStatistics { // Number of strides (row groups) skipped based on statistics. int64_t skippedStrides{0}; + int64_t footerBufferOverread{0}; + ColumnReaderStatistics columnReaderStatistics; std::unordered_map toMap() { - return { - {"skippedSplits", RuntimeCounter(skippedSplits)}, - {"skippedSplitBytes", - RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)}, - {"skippedStrides", RuntimeCounter(skippedStrides)}, - {"flattenStringDictionaryValues", - RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)}}; + std::unordered_map result; + if (skippedSplits > 0) { + result.emplace("skippedSplits", RuntimeCounter(skippedSplits)); + } + if (skippedSplitBytes > 0) { + result.emplace( + "skippedSplitBytes", + RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)); + } + if (skippedStrides > 0) { + result.emplace("skippedStrides", RuntimeCounter(skippedStrides)); + } + if (footerBufferOverread > 0) { + result.emplace( + "footerBufferOverread", + RuntimeCounter(footerBufferOverread, RuntimeCounter::Unit::kBytes)); + } + if (columnReaderStatistics.flattenStringDictionaryValues > 0) { + result.emplace( + "flattenStringDictionaryValues", + RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)); + } + return result; } }; diff --git a/velox/dwio/common/compression/PagedInputStream.h b/velox/dwio/common/compression/PagedInputStream.h index b7ecd99fd4c94..15b1acd630a0f 100644 --- a/velox/dwio/common/compression/PagedInputStream.h +++ b/velox/dwio/common/compression/PagedInputStream.h @@ -74,7 +74,7 @@ class PagedInputStream : public dwio::common::SeekableInputStream { ")"); } - size_t positionSize() override { + size_t positionSize() const override { // not compressed, so need 2 positions (compressed position + uncompressed // position) return 2; diff --git a/velox/dwio/common/tests/CMakeLists.txt b/velox/dwio/common/tests/CMakeLists.txt index 6f3a36273c6db..0cda1d8be5654 100644 --- a/velox/dwio/common/tests/CMakeLists.txt +++ b/velox/dwio/common/tests/CMakeLists.txt @@ -51,6 +51,7 @@ add_executable( ReadFileInputStreamTests.cpp ReaderTest.cpp RetryTests.cpp + SeekableInputStreamTest.cpp TestBufferedInput.cpp ThrottlerTest.cpp TypeTests.cpp diff --git a/velox/dwio/common/tests/SeekableInputStreamTest.cpp b/velox/dwio/common/tests/SeekableInputStreamTest.cpp new file mode 100644 index 0000000000000..96da4294dedc2 --- /dev/null +++ b/velox/dwio/common/tests/SeekableInputStreamTest.cpp @@ -0,0 +1,110 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/SeekableInputStream.h" + +#include + +namespace facebook::velox::dwio::common { +namespace { + +SeekableConcatInputStream createExampleSeekableConcatInputStream( + std::string& expected) { + for (int i = 0; i < 100; ++i) { + expected += std::to_string(i); + } + std::vector> streams; + const char* data = expected.data(); + auto size = expected.size(); + streams.push_back( + std::make_unique(data, expected.size() / 2)); + data += streams.back()->totalLength(); + size -= streams.back()->totalLength(); + streams.push_back(std::make_unique(data, size)); + return SeekableConcatInputStream(std::move(streams)); +} + +TEST(SeekableConcatInputStreamTest, next) { + std::string expected; + auto stream = createExampleSeekableConcatInputStream(expected); + ASSERT_EQ(stream.totalLength(), expected.size()); + std::string result; + result.reserve(expected.size()); + const void* data; + int32_t size; + while (stream.Next(&data, &size)) { + ASSERT_GT(size, 0); + auto oldSize = result.size(); + result.resize(oldSize + size); + memcpy(result.data() + oldSize, data, size); + } + ASSERT_EQ(size, 0); + ASSERT_EQ(result, expected); +} + +TEST(SeekableConcatInputStreamTest, backUp) { + std::string expected; + auto stream = createExampleSeekableConcatInputStream(expected); + const auto chunkSize = expected.size() / 2; + const void* data; + int32_t size; + ASSERT_TRUE(stream.Next(&data, &size)); + ASSERT_EQ(size, chunkSize); + ASSERT_EQ( + std::string_view(static_cast(data), size), + std::string_view(expected.data(), size)); + ASSERT_TRUE(stream.Next(&data, &size)); + ASSERT_EQ(size, chunkSize); + ASSERT_EQ( + std::string_view(static_cast(data), size), + std::string_view(expected.data() + chunkSize, size)); + ASSERT_FALSE(stream.Next(&data, &size)); + stream.BackUp(1 + chunkSize); + ASSERT_TRUE(stream.Next(&data, &size)); + ASSERT_EQ(size, 1); + ASSERT_EQ(static_cast(data)[0], expected[chunkSize - 1]); + ASSERT_TRUE(stream.Next(&data, &size)); + ASSERT_EQ(size, chunkSize); + ASSERT_EQ( + std::string_view(static_cast(data), size), + std::string_view(expected.data() + chunkSize, size)); + ASSERT_FALSE(stream.Next(&data, &size)); +} + +TEST(SeekableConcatInputStreamTest, skip) { + std::string expected; + auto stream = createExampleSeekableConcatInputStream(expected); + const auto chunkSize = expected.size() / 2; + const void* data; + int32_t size; + ASSERT_TRUE(stream.SkipInt64(1)); + ASSERT_TRUE(stream.Next(&data, &size)); + ASSERT_EQ(size, chunkSize - 1); + ASSERT_EQ( + std::string_view(static_cast(data), size), + std::string_view(expected.data() + 1, size)); + stream.BackUp(1); + ASSERT_TRUE(stream.SkipInt64(2)); + ASSERT_TRUE(stream.Next(&data, &size)); + ASSERT_EQ(size, chunkSize - 1); + ASSERT_EQ( + std::string_view(static_cast(data), size), + std::string_view(expected.data() + chunkSize + 1, size)); + ASSERT_FALSE(stream.SkipInt64(1)); +} + +} // namespace +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index ed9dede68142a..ff52cca42599a 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -301,6 +301,9 @@ DwrfRowReader::DwrfRowReader( } unitLoader_ = getUnitLoader(); + if (!isEmptyFile()) { + getReader().loadCache(); + } } std::unique_ptr& DwrfRowReader::getColumnReader() { diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index a18b6982fa591..268b007de4b15 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -108,6 +108,7 @@ class DwrfRowReader : public StrideIndexProvider, void updateRuntimeStats( dwio::common::RuntimeStatistics& stats) const override { stats.skippedStrides += skippedStrides_; + stats.footerBufferOverread += getReader().footerBufferOverread(); stats.columnReaderStatistics.flattenStringDictionaryValues += columnReaderStatistics_.flattenStringDictionaryValues; } diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index b81c208276e0a..4e80345d2cbbf 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -82,56 +82,74 @@ ReaderBase::ReaderBase( FileFormat fileFormat) : ReaderBase(createReaderOptions(pool, fileFormat), std::move(input)) {} +namespace { + +template +std::unique_ptr parsePostScript(const char* input, int size) { + auto impl = std::make_unique(); + VELOX_CHECK(impl->ParseFromArray(input, size)); + return std::make_unique(std::move(impl)); +} + +} // namespace + +template +std::unique_ptr ReaderBase::parseFooter( + std::unique_ptr input) { + auto* impl = google::protobuf::Arena::CreateMessage(arena_.get()); + auto decompressed = createDecompressedStream(std::move(input), "File Footer"); + VELOX_CHECK(impl->ParseFromZeroCopyStream(decompressed.get())); + return std::make_unique(impl); +} + ReaderBase::ReaderBase( const dwio::common::ReaderOptions& options, std::unique_ptr input) - : arena_(std::make_unique()), - options_{options}, + : options_{options}, input_(std::move(input)), - fileLength_(input_->getReadFile()->size()) { + fileLength_(input_->getReadFile()->size()), + arena_(std::make_unique()) { process::TraceContext trace("ReaderBase::ReaderBase"); // TODO: make a config DWIO_ENSURE(fileLength_ > 0, "ORC file is empty"); VELOX_CHECK_GE(fileLength_, 4, "File size too small"); const auto preloadFile = fileLength_ <= options_.filePreloadThreshold(); - const uint64_t readSize = preloadFile - ? fileLength_ - : std::min(fileLength_, options_.footerEstimatedSize()); + const int64_t bufSize = std::min(fileLength_, options_.footerEstimatedSize()); + const uint64_t readSize = preloadFile ? fileLength_ : bufSize; if (input_->supportSyncLoad()) { input_->enqueue({fileLength_ - readSize, readSize, "footer"}); input_->load(preloadFile ? LogType::FILE : LogType::FOOTER); } // TODO: read footer from spectrum - { - const void* buf; - int32_t ignored; - auto lastByteStream = input_->read(fileLength_ - 1, 1, LogType::FOOTER); - const bool ret = lastByteStream->Next(&buf, &ignored); - VELOX_CHECK(ret, "Failed to read"); - // Make sure 'lastByteStream' is live while dereferencing 'buf'. - psLength_ = *static_cast(buf) & 0xff; - } + footerBuffer_ = + AlignedBuffer::allocate(bufSize, &options_.memoryPool()); + auto* buf = footerBuffer_->asMutable(); + auto stream = input_->read(fileLength_ - bufSize, bufSize, LogType::FOOTER); + stream->readFully(buf, bufSize); + int offset = bufSize - 1; + psLength_ = static_cast(buf[offset]); VELOX_CHECK_LE( psLength_ + 4, // 1 byte for post script len, 3 byte "ORC" header. fileLength_, "Corrupted file, Post script size is invalid"); + VELOX_CHECK_GE(offset, psLength_); + offset -= psLength_; if (fileFormat() == FileFormat::DWRF) { - auto postScript = ProtoUtils::readProto( - input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); - postScript_ = std::make_unique(std::move(postScript)); + postScript_ = parsePostScript(buf + offset, psLength_); } else { - auto postScript = ProtoUtils::readProto( - input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); - postScript_ = std::make_unique(std::move(postScript)); + postScript_ = + parsePostScript(buf + offset, psLength_); } const uint64_t footerSize = postScript_->footerLength(); const uint64_t cacheSize = postScript_->hasCacheSize() ? postScript_->cacheSize() : 0; const uint64_t tailSize = 1 + psLength_ + footerSize + cacheSize; + footerBufferOverread_ = + std::max(0, bufSize - static_cast(tailSize)); // There are cases in warehouse, where RC/text files are stored // in ORC partition. This causes the Reader to SIGSEGV. The following @@ -154,29 +172,49 @@ ReaderBase::ReaderBase( input_->load(LogType::FOOTER); } - auto footerStream = input_->read( - fileLength_ - psLength_ - footerSize - 1, footerSize, LogType::FOOTER); + std::unique_ptr footerStream; + if (offset >= footerSize) { + offset -= footerSize; + footerStream = std::make_unique( + buf + offset, footerSize); + } else { + std::vector< + std::unique_ptr> + streams; + streams.push_back(input_->read( + fileLength_ - footerSize - psLength_ - 1, + footerSize - offset, + LogType::FOOTER)); + streams.push_back( + std::make_unique(buf, offset)); + footerStream = std::make_unique( + std::move(streams)); + offset = 0; + } if (fileFormat() == FileFormat::DWRF) { - auto footer = - google::protobuf::Arena::CreateMessage(arena_.get()); - ProtoUtils::readProtoInto( - createDecompressedStream(std::move(footerStream), "File Footer"), - footer); - footer_ = std::make_unique(footer); + footer_ = parseFooter(std::move(footerStream)); } else { - auto footer = google::protobuf::Arena::CreateMessage( - arena_.get()); - ProtoUtils::readProtoInto( - createDecompressedStream(std::move(footerStream), "File Footer"), - footer); - footer_ = std::make_unique(footer); + footer_ = parseFooter(std::move(footerStream)); } + footerBufferSize_ = offset; schema_ = std::dynamic_pointer_cast( convertType(*footer_, 0, options_.fileColumnNamesReadAsLowerCase())); VELOX_CHECK_NOT_NULL(schema_, "invalid schema"); - // load stripe index/footer cache + // initialize file decrypter + handler_ = + DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); +} + +void ReaderBase::loadCache() { + if (!footerBuffer_) { + return; + } + const uint64_t footerSize = postScript_->footerLength(); + const uint64_t cacheSize = + postScript_->hasCacheSize() ? postScript_->cacheSize() : 0; + const uint64_t tailSize = 1 + psLength_ + footerSize + cacheSize; if (cacheSize > 0) { VELOX_CHECK_EQ(format(), DwrfFormat::kDwrf); const uint64_t cacheOffset = fileLength_ - tailSize; @@ -189,8 +227,19 @@ ReaderBase::ReaderBase( } else { auto cacheBuffer = std::make_shared>( options_.memoryPool(), cacheSize); - input_->read(cacheOffset, cacheSize, LogType::FOOTER) - ->readFully(cacheBuffer->data(), cacheSize); + auto* target = cacheBuffer->data(); + auto* source = footerBuffer_->as(); + auto size = cacheSize; + if (cacheSize > footerBufferSize_) { + auto remaining = cacheSize - footerBufferSize_; + auto stream = input_->read(cacheOffset, remaining, LogType::FOOTER); + stream->readFully(target, remaining); + target += remaining; + size -= remaining; + } else { + source += footerBufferSize_ - cacheSize; + } + memcpy(target, source, size); cache_ = std::make_unique( postScript_->cacheMode(), *footer_, std::move(cacheBuffer)); } @@ -208,9 +257,8 @@ ReaderBase::ReaderBase( input_->load(LogType::FOOTER); } } - // initialize file decrypter - handler_ = - DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); + // Release the memory as we no longer need it. + footerBuffer_.reset(); } std::vector ReaderBase::rowsPerStripe() const { diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index 11f9070610e71..90bb21be58307 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -80,13 +80,13 @@ class ReaderBase { const proto::Footer* footer, std::unique_ptr cache, std::unique_ptr handler = nullptr) - : postScript_{std::move(ps)}, - footer_{std::make_unique(footer)}, - cache_{std::move(cache)}, - handler_{std::move(handler)}, - options_{dwio::common::ReaderOptions(&pool)}, + : options_{dwio::common::ReaderOptions(&pool)}, input_{std::move(input)}, fileLength_{0}, + postScript_{std::move(ps)}, + footer_{std::make_unique(footer)}, + handler_{std::move(handler)}, + cache_{std::move(cache)}, schema_{ std::dynamic_pointer_cast(convertType(*footer_))}, psLength_{0} { @@ -150,6 +150,8 @@ class ReaderBase { return *input_; } + void loadCache(); + const std::unique_ptr& metadataCache() const { return cache_; } @@ -246,6 +248,10 @@ class ReaderBase { return options_.randomSkip(); } + int footerBufferOverread() const { + return footerBufferOverread_; + } + private: static std::shared_ptr convertType( const FooterWrapper& footer, @@ -260,16 +266,23 @@ class ReaderBase { return options; } - std::unique_ptr arena_; - std::unique_ptr postScript_; - std::unique_ptr footer_ = nullptr; - std::unique_ptr cache_; - std::unique_ptr handler_; + template + std::unique_ptr parseFooter( + std::unique_ptr input); const dwio::common::ReaderOptions options_; const std::unique_ptr input_; const uint64_t fileLength_; + BufferPtr footerBuffer_; + int footerBufferSize_; + int footerBufferOverread_; + std::unique_ptr arena_; + std::unique_ptr postScript_; + std::unique_ptr footer_; + std::unique_ptr handler_; + std::unique_ptr cache_; + RowTypePtr schema_; // Lazily populated mutable std::shared_ptr schemaWithId_; diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index fa2ee0259aa2b..ad26b7954e6c9 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -1392,7 +1392,7 @@ TEST_F(TestReader, fileColumnNamesReadAsLowerCaseComplexStruct) { TEST_F(TestReader, TestStripeSizeCallback) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< @@ -1420,7 +1420,7 @@ TEST_F(TestReader, TestStripeSizeCallback) { TEST_F(TestReader, TestStripeSizeCallbackLimitsOneStripe) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< @@ -1449,7 +1449,7 @@ TEST_F(TestReader, TestStripeSizeCallbackLimitsOneStripe) { TEST_F(TestReader, TestStripeSizeCallbackLimitsTwoStripe) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< diff --git a/velox/dwio/dwrf/test/TestDecompression.cpp b/velox/dwio/dwrf/test/TestDecompression.cpp index fcba33d80cfdc..37b0166558ff9 100644 --- a/velox/dwio/dwrf/test/TestDecompression.cpp +++ b/velox/dwio/dwrf/test/TestDecompression.cpp @@ -1054,7 +1054,7 @@ class TestingSeekableInputStream : public SeekableInputStream { return "testing"; } - size_t positionSize() override { + size_t positionSize() const override { return 1; } diff --git a/velox/dwio/dwrf/test/WriterTest.cpp b/velox/dwio/dwrf/test/WriterTest.cpp index 1570f29888655..13d81ff9cb41b 100644 --- a/velox/dwio/dwrf/test/WriterTest.cpp +++ b/velox/dwio/dwrf/test/WriterTest.cpp @@ -62,7 +62,9 @@ class WriterTest : public Test { auto readFile = std::make_shared(std::move(data)); auto input = std::make_unique(std::move(readFile), *pool_); dwio::common::ReaderOptions readerOpts{pool_.get()}; - return std::make_unique(readerOpts, std::move(input)); + auto reader = std::make_unique(readerOpts, std::move(input)); + reader->loadCache(); + return reader; } auto& getContext() { diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index de6dcd55bd990..47d00c7799b5f 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -190,7 +190,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dynamicFiltersAccepted[ ]* sum: 1, count: 1, min: 1, max: 1"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" localReadBytes [ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, @@ -208,9 +208,6 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes [ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}, @@ -285,7 +282,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" Input: 10000 rows \\(.+\\), Output: 10000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}, {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" localReadBytes [ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, @@ -304,9 +301,6 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}}); @@ -359,7 +353,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {R"( Input: 100 rows \(.+\), Output: 100 rows \(.+\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+))"}, {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" localReadBytes [ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, @@ -378,9 +372,6 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}}); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index d8292880caeec..64131a70165ce 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -2324,8 +2324,8 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(31'234, stats.rawInputRows); EXPECT_EQ(31'234, stats.inputRows); EXPECT_EQ(31'234, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedStrides"), 0); task = assertQuery("c0 IS NULL"); @@ -2334,7 +2334,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(0, stats.inputRows); EXPECT_EQ(0, stats.outputRows); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 1); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedStrides"), 0); // c1 IS NULL - first stride should be skipped based on stats task = assertQuery("c1 IS NULL"); @@ -2343,7 +2343,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(size - 10'000, stats.rawInputRows); EXPECT_EQ(size - 11'111, stats.inputRows); EXPECT_EQ(size - 11'111, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 1); // c1 IS NOT NULL - 3rd and 4th strides should be skipped based on stats @@ -2353,7 +2353,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(20'000, stats.rawInputRows); EXPECT_EQ(11'111, stats.inputRows); EXPECT_EQ(11'111, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 2); } @@ -5355,3 +5355,18 @@ TEST_F(TableScanTest, rowId) { AssertQueryBuilder(plan).split(split).assertResults(expected); } } + +TEST_F(TableScanTest, footerIOCount) { + auto vector = makeRowVector({makeFlatVector(10, folly::identity)}); + auto file = TempFilePath::create(); + writeToFile(file->getPath(), {vector}); + auto plan = PlanBuilder().tableScan(asRowType(vector->type())).planNode(); + auto task = + AssertQueryBuilder(plan) + .split(makeHiveConnectorSplit(file->getPath(), 10'000, 10'000)) + .assertResults( + BaseVector::create(vector->type(), 0, pool())); + auto stats = getTableScanRuntimeStats(task); + ASSERT_EQ(stats.at("numStorageRead").sum, 1); + ASSERT_GT(stats.at("footerBufferOverread").sum, 0); +}