From 97160cdc385e5212c3b89a200690e0e85d0ff19f Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Fri, 26 Apr 2024 10:41:58 -0700 Subject: [PATCH] =?UTF-8?q?Avoid=20unnecessary=20memory=20capacity=20growt?= =?UTF-8?q?h=20in=20case=20of=20concurrent=20arbitr=E2=80=A6=20(#9557)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: This PR avoids unnecessary memory capacity growth in case of concurrent arbitration requests from the same query. The first arbitration request might have reserved enough capacity from the arbitrator (we allocate more than request to reduce the number of arbitrations). We avoid this by checking if there is sufficient free capacity in the request pool itself before allocating more capacity from the arbitrator. Also to avoid unnecessary retries from the memory pool, we support to commit the reservation bytes before return arbitration success back to the memory pool. Correspondingly, the memory pool doesn't have to increase the reservation and check for retry on capacity grow success from the arbitrator. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9557 Reviewed By: tanjialiang, oerling Differential Revision: D56444509 Pulled By: xiaoxmeng fbshipit-source-id: adbff6ba18389c30c601e627a325c6d7df1f907c --- velox/common/file/tests/FaultyFile.cpp | 5 +- velox/common/file/tests/FaultyFile.h | 4 +- velox/common/file/tests/FileTest.cpp | 2 +- velox/common/memory/Memory.cpp | 7 +- velox/common/memory/Memory.h | 10 +- velox/common/memory/MemoryArbitrator.cpp | 4 +- velox/common/memory/MemoryArbitrator.h | 2 +- velox/common/memory/MemoryPool.cpp | 53 +++--- velox/common/memory/MemoryPool.h | 18 ++- velox/common/memory/SharedArbitrator.cpp | 105 ++++++------ velox/common/memory/SharedArbitrator.h | 32 ++-- .../memory/tests/MemoryArbitratorTest.cpp | 41 ++--- velox/common/memory/tests/MemoryPoolTest.cpp | 113 ++++++++----- .../memory/tests/MockSharedArbitratorTest.cpp | 61 +++++-- velox/dwio/dwrf/test/WriterFlushTest.cpp | 3 +- velox/exec/tests/HashJoinTest.cpp | 109 +------------ velox/exec/tests/OrderByTest.cpp | 153 ++++-------------- velox/exec/tests/TableScanTest.cpp | 3 +- velox/exec/tests/TableWriteTest.cpp | 13 +- 19 files changed, 328 insertions(+), 410 deletions(-) diff --git a/velox/common/file/tests/FaultyFile.cpp b/velox/common/file/tests/FaultyFile.cpp index a8342506dd51..80bc292052f2 100644 --- a/velox/common/file/tests/FaultyFile.cpp +++ b/velox/common/file/tests/FaultyFile.cpp @@ -80,10 +80,9 @@ void FaultyWriteFile::append(std::string_view data) { if (injectionHook_ != nullptr) { FaultFileWriteOperation op(path_, data); injectionHook_(&op); - if (op.delegate) { - delegatedFile_->append(op.data); + if (!op.delegate) { + return; } - return; } delegatedFile_->append(data); } diff --git a/velox/common/file/tests/FaultyFile.h b/velox/common/file/tests/FaultyFile.h index c5dfaa721ead..4bba0faf89aa 100644 --- a/velox/common/file/tests/FaultyFile.h +++ b/velox/common/file/tests/FaultyFile.h @@ -87,13 +87,13 @@ struct FaultFileReadvOperation : FaultFileOperation { /// Fault injection parameters for file write API. struct FaultFileWriteOperation : FaultFileOperation { - std::string_view data; + std::string_view* data; FaultFileWriteOperation( const std::string& _path, const std::string_view& _data) : FaultFileOperation(FaultFileOperation::Type::kWrite, _path), - data(_data) {} + data(const_cast(&_data)) {} }; /// The fault injection hook on the file operation path. diff --git a/velox/common/file/tests/FileTest.cpp b/velox/common/file/tests/FileTest.cpp index 11a914336a2e..b1eb01399275 100644 --- a/velox/common/file/tests/FileTest.cpp +++ b/velox/common/file/tests/FileTest.cpp @@ -642,7 +642,7 @@ TEST_F(FaultyFsTest, fileWriteFaultHookInjection) { return; } auto* writeOp = static_cast(op); - writeOp->data = "Error data"; + *writeOp->data = "Error data"; }); { auto writeFile = fs_->openFileForWrite(path1, {}); diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 241501e6546a..d96474a5febc 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -105,7 +105,12 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options) VELOX_USER_CHECK_GE(capacity(), 0); VELOX_CHECK_GE(allocator_->capacity(), arbitrator_->capacity()); MemoryAllocator::alignmentCheck(0, alignment_); - defaultRoot_->grow(defaultRoot_->maxCapacity()); + const bool ret = defaultRoot_->grow(defaultRoot_->maxCapacity(), 0); + VELOX_CHECK( + ret, + "Failed to set max capacity {} for {}", + succinctBytes(defaultRoot_->maxCapacity()), + defaultRoot_->name()); const size_t numSharedPools = std::max(1, FLAGS_velox_memory_num_shared_leaf_pools); sharedLeafPools_.reserve(numSharedPools); diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index ee163da915bf..35bda7bf2463 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -161,7 +161,11 @@ struct MemoryManagerOptions { /// pool. uint64_t memoryPoolInitCapacity{256 << 20}; - /// The minimal memory capacity reserved for a query memory pool to run. + /// The minimal query memory pool capacity that is ensured during arbitration. + /// During arbitration, memory arbitrator ensures the participants' memory + /// pool capacity to be no less than this value on a best-effort basis, for + /// more smooth executions of the queries, to avoid frequent arbitration + /// requests. uint64_t memoryPoolReservedCapacity{0}; /// The minimal memory capacity to transfer out of or into a memory pool @@ -287,10 +291,6 @@ class MemoryManager { return sharedLeafPools_; } - bool testingGrowPool(MemoryPool* pool, uint64_t incrementBytes) { - return growPool(pool, incrementBytes); - } - private: void dropPool(MemoryPool* pool); diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index e379a78c1f10..a0466f544f00 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -94,8 +94,8 @@ class NoopArbitrator : public MemoryArbitrator { // Noop arbitrator has no memory capacity limit so no operation needed for // memory pool capacity reserve. uint64_t growCapacity(MemoryPool* pool, uint64_t /*unused*/) override { - pool->grow(pool->maxCapacity()); - return pool->maxCapacity(); + pool->grow(pool->maxCapacity(), 0); + return pool->capacity(); } // Noop arbitrator has no memory capacity limit so no operation needed for diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 6107ca825314..ca66ff2e7833 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -128,7 +128,7 @@ class MemoryArbitrator { /// Invoked by the memory manager to allocate up to 'targetBytes' of free /// memory capacity without triggering memory arbitration. The function will /// grow the memory pool's capacity based on the free available memory - /// capacity in the arbitrator, and returns the actual growed capacity in + /// capacity in the arbitrator, and returns the actual grown capacity in /// bytes. virtual uint64_t growCapacity(MemoryPool* pool, uint64_t bytes) = 0; diff --git a/velox/common/memory/MemoryPool.cpp b/velox/common/memory/MemoryPool.cpp index 5b99577d8b05..4939931b1530 100644 --- a/velox/common/memory/MemoryPool.cpp +++ b/velox/common/memory/MemoryPool.cpp @@ -809,15 +809,9 @@ bool MemoryPoolImpl::incrementReservationThreadSafe( TestValue::adjust( "facebook::velox::memory::MemoryPoolImpl::incrementReservationThreadSafe::AfterGrowCallback", this); - // NOTE: the memory reservation might still fail even if the memory grow - // callback succeeds. The reason is that we don't hold the root tracker's - // mutex lock while running the grow callback. Therefore, there is a - // possibility in theory that a concurrent memory reservation request - // might steal away the increased memory capacity after the grow callback - // finishes and before we increase the reservation. If it happens, we can - // simply fall back to retry the memory reservation from the leaf memory - // pool which should happen rarely. - return maybeIncrementReservation(size); + // NOTE: if memory arbitration succeeds, it should have already committed + // the reservation 'size' in the root memory pool. + return true; } VELOX_MEM_POOL_CAP_EXCEEDED(fmt::format( "Exceeded memory pool cap of {} with max {} when requesting {}, memory " @@ -846,12 +840,16 @@ bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) { return false; } } - reservationBytes_ += size; + incrementReservationLocked(size); + return true; +} + +void MemoryPoolImpl::incrementReservationLocked(uint64_t bytes) { + reservationBytes_ += bytes; if (!isLeaf()) { - cumulativeBytes_ += size; + cumulativeBytes_ += bytes; maybeUpdatePeakBytesLocked(reservationBytes_); } - return true; } void MemoryPoolImpl::release() { @@ -1029,20 +1027,29 @@ uint64_t MemoryPoolImpl::shrink(uint64_t targetBytes) { return freeBytes; } -uint64_t MemoryPoolImpl::grow(uint64_t bytes) noexcept { +bool MemoryPoolImpl::grow(uint64_t growBytes, uint64_t reservationBytes) { if (parent_ != nullptr) { - return parent_->grow(bytes); + return parent_->grow(growBytes, reservationBytes); } // TODO: add to prevent from growing beyond the max capacity and the // corresponding support in memory arbitrator. std::lock_guard l(mutex_); // We don't expect to grow a memory pool without capacity limit. VELOX_CHECK_NE(capacity_, kMaxMemory, "Can't grow with unlimited capacity"); - VELOX_CHECK_LE( - capacity_ + bytes, maxCapacity_, "Can't grow beyond the max capacity"); - capacity_ += bytes; - VELOX_CHECK_GE(capacity_, bytes); - return capacity_; + if (capacity_ + growBytes > maxCapacity_) { + return false; + } + if (reservationBytes_ + reservationBytes > capacity_ + growBytes) { + return false; + } + + capacity_ += growBytes; + VELOX_CHECK_GE(capacity_, growBytes); + if (reservationBytes > 0) { + incrementReservationLocked(reservationBytes); + VELOX_CHECK_LE(reservationBytes, reservationBytes_); + } + return true; } void MemoryPoolImpl::abort(const std::exception_ptr& error) { @@ -1081,6 +1088,14 @@ void MemoryPoolImpl::testingSetCapacity(int64_t bytes) { capacity_ = bytes; } +void MemoryPoolImpl::testingSetReservation(int64_t bytes) { + if (parent_ != nullptr) { + return toImpl(parent_)->testingSetReservation(bytes); + } + std::lock_guard l(mutex_); + reservationBytes_ = bytes; +} + bool MemoryPoolImpl::needRecordDbg(bool /* isAlloc */) { if (!debugPoolNameRegex_.empty()) { return RE2::FullMatch(name_, debugPoolNameRegex_); diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index bbbe06352747..de8a864a02f4 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -362,9 +362,15 @@ class MemoryPool : public std::enable_shared_from_this { /// 'targetBytes' is zero, the function frees all the free memory capacity. virtual uint64_t shrink(uint64_t targetBytes = 0) = 0; - /// Invoked to increase the memory pool's capacity by 'bytes'. The function - /// returns the memory pool's capacity after the growth. - virtual uint64_t grow(uint64_t bytes) noexcept = 0; + /// Invoked to increase the memory pool's capacity by 'growBytes' and commit + /// the reservation by 'reservationBytes'. The function makes the two updates + /// atomic. The function returns true if the updates succeed, otherwise false + /// and neither change will apply. + /// + /// NOTE: this should only be called by memory arbitrator when a root memory + /// pool tries to grow its capacity for a new reservation request which + /// exceeds its current capacity limit. + virtual bool grow(uint64_t growBytes, uint64_t reservationBytes = 0) = 0; /// Sets the memory reclaimer for this memory pool. /// @@ -646,7 +652,7 @@ class MemoryPoolImpl : public MemoryPool { uint64_t shrink(uint64_t targetBytes = 0) override; - uint64_t grow(uint64_t bytes) noexcept override; + bool grow(uint64_t growBytes, uint64_t reservationBytes = 0) override; void abort(const std::exception_ptr& error) override; @@ -684,6 +690,8 @@ class MemoryPoolImpl : public MemoryPool { void testingSetCapacity(int64_t bytes); + void testingSetReservation(int64_t bytes); + MemoryManager* testingManager() const { return manager_; } @@ -845,6 +853,8 @@ class MemoryPoolImpl : public MemoryPool { // returns true, otherwise the function returns false. bool maybeIncrementReservation(uint64_t size); + void incrementReservationLocked(uint64_t bytes); + // Release memory reservation for an allocation free or memory release with // specified 'size'. If 'releaseOnly' is true, then we only release the unused // reservation if 'minReservationBytes_' is set. 'releaseThreadSafe' processes diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index b03badb51a62..01bd3802f6de 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -79,7 +79,7 @@ void sortCandidatesByReclaimableFreeCapacity( &candidates); } -void sortCandidatesByReclaimableUsedMemory( +void sortCandidatesByReclaimableUsedCapacity( std::vector& candidates) { std::sort( candidates.begin(), @@ -90,7 +90,7 @@ void sortCandidatesByReclaimableUsedMemory( }); TestValue::adjust( - "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableUsedMemory", + "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableUsedCapacity", &candidates); } @@ -162,23 +162,9 @@ std::string SharedArbitrator::Candidate::toString() const { } SharedArbitrator::~SharedArbitrator() { - if (freeReservedCapacity_ != reservedCapacity_) { - const std::string errMsg = fmt::format( - "There is unexpected free reserved capacity not given back to arbitrator " - "on destruction: freeReservedCapacity_{} != reservedCapacity_{}\\n{}", - freeReservedCapacity_, - reservedCapacity_, - toString()); - if (checkUsageLeak_) { - VELOX_FAIL(errMsg); - } else { - VELOX_MEM_LOG(ERROR) << errMsg; - } - } if (freeNonReservedCapacity_ + freeReservedCapacity_ != capacity_) { const std::string errMsg = fmt::format( - "There is unexpected free capacity not given back to arbitrator " - "on destruction: freeNonReservedCapacity_[{}] + freeReservedCapacity_[{}] != capacity_[{}])\\n{}", + "Unexpected free capacity leak in arbitrator: freeNonReservedCapacity_[{}] + freeReservedCapacity_[{}] != capacity_[{}])\\n{}", freeNonReservedCapacity_, freeReservedCapacity_, capacity_, @@ -214,18 +200,18 @@ void SharedArbitrator::updateFreeCapacityMetrics() const { kMetricArbitratorFreeReservedCapacityBytes, freeReservedCapacity_); } -int64_t SharedArbitrator::reclaimableCapacity(const MemoryPool& pool) const { +int64_t SharedArbitrator::maxReclaimableCapacity(const MemoryPool& pool) const { return std::max(0, pool.capacity() - memoryPoolReservedCapacity_); } int64_t SharedArbitrator::reclaimableFreeCapacity( const MemoryPool& pool) const { - return std::min(pool.freeBytes(), reclaimableCapacity(pool)); + return std::min(pool.freeBytes(), maxReclaimableCapacity(pool)); } int64_t SharedArbitrator::reclaimableUsedCapacity( const MemoryPool& pool) const { - const auto maxReclaimableBytes = reclaimableCapacity(pool); + const auto maxReclaimableBytes = maxReclaimableCapacity(pool); const auto reclaimableBytes = pool.reclaimableBytes(); return std::min(maxReclaimableBytes, reclaimableBytes.value_or(0)); } @@ -240,9 +226,8 @@ int64_t SharedArbitrator::minGrowCapacity(const MemoryPool& pool) const { uint64_t SharedArbitrator::growCapacity( MemoryPool* pool, uint64_t targetBytes) { - const auto freeCapacityUpdateCb = + const auto freeCapacityMetricUpdateCb = folly::makeGuard([this]() { updateFreeCapacityMetrics(); }); - uint64_t reservedBytes{0}; { std::lock_guard l(mutex_); @@ -252,33 +237,36 @@ uint64_t SharedArbitrator::growCapacity( const int64_t minBytesToReserve = minGrowCapacity(*pool); reservedBytes = decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve); - pool->grow(reservedBytes); + try { + checkedGrow(pool, reservedBytes, 0); + } catch (const VeloxRuntimeError& error) { + reservedBytes = 0; + } } return reservedBytes; } uint64_t SharedArbitrator::decrementFreeCapacity( - uint64_t maxBytes, - uint64_t minBytes) { + uint64_t maxBytesToReserve, + uint64_t minBytesToReserve) { uint64_t reservedBytes{0}; { std::lock_guard l(mutex_); - reservedBytes = decrementFreeCapacityLocked(maxBytes, minBytes); + reservedBytes = + decrementFreeCapacityLocked(maxBytesToReserve, minBytesToReserve); } return reservedBytes; } uint64_t SharedArbitrator::decrementFreeCapacityLocked( - uint64_t maxBytes, - uint64_t minBytes) { + uint64_t maxBytesToReserve, + uint64_t minBytesToReserve) { uint64_t allocatedBytes = - std::min(freeNonReservedCapacity_, maxBytes); - VELOX_CHECK_LE(allocatedBytes, freeNonReservedCapacity_); + std::min(freeNonReservedCapacity_, maxBytesToReserve); freeNonReservedCapacity_ -= allocatedBytes; - if (allocatedBytes < minBytes) { - const uint64_t reservedBytes = - std::min(minBytes - allocatedBytes, freeReservedCapacity_); - VELOX_CHECK_LE(reservedBytes, freeReservedCapacity_); + if (allocatedBytes < minBytesToReserve) { + const uint64_t reservedBytes = std::min( + minBytesToReserve - allocatedBytes, freeReservedCapacity_); freeReservedCapacity_ -= reservedBytes; allocatedBytes += reservedBytes; } @@ -349,6 +337,10 @@ void SharedArbitrator::testingFreeCapacity(uint64_t capacity) { incrementFreeCapacityLocked(capacity); } +uint64_t SharedArbitrator::testingNumRequests() const { + return numRequests_; +} + bool SharedArbitrator::growCapacity( MemoryPool* pool, const std::vector>& candidatePools, @@ -364,6 +356,16 @@ bool SharedArbitrator::growCapacity( VELOX_MEM_POOL_ABORTED("The requestor pool has been aborted"); } + // Checks if the request pool already has enough free capacity for the new + // request. This could happen if there is multiple concurrent arbitration + // requests from the same query. When the first served request succeeds, it + // might have reserved enough memory capacity for the followup requests. + if (requestor->freeBytes() >= targetBytes) { + if (requestor->grow(0, targetBytes)) { + return true; + } + } + if (!ensureCapacity(requestor, targetBytes)) { RECORD_METRIC_VALUE(kMetricArbitratorFailuresCount); ++numFailures_; @@ -466,6 +468,19 @@ bool SharedArbitrator::handleOOM( return true; } +void SharedArbitrator::checkedGrow( + MemoryPool* pool, + uint64_t growBytes, + uint64_t reservationBytes) { + const auto ret = pool->grow(growBytes, reservationBytes); + VELOX_CHECK( + ret, + "Failed to grow pool {} with {} and commit {} used reservation", + pool->name(), + succinctBytes(growBytes), + succinctBytes(reservationBytes)); +} + bool SharedArbitrator::arbitrateMemory( MemoryPool* requestor, std::vector& candidates, @@ -476,24 +491,24 @@ bool SharedArbitrator::arbitrateMemory( std::max(memoryPoolTransferCapacity_, targetBytes)); const uint64_t minGrowTarget = minGrowCapacity(*requestor); uint64_t freedBytes = decrementFreeCapacity(growTarget, minGrowTarget); - if (freedBytes >= targetBytes) { - requestor->grow(freedBytes); - return true; - } - VELOX_CHECK_LT(freedBytes, growTarget); - auto freeGuard = folly::makeGuard([&]() { // Returns the unused freed memory capacity back to the arbitrator. if (freedBytes > 0) { incrementFreeCapacity(freedBytes); } }); + if (freedBytes >= targetBytes) { + checkedGrow(requestor, freedBytes, targetBytes); + freedBytes = 0; + return true; + } + VELOX_CHECK_LT(freedBytes, growTarget); freedBytes += reclaimFreeMemoryFromCandidates(candidates, growTarget - freedBytes); if (freedBytes >= targetBytes) { const uint64_t bytesToGrow = std::min(growTarget, freedBytes); - requestor->grow(bytesToGrow); + checkedGrow(requestor, bytesToGrow, targetBytes); freedBytes -= bytesToGrow; return true; } @@ -519,7 +534,7 @@ bool SharedArbitrator::arbitrateMemory( } const uint64_t bytesToGrow = std::min(freedBytes, growTarget); - requestor->grow(bytesToGrow); + checkedGrow(requestor, bytesToGrow, targetBytes); freedBytes -= bytesToGrow; return true; } @@ -556,7 +571,7 @@ uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill( std::vector& candidates, uint64_t targetBytes) { // Sort candidate memory pools based on their reclaimable used capacity. - sortCandidatesByReclaimableUsedMemory(candidates); + sortCandidatesByReclaimableUsedCapacity(candidates); uint64_t freedBytes{0}; for (const auto& candidate : candidates) { @@ -608,7 +623,7 @@ uint64_t SharedArbitrator::reclaim( bool isLocalArbitration) noexcept { int64_t bytesToReclaim = std::min( std::max(targetBytes, memoryPoolTransferCapacity_), - reclaimableCapacity(*pool)); + maxReclaimableCapacity(*pool)); if (bytesToReclaim == 0) { return 0; } @@ -803,7 +818,7 @@ void SharedArbitrator::startArbitration(const std::string& contextMsg) { } TestValue::adjust( - "facebook::velox::memory::SharedArbitrator::startArbitration", nullptr); + "facebook::velox::memory::SharedArbitrator::startArbitration", this); if (waitPromise.valid()) { uint64_t waitTimeUs{0}; diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index 3332c770322b..50dc8c015d18 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -75,6 +75,8 @@ class SharedArbitrator : public memory::MemoryArbitrator { /// Returns 'freeCapacity' back to the arbitrator for testing. void testingFreeCapacity(uint64_t freeCapacity); + uint64_t testingNumRequests() const; + /// Operator level runtime stats that are reported during a shared arbitration /// attempt. static inline const std::string kMemoryArbitrationWallNanos{ @@ -160,12 +162,18 @@ class SharedArbitrator : public memory::MemoryArbitrator { std::vector& candidates, uint64_t targetBytes); - // Invoded to reclaim used memroy capacity from 'candidates' by aborting the + // Invoked to reclaim used memory capacity from 'candidates' by aborting the // top memory users' queries. uint64_t reclaimUsedMemoryFromCandidatesByAbort( std::vector& candidates, uint64_t targetBytes); + // Invoked to grow 'pool' capacity by 'growBytes' and commit used reservation + // by 'reservationBytes'. The function throws if the grow fails from memory + // pool. + void + checkedGrow(MemoryPool* pool, uint64_t growBytes, uint64_t reservationBytes); + // Invoked to reclaim used memory from 'targetPool' with specified // 'targetBytes'. The function returns the actually freed capacity. // 'isLocalArbitration' is true when the reclaim attempt is within a local @@ -188,14 +196,18 @@ class SharedArbitrator : public memory::MemoryArbitrator { uint64_t targetBytes, std::vector& candidates); - // Decrements free capacity from the arbitrator with up to 'maxBytes'. The - // arbitrator might have less free available capacity. The function returns - // the actual decremented free capacity bytes. If 'minBytes' is not zero and - // there is less than 'minBytes' available in non-reserved capacity, then - // the arbitrator tries to decrement up to 'minBytes' from the reserved - // capacity. - uint64_t decrementFreeCapacity(uint64_t maxBytes, uint64_t minBytes); - uint64_t decrementFreeCapacityLocked(uint64_t maxBytes, uint64_t minBytes); + // Decrements free capacity from the arbitrator with up to + // 'maxBytesToReserve'. The arbitrator might have less free available + // capacity. The function returns the actual decremented free capacity bytes. + // If 'minBytesToReserve' is not zero and there is less than 'minBytes' + // available in non-reserved capacity, then the arbitrator tries to decrement + // up to 'minBytes' from the reserved capacity. + uint64_t decrementFreeCapacity( + uint64_t maxBytesToReserve, + uint64_t minBytesToReserve); + uint64_t decrementFreeCapacityLocked( + uint64_t maxBytesToReserve, + uint64_t minBytesToReserve); // Increment free capacity by 'bytes'. void incrementFreeCapacity(uint64_t bytes); @@ -213,7 +225,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { // Returns the max reclaimable capacity from 'pool' which includes both used // and free capacities. - int64_t reclaimableCapacity(const MemoryPool& pool) const; + int64_t maxReclaimableCapacity(const MemoryPool& pool) const; // Returns the free memory capacity that can be reclaimed from 'pool' by // shrink. diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 347c7eaa9c8f..8153b32a905a 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -293,36 +293,21 @@ TEST_F(MemoryArbitrationTest, reservedCapacityFreeByPoolShrink) { options.memoryPoolInitCapacity = 2 << 20; options.memoryPoolReservedCapacity = 1 << 20; - for (bool shrinkByCapacityGrow : {false, true}) { - SCOPED_TRACE(fmt::format("shrinkByCapacityGrow {}", shrinkByCapacityGrow)); - - MemoryManager manager(options); - auto* arbitrator = manager.arbitrator(); - const int numPools = 6; - std::vector> pools; - for (int i = 0; i < numPools; ++i) { - pools.push_back(manager.addRootPool("", kMaxMemory)); - ASSERT_GE(pools.back()->capacity(), 1 << 20); - } - ASSERT_EQ(arbitrator->stats().freeCapacityBytes, 0); + MemoryManager manager(options); + auto* arbitrator = manager.arbitrator(); + const int numPools = 6; + std::vector> pools; + for (int i = 0; i < numPools; ++i) { pools.push_back(manager.addRootPool("", kMaxMemory)); - - ASSERT_GE(pools.back()->capacity(), 0); - if (shrinkByCapacityGrow) { - ASSERT_TRUE( - arbitrator->growCapacity(pools[numPools - 1].get(), pools, 1 << 20)); - ASSERT_EQ(arbitrator->stats().freeReservedCapacityBytes, 0); - ASSERT_EQ(arbitrator->stats().freeCapacityBytes, 0); - ASSERT_EQ( - arbitrator->growCapacity(pools[numPools - 1].get(), 1 << 20), 0); - ASSERT_EQ(arbitrator->growCapacity(pools.back().get(), 2 << 20), 0); - } else { - ASSERT_EQ(arbitrator->shrinkCapacity(pools, 1 << 20), 2 << 20); - ASSERT_EQ( - arbitrator->growCapacity(pools[numPools - 1].get(), 1 << 20), 0); - ASSERT_EQ(arbitrator->growCapacity(pools.back().get(), 2 << 20), 1 << 20); - } + ASSERT_GE(pools.back()->capacity(), 1 << 20); } + ASSERT_EQ(arbitrator->stats().freeCapacityBytes, 0); + pools.push_back(manager.addRootPool("", kMaxMemory)); + + ASSERT_GE(pools.back()->capacity(), 0); + ASSERT_EQ(arbitrator->shrinkCapacity(pools, 1 << 20), 2 << 20); + ASSERT_EQ(arbitrator->growCapacity(pools[numPools - 1].get(), 1 << 20), 0); + ASSERT_EQ(arbitrator->growCapacity(pools.back().get(), 2 << 20), 1 << 20); } TEST_F(MemoryArbitrationTest, arbitratorStats) { diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index a5e4ec8415d8..19d359f674bc 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -401,27 +401,89 @@ TEST_P(MemoryPoolTest, DISABLED_memoryLeakCheck) { child->free(oneChunk, kChunkSize); } -TEST_P(MemoryPoolTest, DISABLED_growBeyondMaxCapacity) { - gflags::FlagSaver flagSaver; - testing::FLAGS_gtest_death_test_style = "fast"; +TEST_P(MemoryPoolTest, growFailures) { auto manager = getMemoryManager(); + // Grow beyond limit. { auto poolWithoutLimit = manager->addRootPool("poolWithoutLimit"); ASSERT_EQ(poolWithoutLimit->capacity(), kMaxMemory); - ASSERT_DEATH( - poolWithoutLimit->grow(1), "Can't grow with unlimited capacity"); + ASSERT_EQ(poolWithoutLimit->currentBytes(), 0); + VELOX_ASSERT_THROW( + poolWithoutLimit->grow(1, 0), "Can't grow with unlimited capacity"); + ASSERT_EQ(poolWithoutLimit->currentBytes(), 0); + VELOX_ASSERT_THROW( + poolWithoutLimit->grow(1, 1'000), "Can't grow with unlimited capacity"); + ASSERT_EQ(poolWithoutLimit->currentBytes(), 0); } { const int64_t capacity = 4 * GB; auto poolWithLimit = manager->addRootPool("poolWithLimit", capacity); ASSERT_EQ(poolWithLimit->capacity(), capacity); + ASSERT_EQ(poolWithLimit->currentBytes(), 0); ASSERT_EQ(poolWithLimit->shrink(poolWithLimit->currentBytes()), capacity); - ASSERT_EQ(poolWithLimit->grow(capacity / 2), capacity / 2); - ASSERT_DEATH( - poolWithLimit->grow(capacity), "Can't grow beyond the max capacity"); + ASSERT_EQ(poolWithLimit->currentBytes(), 0); + ASSERT_TRUE(poolWithLimit->grow(capacity / 2, 0)); + ASSERT_EQ(poolWithLimit->currentBytes(), 0); + ASSERT_FALSE(poolWithLimit->grow(capacity, 0)); + ASSERT_EQ(poolWithLimit->currentBytes(), 0); + ASSERT_EQ(poolWithLimit->capacity(), capacity / 2); + ASSERT_FALSE(poolWithLimit->grow(capacity, 1'000)); + ASSERT_EQ(poolWithLimit->currentBytes(), 0); + } + + // Insufficient capacity for new reservation. + { + const int64_t capacity = 4 * GB; + auto poolWithLimit = manager->addRootPool("poolWithLimit", capacity); + ASSERT_EQ(poolWithLimit->capacity(), capacity); + ASSERT_EQ(poolWithLimit->currentBytes(), 0); + ASSERT_EQ(poolWithLimit->shrink(poolWithLimit->capacity()), capacity); + ASSERT_EQ(poolWithLimit->currentBytes(), 0); + ASSERT_EQ(poolWithLimit->capacity(), 0); + + ASSERT_FALSE(poolWithLimit->grow(capacity / 2, capacity)); + ASSERT_EQ(poolWithLimit->currentBytes(), 0); + ASSERT_EQ(poolWithLimit->capacity(), 0); + + ASSERT_FALSE(poolWithLimit->grow(0, capacity)); + ASSERT_EQ(poolWithLimit->currentBytes(), 0); + ASSERT_EQ(poolWithLimit->capacity(), 0); + ASSERT_EQ(poolWithLimit->currentBytes(), 0); } } +TEST_P(MemoryPoolTest, grow) { + auto manager = getMemoryManager(); + const int64_t capacity = 4 * GB; + auto root = manager->addRootPool("grow", capacity); + root->shrink(capacity / 2); + ASSERT_EQ(root->capacity(), capacity / 2); + + auto leaf = root->addLeafChild("leafPool"); + void* buf = leaf->allocate(1 * MB); + ASSERT_EQ(root->capacity(), capacity / 2); + ASSERT_EQ(root->currentBytes(), 1 * MB); + + ASSERT_TRUE(root->grow(0, 2 * MB)); + ASSERT_EQ(root->currentBytes(), 3 * MB); + ASSERT_EQ(root->capacity(), capacity / 2); + + ASSERT_TRUE(root->grow(0, 4 * MB)); + ASSERT_EQ(root->currentBytes(), 7 * MB); + ASSERT_EQ(root->capacity(), capacity / 2); + + ASSERT_TRUE(root->grow(1 * MB, 2 * MB)); + ASSERT_EQ(root->currentBytes(), 9 * MB); + ASSERT_EQ(root->capacity(), capacity / 2 + 1 * MB); + + ASSERT_TRUE(root->grow(6 * MB, 4 * MB)); + ASSERT_EQ(root->currentBytes(), 13 * MB); + ASSERT_EQ(root->capacity(), capacity / 2 + 7 * MB); + + static_cast(root.get())->testingSetReservation(1 * MB); + leaf->free(buf, 1 * MB); +} + TEST_P(MemoryPoolTest, ReallocTestSameSize) { auto manager = getMemoryManager(); auto root = manager->addRootPool(); @@ -431,7 +493,6 @@ TEST_P(MemoryPoolTest, ReallocTestSameSize) { const int64_t kChunkSize{32L * MB}; // Realloc the same size. - void* oneChunk = pool->allocate(kChunkSize); ASSERT_EQ(kChunkSize, pool->currentBytes()); ASSERT_EQ(kChunkSize, pool->stats().peakBytes); @@ -2708,11 +2769,14 @@ TEST_P(MemoryPoolTest, shrinkAndGrowAPIs) { for (int i = 0; i < step; ++i) { const int expectedCapacity = (i + 1) * allocationSize; if (i % 3 == 0) { - ASSERT_EQ(leafPool->grow(allocationSize), expectedCapacity); + ASSERT_TRUE(leafPool->grow(allocationSize, 0)); + ASSERT_EQ(leafPool->capacity(), expectedCapacity); } else if (i % 3 == 1) { - ASSERT_EQ(aggregationPool->grow(allocationSize), expectedCapacity); + ASSERT_TRUE(aggregationPool->grow(allocationSize, 0)); + ASSERT_EQ(leafPool->capacity(), expectedCapacity); } else { - ASSERT_EQ(rootPool->grow(allocationSize), expectedCapacity); + ASSERT_TRUE(rootPool->grow(allocationSize, 0)); + ASSERT_EQ(leafPool->capacity(), expectedCapacity); } ASSERT_EQ(leafPool->capacity(), expectedCapacity); ASSERT_EQ(aggregationPool->capacity(), expectedCapacity); @@ -3287,31 +3351,6 @@ TEST_P(MemoryPoolTest, maybeReserveFailWithAbort) { child->maybeReserve(2 * kMaxSize), "Manual MemoryPool Abortion"); } -// Model implementation of a GrowCallback. -bool grow(int64_t size, int64_t hardLimit, MemoryPool& pool) { - static std::mutex mutex; - // The calls from different threads on the same tracker must be serialized. - std::lock_guard l(mutex); - // The total includes the allocation that exceeded the limit. This - // function's job is to raise the limit to >= current + size. - auto current = pool.reservedBytes(); - auto limit = pool.capacity(); - if (current + size <= limit) { - // No need to increase. It could be another thread already - // increased the cap far enough while this thread was waiting to - // enter the lock_guard. - return true; - } - if (current + size > hardLimit) { - // The caller will revert the allocation that called this and signal an - // error. - return false; - } - // We set the new limit to be the requested size. - static_cast(&pool)->testingSetCapacity(current + size); - return true; -} - DEBUG_ONLY_TEST_P(MemoryPoolTest, raceBetweenFreeAndFailedAllocation) { if (!isLeafThreadSafe_) { return; diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 1c0e70c11536..261a996d14f5 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -519,7 +519,7 @@ TEST_F(MockSharedArbitrationTest, constructor) { if (i < nonReservedCapacity / kMemoryPoolInitCapacity) { ASSERT_EQ(task->capacity(), kMemoryPoolInitCapacity); } else { - ASSERT_EQ(task->capacity(), kMemoryPoolReservedCapacity); + ASSERT_EQ(task->capacity(), kMemoryPoolReservedCapacity) << i; } remainingFreeCapacity -= task->capacity(); tasks.push_back(std::move(task)); @@ -586,8 +586,8 @@ TEST_F(MockSharedArbitrationTest, arbitrationFailsTask) { // handleOOM(). auto growTask = addTask(328 * MB); auto* growOp = growTask->addMemoryOp(false); - auto* bufGrow = growOp->allocate(64 * MB); - ASSERT_NO_THROW(manager_->testingGrowPool(growOp->pool(), 128 * MB)); + auto* bufGrow1 = growOp->allocate(64 * MB); + auto* bufGrow2 = growOp->allocate(128 * MB); ASSERT_NE(nonReclaimTask->error(), nullptr); try { std::rethrow_exception(nonReclaimTask->error()); @@ -1639,7 +1639,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, orderedArbitration) { } }))); SCOPED_TESTVALUE_SET( - "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableUsedMemory", + "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableUsedCapacity", std::function*)>( ([&](const std::vector* candidates) { for (int i = 1; i < candidates->size(); ++i) { @@ -1977,8 +1977,8 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, failedToReclaimFromRequestor) { std::atomic_bool arbitrationStarted{false}; SCOPED_TESTVALUE_SET( "facebook::velox::memory::SharedArbitrator::startArbitration", - std::function( - ([&](const MemoryPool* /*unused*/) { + std::function( + ([&](const SharedArbitrator* /*unused*/) { if (!arbitrationStarted) { return; } @@ -2167,8 +2167,8 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, failedToReclaimFromOtherTask) { std::atomic arbitrationStarted{false}; SCOPED_TESTVALUE_SET( "facebook::velox::memory::SharedArbitrator::startArbitration", - std::function( - ([&](const MemoryPool* /*unsed*/) { + std::function( + ([&](const SharedArbitrator* /*unsed*/) { if (!arbitrationStarted) { return; } @@ -2308,6 +2308,40 @@ TEST_F(MockSharedArbitrationTest, memoryPoolAbortThrow) { ASSERT_EQ(arbitrator_->stats().numAborted, 1); } +// This test makes sure the memory capacity grows as expected. +DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, concurrentArbitrationRequests) { + setupMemory(kMemoryCapacity, 0, 0, 0, 128 << 20); + std::shared_ptr task = addTask(); + MockMemoryOperator* op1 = addMemoryOp(task); + MockMemoryOperator* op2 = addMemoryOp(task); + + std::atomic_bool arbitrationWaitFlag{true}; + folly::EventCount arbitrationWait; + std::atomic_bool injectOnce{true}; + SCOPED_TESTVALUE_SET( + "facebook::velox::memory::SharedArbitrator::startArbitration", + std::function( + ([&](const SharedArbitrator* arbitrator) { + if (!injectOnce.exchange(false)) { + return; + } + arbitrationWaitFlag = false; + arbitrationWait.notifyAll(); + while (arbitrator->testingNumRequests() != 2) { + std::this_thread::sleep_for(std::chrono::seconds(5)); // NOLINT + } + }))); + + std::thread firstArbitrationThread([&]() { op1->allocate(64 << 20); }); + + std::thread secondArbitrationThread([&]() { op2->allocate(64 << 20); }); + + firstArbitrationThread.join(); + secondArbitrationThread.join(); + + ASSERT_EQ(task->capacity(), 128 << 20); +} + DEBUG_ONLY_TEST_F( MockSharedArbitrationTest, freeUnusedCapacityWhenReclaimMemoryPool) { @@ -2326,7 +2360,7 @@ DEBUG_ONLY_TEST_F( folly::EventCount reclaimBlock; auto reclaimBlockKey = reclaimBlock.prepareWait(); SCOPED_TESTVALUE_SET( - "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableUsedMemory", + "facebook::velox::memory::SharedArbitrator::sortCandidatesByReclaimableUsedCapacity", std::function(([&](const MemoryPool* /*unsed*/) { reclaimWait.notify(); reclaimBlock.wait(reclaimBlockKey); @@ -2368,10 +2402,11 @@ DEBUG_ONLY_TEST_F( SCOPED_TESTVALUE_SET( "facebook::velox::memory::SharedArbitrator::startArbitration", - std::function(([&](const MemoryPool* /*unsed*/) { - arbitrationRun.notify(); - arbitrationBlock.wait(arbitrationBlockKey); - }))); + std::function( + ([&](const SharedArbitrator* /*unsed*/) { + arbitrationRun.notify(); + arbitrationBlock.wait(arbitrationBlockKey); + }))); std::thread allocThread([&]() { // Allocate more than its capacity to trigger arbitration which is blocked diff --git a/velox/dwio/dwrf/test/WriterFlushTest.cpp b/velox/dwio/dwrf/test/WriterFlushTest.cpp index ae7d6ea3173e..140c8dd975e3 100644 --- a/velox/dwio/dwrf/test/WriterFlushTest.cpp +++ b/velox/dwio/dwrf/test/WriterFlushTest.cpp @@ -167,7 +167,6 @@ class MockMemoryPool : public velox::memory::MemoryPool { } MOCK_CONST_METHOD0(peakBytes, int64_t()); - // MOCK_CONST_METHOD0(getMaxBytes, int64_t()); MOCK_METHOD1(updateSubtreeMemoryUsage, int64_t(int64_t)); @@ -207,7 +206,7 @@ class MockMemoryPool : public velox::memory::MemoryPool { VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); } - uint64_t grow(uint64_t /*unused*/) noexcept override { + bool grow(uint64_t /*unused*/, uint64_t /*unused*/) noexcept override { VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); } diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index f4be9ddb1142..601a0d90a5e5 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -6512,8 +6512,6 @@ TEST_F(HashJoinTest, reclaimFromJoinBuilderWithMultiDrivers) { DEBUG_ONLY_TEST_F( HashJoinTest, failedToReclaimFromHashJoinBuildersInNonReclaimableSection) { - std::unique_ptr memoryManager = createMemoryManager(); - const auto& arbitrator = memoryManager->arbitrator(); auto rowType = ROW({ {"c0", INTEGER()}, {"c1", INTEGER()}, @@ -6522,7 +6520,7 @@ DEBUG_ONLY_TEST_F( const auto vectors = createVectors(rowType, 64 << 20, fuzzerOpts_); const int numDrivers = 1; std::shared_ptr queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); + newQueryCtx(memory::memoryManager(), executor_.get(), 512 << 20); const auto expectedResult = runHashJoinTask(vectors, queryCtx, numDrivers, pool(), false).data; @@ -6564,14 +6562,12 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(planStats.spilledBytes, 0); }); - auto fakePool = queryCtx->pool()->addLeafChild( - "fakePool", true, FakeMemoryReclaimer::create()); // Wait for the hash build operators to enter into non-reclaimable section. nonReclaimableSectionWait.await( [&]() { return !nonReclaimableSectionWaitFlag.load(); }); // We expect capacity grow fails as we can't reclaim from hash join operators. - ASSERT_FALSE(memoryManager->testingGrowPool(fakePool.get(), kMemoryCapacity)); + memory::testingRunArbitration(); // Notify the hash build operator that memory arbitration has been done. memoryArbitrationWaitFlag = false; @@ -6583,7 +6579,9 @@ DEBUG_ONLY_TEST_F( // one. We need to make sure any used memory got cleaned up before exiting // the scope waitForAllTasksToBeDeleted(); - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 2); + ASSERT_EQ( + memory::memoryManager()->arbitrator()->stats().numNonReclaimableAttempts, + 2); } DEBUG_ONLY_TEST_F(HashJoinTest, reclaimFromHashJoinBuildInWaitForTableBuild) { @@ -6769,103 +6767,6 @@ DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredByEnsureJoinTableFit) { .run(); } -DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringJoinTableBuild) { - std::unique_ptr memoryManager = createMemoryManager(); - const auto& arbitrator = memoryManager->arbitrator(); - auto rowType = ROW({ - {"c0", INTEGER()}, - {"c1", INTEGER()}, - {"c2", VARCHAR()}, - }); - // Build a large vector to trigger memory arbitration. - fuzzerOpts_.vectorSize = 10'000; - std::vector vectors = createVectors(2, rowType, fuzzerOpts_); - createDuckDbTable(vectors); - - std::shared_ptr joinQueryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - - std::atomic blockTableBuildOpOnce{true}; - std::atomic tableBuildBlocked{false}; - folly::EventCount tableBuildBlockWait; - std::atomic unblockTableBuild{false}; - folly::EventCount unblockTableBuildWait; - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::HashTable::parallelJoinBuild", - std::function(([&](memory::MemoryPool* pool) { - if (!blockTableBuildOpOnce.exchange(false)) { - return; - } - tableBuildBlocked = true; - tableBuildBlockWait.notifyAll(); - unblockTableBuildWait.await([&]() { return unblockTableBuild.load(); }); - void* buffer = pool->allocate(kMemoryCapacity / 4); - pool->free(buffer, kMemoryCapacity / 4); - }))); - - std::thread joinThread([&]() { - auto planNodeIdGenerator = std::make_shared(); - const auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto task = - AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->getPath()) - .config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kJoinSpillEnabled, true) - .config(core::QueryConfig::kSpillNumPartitionBits, 2) - // Set multiple hash build drivers to trigger parallel build. - .maxDrivers(4) - .queryCtx(joinQueryCtx) - .plan(PlanBuilder(planNodeIdGenerator) - .values(vectors, true) - .project({"c0 AS t0", "c1 AS t1", "c2 AS t2"}) - .hashJoin( - {"t0", "t1"}, - {"u1", "u0"}, - PlanBuilder(planNodeIdGenerator) - .values(vectors, true) - .project({"c0 AS u0", "c1 AS u1", "c2 AS u2"}) - .planNode(), - "", - {"t1"}, - core::JoinType::kInner) - .planNode()) - .assertResults( - "SELECT t.c1 FROM tmp as t, tmp AS u WHERE t.c0 == u.c1 AND t.c1 == u.c0"); - }); - - tableBuildBlockWait.await([&]() { return tableBuildBlocked.load(); }); - - folly::EventCount taskPauseWait; - std::atomic taskPaused{false}; - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Task::requestPauseLocked", - std::function(([&](Task* /*unused*/) { - taskPaused = true; - taskPauseWait.notifyAll(); - }))); - - std::thread memThread([&]() { - std::shared_ptr fakeCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - auto fakePool = fakeCtx->pool()->addLeafChild("fakePool"); - ASSERT_FALSE(memoryManager->testingGrowPool( - fakePool.get(), memoryManager->arbitrator()->capacity())); - }); - - taskPauseWait.await([&]() { return taskPaused.load(); }); - - unblockTableBuild = true; - unblockTableBuildWait.notifyAll(); - - joinThread.join(); - memThread.join(); - - // This test uses on-demand created memory manager instead of the global - // one. We need to make sure any used memory got cleaned up before exiting - // the scope - waitForAllTasksToBeDeleted(); -} - DEBUG_ONLY_TEST_F(HashJoinTest, joinBuildSpillError) { const int kMemoryCapacity = 32 << 20; // Set a small memory capacity to trigger spill. diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index d2ad623e3be7..4f7fff3cf56c 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -1294,78 +1294,39 @@ TEST_F(OrderByTest, maxSpillBytes) { DEBUG_ONLY_TEST_F(OrderByTest, reclaimFromOrderBy) { std::vector vectors = createVectors(8, rowType_, fuzzerOpts_); createDuckDbTable(vectors); - std::unique_ptr memoryManager = createMemoryManager(); - std::vector sameQueries = {false, true}; - for (bool sameQuery : sameQueries) { - SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery)); - const auto spillDirectory = exec::test::TempDirectoryPath::create(); - std::shared_ptr fakeQueryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity * 2); - std::shared_ptr orderByQueryCtx; - if (sameQuery) { - orderByQueryCtx = fakeQueryCtx; - } else { - orderByQueryCtx = newQueryCtx( - memoryManager.get(), executor_.get(), kMemoryCapacity * 2); - } - - folly::EventCount arbitrationWait; - std::atomic_bool arbitrationWaitFlag{true}; - folly::EventCount taskPauseWait; - std::atomic_bool taskPauseWaitFlag{true}; - - std::atomic_int numInputs{0}; - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Driver::runInternal::addInput", - std::function(([&](Operator* op) { - if (op->operatorType() != "OrderBy") { - return; - } - if (++numInputs != 5) { - return; - } - arbitrationWaitFlag = false; - arbitrationWait.notifyAll(); - - // Wait for task pause to be triggered. - taskPauseWait.await([&] { return !taskPauseWaitFlag.load(); }); - }))); - - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Task::requestPauseLocked", - std::function(([&](Task* /*unused*/) { - taskPauseWaitFlag = false; - taskPauseWait.notifyAll(); - }))); - - std::thread orderByThread([&]() { - auto task = - AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->getPath()) - .config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kOrderBySpillEnabled, true) - .queryCtx(orderByQueryCtx) - .plan(PlanBuilder() - .values(vectors) - .orderBy({"c0 ASC NULLS LAST"}, false) - .planNode()) - .assertResults("SELECT * FROM tmp ORDER BY c0 ASC NULLS LAST"); - auto stats = task->taskStats().pipelineStats; - ASSERT_GT(stats[0].operatorStats[1].spilledBytes, 0); - }); - - arbitrationWait.await([&] { return !arbitrationWaitFlag.load(); }); - - auto fakePool = fakeQueryCtx->pool()->addLeafChild( - "fakePool", true, FakeMemoryReclaimer::create()); - - memoryManager->testingGrowPool( - fakePool.get(), memoryManager->arbitrator()->capacity()); - - orderByThread.join(); + std::atomic_int numInputs{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function(([&](Operator* op) { + if (op->operatorType() != "OrderBy") { + return; + } + if (++numInputs != 5) { + return; + } + auto* driver = op->testingOperatorCtx()->driver(); + SuspendedSection suspendedSection(driver); + memory::testingRunArbitration(); + }))); - waitForAllTasksToBeDeleted(); - } + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId orderById; + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->getPath()) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kOrderBySpillEnabled, true) + .plan(PlanBuilder() + .values(vectors) + .orderBy({"c0 ASC NULLS LAST"}, false) + .capturePlanNodeId(orderById) + .planNode()) + .assertResults("SELECT * FROM tmp ORDER BY c0 ASC NULLS LAST"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(orderById); + ASSERT_GT(planStats.spilledBytes, 0); + task.reset(); + waitForAllTasksToBeDeleted(); } DEBUG_ONLY_TEST_F(OrderByTest, reclaimFromEmptyOrderBy) { @@ -1403,54 +1364,4 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimFromEmptyOrderBy) { ASSERT_EQ(stats[0].operatorStats[1].spilledBytes, 0); ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0); } - -TEST_F(OrderByTest, reclaimFromCompletedOrderBy) { - std::vector vectors = createVectors(8, rowType_, fuzzerOpts_); - createDuckDbTable(vectors); - std::unique_ptr memoryManager = createMemoryManager(); - std::vector sameQueries = {false, true}; - for (bool sameQuery : sameQueries) { - SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery)); - const auto spillDirectory = exec::test::TempDirectoryPath::create(); - std::shared_ptr fakeQueryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity * 2); - std::shared_ptr orderByQueryCtx; - if (sameQuery) { - orderByQueryCtx = fakeQueryCtx; - } else { - orderByQueryCtx = newQueryCtx( - memoryManager.get(), executor_.get(), kMemoryCapacity * 2); - } - - folly::EventCount arbitrationWait; - std::atomic_bool arbitrationWaitFlag{true}; - - std::thread orderByThread([&]() { - auto task = - AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(orderByQueryCtx) - .plan(PlanBuilder() - .values(vectors) - .orderBy({"c0 ASC NULLS LAST"}, false) - .planNode()) - .assertResults("SELECT * FROM tmp ORDER BY c0 ASC NULLS LAST"); - waitForTaskCompletion(task.get()); - arbitrationWaitFlag = false; - arbitrationWait.notifyAll(); - }); - - arbitrationWait.await([&] { return !arbitrationWaitFlag.load(); }); - - auto fakePool = fakeQueryCtx->pool()->addLeafChild( - "fakePool", true, FakeMemoryReclaimer::create()); - - memoryManager->testingGrowPool( - fakePool.get(), memoryManager->arbitrator()->capacity()); - - orderByThread.join(); - - waitForAllTasksToBeDeleted(); - } -} - } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 0fa6e5b272e6..400e55c76758 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -4241,7 +4241,8 @@ DEBUG_ONLY_TEST_F( if (!injectOnce.exchange(false)) { return; } - memory::memoryManager()->testingGrowPool(pool, 1 << 20); + VELOX_ASSERT_THROW( + pool->allocate(memory::memoryManager()->capacity()), ""); })); auto op = diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 9ab7f52865b4..5a4a45b151ea 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -4205,11 +4205,8 @@ DEBUG_ONLY_TEST_F( createVectors(rowType_, memoryCapacity / 8, fuzzerOpts_); const auto expectedResult = runWriteTask(vectors, nullptr, 1, pool(), kHiveConnectorId, false).data; - - auto memoryManager = createMemoryManager(memoryCapacity); - auto arbitrator = memoryManager->arbitrator(); auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); + newQueryCtx(memory::memoryManager(), executor_.get(), memoryCapacity); std::atomic_bool writerCloseWaitFlag{true}; folly::EventCount writerCloseWait; @@ -4237,13 +4234,7 @@ DEBUG_ONLY_TEST_F( writerCloseWait.await([&]() { return !writerCloseWaitFlag.load(); }); - // Creates a fake pool to trigger memory arbitration. - auto fakePool = queryCtx->pool()->addLeafChild( - "fakePool", true, FakeMemoryReclaimer::create()); - ASSERT_TRUE(memoryManager->testingGrowPool( - fakePool.get(), - arbitrator->stats().freeCapacityBytes + - queryCtx->pool()->capacity() / 2)); + memory::testingRunArbitration(); queryThread.join(); waitForAllTasksToBeDeleted();