From 20afe686428dc40574ec01d14857170fb8779f9d Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Sun, 26 May 2024 11:22:37 -0700 Subject: [PATCH] Let operator not shrink pool after reclaim (#9895) Summary: Currently operator shrinks pool after reclaim. Moved this operation up to arbitrator level to achieve a better global pool accounting consistency. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9895 Reviewed By: xiaoxmeng Differential Revision: D57708523 Pulled By: tanjialiang fbshipit-source-id: e998b579b183064348c46e8e9ea5e9665d893834 --- velox/common/base/Counters.cpp | 13 +++- velox/common/memory/MemoryArbitrator.cpp | 2 +- velox/common/memory/SharedArbitrator.cpp | 3 +- .../memory/tests/MockSharedArbitratorTest.cpp | 10 +++- velox/docs/monitoring/metrics.rst | 5 +- velox/exec/Operator.cpp | 5 +- velox/exec/tests/AggregationTest.cpp | 28 +++------ velox/exec/tests/HashJoinTest.cpp | 59 ++++++++++--------- velox/exec/tests/OrderByTest.cpp | 25 +++----- 9 files changed, 74 insertions(+), 76 deletions(-) diff --git a/velox/common/base/Counters.cpp b/velox/common/base/Counters.cpp index a75d53a43ca6..1e0d446ba4b1 100644 --- a/velox/common/base/Counters.cpp +++ b/velox/common/base/Counters.cpp @@ -274,8 +274,17 @@ void registerVeloxMetrics() { DEFINE_HISTOGRAM_METRIC( kMetricMemoryReclaimExecTimeMs, 30'000, 0, 600'000, 50, 90, 99, 100); - // Tracks op memory reclaim bytes. - DEFINE_METRIC(kMetricMemoryReclaimedBytes, facebook::velox::StatType::SUM); + // Tracks op memory reclaim bytes distribution in range of [0, 4GB] with 64 + // buckets and reports P50, P90, P99, and P100 + DEFINE_HISTOGRAM_METRIC( + kMetricMemoryReclaimedBytes, + 67'108'864, + 0, + 4'294'967'296, + 50, + 90, + 99, + 100); // Tracks the memory reclaim count on an operator. DEFINE_METRIC( diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index f8477cf04ae9..6dd15c59be9b 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -179,7 +179,7 @@ uint64_t MemoryReclaimer::run( stats.reclaimedBytes += reclaimedBytes; RECORD_HISTOGRAM_METRIC_VALUE( kMetricMemoryReclaimExecTimeMs, execTimeUs / 1'000); - RECORD_METRIC_VALUE(kMetricMemoryReclaimedBytes, reclaimedBytes); + RECORD_HISTOGRAM_METRIC_VALUE(kMetricMemoryReclaimedBytes, reclaimedBytes); RECORD_METRIC_VALUE(kMetricMemoryReclaimCount); addThreadLocalRuntimeStat( "memoryReclaimWallNanos", diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index ea350455b6d9..a85839c8484d 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -758,10 +758,9 @@ uint64_t SharedArbitrator::reclaim( VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool " << pool->name() << ", aborting it: " << e.what(); abort(pool, std::current_exception()); - // Free up all the free capacity from the aborted pool as the associated - // query has failed at this point. pool->shrink(); } + pool->shrink(bytesToReclaim); const uint64_t newCapacity = pool->capacity(); VELOX_CHECK_GE(oldCapacity, newCapacity); reclaimedBytes = oldCapacity - newCapacity; diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index a649a8537b9c..e26c7ac3cfa4 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -323,10 +323,13 @@ class MockMemoryOperator { allocIt = allocations_.erase(allocIt); } totalBytes_ -= bytesReclaimed; + const auto oldReservedBytes = pool_->reservedBytes(); for (const auto& allocation : allocationsToFree) { pool_->free(allocation.buffer, allocation.size); } - return pool_->shrink(targetBytes); + const auto newReservedBytes = pool_->reservedBytes(); + VELOX_CHECK_GE(oldReservedBytes, newReservedBytes); + return newReservedBytes - oldReservedBytes; } void abort(MemoryPool* pool) { @@ -676,11 +679,12 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) { std::string debugString() const { std::stringstream tasksOss; for (const auto& testTask : testTasks) { + tasksOss << "["; tasksOss << testTask.debugString(); - tasksOss << ","; + tasksOss << "], "; } return fmt::format( - "taskTests: [{}], targetBytes: {}, expectedFreedBytes: {}, expectedFreeCapacity: {}, expectedReservedFreeCapacity: {}, allowSpill: {}, allowAbort: {}", + "testTasks: [{}], targetBytes: {}, expectedFreedBytes: {}, expectedFreeCapacity: {}, expectedReservedFreeCapacity: {}, allowSpill: {}, allowAbort: {}", tasksOss.str(), succinctBytes(targetBytes), succinctBytes(expectedFreedBytes), diff --git a/velox/docs/monitoring/metrics.rst b/velox/docs/monitoring/metrics.rst index e43a7ade93f5..177a45520c1b 100644 --- a/velox/docs/monitoring/metrics.rst +++ b/velox/docs/monitoring/metrics.rst @@ -102,8 +102,9 @@ Memory Management with 20 buckets. It is configured to report latency at P50, P90, P99, and P100 percentiles. * - memory_reclaim_bytes - - Sum - - The sum of reclaimed memory bytes. + - Histogram + - The distribution of reclaimed bytes in range of [0, 4GB] with 64 buckets + and reports P50, P90, P99, and P100. * - task_memory_reclaim_count - Count - The count of task memory reclaims. diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 3e1009e381af..178b64fd6ab0 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -649,8 +649,11 @@ uint64_t Operator::MemoryReclaimer::reclaim( auto reclaimBytes = memory::MemoryReclaimer::run( [&]() { + const auto reservedBytesBeforeReclaim = pool->reservedBytes(); op_->reclaim(targetBytes, stats); - return pool->shrink(targetBytes); + const auto reservedBytesAfterReclaim = pool->reservedBytes(); + VELOX_CHECK_GE(reservedBytesBeforeReclaim, reservedBytesAfterReclaim); + return reservedBytesBeforeReclaim - reservedBytesAfterReclaim; }, stats); diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 5672eca7e407..f9edadb4bd1e 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -376,16 +376,6 @@ class AggregationTest : public OperatorTestBase { pool_.get()); } - static void reclaimAndRestoreCapacity( - const Operator* op, - uint64_t targetBytes, - memory::MemoryReclaimer::Stats& reclaimerStats) { - const auto oldCapacity = op->pool()->capacity(); - op->pool()->reclaim(targetBytes, 0, reclaimerStats); - dynamic_cast(op->pool()) - ->testingSetCapacity(oldCapacity); - } - RowTypePtr rowType_{ ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6"}, {BIGINT(), @@ -2109,9 +2099,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { if (testData.expectedReclaimable) { const auto usedMemory = op->pool()->usedBytes(); - reclaimAndRestoreCapacity( - op, + op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + 0, reclaimerStats_); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); @@ -2233,12 +2223,12 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringReserve) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemory = op->pool()->usedBytes(); - reclaimAndRestoreCapacity( - op, + op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + 0, reclaimerStats_); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); - ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); + ASSERT_GE(reclaimerStats_.reclaimedBytes, 0); reclaimerStats_.reset(); // The hash table itself in the grouping set is not cleared so it still // uses some memory. @@ -2476,9 +2466,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringOutputProcessing) { if (enableSpilling) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemory = op->pool()->usedBytes(); - reclaimAndRestoreCapacity( - op, + op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + 0, reclaimerStats_); ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 0); ASSERT_GT(usedMemory, op->pool()->usedBytes()); @@ -2764,9 +2754,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimWithEmptyAggregationTable) { if (enableSpilling) { ASSERT_EQ(reclaimableBytes, 0); const auto usedMemory = op->pool()->usedBytes(); - reclaimAndRestoreCapacity( - op, + op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + 0, reclaimerStats_); ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{}); // No reclaim as the operator has started output processing. diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 0ec555deb6ba..1a9ea6624503 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -897,17 +897,6 @@ class HashJoinTest : public HiveConnectorTestBase { joinNode->outputType()); } - static void reclaimAndRestoreCapacity( - const Operator* op, - uint64_t targetBytes, - memory::MemoryReclaimer::Stats& reclaimerStats) { - memory::ScopedMemoryArbitrationContext ctx(op->pool()); - const auto oldCapacity = op->pool()->capacity(); - op->pool()->reclaim(targetBytes, 0, reclaimerStats); - dynamic_cast(op->pool()) - ->testingSetCapacity(oldCapacity); - } - const int32_t numDrivers_; // The default left and right table types used for test. @@ -5502,10 +5491,13 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { } if (testData.expectedReclaimable) { - reclaimAndRestoreCapacity( - op, - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), - reclaimerStats_); + { + memory::ScopedMemoryArbitrationContext ctx(op->pool()); + op->pool()->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + 0, + reclaimerStats_); + } ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); reclaimerStats_.reset(); @@ -5639,10 +5631,13 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) { ASSERT_TRUE(reclaimable); ASSERT_GT(reclaimableBytes, 0); - reclaimAndRestoreCapacity( - op, - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), - reclaimerStats_); + { + memory::ScopedMemoryArbitrationContext ctx(op->pool()); + op->pool()->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + 0, + reclaimerStats_); + } ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); ASSERT_EQ(op->pool()->usedBytes(), 0); @@ -5891,11 +5886,14 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) { if (enableSpilling) { ASSERT_GT(reclaimableBytes, 0); const auto usedMemoryBytes = op->pool()->usedBytes(); - reclaimAndRestoreCapacity( - op, - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), - reclaimerStats_); - ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); + { + memory::ScopedMemoryArbitrationContext ctx(op->pool()); + op->pool()->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + 0, + reclaimerStats_); + } + ASSERT_GE(reclaimerStats_.reclaimedBytes, 0); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); // No reclaim as the operator has started output processing. ASSERT_EQ(usedMemoryBytes, op->pool()->usedBytes()); @@ -6036,11 +6034,14 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { const auto usedMemoryBytes = op->pool()->usedBytes(); reclaimerStats_.reset(); - reclaimAndRestoreCapacity( - op, - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), - reclaimerStats_); - ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); + { + memory::ScopedMemoryArbitrationContext ctx(op->pool()); + op->pool()->reclaim( + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), + 0, + reclaimerStats_); + } + ASSERT_GE(reclaimerStats_.reclaimedBytes, 0); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); // No reclaim as the build operator is not in building table state. ASSERT_EQ(usedMemoryBytes, op->pool()->usedBytes()); diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index 0a3cfcb5fc5c..90c3a56f07e9 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -232,16 +232,6 @@ class OrderByTest : public OperatorTestBase { } } - static void reclaimAndRestoreCapacity( - const Operator* op, - uint64_t targetBytes, - memory::MemoryReclaimer::Stats& reclaimerStats) { - const auto oldCapacity = op->pool()->capacity(); - op->pool()->reclaim(targetBytes, 0, reclaimerStats); - dynamic_cast(op->pool()) - ->testingSetCapacity(oldCapacity); - } - std::vector makeVectors( const RowTypePtr& rowType, int32_t numVectors, @@ -661,9 +651,9 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringInputProcessing) { } if (testData.expectedReclaimable) { - reclaimAndRestoreCapacity( - op, + op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + 0, reclaimerStats_); ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); @@ -784,9 +774,9 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringReserve) { ASSERT_TRUE(reclaimable); ASSERT_GT(reclaimableBytes, 0); - reclaimAndRestoreCapacity( - op, + op->pool()->reclaim( folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + 0, reclaimerStats_); ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); @@ -939,7 +929,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { constexpr int64_t kMaxBytes = 1LL << 30; // 1GB auto rowType = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()}); VectorFuzzer fuzzer({.vectorSize = 1000}, pool()); - const int32_t numBatches = 10; + const int32_t numBatches = 200; std::vector batches; for (int32_t i = 0; i < numBatches; ++i) { batches.push_back(fuzzer.fuzzRow(rowType)); @@ -1032,8 +1022,9 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { if (enableSpilling) { ASSERT_GT(reclaimableBytes, 0); reclaimerStats_.reset(); - reclaimAndRestoreCapacity(op, reclaimableBytes, reclaimerStats_); - ASSERT_EQ(reclaimerStats_.reclaimedBytes, reclaimableBytes); + op->pool()->reclaim(reclaimableBytes, 0, reclaimerStats_); + ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); + ASSERT_LE(reclaimerStats_.reclaimedBytes, reclaimableBytes); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); } else { ASSERT_EQ(reclaimableBytes, 0);