From 88a7f41503d1de9797819162281ded84f9783869 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Wed, 4 Oct 2023 13:36:37 -0700 Subject: [PATCH] Add stats collection to memory reclaimer --- velox/common/memory/MemoryArbitrator.cpp | 33 ++++++-- velox/common/memory/MemoryArbitrator.h | 20 ++++- velox/common/memory/MemoryPool.cpp | 6 +- velox/common/memory/MemoryPool.h | 7 +- velox/common/memory/SharedArbitrator.cpp | 5 +- velox/common/memory/SharedArbitrator.h | 1 + .../memory/tests/MemoryArbitratorTest.cpp | 68 ++++++++++------- .../common/memory/tests/MemoryManagerTest.cpp | 2 +- velox/common/memory/tests/MemoryPoolTest.cpp | 7 +- .../memory/tests/MockSharedArbitratorTest.cpp | 3 +- .../memory/tests/SharedArbitratorTest.cpp | 76 +++++++++++++++++-- velox/dwio/dwrf/test/WriterFlushTest.cpp | 4 +- velox/exec/HashAggregation.cpp | 8 +- velox/exec/HashAggregation.h | 3 +- velox/exec/HashBuild.cpp | 12 +-- velox/exec/HashBuild.h | 3 +- velox/exec/HashJoinBridge.cpp | 7 +- velox/exec/HashJoinBridge.h | 5 +- velox/exec/Operator.cpp | 5 +- velox/exec/Operator.h | 9 ++- velox/exec/OrderBy.cpp | 8 +- velox/exec/OrderBy.h | 3 +- velox/exec/TableWriter.cpp | 3 +- velox/exec/TableWriter.h | 5 +- velox/exec/Task.cpp | 5 +- velox/exec/Task.h | 5 +- velox/exec/tests/AggregationTest.cpp | 38 +++++++--- velox/exec/tests/HashJoinTest.cpp | 38 +++++++--- velox/exec/tests/OrderByTest.cpp | 30 ++++++-- 29 files changed, 311 insertions(+), 108 deletions(-) diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index 45270870368a..f6f21f7e369b 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -184,7 +184,8 @@ bool MemoryReclaimer::reclaimableBytes( return reclaimable; } -uint64_t MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes) { +uint64_t +MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats) { if (pool->kind() == MemoryPool::Kind::kLeaf) { return 0; } @@ -214,7 +215,7 @@ uint64_t MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes) { uint64_t reclaimedBytes{0}; for (const auto& candidate : candidates) { - const auto bytes = candidate.pool->reclaim(targetBytes); + const auto bytes = candidate.pool->reclaim(targetBytes, stats); reclaimedBytes += bytes; if (targetBytes != 0) { if (bytes >= targetBytes) { @@ -243,6 +244,16 @@ void MemoryReclaimer::abort(MemoryPool* pool, const std::exception_ptr& error) { }); } +bool MemoryReclaimer::Stats::operator==( + const MemoryReclaimer::Stats& other) const { + return numNonReclaimableAttempts == other.numNonReclaimableAttempts; +} + +bool MemoryReclaimer::Stats::operator!=( + const MemoryReclaimer::Stats& other) const { + return !(*this == other); +} + MemoryArbitrator::Stats::Stats( uint64_t _numRequests, uint64_t _numSucceeded, @@ -254,7 +265,8 @@ MemoryArbitrator::Stats::Stats( uint64_t _numReclaimedBytes, uint64_t _maxCapacityBytes, uint64_t _freeCapacityBytes, - uint64_t _reclaimTimeUs) + uint64_t _reclaimTimeUs, + uint64_t _numNonReclaimableAttempts) : numRequests(_numRequests), numSucceeded(_numSucceeded), numAborted(_numAborted), @@ -265,15 +277,17 @@ MemoryArbitrator::Stats::Stats( numReclaimedBytes(_numReclaimedBytes), maxCapacityBytes(_maxCapacityBytes), freeCapacityBytes(_freeCapacityBytes), - reclaimTimeUs(_reclaimTimeUs) {} + reclaimTimeUs(_reclaimTimeUs), + numNonReclaimableAttempts(_numNonReclaimableAttempts) {} std::string MemoryArbitrator::Stats::toString() const { return fmt::format( - "STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} reclaimedMemory {} maxCapacity {} freeCapacity {}]", + "STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} numNonReclaimableAttempts {} queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} reclaimedMemory {} maxCapacity {} freeCapacity {}]", numRequests, numSucceeded, numAborted, numFailures, + numNonReclaimableAttempts, succinctMicros(queueTimeUs), succinctMicros(arbitrationTimeUs), succinctMicros(reclaimTimeUs), @@ -297,6 +311,8 @@ MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-( result.maxCapacityBytes = maxCapacityBytes; result.freeCapacityBytes = freeCapacityBytes; result.reclaimTimeUs = reclaimTimeUs - other.reclaimTimeUs; + result.numNonReclaimableAttempts = + numNonReclaimableAttempts - other.numNonReclaimableAttempts; return result; } @@ -312,7 +328,8 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const { numReclaimedBytes, maxCapacityBytes, freeCapacityBytes, - reclaimTimeUs) == + reclaimTimeUs, + numNonReclaimableAttempts) == std::tie( other.numRequests, other.numSucceeded, @@ -324,7 +341,8 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const { other.numReclaimedBytes, other.maxCapacityBytes, other.freeCapacityBytes, - other.reclaimTimeUs); + other.reclaimTimeUs, + other.numNonReclaimableAttempts); } bool MemoryArbitrator::Stats::operator!=(const Stats& other) const { @@ -355,6 +373,7 @@ bool MemoryArbitrator::Stats::operator<(const Stats& other) const { UPDATE_COUNTER(numShrunkBytes); UPDATE_COUNTER(numReclaimedBytes); UPDATE_COUNTER(reclaimTimeUs); + UPDATE_COUNTER(numNonReclaimableAttempts); #undef UPDATE_COUNTER VELOX_CHECK( !((gtCount > 0) && (ltCount > 0)), diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 93c8a742937e..a73fea87c00a 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -176,6 +176,9 @@ class MemoryArbitrator { /// The sum of all reclaim operation durations during arbitration in /// microseconds. uint64_t reclaimTimeUs{0}; + /// The total number of times of the reclaim attempts that end up failing + /// due to reclaiming at non-reclaimable stage. + uint64_t numNonReclaimableAttempts{0}; Stats( uint64_t _numRequests, @@ -188,7 +191,8 @@ class MemoryArbitrator { uint64_t _numReclaimedBytes, uint64_t _maxCapacityBytes, uint64_t _freeCapacityBytes, - uint64_t _reclaimTimeUs); + uint64_t _reclaimTimeUs, + uint64_t _numNonReclaimableAttempts); Stats() = default; @@ -252,6 +256,17 @@ FOLLY_ALWAYS_INLINE std::ostream& operator<<( /// through techniques such as disks spilling. class MemoryReclaimer { public: + /// Used to collect memory reclaim execution stats. + struct Stats { + /// The total number of times of the reclaim attempts that end up failing + /// due to reclaiming at non-reclaimable stage. + uint64_t numNonReclaimableAttempts{0}; + + bool operator==(const Stats& other) const; + + bool operator!=(const Stats& other) const; + }; + virtual ~MemoryReclaimer() = default; static std::unique_ptr create(); @@ -286,7 +301,8 @@ class MemoryReclaimer { /// memory bytes but there is no guarantees. If 'targetBytes' is zero, then it /// reclaims all the reclaimable memory from the memory 'pool'. The function /// returns the actual reclaimed memory bytes. - virtual uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes); + virtual uint64_t + reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats); /// Invoked by the memory arbitrator to abort memory 'pool' and the associated /// query execution when encounters non-recoverable memory reclaim error or diff --git a/velox/common/memory/MemoryPool.cpp b/velox/common/memory/MemoryPool.cpp index 63c7dd49515d..ab2c3b026f47 100644 --- a/velox/common/memory/MemoryPool.cpp +++ b/velox/common/memory/MemoryPool.cpp @@ -923,11 +923,13 @@ bool MemoryPoolImpl::reclaimableBytes(uint64_t& reclaimableBytes) const { return reclaimer()->reclaimableBytes(*this, reclaimableBytes); } -uint64_t MemoryPoolImpl::reclaim(uint64_t targetBytes) { +uint64_t MemoryPoolImpl::reclaim( + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) { if (reclaimer() == nullptr) { return 0; } - return reclaimer()->reclaim(this, targetBytes); + return reclaimer()->reclaim(this, targetBytes, stats); } void MemoryPoolImpl::enterArbitration() { diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index b65dfbc45e74..b12e0799a29c 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -408,7 +408,9 @@ class MemoryPool : public std::enable_shared_from_this { /// noop if the reclaimer is not set, otherwise invoke the reclaimer's /// corresponding method. The function returns the actually freed capacity /// from the root of this memory pool. - virtual uint64_t reclaim(uint64_t targetBytes) = 0; + virtual uint64_t reclaim( + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) = 0; /// Invoked by the memory arbitrator to abort a root memory pool. The function /// forwards the request to the corresponding query object to abort its @@ -627,7 +629,8 @@ class MemoryPoolImpl : public MemoryPool { bool reclaimableBytes(uint64_t& reclaimableBytes) const override; - uint64_t reclaim(uint64_t targetBytes) override; + uint64_t reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) + override; uint64_t shrink(uint64_t targetBytes = 0) override; diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index ff51e8c60996..611e9fa65fea 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -398,13 +398,14 @@ uint64_t SharedArbitrator::reclaim( uint64_t reclaimDurationUs{0}; uint64_t reclaimedBytes{0}; uint64_t freedBytes{0}; + MemoryReclaimer::Stats reclaimerStats; { MicrosecondTimer reclaimTimer(&reclaimDurationUs); const uint64_t oldCapacity = pool->capacity(); try { freedBytes = pool->shrink(targetBytes); if (freedBytes < targetBytes) { - pool->reclaim(targetBytes - freedBytes); + pool->reclaim(targetBytes - freedBytes, reclaimerStats); } } catch (const std::exception& e) { VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool " @@ -421,6 +422,7 @@ uint64_t SharedArbitrator::reclaim( numReclaimedBytes_ += reclaimedBytes - freedBytes; numShrunkBytes_ += freedBytes; reclaimTimeUs_ += reclaimDurationUs; + numNonReclaimableAttempts_ += reclaimerStats.numNonReclaimableAttempts; VELOX_MEM_LOG(INFO) << "Reclaimed from memory pool " << pool->name() << " with target of " << succinctBytes(targetBytes) << ", actually reclaimed " << succinctBytes(freedBytes) @@ -492,6 +494,7 @@ MemoryArbitrator::Stats SharedArbitrator::statsLocked() const { stats.maxCapacityBytes = capacity_; stats.freeCapacityBytes = freeCapacity_; stats.reclaimTimeUs = reclaimTimeUs_; + stats.numNonReclaimableAttempts = numNonReclaimableAttempts_; return stats; } diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index e6dfd31efcea..b291b2af3bb1 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -200,5 +200,6 @@ class SharedArbitrator : public MemoryArbitrator { tsan_atomic numShrunkBytes_{0}; tsan_atomic numReclaimedBytes_{0}; tsan_atomic reclaimTimeUs_{0}; + tsan_atomic numNonReclaimableAttempts_{0}; }; } // namespace facebook::velox::memory diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 22367a5e19c8..8371cdb7cb38 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -44,9 +44,10 @@ TEST_F(MemoryArbitrationTest, stats) { stats.numShrunkBytes = 100'000'000; stats.numReclaimedBytes = 10'000; stats.reclaimTimeUs = 1'000; + stats.numNonReclaimableAttempts = 5; ASSERT_EQ( stats.toString(), - "STATS[numRequests 2 numSucceeded 0 numAborted 3 numFailures 100 queueTime 230.00ms arbitrationTime 1.02ms reclaimTime 1.00ms shrunkMemory 95.37MB reclaimedMemory 9.77KB maxCapacity 0B freeCapacity 0B]"); + "STATS[numRequests 2 numSucceeded 0 numAborted 3 numFailures 100 numNonReclaimableAttempts 5 queueTime 230.00ms arbitrationTime 1.02ms reclaimTime 1.00ms shrunkMemory 95.37MB reclaimedMemory 9.77KB maxCapacity 0B freeCapacity 0B]"); } TEST_F(MemoryArbitrationTest, create) { @@ -120,9 +121,9 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) { TEST_F(MemoryArbitrationTest, arbitratorStats) { const MemoryArbitrator::Stats emptyStats; ASSERT_TRUE(emptyStats.empty()); - const MemoryArbitrator::Stats anchorStats(5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5); + const MemoryArbitrator::Stats anchorStats(5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5); ASSERT_FALSE(anchorStats.empty()); - const MemoryArbitrator::Stats largeStats(8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8); + const MemoryArbitrator::Stats largeStats(8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8); ASSERT_FALSE(largeStats.empty()); ASSERT_TRUE(!(anchorStats == largeStats)); ASSERT_TRUE(anchorStats != largeStats); @@ -131,9 +132,9 @@ TEST_F(MemoryArbitrationTest, arbitratorStats) { ASSERT_TRUE(anchorStats <= largeStats); ASSERT_TRUE(!(anchorStats >= largeStats)); const auto delta = largeStats - anchorStats; - ASSERT_EQ(delta, MemoryArbitrator::Stats(3, 3, 3, 3, 3, 3, 3, 3, 8, 8, 3)); + ASSERT_EQ(delta, MemoryArbitrator::Stats(3, 3, 3, 3, 3, 3, 3, 3, 8, 8, 3, 3)); - const MemoryArbitrator::Stats smallStats(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2); + const MemoryArbitrator::Stats smallStats(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2); ASSERT_TRUE(!(anchorStats == smallStats)); ASSERT_TRUE(anchorStats != smallStats); ASSERT_TRUE(!(anchorStats < smallStats)); @@ -141,7 +142,8 @@ TEST_F(MemoryArbitrationTest, arbitratorStats) { ASSERT_TRUE(!(anchorStats <= smallStats)); ASSERT_TRUE(anchorStats >= smallStats); - const MemoryArbitrator::Stats invalidStats(2, 2, 2, 2, 2, 2, 8, 8, 8, 8, 8); + const MemoryArbitrator::Stats invalidStats( + 2, 2, 2, 2, 2, 2, 8, 8, 8, 8, 8, 2); ASSERT_TRUE(!(anchorStats == invalidStats)); ASSERT_TRUE(anchorStats != invalidStats); ASSERT_THROW(anchorStats < invalidStats, VeloxException); @@ -242,9 +244,10 @@ class MemoryReclaimerTest : public testing::Test { void TearDown() override {} folly::Random::DefaultGenerator rng_; + MemoryReclaimer::Stats stats_; }; -TEST_F(MemoryReclaimerTest, default) { +TEST_F(MemoryReclaimerTest, common) { struct { int numChildren; int numGrandchildren; @@ -312,10 +315,11 @@ TEST_F(MemoryReclaimerTest, default) { uint64_t reclaimableBytes; ASSERT_FALSE(pool->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); - ASSERT_EQ(pool->reclaim(0), 0); - ASSERT_EQ(pool->reclaim(100), 0); - ASSERT_EQ(pool->reclaim(kMaxMemory), 0); + ASSERT_EQ(pool->reclaim(0, stats_), 0); + ASSERT_EQ(pool->reclaim(100, stats_), 0); + ASSERT_EQ(pool->reclaim(kMaxMemory, stats_), 0); } + ASSERT_EQ(stats_, MemoryReclaimer::Stats{}); for (const auto& allocation : allocations) { allocation.pool->free(allocation.buffer, allocation.size); } @@ -338,8 +342,10 @@ class MockLeafMemoryReclaimer : public MemoryReclaimer { return true; } - uint64_t reclaim(MemoryPool* /*unused*/, uint64_t targetBytes) noexcept - override { + uint64_t reclaim( + MemoryPool* /*unused*/, + uint64_t targetBytes, + Stats& stats) noexcept override { std::lock_guard l(mu_); uint64_t reclaimedBytes{0}; while (!allocations_.empty() && @@ -427,20 +433,21 @@ TEST_F(MemoryReclaimerTest, mockReclaim) { const int numReclaims = 5; const int numBytesToReclaim = allocBytes * 3; for (int iter = 0; iter < numReclaims; ++iter) { - const auto reclaimedBytes = root->reclaim(numBytesToReclaim); + const auto reclaimedBytes = root->reclaim(numBytesToReclaim, stats_); ASSERT_EQ(reclaimedBytes, numBytesToReclaim); ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, totalUsedBytes); } ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(totalUsedBytes, reclaimableBytes); - ASSERT_EQ(root->reclaim(allocBytes + 1), 2 * allocBytes); - ASSERT_EQ(root->reclaim(allocBytes - 1), allocBytes); + ASSERT_EQ(root->reclaim(allocBytes + 1, stats_), 2 * allocBytes); + ASSERT_EQ(root->reclaim(allocBytes - 1, stats_), allocBytes); const uint64_t expectedReclaimedBytes = totalUsedBytes; - ASSERT_EQ(root->reclaim(0), expectedReclaimedBytes); + ASSERT_EQ(root->reclaim(0, stats_), expectedReclaimedBytes); ASSERT_EQ(totalUsedBytes, 0); ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); + ASSERT_EQ(stats_, MemoryReclaimer::Stats{}); } TEST_F(MemoryReclaimerTest, mockReclaimMoreThanAvailable) { @@ -470,10 +477,12 @@ TEST_F(MemoryReclaimerTest, mockReclaimMoreThanAvailable) { ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, totalUsedBytes); const uint64_t expectedReclaimedBytes = totalUsedBytes; - ASSERT_EQ(root->reclaim(totalUsedBytes + 100), expectedReclaimedBytes); + ASSERT_EQ( + root->reclaim(totalUsedBytes + 100, stats_), expectedReclaimedBytes); ASSERT_EQ(totalUsedBytes, 0); ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); + ASSERT_EQ(stats_, MemoryReclaimer::Stats{}); } TEST_F(MemoryReclaimerTest, orderedReclaim) { @@ -528,7 +537,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child. // So expected reclaimable allocation units are {10, 11, 8, *14*, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes), + root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes, stats_), 2 * allocUnitBytes); totalAllocUnits -= 2; verify({10, 11, 8, 14, 5}); @@ -537,7 +546,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child. // So expected reclaimable allocation units are {10, 11, 8, *12*, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes), + root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes, stats_), 2 * allocUnitBytes); totalAllocUnits -= 2; verify({10, 11, 8, 12, 5}); @@ -546,7 +555,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child. // So expected reclaimable allocation units are {10, 11, 8, *4*, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 8 * allocUnitBytes), + root->reclaimer()->reclaim(root.get(), 8 * allocUnitBytes, stats_), 8 * allocUnitBytes); totalAllocUnits -= 8; verify({10, 11, 8, 4, 5}); @@ -555,7 +564,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child. // So expected reclaimable allocation gunits are {10, *9*, 8, 4, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes), + root->reclaimer()->reclaim(root.get(), 2 * allocUnitBytes, stats_), 2 * allocUnitBytes); totalAllocUnits -= 2; verify({10, 9, 8, 4, 5}); @@ -564,7 +573,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child. // So expected reclaimable allocation units are {*7*, 9, 8, 4, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 3 * allocUnitBytes), + root->reclaimer()->reclaim(root.get(), 3 * allocUnitBytes, stats_), 3 * allocUnitBytes); totalAllocUnits -= 3; verify({7, 9, 8, 4, 5}); @@ -573,7 +582,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child and two from 2nd child. // So expected reclaimable allocation units are {7, *0*, *6*, 4, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 11 * allocUnitBytes), + root->reclaimer()->reclaim(root.get(), 11 * allocUnitBytes, stats_), 11 * allocUnitBytes); totalAllocUnits -= 11; verify({7, 0, 6, 4, 5}); @@ -582,7 +591,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child and three from 2nd child. // So expected reclaimable allocation units are {*0*, 0, *3*, 4, 5} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 10 * allocUnitBytes), + root->reclaimer()->reclaim(root.get(), 10 * allocUnitBytes, stats_), 10 * allocUnitBytes); totalAllocUnits -= 10; verify({0, 0, 3, 4, 5}); @@ -591,7 +600,7 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // child and 4 from 4th child and 1 from 2nd. // So expected reclaimable allocation units are {0, 0, 2, *0*, *0*} ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), 10 * allocUnitBytes), + root->reclaimer()->reclaim(root.get(), 10 * allocUnitBytes, stats_), 10 * allocUnitBytes); totalAllocUnits -= 10; verify({0, 0, 2, 0, 0}); @@ -599,10 +608,12 @@ TEST_F(MemoryReclaimerTest, orderedReclaim) { // Reclaim all the remaining units and expect all reclaimable bytes got // cleared. ASSERT_EQ( - root->reclaimer()->reclaim(root.get(), totalAllocUnits * allocUnitBytes), + root->reclaimer()->reclaim( + root.get(), totalAllocUnits * allocUnitBytes, stats_), totalAllocUnits * allocUnitBytes); totalAllocUnits = 0; verify({0, 0, 0, 0, 0}); + ASSERT_EQ(stats_, MemoryReclaimer::Stats{}); } TEST_F(MemoryReclaimerTest, arbitrationContext) { @@ -685,7 +696,7 @@ TEST_F(MemoryReclaimerTest, concurrentRandomMockReclaims) { bytesToReclaim = 0; } } - const auto reclaimedBytes = root->reclaim(bytesToReclaim); + const auto reclaimedBytes = root->reclaim(bytesToReclaim, stats_); if (reclaimedBytes < bytesToReclaim) { ASSERT_GT(bytesToReclaim, oldUsedBytes); } @@ -723,10 +734,11 @@ TEST_F(MemoryReclaimerTest, concurrentRandomMockReclaims) { ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, totalUsedBytes); - root->reclaim(0); + root->reclaim(0, stats_); ASSERT_TRUE(root->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); ASSERT_EQ(totalUsedBytes, 0); + ASSERT_EQ(stats_, MemoryReclaimer::Stats{}); } } // namespace facebook::velox::memory diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index 2da2ddb2581c..bdeb39b7eb3d 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -96,7 +96,7 @@ TEST_F(MemoryManagerTest, Ctor) { ASSERT_EQ(arbitrator->stats().maxCapacityBytes, kCapacity); ASSERT_EQ( manager.toString(), - "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of pools 0\nList of root pools:\n\t__default_root__\nMemory Allocator[MALLOC capacity 4.00GB allocated bytes 0 allocated pages 0 mapped pages 0]\nARBITRATOR[SHARED CAPACITY[4.00GB] STATS[numRequests 0 numSucceeded 0 numAborted 0 numFailures 0 queueTime 0us arbitrationTime 0us reclaimTime 0us shrunkMemory 0B reclaimedMemory 0B maxCapacity 4.00GB freeCapacity 4.00GB]]]"); + "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of pools 0\nList of root pools:\n\t__default_root__\nMemory Allocator[MALLOC capacity 4.00GB allocated bytes 0 allocated pages 0 mapped pages 0]\nARBITRATOR[SHARED CAPACITY[4.00GB] STATS[numRequests 0 numSucceeded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 queueTime 0us arbitrationTime 0us reclaimTime 0us shrunkMemory 0B reclaimedMemory 0B maxCapacity 4.00GB freeCapacity 4.00GB]]]"); } { // Test construction failure due to inconsistent allocator capacity setting. diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index 9982454a3b1a..2b9c45a64bd2 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -152,6 +152,7 @@ class MemoryPoolTest : public testing::TestWithParam { std::shared_ptr allocator_; std::shared_ptr manager_; std::shared_ptr cache_; + MemoryReclaimer::Stats stats_; }; TEST_P(MemoryPoolTest, Ctor) { @@ -2778,9 +2779,9 @@ TEST_P(MemoryPoolTest, reclaimAPIsWithDefaultReclaimer) { uint64_t reclaimableBytes{100}; ASSERT_FALSE(pool->reclaimableBytes(reclaimableBytes)); ASSERT_EQ(reclaimableBytes, 0); - ASSERT_EQ(pool->reclaim(0), 0); - ASSERT_EQ(pool->reclaim(100), 0); - ASSERT_EQ(pool->reclaim(kMaxMemory), 0); + ASSERT_EQ(pool->reclaim(0, stats_), 0); + ASSERT_EQ(pool->reclaim(100, stats_), 0); + ASSERT_EQ(pool->reclaim(kMaxMemory, stats_), 0); } for (const auto& allocation : allocations) { allocation.pool->free(allocation.buffer, allocation.size); diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 72d804713050..8f092cc64e20 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -161,7 +161,8 @@ class MockMemoryOperator { return op_->reclaimableBytes(pool, reclaimableBytes); } - uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes) override { + uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats) + override { ++numReclaims_; if (!reclaimable_) { return 0; diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index 4a03e909aa71..547d818f5c47 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -100,8 +100,9 @@ class FakeMemoryNode : public core::PlanNode { }; using AllocationCallback = std::function; -using ReclaimInjectionCallback = - std::function; +// If return true, the caller will terminate execution and return early. +using ReclaimInjectionCallback = std::function< + bool(MemoryPool* pool, uint64_t targetByte, MemoryReclaimer::Stats& stats)>; // Custom operator for the custom factory. class FakeMemoryOperator : public Operator { @@ -163,15 +164,16 @@ class FakeMemoryOperator : public Operator { return canReclaim_; } - void reclaim(uint64_t targetBytes) override { + void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) + override { VELOX_CHECK(canReclaim()); auto* driver = operatorCtx_->driver(); VELOX_CHECK(!driver->state().isOnThread() || driver->state().isSuspended); VELOX_CHECK(driver->task()->pauseRequested()); VELOX_CHECK_GT(targetBytes, 0); - if (reclaimCb_ != nullptr) { - reclaimCb_(pool(), targetBytes); + if (reclaimCb_ != nullptr && reclaimCb_(pool(), targetBytes, stats)) { + return; } uint64_t bytesReclaimed{0}; @@ -575,7 +577,8 @@ class TestMemoryReclaimer : public MemoryReclaimer { TestMemoryReclaimer(std::function reclaimCb) : reclaimCb_(std::move(reclaimCb)) {} - uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes) override { + uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats) + override { if (pool->kind() == MemoryPool::Kind::kLeaf) { return 0; } @@ -599,7 +602,7 @@ class TestMemoryReclaimer : public MemoryReclaimer { uint64_t reclaimedBytes{0}; for (const auto& candidate : candidates) { - const auto bytes = candidate.pool->reclaim(targetBytes); + const auto bytes = candidate.pool->reclaim(targetBytes, stats); if (reclaimCb_ != nullptr) { reclaimCb_(candidate.pool); } @@ -1729,6 +1732,7 @@ DEBUG_ONLY_TEST_F( // We only expect to reclaim from one hash build operator once. ASSERT_EQ(numHashBuildReclaims, 1); waitForAllTasksToBeDeleted(); + ASSERT_EQ(arbitrator_->stats().numNonReclaimableAttempts, 1); } DEBUG_ONLY_TEST_F( @@ -2147,6 +2151,59 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, driverInitTriggeredArbitration) { .assertResults(expectedVector); } +DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimerStats) { + std::vector vectors; + const int vectorSize = 100; + fuzzerOpts_.vectorSize = vectorSize; + vectors.push_back(newVector()); + + createDuckDbTable(vectors); + setupMemory(32 * MB, 0); + std::shared_ptr queryCtx = newQueryCtx(32 * MB); + ASSERT_EQ(queryCtx->pool()->capacity(), 0); + ASSERT_EQ(queryCtx->pool()->maxCapacity(), 32 * MB); + auto additionalCtxLeafPool = queryCtx->pool()->addLeafChild("ctx_leaf"); + TestAllocation outerAlloc; + outerAlloc.buffer = additionalCtxLeafPool->allocate(8 * MB); + outerAlloc.pool = additionalCtxLeafPool.get(); + outerAlloc.size = 8 * MB; + fakeOperatorFactory_->setAllocationCallback([&](Operator* op) { + TestAllocation allocation0; + TestAllocation allocation1; + auto guard = folly::makeGuard([&]() { + allocation0.free(); + allocation1.free(); + }); + allocation0.buffer = op->pool()->allocate(16 * MB); + allocation0.pool = op->pool(); + allocation0.size = 16 * MB; + + allocation1.buffer = op->pool()->allocate(16 * MB); + allocation1.pool = op->pool(); + allocation1.size = 16 * MB; + + return TestAllocation{}; + }); + fakeOperatorFactory_->setReclaimCallback([&](MemoryPool* /*unused*/, + uint64_t /*unused*/, + MemoryReclaimer::Stats& stats) { + ++stats.numNonReclaimableAttempts; + outerAlloc.free(); + return true; + }); + auto planNodeIdGenerator = std::make_shared(); + AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(queryCtx) + .plan(PlanBuilder(planNodeIdGenerator, pool()) + .values(vectors) + .addNode([&](std::string id, core::PlanNodePtr input) { + return std::make_shared(id, input); + }) + .planNode()) + .assertResults(vectors); + ASSERT_EQ(arbitrator_->stats().numNonReclaimableAttempts, 1); +} + DEBUG_ONLY_TEST_F( SharedArbitrationTest, DISABLED_raceBetweenTaskTerminateAndReclaim) { @@ -2646,10 +2703,13 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { fakeOperatorFactory_->setMaxDrivers(numDrivers); const std::string injectReclaimErrorMessage("Inject reclaim failure"); fakeOperatorFactory_->setReclaimCallback( - [&](MemoryPool* /*unused*/, uint64_t /*unused*/) { + [&](MemoryPool* /*unused*/, + uint64_t /*unused*/, + MemoryReclaimer::Stats& /*unused*/) { if (folly::Random::oneIn(10)) { VELOX_FAIL(injectReclaimErrorMessage); } + return false; }); const int numThreads = 30; diff --git a/velox/dwio/dwrf/test/WriterFlushTest.cpp b/velox/dwio/dwrf/test/WriterFlushTest.cpp index 6e45907e7e34..2fdcba761ac2 100644 --- a/velox/dwio/dwrf/test/WriterFlushTest.cpp +++ b/velox/dwio/dwrf/test/WriterFlushTest.cpp @@ -200,7 +200,9 @@ class MockMemoryPool : public velox::memory::MemoryPool { VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); } - uint64_t reclaim(uint64_t targetBytes) override { + uint64_t reclaim( + uint64_t /*unused*/, + velox::memory::MemoryReclaimer::Stats& /*unused*/) override { VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); } diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index cf006caa2f5e..e0b4ab944f8e 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -481,15 +481,17 @@ bool HashAggregation::isFinished() { return finished_; } -void HashAggregation::reclaim(uint64_t targetBytes) { +void HashAggregation::reclaim( + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) { VELOX_CHECK(canReclaim()); auto* driver = operatorCtx_->driver(); /// NOTE: an aggregation operator is reclaimable if it hasn't started output /// processing and is not under non-reclaimable execution section. if (noMoreInput_ || nonReclaimableSection_) { - // TODO: add stats to record the non-reclaimable case and reduce the log - // frequency if it is too verbose. + // TODO: reduce the log frequency if it is too verbose. + ++stats.numNonReclaimableAttempts; LOG(WARNING) << "Can't reclaim from aggregation operator, noMoreInput_[" << noMoreInput_ << "], nonReclaimableSection_[" << nonReclaimableSection_ << "], " << toString(); diff --git a/velox/exec/HashAggregation.h b/velox/exec/HashAggregation.h index ea2e71528743..51e01897fb02 100644 --- a/velox/exec/HashAggregation.h +++ b/velox/exec/HashAggregation.h @@ -43,7 +43,8 @@ class HashAggregation : public Operator { bool isFinished() override; - void reclaim(uint64_t targetBytes) override; + void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) + override; void close() override; diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index b50c82fc1514..847012b7425b 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -1070,7 +1070,9 @@ bool HashBuild::testingTriggerSpill() { spillConfig()->testSpillPct; } -void HashBuild::reclaim(uint64_t /*unused*/) { +void HashBuild::reclaim( + uint64_t /*unused*/, + memory::MemoryReclaimer::Stats& stats) { VELOX_CHECK(canReclaim()); auto* driver = operatorCtx_->driver(); @@ -1080,8 +1082,8 @@ void HashBuild::reclaim(uint64_t /*unused*/) { // build processing and is not under non-reclaimable execution section. if ((state_ != State::kRunning && state_ != State::kWaitForBuild) || nonReclaimableSection_) { - // TODO: add stats to record the non-reclaimable case and reduce the log - // frequency if it is too verbose. + // TODO: reduce the log frequency if it is too verbose. + ++stats.numNonReclaimableAttempts; LOG(WARNING) << "Can't reclaim from hash build operator, state_[" << stateName(state_) << "], nonReclaimableSection_[" << nonReclaimableSection_ << "], " << toString(); @@ -1099,8 +1101,8 @@ void HashBuild::reclaim(uint64_t /*unused*/) { if ((buildOp->state_ != State::kRunning && buildOp->state_ != State::kWaitForBuild) || buildOp->nonReclaimableSection_) { - // TODO: add stats to record the non-reclaimable case and reduce the log - // frequency if it is too verbose. + // TODO: reduce the log frequency if it is too verbose. + ++stats.numNonReclaimableAttempts; LOG(WARNING) << "Can't reclaim from hash build operator, state_[" << stateName(buildOp->state_) << "], nonReclaimableSection_[" << buildOp->nonReclaimableSection_ << "], " diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 35f3e29e14da..f94025d01f64 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -77,7 +77,8 @@ class HashBuild final : public Operator { bool isFinished() override; - void reclaim(uint64_t targetBytes) override; + void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) + override; void abort() override; diff --git a/velox/exec/HashJoinBridge.cpp b/velox/exec/HashJoinBridge.cpp index daa5ee4bb29a..8e8d585178aa 100644 --- a/velox/exec/HashJoinBridge.cpp +++ b/velox/exec/HashJoinBridge.cpp @@ -181,10 +181,11 @@ bool isLeftNullAwareJoinWithFilter( uint64_t HashJoinMemoryReclaimer::reclaim( memory::MemoryPool* pool, - uint64_t targetBytes) { + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) { uint64_t reclaimedBytes{0}; pool->visitChildren( - [&targetBytes, &reclaimedBytes](memory::MemoryPool* child) { + [&targetBytes, &reclaimedBytes, &stats](memory::MemoryPool* child) { VELOX_CHECK_EQ(child->kind(), memory::MemoryPool::Kind::kLeaf); // The hash probe operator do not support memory reclaim. if (!isHashBuildMemoryPool(*child)) { @@ -192,7 +193,7 @@ uint64_t HashJoinMemoryReclaimer::reclaim( } // We only need to reclaim from any one of the hash build operators // which will reclaim from all the peer hash build operators. - reclaimedBytes = child->reclaim(targetBytes); + reclaimedBytes = child->reclaim(targetBytes, stats); return false; }); return reclaimedBytes; diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index 4eadffb09516..39d01d02fe30 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -143,7 +143,10 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer { new HashJoinMemoryReclaimer()); } - uint64_t reclaim(memory::MemoryPool* pool, uint64_t targetBytes) final; + uint64_t reclaim( + memory::MemoryPool* pool, + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) final; private: HashJoinMemoryReclaimer() : MemoryReclaimer() {} diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 172f0acb81d8..a9491eb0e871 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -558,7 +558,8 @@ bool Operator::MemoryReclaimer::reclaimableBytes( uint64_t Operator::MemoryReclaimer::reclaim( memory::MemoryPool* pool, - uint64_t targetBytes) { + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) { std::shared_ptr driver = ensureDriver(); if (FOLLY_UNLIKELY(driver == nullptr)) { return 0; @@ -575,7 +576,7 @@ uint64_t Operator::MemoryReclaimer::reclaim( TestValue::adjust( "facebook::velox::exec::Operator::MemoryReclaimer::reclaim", pool); - op_->reclaim(targetBytes); + op_->reclaim(targetBytes, stats); return pool->shrink(targetBytes); } diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index fad3883b005d..1ec304e4cfa0 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -480,7 +480,9 @@ class Operator : public BaseRuntimeStatWriter { /// NOTE: this method doesn't return the actually freed memory bytes. The /// caller need to claim the actually freed memory space by shrinking the /// associated root memory pool's capacity accordingly. - virtual void reclaim(uint64_t targetBytes) {} + virtual void reclaim( + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) {} const core::PlanNodeId& planNodeId() const { return operatorCtx_->planNodeId(); @@ -559,7 +561,10 @@ class Operator : public BaseRuntimeStatWriter { const memory::MemoryPool& pool, uint64_t& reclaimableBytes) const override; - uint64_t reclaim(memory::MemoryPool* pool, uint64_t targetBytes) override; + uint64_t reclaim( + memory::MemoryPool* pool, + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) override; void abort(memory::MemoryPool* pool, const std::exception_ptr& /* error */) override; diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index 0d7981f3869e..6f7ae2e7d3ef 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -76,15 +76,17 @@ void OrderBy::addInput(RowVectorPtr input) { sortBuffer_->addInput(input); } -void OrderBy::reclaim(uint64_t targetBytes) { +void OrderBy::reclaim( + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) { VELOX_CHECK(canReclaim()); auto* driver = operatorCtx_->driver(); // NOTE: an order by operator is reclaimable if it hasn't started output // processing and is not under non-reclaimable execution section. if (noMoreInput_ || nonReclaimableSection_) { - // TODO: add stats to record the non-reclaimable case and reduce the log - // frequency if it is too verbose. + // TODO: reduce the log frequency if it is too verbose. + ++stats.numNonReclaimableAttempts; LOG(WARNING) << "Can't reclaim from order by operator, noMoreInput_[" << noMoreInput_ << "], nonReclaimableSection_[" << nonReclaimableSection_ << "], " << toString(); diff --git a/velox/exec/OrderBy.h b/velox/exec/OrderBy.h index 2f6b8f69b043..7d83a7cbff07 100644 --- a/velox/exec/OrderBy.h +++ b/velox/exec/OrderBy.h @@ -57,7 +57,8 @@ class OrderBy : public Operator { return finished_; } - void reclaim(uint64_t targetBytes) override; + void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) + override; void abort() override; diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 0d0aa6446074..56a04e335995 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -280,7 +280,8 @@ bool TableWriter::MemoryReclaimer::reclaimableBytes( uint64_t TableWriter::MemoryReclaimer::reclaim( memory::MemoryPool* pool, - uint64_t /*unused*/) { + uint64_t /*unused*/, + memory::MemoryReclaimer::Stats& /*unused*/) { VELOX_CHECK(!pool->isLeaf()); return 0; } diff --git a/velox/exec/TableWriter.h b/velox/exec/TableWriter.h index d22bc5d622f4..930e52522ba1 100644 --- a/velox/exec/TableWriter.h +++ b/velox/exec/TableWriter.h @@ -144,7 +144,10 @@ class TableWriter : public Operator { const memory::MemoryPool& pool, uint64_t& reclaimableBytes) const override; - uint64_t reclaim(memory::MemoryPool* pool, uint64_t targetBytes) override; + uint64_t reclaim( + memory::MemoryPool* pool, + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) override; void abort(memory::MemoryPool* pool, const std::exception_ptr& /* error */) override; diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 6c1ab0cb4735..7183cf0c7244 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2424,7 +2424,8 @@ std::unique_ptr Task::MemoryReclaimer::create( uint64_t Task::MemoryReclaimer::reclaim( memory::MemoryPool* pool, - uint64_t targetBytes) { + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) { auto task = ensureTask(); if (FOLLY_UNLIKELY(task == nullptr)) { return 0; @@ -2443,7 +2444,7 @@ uint64_t Task::MemoryReclaimer::reclaim( if (task->isCancelled()) { return 0; } - return memory::MemoryReclaimer::reclaim(pool, targetBytes); + return memory::MemoryReclaimer::reclaim(pool, targetBytes, stats); } void Task::MemoryReclaimer::abort( diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 3eb2ea22d8d5..53d306dcfbcf 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -651,7 +651,10 @@ class Task : public std::enable_shared_from_this { static std::unique_ptr create( const std::shared_ptr& task); - uint64_t reclaim(memory::MemoryPool* pool, uint64_t targetBytes) override; + uint64_t reclaim( + memory::MemoryPool* pool, + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& stats) override; void abort(memory::MemoryPool* pool, const std::exception_ptr& error) override; diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index b8fbffa1920f..1cdf7b9831fb 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -351,6 +351,7 @@ class AggregationTest : public OperatorTestBase { DOUBLE(), VARCHAR()})}; folly::Random::DefaultGenerator rng_; + memory::MemoryReclaimer::Stats reclaimerStats_; }; template <> @@ -2045,14 +2046,17 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { if (testData.expectedReclaimable) { const auto usedMemory = op->pool()->currentBytes(); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_); // The hash table itself in the grouping set is not cleared so it still // uses some memory. ASSERT_LT(op->pool()->currentBytes(), usedMemory); } else { VELOX_ASSERT_THROW( op->reclaim( - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)), + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_), ""); } @@ -2070,6 +2074,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { } OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{}); } DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringReserve) { @@ -2163,7 +2168,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringReserve) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemory = op->pool()->currentBytes(); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_); // The hash table itself in the grouping set is not cleared so it still // uses some memory. ASSERT_LT(op->pool()->currentBytes(), usedMemory); @@ -2176,6 +2183,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringReserve) { ASSERT_GT(stats[0].operatorStats[1].spilledBytes, 0); ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 4); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{}); } DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringAllocation) { @@ -2290,14 +2298,17 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringAllocation) { if (enableSpilling) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemory = op->pool()->currentBytes(); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_); // No reclaim as the operator is under non-reclaimable section. ASSERT_EQ(usedMemory, op->pool()->currentBytes()); } else { ASSERT_EQ(reclaimableBytes, 0); VELOX_ASSERT_THROW( op->reclaim( - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)), + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_), ""); } @@ -2311,6 +2322,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringAllocation) { ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{1}); } DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringOutputProcessing) { @@ -2411,14 +2423,17 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringOutputProcessing) { if (enableSpilling) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemory = op->pool()->currentBytes(); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_); // No reclaim as the operator has started output processing. ASSERT_EQ(usedMemory, op->pool()->currentBytes()); } else { ASSERT_EQ(reclaimableBytes, 0); VELOX_ASSERT_THROW( op->reclaim( - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)), + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_), ""); } @@ -2431,6 +2446,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringOutputProcessing) { ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{1}); } DEBUG_ONLY_TEST_F(AggregationTest, reclaimWithEmptyAggregationTable) { @@ -2525,14 +2541,17 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimWithEmptyAggregationTable) { if (enableSpilling) { ASSERT_EQ(reclaimableBytes, 0); const auto usedMemory = op->pool()->currentBytes(); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_); // No reclaim as the operator has started output processing. ASSERT_EQ(usedMemory, op->pool()->currentBytes()); } else { ASSERT_EQ(reclaimableBytes, 0); VELOX_ASSERT_THROW( op->reclaim( - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)), + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_), ""); } @@ -2545,6 +2564,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimWithEmptyAggregationTable) { ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{}); } namespace { diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 671664f92e9a..2a5fc1233f93 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -845,6 +845,7 @@ class HashJoinTest : public HiveConnectorTestBase { RowTypePtr probeType_; RowTypePtr buildType_; + memory::MemoryReclaimer::Stats reclaimerStats_; friend class HashJoinBuilder; }; @@ -5028,11 +5029,15 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { } if (testData.expectedReclaimable) { - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32()); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + reclaimerStats_); ASSERT_EQ(op->pool()->currentBytes(), 0); } else { VELOX_ASSERT_THROW( - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32()), + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + reclaimerStats_), ""); } @@ -5041,6 +5046,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { taskThread.join(); } + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{}); } DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) { @@ -5152,7 +5158,8 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) { ASSERT_TRUE(reclaimable); ASSERT_GT(reclaimableBytes, 0); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32()); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), reclaimerStats_); ASSERT_EQ(op->pool()->currentBytes(), 0); driverWait.notify(); @@ -5160,6 +5167,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) { task.reset(); taskThread.join(); + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{}); } DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringAllocation) { @@ -5276,10 +5284,14 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringAllocation) { ASSERT_EQ(reclaimable, enableSpilling); if (enableSpilling) { ASSERT_GE(reclaimableBytes, 0); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32()); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + reclaimerStats_); } else { VELOX_ASSERT_THROW( - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32()), + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + reclaimerStats_), ""); ASSERT_EQ(reclaimableBytes, 0); } @@ -5290,6 +5302,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringAllocation) { taskThread.join(); } + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{1}); } DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) { @@ -5395,13 +5408,17 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) { if (enableSpilling) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemoryBytes = op->pool()->currentBytes(); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32()); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + reclaimerStats_); // No reclaim as the operator has started output processing. ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes()); } else { ASSERT_EQ(reclaimableBytes, 0); VELOX_ASSERT_THROW( - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32()), + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + reclaimerStats_), ""); } @@ -5410,6 +5427,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) { taskThread.join(); } + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{1}); } DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { @@ -5469,7 +5487,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { SuspendedSection suspendedSection(driver); auto taskPauseWait = task->requestPause(); taskPauseWait.wait(); - op->reclaim(0); + op->reclaim(0, reclaimerStats_); Task::resume(task); }))); @@ -5529,7 +5547,8 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemoryBytes = op->pool()->currentBytes(); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32()); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), reclaimerStats_); // No reclaim as the build operator is not in building table state. ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes()); @@ -5538,6 +5557,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { task.reset(); taskThread.join(); + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{1}); } DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringOutputProcessing) { diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index 29c2e4bc0f16..c7b6bc910497 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -216,6 +216,7 @@ class OrderByTest : public OperatorTestBase { } folly::Random::DefaultGenerator rng_; + memory::MemoryReclaimer::Stats reclaimerStats_; }; TEST_F(OrderByTest, selectiveFilter) { @@ -686,12 +687,15 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringInputProcessing) { } if (testData.expectedReclaimable) { - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_); ASSERT_EQ(op->pool()->currentBytes(), 0); } else { VELOX_ASSERT_THROW( op->reclaim( - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)), + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_), ""); } @@ -709,6 +713,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringInputProcessing) { } OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{}); } DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringReserve) { @@ -802,7 +807,9 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringReserve) { ASSERT_TRUE(reclaimable); ASSERT_GT(reclaimableBytes, 0); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_); ASSERT_EQ(op->pool()->currentBytes(), 0); driverWait.notify(); @@ -814,6 +821,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringReserve) { ASSERT_GT(stats[0].operatorStats[1].spilledBytes, 0); ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 1); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{}); } DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringAllocation) { @@ -929,11 +937,14 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringAllocation) { } if (enableSpilling) { - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_); } else { VELOX_ASSERT_THROW( op->reclaim( - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)), + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_), ""); } @@ -947,6 +958,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringAllocation) { ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{1}); } DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { @@ -1047,14 +1059,17 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { if (enableSpilling) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemoryBytes = op->pool()->currentBytes(); - op->reclaim(folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)); + op->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_); // No reclaim as the operator has started output processing. ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes()); } else { ASSERT_EQ(reclaimableBytes, 0); VELOX_ASSERT_THROW( op->reclaim( - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_)), + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_), ""); } @@ -1066,6 +1081,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } + ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{1}); } DEBUG_ONLY_TEST_F(OrderByTest, abortDuringOutputProcessing) {