From 530b446f8adaf1a62a84b6981c5d18681eb7b4eb Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Tue, 8 Oct 2024 18:29:11 -0700 Subject: [PATCH] Avoid additional memory allocation during sort output spill (#11199) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11199 Memory arbitration fuzzer is flaky because of the unexpected memory allocation after sort output spill finish. The reason is that when we finish spill, we setup the merge reader to prepare reading the unspilled data. The merge reader might use non-trivial amount of data which cause additional memory consumption. And we shall keep the spill critical path as fast as possible. This PR moves the merge reader setup from the spill path to the first get output with unit tests. Also improve the fuzzer test logging a bit to help debug. Reviewed By: bikramSingh91 Differential Revision: D64072660 fbshipit-source-id: 01e409a92a329b1634cf746af940a9e2b8af11c4 --- velox/exec/SortBuffer.cpp | 23 ++- velox/exec/SortBuffer.h | 4 + velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp | 8 +- velox/exec/tests/SortBufferTest.cpp | 171 ++++++++++++++---- 4 files changed, 163 insertions(+), 43 deletions(-) diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index 2def45fb9cfb..000583fadf07 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -82,6 +82,9 @@ SortBuffer::~SortBuffer() { } void SortBuffer::addInput(const VectorPtr& input) { + velox::common::testutil::TestValue::adjust( + "facebook::velox::exec::SortBuffer::addInput", this); + VELOX_CHECK(!noMoreInput_); ensureInputFits(input); @@ -348,6 +351,7 @@ void SortBuffer::prepareOutput(vector_size_t maxOutputRows) { if (spiller_ != nullptr) { spillSources_.resize(maxOutputRows); spillSourceRows_.resize(maxOutputRows); + prepareOutputWithSpill(); } VELOX_CHECK_GT(output_->size(), 0); @@ -414,11 +418,22 @@ void SortBuffer::getOutputWithSpill() { void SortBuffer::finishSpill() { VELOX_CHECK_NULL(spillMerger_); - SpillPartitionSet spillPartitionSet; - spiller_->finishSpill(spillPartitionSet); - VELOX_CHECK_EQ(spillPartitionSet.size(), 1); - spillMerger_ = spillPartitionSet.begin()->second->createOrderedReader( + VELOX_CHECK(spillPartitionSet_.empty()); + spiller_->finishSpill(spillPartitionSet_); + VELOX_CHECK_EQ(spillPartitionSet_.size(), 1); +} + +void SortBuffer::prepareOutputWithSpill() { + VELOX_CHECK_NOT_NULL(spiller_); + if (spillMerger_ != nullptr) { + VELOX_CHECK(spillPartitionSet_.empty()); + return; + } + + VELOX_CHECK_EQ(spillPartitionSet_.size(), 1); + spillMerger_ = spillPartitionSet_.begin()->second->createOrderedReader( spillConfig_->readBufferSize, pool(), spillStats_); + spillPartitionSet_.clear(); } } // namespace facebook::velox::exec diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index a08052b7b968..3791bcf71258 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -77,6 +77,9 @@ class SortBuffer { void updateEstimatedOutputRowSize(); // Invoked to initialize or reset the reusable output buffer to get output. void prepareOutput(vector_size_t maxOutputRows); + // Invoked to initialize reader to read the spilled data from storage for + // output processing. + void prepareOutputWithSpill(); void getOutputWithoutSpill(); void getOutputWithSpill(); // Spill during input stage. @@ -116,6 +119,7 @@ class SortBuffer { // sort key columns are stored first then the non-sorted data columns. RowTypePtr spillerStoreType_; std::unique_ptr spiller_; + SpillPartitionSet spillPartitionSet_; // Used to merge the sorted runs from in-memory rows and spilled rows on disk. std::unique_ptr> spillMerger_; // Records the source rows to copy to 'output_' in order. diff --git a/velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp b/velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp index ec8df7f45f34..5e3e6939d952 100644 --- a/velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp +++ b/velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp @@ -85,15 +85,13 @@ class MemoryArbitrationFuzzer { struct Stats { size_t successCount{0}; - size_t failureCount{0}; size_t oomCount{0}; size_t abortCount{0}; void print() const { std::stringstream ss; - ss << "Success count = " << successCount - << ", failure count = " << failureCount - << ". OOM count = " << oomCount << " Abort count = " << abortCount; + ss << "Success count = " << successCount << ". OOM count = " << oomCount + << " Abort count = " << abortCount; LOG(INFO) << ss.str(); } }; @@ -714,7 +712,7 @@ void MemoryArbitrationFuzzer::verify() { } else if (e.errorCode() == error_code::kMemAborted.c_str()) { ++lockedStats->abortCount; } else { - ++lockedStats->failureCount; + LOG(ERROR) << "Unexpected exception: " << e.what(); std::rethrow_exception(std::current_exception()); } } diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index 45c50f7ea5ae..c09b1c724927 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -456,24 +456,9 @@ TEST_F(SortBufferTest, spill) { } } -DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemoryGetOutput) { +DEBUG_ONLY_TEST_F(SortBufferTest, spillDuringInput) { auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto spillConfig = common::SpillConfig( - [&]() -> const std::string& { return spillDirectory->getPath(); }, - [&](uint64_t) {}, - "0.0.0", - 1000, - 0, - 1 << 20, - executor_.get(), - 100, - 100000, - 0, - 0, - 0, - 0, - 0, - "none"); + const auto spillConfig = getSpillConfig(spillDirectory->getPath()); folly::Synchronized spillStats; auto sortBuffer = std::make_unique( inputType_, @@ -485,34 +470,152 @@ DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemoryGetOutput) { &spillConfig, &spillStats); + const int numInputs = 10; + const int numSpilledInputs = 10 / 2; + std::atomic_int processedInputs{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::SortBuffer::addInput", + std::function(([&](SortBuffer* sortBuffer) { + if (processedInputs++ != numSpilledInputs) { + return; + } + ASSERT_GT(sortBuffer->pool()->usedBytes(), 0); + sortBuffer->spill(); + ASSERT_EQ(sortBuffer->pool()->usedBytes(), 0); + }))); + const std::shared_ptr fuzzerPool = - memory::memoryManager()->addLeafPool("spillSource"); + memory::memoryManager()->addLeafPool("spillDuringInput"); VectorFuzzer fuzzer({.vectorSize = 1024}, fuzzerPool.get()); + uint64_t totalNumInput{0}; + + ASSERT_EQ(memory::spillMemoryPool()->stats().usedBytes, 0); + const auto peakSpillMemoryUsage = + memory::spillMemoryPool()->stats().peakBytes; - TestScopedSpillInjection scopedSpillInjection(0); - for (int i = 0; i < 3; ++i) { + for (int i = 0; i < numInputs; ++i) { sortBuffer->addInput(fuzzer.fuzzRow(inputType_)); } + sortBuffer->noMoreInput(); + + ASSERT_FALSE(spillStats.rlock()->empty()); + ASSERT_GT(spillStats.rlock()->spilledRows, 0); + ASSERT_EQ(spillStats.rlock()->spilledRows, numInputs * 1024); + ASSERT_GT(spillStats.rlock()->spilledBytes, 0); + ASSERT_EQ(spillStats.rlock()->spilledPartitions, 1); + ASSERT_EQ(spillStats.rlock()->spilledFiles, 2); + + ASSERT_EQ(memory::spillMemoryPool()->stats().usedBytes, 0); + if (memory::spillMemoryPool()->trackUsage()) { + ASSERT_GT(memory::spillMemoryPool()->stats().peakBytes, 0); + ASSERT_GE( + memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage); + } +} - std::atomic_bool noMoreInput{false}; +DEBUG_ONLY_TEST_F(SortBufferTest, spillDuringOutput) { + auto spillDirectory = exec::test::TempDirectoryPath::create(); + const auto spillConfig = getSpillConfig(spillDirectory->getPath()); + folly::Synchronized spillStats; + auto sortBuffer = std::make_unique( + inputType_, + sortColumnIndices_, + sortCompareFlags_, + pool_.get(), + &nonReclaimableSection_, + prefixSortConfig_, + &spillConfig, + &spillStats); + + const int numInputs = 10; SCOPED_TESTVALUE_SET( "facebook::velox::exec::SortBuffer::noMoreInput", - std::function( - ([&](SortBuffer* sortBuffer) { noMoreInput.store(true); }))); + std::function(([&](SortBuffer* sortBuffer) { + ASSERT_GT(sortBuffer->pool()->usedBytes(), 0); + sortBuffer->spill(); + ASSERT_EQ(sortBuffer->pool()->usedBytes(), 0); + }))); - std::atomic_int numInputs{0}; - SCOPED_TESTVALUE_SET( - "facebook::velox::common::memory::MemoryPoolImpl::maybeReserve", - std::function( - ([&](memory::MemoryPoolImpl* pool) { - if (noMoreInput) { - ++numInputs; - } - }))); + const std::shared_ptr fuzzerPool = + memory::memoryManager()->addLeafPool("spillDuringOutput"); + VectorFuzzer fuzzer({.vectorSize = 1024}, fuzzerPool.get()); + uint64_t totalNumInput{0}; + + ASSERT_EQ(memory::spillMemoryPool()->stats().usedBytes, 0); + const auto peakSpillMemoryUsage = + memory::spillMemoryPool()->stats().peakBytes; + for (int i = 0; i < numInputs; ++i) { + sortBuffer->addInput(fuzzer.fuzzRow(inputType_)); + } sortBuffer->noMoreInput(); - sortBuffer->getOutput(10000); - ASSERT_EQ(numInputs, 1); + + ASSERT_FALSE(spillStats.rlock()->empty()); + ASSERT_GT(spillStats.rlock()->spilledRows, 0); + ASSERT_EQ(spillStats.rlock()->spilledRows, numInputs * 1024); + ASSERT_GT(spillStats.rlock()->spilledBytes, 0); + ASSERT_EQ(spillStats.rlock()->spilledPartitions, 1); + ASSERT_EQ(spillStats.rlock()->spilledFiles, 1); + + ASSERT_EQ(memory::spillMemoryPool()->stats().usedBytes, 0); + if (memory::spillMemoryPool()->trackUsage()) { + ASSERT_GT(memory::spillMemoryPool()->stats().peakBytes, 0); + ASSERT_GE( + memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage); + } +} + +DEBUG_ONLY_TEST_F(SortBufferTest, reserveMemoryGetOutput) { + for (bool spillEnabled : {false, true}) { + SCOPED_TRACE(fmt::format("spillEnabled {}", spillEnabled)); + + auto spillDirectory = exec::test::TempDirectoryPath::create(); + const auto spillConfig = getSpillConfig(spillDirectory->getPath()); + folly::Synchronized spillStats; + auto sortBuffer = std::make_unique( + inputType_, + sortColumnIndices_, + sortCompareFlags_, + pool_.get(), + &nonReclaimableSection_, + prefixSortConfig_, + spillEnabled ? &spillConfig : nullptr, + &spillStats); + + const std::shared_ptr fuzzerPool = + memory::memoryManager()->addLeafPool("reserveMemoryGetOutput"); + VectorFuzzer fuzzer({.vectorSize = 1024}, fuzzerPool.get()); + + const int numInputs{10}; + for (int i = 0; i < numInputs; ++i) { + sortBuffer->addInput(fuzzer.fuzzRow(inputType_)); + } + + std::atomic_bool noMoreInput{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::SortBuffer::noMoreInput", + std::function( + ([&](SortBuffer* sortBuffer) { noMoreInput.store(true); }))); + + std::atomic_int numReserves{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::common::memory::MemoryPoolImpl::maybeReserve", + std::function( + ([&](memory::MemoryPoolImpl* pool) { + if (noMoreInput) { + ++numReserves; + } + }))); + + sortBuffer->noMoreInput(); + // Sets an extreme large value to get output once to avoid test flakiness. + sortBuffer->getOutput(1'000'000); + if (spillEnabled) { + ASSERT_EQ(numReserves, 1); + } else { + ASSERT_EQ(numReserves, 0); + } + } } TEST_F(SortBufferTest, emptySpill) {