diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index eac6d149f4856..8614a394e3ae1 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -174,8 +174,8 @@ void MemoryArbitrator::unregisterFactory(const std::string& kind) { return pool->shrink(targetBytes); } -std::unique_ptr MemoryReclaimer::create() { - return std::unique_ptr(new MemoryReclaimer()); +std::unique_ptr MemoryReclaimer::create(int32_t priority) { + return std::unique_ptr(new MemoryReclaimer(priority)); } // static @@ -234,8 +234,9 @@ uint64_t MemoryReclaimer::reclaim( return 0; } - // Sort the child pools based on their reserved memory and reclaim from the - // child pool with most reservation first. + // Sort the child pools based on their reclaimer priority and reserved memory. + // Reclaim from the child pool with highest priority and most reservation + // first. struct Candidate { std::shared_ptr pool; int64_t reclaimableBytes; @@ -247,8 +248,13 @@ uint64_t MemoryReclaimer::reclaim( for (auto& entry : pool->children_) { auto child = entry.second.lock(); if (child != nullptr) { - const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0); - candidates.push_back(Candidate{std::move(child), reclaimableBytes}); + const auto reclaimableBytesOpt = child->reclaimableBytes(); + if (!reclaimableBytesOpt.has_value()) { + continue; + } + candidates.push_back(Candidate{ + std::move(child), + static_cast(reclaimableBytesOpt.value())}); } } } @@ -257,7 +263,12 @@ uint64_t MemoryReclaimer::reclaim( candidates.begin(), candidates.end(), [](const auto& lhs, const auto& rhs) { - return lhs.reclaimableBytes > rhs.reclaimableBytes; + const auto lPriority = lhs.pool->reclaimer()->priority(); + const auto rPriority = rhs.pool->reclaimer()->priority(); + if (lPriority == rPriority) { + return lhs.reclaimableBytes > rhs.reclaimableBytes; + } + return lPriority < rPriority; }); uint64_t reclaimedBytes{0}; diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 969fbb0fa5a97..09f4953573003 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -287,7 +287,7 @@ class MemoryReclaimer { virtual ~MemoryReclaimer() = default; - static std::unique_ptr create(); + static std::unique_ptr create(int32_t priority = 0); /// Invoked memory reclaim function from 'pool' and record execution 'stats'. static uint64_t run(const std::function& func, Stats& stats); @@ -309,6 +309,26 @@ class MemoryReclaimer { /// enterArbitration has been called. virtual void leaveArbitration() noexcept {} + /// Invoked by upper layer reclaimer, to return the priority of this + /// reclaimer. The priority determines the reclaiming order of self among all + /// same level reclaimers. The smaller the number, the higher the priority. + /// Consider the following memory pool & reclaimer structure: + /// + /// rec1(pri 1) + /// / \ + /// / \ + /// / \ + /// rec2(pri 1) rec3(pri 3) + /// / \ / \ + /// / \ / \ + /// rec4(pri 1) rec5(pri 0) rec6(pri 0) rec7(pri 1) + /// + /// The reclaiming traversing order will be rec1 -> rec2 -> rec5 -> rec4 -> + /// rec3 -> rec6 -> rec7 + virtual int32_t priority() const { + return priority_; + }; + /// Invoked by the memory arbitrator to get the amount of memory bytes that /// can be reclaimed from 'pool'. The function returns true if 'pool' is /// reclaimable and returns the estimated reclaimable bytes in @@ -343,7 +363,10 @@ class MemoryReclaimer { virtual void abort(MemoryPool* pool, const std::exception_ptr& error); protected: - MemoryReclaimer() = default; + explicit MemoryReclaimer(int32_t priority) : priority_(priority){}; + + private: + const int32_t priority_; }; /// Helper class used to measure the memory bytes reclaimed from a memory pool diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 5098bccbcd840..75caf8e09e2fa 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -1123,9 +1123,11 @@ LocationHandlePtr LocationHandle::create(const folly::dynamic& obj) { std::unique_ptr HiveDataSink::WriterReclaimer::create( HiveDataSink* dataSink, HiveWriterInfo* writerInfo, - io::IoStatistics* ioStats) { + io::IoStatistics* ioStats, + int32_t priority) { return std::unique_ptr( - new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats)); + new HiveDataSink::WriterReclaimer( + dataSink, writerInfo, ioStats, priority)); } bool HiveDataSink::WriterReclaimer::reclaimableBytes( diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index f0d31f7654183..0bb89e5b1fc61 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -477,7 +477,8 @@ class HiveDataSink : public DataSink { static std::unique_ptr create( HiveDataSink* dataSink, HiveWriterInfo* writerInfo, - io::IoStatistics* ioStats); + io::IoStatistics* ioStats, + int32_t priority = 0); bool reclaimableBytes( const memory::MemoryPool& pool, @@ -493,8 +494,9 @@ class HiveDataSink : public DataSink { WriterReclaimer( HiveDataSink* dataSink, HiveWriterInfo* writerInfo, - io::IoStatistics* ioStats) - : exec::MemoryReclaimer(), + io::IoStatistics* ioStats, + int32_t priority) + : exec::MemoryReclaimer(priority), dataSink_(dataSink), writerInfo_(writerInfo), ioStats_(ioStats) { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 5e89ccc4c9077..88ce04b1b9a00 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -170,8 +170,9 @@ class QueryCtx : public std::enable_shared_from_this { protected: MemoryReclaimer( const std::shared_ptr& queryCtx, - memory::MemoryPool* pool) - : queryCtx_(queryCtx), pool_(pool) { + memory::MemoryPool* pool, + int32_t priority = 0) + : memory::MemoryReclaimer(priority), queryCtx_(queryCtx), pool_(pool) { VELOX_CHECK_NOT_NULL(pool_); } diff --git a/velox/dwio/common/SortingWriter.cpp b/velox/dwio/common/SortingWriter.cpp index 00bab4ee5360f..5f82c66eed93b 100644 --- a/velox/dwio/common/SortingWriter.cpp +++ b/velox/dwio/common/SortingWriter.cpp @@ -154,8 +154,10 @@ vector_size_t SortingWriter::outputBatchRows() { } std::unique_ptr SortingWriter::MemoryReclaimer::create( - SortingWriter* writer) { - return std::unique_ptr(new MemoryReclaimer(writer)); + SortingWriter* writer, + int32_t priority) { + return std::unique_ptr( + new MemoryReclaimer(writer, priority)); } bool SortingWriter::MemoryReclaimer::reclaimableBytes( diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index a7edd69ed8e28..fdaa7669d0190 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -50,7 +50,8 @@ class SortingWriter : public Writer { class MemoryReclaimer : public exec::MemoryReclaimer { public: static std::unique_ptr create( - SortingWriter* writer); + SortingWriter* writer, + int32_t priority = 0); bool reclaimableBytes( const memory::MemoryPool& pool, @@ -63,8 +64,8 @@ class SortingWriter : public Writer { memory::MemoryReclaimer::Stats& stats) override; private: - explicit MemoryReclaimer(SortingWriter* writer) - : exec::MemoryReclaimer(), + MemoryReclaimer(SortingWriter* writer, int32_t priority) + : exec::MemoryReclaimer(priority), writer_(writer), canReclaim_(writer_->sortBuffer_->canSpill()) {} diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index e63666190e36a..06f2eeb84b8f2 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -715,9 +715,10 @@ void Writer::abort() { } std::unique_ptr Writer::MemoryReclaimer::create( - Writer* writer) { + Writer* writer, + int32_t priority) { return std::unique_ptr( - new Writer::MemoryReclaimer(writer)); + new Writer::MemoryReclaimer(writer, priority)); } bool Writer::MemoryReclaimer::reclaimableBytes( diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 8cc82303e10fd..43cd4225bc875 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -150,7 +150,9 @@ class Writer : public dwio::common::Writer { private: class MemoryReclaimer : public exec::MemoryReclaimer { public: - static std::unique_ptr create(Writer* writer); + static std::unique_ptr create( + Writer* writer, + int32_t priority = 0); bool reclaimableBytes( const memory::MemoryPool& pool, @@ -163,7 +165,8 @@ class Writer : public dwio::common::Writer { memory::MemoryReclaimer::Stats& stats) override; private: - explicit MemoryReclaimer(Writer* writer) : writer_(writer) { + MemoryReclaimer(Writer* writer, int32_t priority) + : exec::MemoryReclaimer(priority), writer_(writer) { VELOX_CHECK_NOT_NULL(writer_); } diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index 40ba021988f67..c7fa5cfce6058 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -174,9 +174,10 @@ bool isLeftNullAwareJoinWithFilter( class HashJoinMemoryReclaimer final : public MemoryReclaimer { public: static std::unique_ptr create( - std::shared_ptr joinBridge) { + std::shared_ptr joinBridge, + int32_t priority) { return std::unique_ptr( - new HashJoinMemoryReclaimer(joinBridge)); + new HashJoinMemoryReclaimer(joinBridge, priority)); } uint64_t reclaim( @@ -187,8 +188,9 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer { private: explicit HashJoinMemoryReclaimer( - const std::shared_ptr& joinBridge) - : MemoryReclaimer(), joinBridge_(joinBridge) {} + const std::shared_ptr& joinBridge, + int32_t priority) + : MemoryReclaimer(priority), joinBridge_(joinBridge) {} std::weak_ptr joinBridge_; }; diff --git a/velox/exec/MemoryReclaimer.cpp b/velox/exec/MemoryReclaimer.cpp index 4fa91f69433df..0aaad4bd2ad22 100644 --- a/velox/exec/MemoryReclaimer.cpp +++ b/velox/exec/MemoryReclaimer.cpp @@ -20,8 +20,13 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { -std::unique_ptr MemoryReclaimer::create() { - return std::unique_ptr(new MemoryReclaimer()); +MemoryReclaimer::MemoryReclaimer(int32_t priority) + : memory::MemoryReclaimer(priority) {} + +std::unique_ptr MemoryReclaimer::create( + int32_t priority) { + return std::unique_ptr( + new MemoryReclaimer(priority)); } void MemoryReclaimer::enterArbitration() { @@ -67,13 +72,15 @@ void MemoryReclaimer::abort( } /*static*/ std::unique_ptr -ParallelMemoryReclaimer::create(folly::Executor* executor) { +ParallelMemoryReclaimer::create(folly::Executor* executor, int32_t priority) { return std::unique_ptr( - new ParallelMemoryReclaimer(executor)); + new ParallelMemoryReclaimer(executor, priority)); } -ParallelMemoryReclaimer::ParallelMemoryReclaimer(folly::Executor* executor) - : executor_(executor) {} +ParallelMemoryReclaimer::ParallelMemoryReclaimer( + folly::Executor* executor, + int32_t priority) + : MemoryReclaimer(priority), executor_(executor) {} uint64_t ParallelMemoryReclaimer::reclaim( memory::MemoryPool* pool, @@ -85,8 +92,7 @@ uint64_t ParallelMemoryReclaimer::reclaim( pool, targetBytes, maxWaitMs, stats); } - // Sort the child pools based on their reserved memory and reclaim from the - // child pool with most reservation first. + // Sort candidates based on priority. struct Candidate { std::shared_ptr pool; int64_t reclaimableBytes; @@ -98,11 +104,24 @@ uint64_t ParallelMemoryReclaimer::reclaim( for (auto& entry : pool->children_) { auto child = entry.second.lock(); if (child != nullptr) { - const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0); - candidates.push_back(Candidate{std::move(child), reclaimableBytes}); + const auto reclaimableBytesOpt = child->reclaimableBytes(); + if (!reclaimableBytesOpt.has_value()) { + continue; + } + candidates.push_back(Candidate{ + std::move(child), + static_cast(reclaimableBytesOpt.value())}); } } } + std::sort( + candidates.begin(), + candidates.end(), + [](const Candidate& lhs, const Candidate& rhs) { + return lhs.pool->reclaimer()->priority() < + rhs.pool->reclaimer()->priority(); + }); + struct ReclaimResult { const uint64_t reclaimedBytes{0}; const Stats stats; @@ -117,51 +136,66 @@ uint64_t ParallelMemoryReclaimer::reclaim( error(nullptr) {} }; - std::vector>> reclaimTasks; - for (const auto& candidate : candidates) { - if (candidate.reclaimableBytes == 0) { - continue; + uint64_t reclaimedBytes{0}; + for (uint32_t i = 0, j = i; i < candidates.size(); i = j) { + while (j < candidates.size() && + candidates[i].pool->reclaimer()->priority() == + candidates[j].pool->reclaimer()->priority()) { + j++; } - reclaimTasks.push_back(memory::createAsyncMemoryReclaimTask( - [&, reclaimPool = candidate.pool]() { - try { - Stats reclaimStats; - const auto bytes = - reclaimPool->reclaim(targetBytes, maxWaitMs, reclaimStats); - return std::make_unique( - bytes, std::move(reclaimStats)); - } catch (const std::exception& e) { - VELOX_MEM_LOG(ERROR) << "Reclaim from memory pool " << pool->name() - << " failed: " << e.what(); - // The exception is captured and thrown by the caller. - return std::make_unique(std::current_exception()); - } - })); - if (reclaimTasks.size() > 1) { - executor_->add([source = reclaimTasks.back()]() { source->prepare(); }); + + std::vector>> reclaimTasks; + for (uint32_t k = i; k < j; k++) { + auto& candidate = candidates[k]; + if (candidate.reclaimableBytes == 0) { + continue; + } + reclaimTasks.push_back( + memory::createAsyncMemoryReclaimTask( + [&, reclaimPool = candidate.pool]() { + try { + Stats reclaimStats; + const auto bytes = reclaimPool->reclaim( + targetBytes, maxWaitMs, reclaimStats); + return std::make_unique( + bytes, std::move(reclaimStats)); + } catch (const std::exception& e) { + VELOX_MEM_LOG(ERROR) + << "Reclaim from memory pool " << pool->name() + << " failed: " << e.what(); + // The exception is captured and thrown by the caller. + return std::make_unique( + std::current_exception()); + } + })); + if (reclaimTasks.size() > 1) { + executor_->add([source = reclaimTasks.back()]() { source->prepare(); }); + } } - } - auto syncGuard = folly::makeGuard([&]() { + auto syncGuard = folly::makeGuard([&]() { + for (auto& reclaimTask : reclaimTasks) { + // We consume the result for the pending tasks. This is a cleanup in the + // guard and must not throw. The first error is already captured before + // this runs. + try { + reclaimTask->move(); + } catch (const std::exception&) { + } + } + }); + for (auto& reclaimTask : reclaimTasks) { - // We consume the result for the pending tasks. This is a cleanup in the - // guard and must not throw. The first error is already captured before - // this runs. - try { - reclaimTask->move(); - } catch (const std::exception&) { + const auto result = reclaimTask->move(); + if (result->error) { + std::rethrow_exception(result->error); } + stats += result->stats; + reclaimedBytes += result->reclaimedBytes; } - }); - - uint64_t reclaimedBytes{0}; - for (auto& reclaimTask : reclaimTasks) { - const auto result = reclaimTask->move(); - if (result->error) { - std::rethrow_exception(result->error); + if (reclaimedBytes >= targetBytes) { + break; } - stats += result->stats; - reclaimedBytes += result->reclaimedBytes; } return reclaimedBytes; } diff --git a/velox/exec/MemoryReclaimer.h b/velox/exec/MemoryReclaimer.h index 7acc41cbed70d..d94799a73d1c0 100644 --- a/velox/exec/MemoryReclaimer.h +++ b/velox/exec/MemoryReclaimer.h @@ -25,7 +25,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer { public: virtual ~MemoryReclaimer() = default; - static std::unique_ptr create(); + static std::unique_ptr create(int32_t priority = 0); void enterArbitration() override; @@ -35,7 +35,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer { override; protected: - MemoryReclaimer() = default; + explicit MemoryReclaimer(int32_t priortity); }; /// Provides the parallel memory reclaimer implementation for velox task @@ -46,7 +46,8 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer { virtual ~ParallelMemoryReclaimer() = default; static std::unique_ptr create( - folly::Executor* executor); + folly::Executor* executor, + int32_t priority = 0); uint64_t reclaim( memory::MemoryPool* pool, @@ -55,7 +56,7 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer { Stats& stats) override; protected: - explicit ParallelMemoryReclaimer(folly::Executor* executor); + ParallelMemoryReclaimer(folly::Executor* executor, int32_t priority); folly::Executor* const executor_{nullptr}; }; diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 4ed9544352926..e591739e4c199 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -631,9 +631,10 @@ void OperatorStats::clear() { std::unique_ptr Operator::MemoryReclaimer::create( DriverCtx* driverCtx, - Operator* op) { - return std::unique_ptr( - new Operator::MemoryReclaimer(driverCtx->driver->shared_from_this(), op)); + Operator* op, + int32_t priority) { + return std::unique_ptr(new Operator::MemoryReclaimer( + driverCtx->driver->shared_from_this(), op, priority)); } void Operator::MemoryReclaimer::enterArbitration() { diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 25d355802d25a..cd692005d3203 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -687,7 +687,8 @@ class Operator : public BaseRuntimeStatWriter { public: static std::unique_ptr create( DriverCtx* driverCtx, - Operator* op); + Operator* op, + int32_t priority = 0); void enterArbitration() override; @@ -707,8 +708,11 @@ class Operator : public BaseRuntimeStatWriter { override; protected: - MemoryReclaimer(const std::shared_ptr& driver, Operator* op) - : driver_(driver), op_(op) { + MemoryReclaimer( + const std::shared_ptr& driver, + Operator* op, + int32_t priority) + : memory::MemoryReclaimer(priority), driver_(driver), op_(op) { VELOX_CHECK_NOT_NULL(op_); } diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 4899e4b4ebf89..330c1b76d3002 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -308,10 +308,11 @@ std::unique_ptr TableWriter::ConnectorReclaimer::create( const std::optional& spillConfig, DriverCtx* driverCtx, - Operator* op) { + Operator* op, + int32_t priority) { return std::unique_ptr( new TableWriter::ConnectorReclaimer( - spillConfig, driverCtx->driver->shared_from_this(), op)); + spillConfig, driverCtx->driver->shared_from_this(), op, priority)); } bool TableWriter::ConnectorReclaimer::reclaimableBytes( diff --git a/velox/exec/TableWriter.h b/velox/exec/TableWriter.h index b188019362fbd..9758286154ca7 100644 --- a/velox/exec/TableWriter.h +++ b/velox/exec/TableWriter.h @@ -149,7 +149,8 @@ class TableWriter : public Operator { static std::unique_ptr create( const std::optional& spillConfig, DriverCtx* driverCtx, - Operator* op); + Operator* op, + int32_t priority = 0); void enterArbitration() override {} @@ -176,9 +177,11 @@ class TableWriter : public Operator { ConnectorReclaimer( const std::optional& spillConfig, const std::shared_ptr& driver, - Operator* op) + Operator* op, + int32_t priority) : ParallelMemoryReclaimer( - spillConfig.has_value() ? spillConfig.value().executor : nullptr), + spillConfig.has_value() ? spillConfig.value().executor : nullptr, + priority), canReclaim_(spillConfig.has_value()), driver_(driver), op_(op) {} diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 0bce65f91d14e..aaaaa4c07c2ab 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -503,8 +503,9 @@ memory::MemoryPool* Task::getOrAddJoinNodePool( } childPools_.push_back(pool_->addAggregateChild( fmt::format("node.{}", nodeId), createNodeReclaimer([&]() { + // Set join reclaimer lower priority as cost of reclaiming join is high. return HashJoinMemoryReclaimer::create( - getHashJoinBridgeLocked(splitGroupId, planNodeId)); + getHashJoinBridgeLocked(splitGroupId, planNodeId), 1); }))); auto* nodePool = childPools_.back().get(); nodePools_[nodeId] = nodePool; @@ -2974,9 +2975,10 @@ void Task::testingVisitDrivers(const std::function& callback) { } std::unique_ptr Task::MemoryReclaimer::create( - const std::shared_ptr& task) { + const std::shared_ptr& task, + int64_t priority) { return std::unique_ptr( - new Task::MemoryReclaimer(task)); + new Task::MemoryReclaimer(task, priority)); } uint64_t Task::MemoryReclaimer::reclaim( diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 7f421952ce51c..661c59485d975 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -801,7 +801,7 @@ class Task : public std::enable_shared_from_this { class MemoryReclaimer : public exec::MemoryReclaimer { public: static std::unique_ptr create( - const std::shared_ptr& task); + const std::shared_ptr& task, int64_t priority = 0); uint64_t reclaim( memory::MemoryPool* pool, @@ -813,7 +813,8 @@ class Task : public std::enable_shared_from_this { override; private: - explicit MemoryReclaimer(const std::shared_ptr& task) : task_(task) { + MemoryReclaimer(const std::shared_ptr& task, int64_t priority) + : exec::MemoryReclaimer(priority), task_(task) { VELOX_CHECK_NOT_NULL(task); } diff --git a/velox/exec/tests/MemoryReclaimerTest.cpp b/velox/exec/tests/MemoryReclaimerTest.cpp index 19bed7fe8ef6b..286b40fd856a6 100644 --- a/velox/exec/tests/MemoryReclaimerTest.cpp +++ b/velox/exec/tests/MemoryReclaimerTest.cpp @@ -170,15 +170,18 @@ TEST(ReclaimableSectionGuard, basic) { } namespace { + +using ReclaimCallback = const std::function; + class MockMemoryReclaimer : public memory::MemoryReclaimer { public: static std::unique_ptr create( bool reclaimable, uint64_t memoryBytes, - const std::function& reclaimCallback = - nullptr) { - return std::unique_ptr( - new MockMemoryReclaimer(reclaimable, memoryBytes, reclaimCallback)); + ReclaimCallback& reclaimCallback = nullptr, + int32_t priority = 0) { + return std::unique_ptr(new MockMemoryReclaimer( + reclaimable, memoryBytes, reclaimCallback, priority)); } bool reclaimableBytes(const MemoryPool& pool, uint64_t& reclaimableBytes) @@ -210,18 +213,25 @@ class MockMemoryReclaimer : public memory::MemoryReclaimer { return memoryBytes_; } + int32_t priority() const { + return priority_; + } + private: MockMemoryReclaimer( bool reclaimable, uint64_t memoryBytes, - const std::function& reclaimCallback) - : reclaimCallback_(reclaimCallback), + const std::function& reclaimCallback, + int32_t priority) + : MemoryReclaimer(priority), + reclaimCallback_(reclaimCallback), + priority_(priority), reclaimable_(reclaimable), memoryBytes_(memoryBytes) {} - const std::function reclaimCallback_; + const ReclaimCallback reclaimCallback_; + const int32_t priority_; bool reclaimable_{false}; - int reclaimCount_{0}; uint64_t memoryBytes_{0}; }; } // namespace @@ -313,3 +323,198 @@ TEST_F(MemoryReclaimerTest, recursiveArbitrationWithParallelReclaim) { } ASSERT_TRUE(reclaimExecuted); } + +TEST_F(MemoryReclaimerTest, reclaimerPriorities) { + auto rootPool = memory::memoryManager()->addRootPool( + "reclaimerPriorities", kMaxMemory, exec::MemoryReclaimer::create()); + std::vector> leafPools; + + const uint32_t kNumChildren = 10; + const uint64_t kPoolMemoryBytes = 1024; + std::vector priorityOrder; + priorityOrder.reserve(kNumChildren); + ReclaimCallback reclaimerCb = [&](MemoryPool* pool) { + auto* mockReclaimer = dynamic_cast(pool->reclaimer()); + ASSERT_TRUE(mockReclaimer != nullptr); + priorityOrder.push_back(mockReclaimer->priority()); + }; + + for (uint32_t i = 0; i < kNumChildren; ++i) { + auto reclaimer = MockMemoryReclaimer::create( + kPoolMemoryBytes, + kPoolMemoryBytes, + reclaimerCb, + folly::Random::rand32(-10000, 10000)); + leafPools.push_back(rootPool->addLeafChild( + fmt::format("leaf-{}", i), true, std::move(reclaimer))); + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim(kNumChildren * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(priorityOrder.size(), kNumChildren); + for (uint32_t i = 0; i < priorityOrder.size() - 1; i++) { + ASSERT_LE(priorityOrder[i], priorityOrder[i + 1]); + } +} + +TEST_F(MemoryReclaimerTest, parallelReclaimerPriorities) { + const uint32_t kNumPriorities = 10; + const uint64_t kPoolMemoryBytes = 1024; + + for (const auto numSamePriorityChildren : {1, 10}) { + auto rootPool = memory::memoryManager()->addRootPool( + "parallelReclaimerPriorities", + 32 << 20, + exec::ParallelMemoryReclaimer::create(executor_.get())); + std::vector> leafPools; + std::mutex priorityOrderMutex; + std::vector priorityOrder; + priorityOrder.reserve(kNumPriorities * numSamePriorityChildren); + ReclaimCallback reclaimerCb = [&](MemoryPool* pool) { + auto* mockReclaimer = + dynamic_cast(pool->reclaimer()); + ASSERT_TRUE(mockReclaimer != nullptr); + std::lock_guard l(priorityOrderMutex); + priorityOrder.push_back(mockReclaimer->priority()); + }; + + for (uint32_t i = 0; i < kNumPriorities; ++i) { + const auto priority = folly::Random::rand32(-10000, 10000); + for (uint32_t j = 0; j < numSamePriorityChildren; j++) { + auto reclaimer = MockMemoryReclaimer::create( + kPoolMemoryBytes, kPoolMemoryBytes, reclaimerCb, priority); + leafPools.push_back(rootPool->addLeafChild( + fmt::format("leaf-{}-{}", i, j), true, std::move(reclaimer))); + } + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim( + kNumPriorities * numSamePriorityChildren * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(priorityOrder.size(), kNumPriorities * numSamePriorityChildren); + for (uint32_t i = 0; i < kNumPriorities; i++) { + for (uint32_t j = 0; j < numSamePriorityChildren - 1; j++) { + ASSERT_EQ( + priorityOrder[i * numSamePriorityChildren + j], + priorityOrder[i * numSamePriorityChildren + j + 1]); + } + if (i < kNumPriorities - 1) { + ASSERT_LE( + priorityOrder[(i + 1) * numSamePriorityChildren - 1], + priorityOrder[(i + 1) * numSamePriorityChildren]); + } + } + } +} + +TEST_F(MemoryReclaimerTest, multiLevelReclaimerPriorities) { + // Following tree structure is built with priorities + // rootPool + // ├── serial-aggr-0 (priority: 4) + // │ ├── serial-aggr-0.leaf-0 (priority: 0) + // │ ├── serial-aggr-0.leaf-1 (priority: 0) + // │ ├── serial-aggr-0.leaf-2 (priority: 1) + // │ ├── serial-aggr-0.leaf-3 (priority: 1) + // │ ├── ... + // │ └── serial-aggr-0.leaf-9 (priority: 4) + // ├── parallel-aggr-1 (priority: 3) + // │ ├── parallel-aggr-1.leaf-0 (priority: 0) + // │ ├── parallel-aggr-1.leaf-1 (priority: 0) + // │ ├── parallel-aggr-1.leaf-2 (priority: 1) + // │ ├── parallel-aggr-1.leaf-3 (priority: 1) + // │ ├── ... + // │ └── parallel-aggr-1.leaf-9 (priority: 4) + // ├── serial-aggr-2 (priority: 2) + // │ ├── serial-aggr-2.leaf-0 (priority: 0) + // │ ├── serial-aggr-2.leaf-1 (priority: 0) + // │ ├── serial-aggr-2.leaf-2 (priority: 1) + // │ ├── serial-aggr-2.leaf-3 (priority: 1) + // │ ├── ... + // │ └── serial-aggr-2.leaf-9 (priority: 4) + // └── parallel-aggr-3 (priority: 1) + // ├── parallel-aggr-3.leaf-0 (priority: 0) + // ├── parallel-aggr-3.leaf-1 (priority: 0) + // ├── parallel-aggr-3.leaf-2 (priority: 1) + // ├── parallel-aggr-3.leaf-3 (priority: 1) + // ├── ... + // └── parallel-aggr-3.leaf-9 (priority: 4) + auto rootPool = memory::memoryManager()->addRootPool( + "multiLevelReclaimerPriorities", + 32 << 20, + exec::MemoryReclaimer::create()); + const uint32_t kNumAggrPools = 4; + const uint32_t kNumLeafPerAggr = 10; + const uint64_t kPoolMemoryBytes = 1024; + std::vector> aggrPools; + std::vector> leafPools; + std::mutex reclaimOrderMutex; + std::vector reclaimOrder; + uint32_t poolIdx{0}; + for (uint32_t i = 0; i < kNumAggrPools; i++) { + if (i % 2 == 0) { + aggrPools.push_back(rootPool->addAggregateChild( + fmt::format("serial-aggr-{}", poolIdx++), + exec::MemoryReclaimer::create(kNumAggrPools - i))); + } else { + aggrPools.push_back(rootPool->addAggregateChild( + fmt::format("parallel-aggr-{}", poolIdx++), + exec::ParallelMemoryReclaimer::create( + executor_.get(), kNumAggrPools - i))); + } + auto& aggrPool = aggrPools.back(); + const auto aggrPoolName = aggrPool->name(); + for (uint32_t j = 0; j < kNumLeafPerAggr; j++) { + leafPools.push_back(aggrPools.back()->addLeafChild( + fmt::format("{}.leaf-{}", aggrPoolName, j), + true, + MockMemoryReclaimer::create( + kPoolMemoryBytes, + kPoolMemoryBytes, + [&](MemoryPool* pool) { + std::lock_guard l(reclaimOrderMutex); + reclaimOrder.push_back(pool); + }, + j / 2))); + } + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim( + kNumAggrPools * kNumLeafPerAggr * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(reclaimOrder.size(), kNumAggrPools * kNumLeafPerAggr); + for (uint32_t i = 0; i < kNumAggrPools; i++) { + uint32_t start = i * kNumLeafPerAggr; + const auto poolName = reclaimOrder[start]->name(); + switch (i) { + case 0: + ASSERT_TRUE(poolName.find("parallel-aggr-3") != std::string::npos); + break; + case 1: + ASSERT_TRUE(poolName.find("serial-aggr-2") != std::string::npos); + break; + case 2: + ASSERT_TRUE(poolName.find("parallel-aggr-1") != std::string::npos); + break; + case 3: + ASSERT_TRUE(poolName.find("serial-aggr-0") != std::string::npos); + break; + default: + FAIL(); + } + for (uint32_t j = start; j < start + kNumLeafPerAggr - 1; j++) { + const auto* firstReclaimer = + dynamic_cast(reclaimOrder[j]->reclaimer()); + ASSERT_TRUE(firstReclaimer != nullptr); + const auto* secondReclaimer = + dynamic_cast(reclaimOrder[j + 1]->reclaimer()); + ASSERT_TRUE(secondReclaimer != nullptr); + ASSERT_LE(firstReclaimer->priority(), secondReclaimer->priority()); + } + } +} diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.h b/velox/exec/tests/utils/ArbitratorTestUtil.h index 19cb362bc3303..6565a25b87219 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.h +++ b/velox/exec/tests/utils/ArbitratorTestUtil.h @@ -36,7 +36,7 @@ constexpr uint64_t kMemoryPoolInitCapacity = 16 * MB; class FakeMemoryReclaimer : public exec::MemoryReclaimer { public: - FakeMemoryReclaimer() = default; + FakeMemoryReclaimer() : exec::MemoryReclaimer(0) {} static std::unique_ptr create() { return std::make_unique();