From 939c102f088ff74c86498e17f9e06147c5321aa2 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Thu, 5 Dec 2024 21:54:32 -0800 Subject: [PATCH] feat: Add priority based memory reclaim framework (#11598) Summary: X-link: https://github.com/facebookincubator/nimble/pull/111 * Adds priority base reclaiming to memory reclaim framework. The priority determines which memory pool to reclaim first on the same level. This would help to make reclaim more application logic aware. * Make join node reclaim priority lower than others. This is because cost of reclaiming (spilling) on join node is high compared to other nodes. Pull Request resolved: https://github.com/facebookincubator/velox/pull/11598 Reviewed By: xiaoxmeng Differential Revision: D66261340 Pulled By: tanjialiang fbshipit-source-id: c03b2ef25b39dc8771f66321731d5a88da26638e --- velox/common/memory/MemoryArbitrator.cpp | 25 ++- velox/common/memory/MemoryArbitrator.h | 27 ++- .../tests/ArbitrationParticipantTest.cpp | 6 +- .../memory/tests/MemoryArbitratorTest.cpp | 3 +- velox/common/memory/tests/MemoryPoolTest.cpp | 3 +- .../memory/tests/MockSharedArbitratorTest.cpp | 6 +- velox/connectors/hive/HiveDataSink.h | 2 +- velox/core/QueryCtx.h | 5 +- velox/dwio/common/SortingWriter.h | 4 +- velox/dwio/dwrf/writer/Writer.h | 3 +- velox/exec/HashJoinBridge.h | 12 +- velox/exec/MemoryReclaimer.cpp | 32 +++- velox/exec/MemoryReclaimer.h | 9 +- velox/exec/Operator.h | 2 +- velox/exec/TableWriter.h | 3 +- velox/exec/Task.cpp | 53 +----- velox/exec/Task.h | 128 ++++++++++--- velox/exec/tests/ExchangeClientTest.cpp | 1 - velox/exec/tests/MemoryReclaimerTest.cpp | 176 +++++++++++++++++- velox/exec/tests/utils/ArbitratorTestUtil.h | 2 +- velox/exec/tests/utils/Cursor.cpp | 1 + velox/runner/LocalRunner.cpp | 1 + 22 files changed, 385 insertions(+), 119 deletions(-) diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index eac6d149f485..769630bf8bd3 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -174,8 +174,8 @@ void MemoryArbitrator::unregisterFactory(const std::string& kind) { return pool->shrink(targetBytes); } -std::unique_ptr MemoryReclaimer::create() { - return std::unique_ptr(new MemoryReclaimer()); +std::unique_ptr MemoryReclaimer::create(int32_t priority) { + return std::unique_ptr(new MemoryReclaimer(priority)); } // static @@ -234,8 +234,9 @@ uint64_t MemoryReclaimer::reclaim( return 0; } - // Sort the child pools based on their reserved memory and reclaim from the - // child pool with most reservation first. + // Sort the child pools based on their reclaimer priority and reserved memory. + // Reclaim from the child pool with highest priority and most reservation + // first. struct Candidate { std::shared_ptr pool; int64_t reclaimableBytes; @@ -247,8 +248,13 @@ uint64_t MemoryReclaimer::reclaim( for (auto& entry : pool->children_) { auto child = entry.second.lock(); if (child != nullptr) { - const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0); - candidates.push_back(Candidate{std::move(child), reclaimableBytes}); + const auto reclaimableBytesOpt = child->reclaimableBytes(); + if (!reclaimableBytesOpt.has_value()) { + continue; + } + candidates.push_back(Candidate{ + std::move(child), + static_cast(reclaimableBytesOpt.value())}); } } } @@ -257,7 +263,12 @@ uint64_t MemoryReclaimer::reclaim( candidates.begin(), candidates.end(), [](const auto& lhs, const auto& rhs) { - return lhs.reclaimableBytes > rhs.reclaimableBytes; + const auto lhsPrio = lhs.pool->reclaimer()->priority(); + const auto rhsPrio = rhs.pool->reclaimer()->priority(); + if (lhsPrio == rhsPrio) { + return lhs.reclaimableBytes > rhs.reclaimableBytes; + } + return lhsPrio < rhsPrio; }); uint64_t reclaimedBytes{0}; diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 969fbb0fa5a9..09f495357300 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -287,7 +287,7 @@ class MemoryReclaimer { virtual ~MemoryReclaimer() = default; - static std::unique_ptr create(); + static std::unique_ptr create(int32_t priority = 0); /// Invoked memory reclaim function from 'pool' and record execution 'stats'. static uint64_t run(const std::function& func, Stats& stats); @@ -309,6 +309,26 @@ class MemoryReclaimer { /// enterArbitration has been called. virtual void leaveArbitration() noexcept {} + /// Invoked by upper layer reclaimer, to return the priority of this + /// reclaimer. The priority determines the reclaiming order of self among all + /// same level reclaimers. The smaller the number, the higher the priority. + /// Consider the following memory pool & reclaimer structure: + /// + /// rec1(pri 1) + /// / \ + /// / \ + /// / \ + /// rec2(pri 1) rec3(pri 3) + /// / \ / \ + /// / \ / \ + /// rec4(pri 1) rec5(pri 0) rec6(pri 0) rec7(pri 1) + /// + /// The reclaiming traversing order will be rec1 -> rec2 -> rec5 -> rec4 -> + /// rec3 -> rec6 -> rec7 + virtual int32_t priority() const { + return priority_; + }; + /// Invoked by the memory arbitrator to get the amount of memory bytes that /// can be reclaimed from 'pool'. The function returns true if 'pool' is /// reclaimable and returns the estimated reclaimable bytes in @@ -343,7 +363,10 @@ class MemoryReclaimer { virtual void abort(MemoryPool* pool, const std::exception_ptr& error); protected: - MemoryReclaimer() = default; + explicit MemoryReclaimer(int32_t priority) : priority_(priority){}; + + private: + const int32_t priority_; }; /// Helper class used to measure the memory bytes reclaimed from a memory pool diff --git a/velox/common/memory/tests/ArbitrationParticipantTest.cpp b/velox/common/memory/tests/ArbitrationParticipantTest.cpp index 5bb23d2c3729..c72bb476dea7 100644 --- a/velox/common/memory/tests/ArbitrationParticipantTest.cpp +++ b/velox/common/memory/tests/ArbitrationParticipantTest.cpp @@ -131,7 +131,8 @@ class MockTask : public std::enable_shared_from_this { class RootMemoryReclaimer : public memory::MemoryReclaimer { public: - RootMemoryReclaimer(const std::shared_ptr& task) : task_(task) {} + RootMemoryReclaimer(const std::shared_ptr& task) + : memory::MemoryReclaimer(0), task_(task) {} static std::unique_ptr create( const std::shared_ptr& task) { @@ -179,7 +180,8 @@ class MockTask : public std::enable_shared_from_this { bool reclaimable, ReclaimInjectionCallback reclaimInjectCb = nullptr, ArbitrationInjectionCallback arbitrationInjectCb = nullptr) - : task_(task), + : memory::MemoryReclaimer(0), + task_(task), reclaimable_(reclaimable), reclaimInjectCb_(std::move(reclaimInjectCb)), arbitrationInjectCb_(std::move(arbitrationInjectCb)) {} diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 1befc83a1a8f..bdbf0cc5b3e6 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -505,7 +505,8 @@ class MockLeafMemoryReclaimer : public MemoryReclaimer { std::atomic& totalUsedBytes, bool reclaimable = true, bool* underArbitration = nullptr) - : reclaimable_(reclaimable), + : MemoryReclaimer(0), + reclaimable_(reclaimable), underArbitration_(underArbitration), totalUsedBytes_(totalUsedBytes) {} diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index 28e7732624a4..648f29c60c2b 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -3572,7 +3572,8 @@ class MockMemoryReclaimer : public MemoryReclaimer { } private: - explicit MockMemoryReclaimer(bool doThrow) : doThrow_(doThrow) {} + explicit MockMemoryReclaimer(bool doThrow) + : MemoryReclaimer(0), doThrow_(doThrow) {} const bool doThrow_; }; diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 61efea38717b..6573709a7027 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -92,7 +92,8 @@ class MockTask : public std::enable_shared_from_this { class MemoryReclaimer : public memory::MemoryReclaimer { public: - MemoryReclaimer(const std::shared_ptr& task) : task_(task) {} + MemoryReclaimer(const std::shared_ptr& task) + : memory::MemoryReclaimer(0), task_(task) {} static std::unique_ptr create( const std::shared_ptr& task) { @@ -178,7 +179,8 @@ class MockMemoryOperator { bool reclaimable, ReclaimInjectionCallback reclaimInjectCb = nullptr, ArbitrationInjectionCallback arbitrationInjectCb = nullptr) - : op_(op), + : memory::MemoryReclaimer(0), + op_(op), reclaimable_(reclaimable), reclaimInjectCb_(std::move(reclaimInjectCb)), arbitrationInjectCb_(std::move(arbitrationInjectCb)) {} diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index f0d31f765418..8274d8204f03 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -494,7 +494,7 @@ class HiveDataSink : public DataSink { HiveDataSink* dataSink, HiveWriterInfo* writerInfo, io::IoStatistics* ioStats) - : exec::MemoryReclaimer(), + : exec::MemoryReclaimer(0), dataSink_(dataSink), writerInfo_(writerInfo), ioStats_(ioStats) { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 5e89ccc4c907..88ce04b1b9a0 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -170,8 +170,9 @@ class QueryCtx : public std::enable_shared_from_this { protected: MemoryReclaimer( const std::shared_ptr& queryCtx, - memory::MemoryPool* pool) - : queryCtx_(queryCtx), pool_(pool) { + memory::MemoryPool* pool, + int32_t priority = 0) + : memory::MemoryReclaimer(priority), queryCtx_(queryCtx), pool_(pool) { VELOX_CHECK_NOT_NULL(pool_); } diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index a7edd69ed8e2..a136cff12387 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -63,8 +63,8 @@ class SortingWriter : public Writer { memory::MemoryReclaimer::Stats& stats) override; private: - explicit MemoryReclaimer(SortingWriter* writer) - : exec::MemoryReclaimer(), + MemoryReclaimer(SortingWriter* writer) + : exec::MemoryReclaimer(0), writer_(writer), canReclaim_(writer_->sortBuffer_->canSpill()) {} diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 8cc82303e10f..dec42bc2cc71 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -163,7 +163,8 @@ class Writer : public dwio::common::Writer { memory::MemoryReclaimer::Stats& stats) override; private: - explicit MemoryReclaimer(Writer* writer) : writer_(writer) { + MemoryReclaimer(Writer* writer) + : exec::MemoryReclaimer(0), writer_(writer) { VELOX_CHECK_NOT_NULL(writer_); } diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index 40ba021988f6..d59c564cc7af 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -174,9 +174,10 @@ bool isLeftNullAwareJoinWithFilter( class HashJoinMemoryReclaimer final : public MemoryReclaimer { public: static std::unique_ptr create( - std::shared_ptr joinBridge) { + std::shared_ptr joinBridge, + int32_t priority = 0) { return std::unique_ptr( - new HashJoinMemoryReclaimer(joinBridge)); + new HashJoinMemoryReclaimer(joinBridge, priority)); } uint64_t reclaim( @@ -186,9 +187,10 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer { memory::MemoryReclaimer::Stats& stats) final; private: - explicit HashJoinMemoryReclaimer( - const std::shared_ptr& joinBridge) - : MemoryReclaimer(), joinBridge_(joinBridge) {} + HashJoinMemoryReclaimer( + const std::shared_ptr& joinBridge, + int32_t priority) + : MemoryReclaimer(priority), joinBridge_(joinBridge) {} std::weak_ptr joinBridge_; }; diff --git a/velox/exec/MemoryReclaimer.cpp b/velox/exec/MemoryReclaimer.cpp index 4fa91f69433d..4aee429883db 100644 --- a/velox/exec/MemoryReclaimer.cpp +++ b/velox/exec/MemoryReclaimer.cpp @@ -20,8 +20,13 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { -std::unique_ptr MemoryReclaimer::create() { - return std::unique_ptr(new MemoryReclaimer()); +MemoryReclaimer::MemoryReclaimer(int32_t priority) + : memory::MemoryReclaimer(priority) {} + +std::unique_ptr MemoryReclaimer::create( + int32_t priority) { + return std::unique_ptr( + new MemoryReclaimer(priority)); } void MemoryReclaimer::enterArbitration() { @@ -67,13 +72,15 @@ void MemoryReclaimer::abort( } /*static*/ std::unique_ptr -ParallelMemoryReclaimer::create(folly::Executor* executor) { +ParallelMemoryReclaimer::create(folly::Executor* executor, int32_t priority) { return std::unique_ptr( - new ParallelMemoryReclaimer(executor)); + new ParallelMemoryReclaimer(executor, priority)); } -ParallelMemoryReclaimer::ParallelMemoryReclaimer(folly::Executor* executor) - : executor_(executor) {} +ParallelMemoryReclaimer::ParallelMemoryReclaimer( + folly::Executor* executor, + int32_t priority) + : MemoryReclaimer(priority), executor_(executor) {} uint64_t ParallelMemoryReclaimer::reclaim( memory::MemoryPool* pool, @@ -85,8 +92,7 @@ uint64_t ParallelMemoryReclaimer::reclaim( pool, targetBytes, maxWaitMs, stats); } - // Sort the child pools based on their reserved memory and reclaim from the - // child pool with most reservation first. + // Sort candidates based on priority. struct Candidate { std::shared_ptr pool; int64_t reclaimableBytes; @@ -98,11 +104,17 @@ uint64_t ParallelMemoryReclaimer::reclaim( for (auto& entry : pool->children_) { auto child = entry.second.lock(); if (child != nullptr) { - const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0); - candidates.push_back(Candidate{std::move(child), reclaimableBytes}); + const auto reclaimableBytesOpt = child->reclaimableBytes(); + if (!reclaimableBytesOpt.has_value()) { + continue; + } + candidates.push_back(Candidate{ + std::move(child), + static_cast(reclaimableBytesOpt.value())}); } } } + struct ReclaimResult { const uint64_t reclaimedBytes{0}; const Stats stats; diff --git a/velox/exec/MemoryReclaimer.h b/velox/exec/MemoryReclaimer.h index 7acc41cbed70..d94799a73d1c 100644 --- a/velox/exec/MemoryReclaimer.h +++ b/velox/exec/MemoryReclaimer.h @@ -25,7 +25,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer { public: virtual ~MemoryReclaimer() = default; - static std::unique_ptr create(); + static std::unique_ptr create(int32_t priority = 0); void enterArbitration() override; @@ -35,7 +35,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer { override; protected: - MemoryReclaimer() = default; + explicit MemoryReclaimer(int32_t priortity); }; /// Provides the parallel memory reclaimer implementation for velox task @@ -46,7 +46,8 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer { virtual ~ParallelMemoryReclaimer() = default; static std::unique_ptr create( - folly::Executor* executor); + folly::Executor* executor, + int32_t priority = 0); uint64_t reclaim( memory::MemoryPool* pool, @@ -55,7 +56,7 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer { Stats& stats) override; protected: - explicit ParallelMemoryReclaimer(folly::Executor* executor); + ParallelMemoryReclaimer(folly::Executor* executor, int32_t priority); folly::Executor* const executor_{nullptr}; }; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 58bc2f53e6a6..db48b26df11a 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -708,7 +708,7 @@ class Operator : public BaseRuntimeStatWriter { protected: MemoryReclaimer(const std::shared_ptr& driver, Operator* op) - : driver_(driver), op_(op) { + : memory::MemoryReclaimer(0), driver_(driver), op_(op) { VELOX_CHECK_NOT_NULL(op_); } diff --git a/velox/exec/TableWriter.h b/velox/exec/TableWriter.h index b188019362fb..b71f24c14588 100644 --- a/velox/exec/TableWriter.h +++ b/velox/exec/TableWriter.h @@ -178,7 +178,8 @@ class TableWriter : public Operator { const std::shared_ptr& driver, Operator* op) : ParallelMemoryReclaimer( - spillConfig.has_value() ? spillConfig.value().executor : nullptr), + spillConfig.has_value() ? spillConfig.value().executor : nullptr, + 0), canReclaim_(spillConfig.has_value()), driver_(driver), op_(op) {} diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index a97457a0b55a..b055105e2047 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -236,48 +236,6 @@ bool unregisterTaskListener(const std::shared_ptr& listener) { }); } -// static. -std::shared_ptr Task::create( - const std::string& taskId, - core::PlanFragment planFragment, - int destination, - std::shared_ptr queryCtx, - ExecutionMode mode, - Consumer consumer, - std::function onError) { - return Task::create( - taskId, - std::move(planFragment), - destination, - std::move(queryCtx), - mode, - (consumer ? [c = std::move(consumer)]() { return c; } - : ConsumerSupplier{}), - std::move(onError)); -} - -// static -std::shared_ptr Task::create( - const std::string& taskId, - core::PlanFragment planFragment, - int destination, - std::shared_ptr queryCtx, - ExecutionMode mode, - ConsumerSupplier consumerSupplier, - std::function onError) { - auto task = std::shared_ptr(new Task( - taskId, - std::move(planFragment), - destination, - std::move(queryCtx), - mode, - std::move(consumerSupplier), - std::move(onError))); - task->initTaskPool(); - task->addToTaskList(); - return task; -} - Task::Task( const std::string& taskId, core::PlanFragment planFragment, @@ -285,11 +243,13 @@ Task::Task( std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority, std::function onError) : uuid_{makeUuid()}, taskId_(taskId), destination_(destination), mode_(mode), + memoryArbitrationPriority_(memoryArbitrationPriority), queryCtx_(std::move(queryCtx)), planFragment_(std::move(planFragment)), traceConfig_(maybeMakeTraceConfig()), @@ -514,6 +474,7 @@ memory::MemoryPool* Task::getOrAddJoinNodePool( } childPools_.push_back(pool_->addAggregateChild( fmt::format("node.{}", nodeId), createNodeReclaimer([&]() { + // Set join reclaimer lower priority as cost of reclaiming join is high. return HashJoinMemoryReclaimer::create( getHashJoinBridgeLocked(splitGroupId, planNodeId)); }))); @@ -548,7 +509,8 @@ std::unique_ptr Task::createTaskReclaimer() { if (queryCtx_->pool()->reclaimer() == nullptr) { return nullptr; } - return Task::MemoryReclaimer::create(shared_from_this()); + return Task::MemoryReclaimer::create( + shared_from_this(), memoryArbitrationPriority_); } velox::memory::MemoryPool* Task::addOperatorPool( @@ -3023,9 +2985,10 @@ void Task::testingVisitDrivers(const std::function& callback) { } std::unique_ptr Task::MemoryReclaimer::create( - const std::shared_ptr& task) { + const std::shared_ptr& task, + int64_t priority) { return std::unique_ptr( - new Task::MemoryReclaimer(task)); + new Task::MemoryReclaimer(task, priority)); } uint64_t Task::MemoryReclaimer::reclaim( diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 74060b9b5034..4a81b3353f81 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -51,24 +51,29 @@ class Task : public std::enable_shared_from_this { kParallel, }; - /// Creates a task to execute a plan fragment, but doesn't start execution - /// until Task::start() method is called. - /// @param taskId Unique task identifier. - /// @param planFragment Plan fragment. - /// @param destination Partition number if task is expected to receive data - /// for a particular partition from a set of upstream tasks participating in a - /// distributed execution. Used to initialize an ExchangeClient. Ignored if - /// plan fragment doesn't have an ExchangeNode. - /// @param queryCtx Query context containing MemoryPool and MemoryAllocator - /// instances to use for memory allocations during execution, executor to - /// schedule operators on, and session properties. - /// @param mode Execution mode for this task. The task can be executed in - /// Serial and Parallel mode. - /// @param consumer Optional factory function to get callbacks to pass the - /// results of the execution. In a parallel execution mode, results from each - /// thread are passed on to a separate consumer. - /// @param onError Optional callback to receive an exception if task - /// execution fails. +/// Creates a task to execute a plan fragment, but doesn't start execution +/// until Task::start() method is called. +/// @param taskId Unique task identifier. +/// @param planFragment Plan fragment. +/// @param destination Partition number if task is expected to receive data +/// for a particular partition from a set of upstream tasks participating in a +/// distributed execution. Used to initialize an ExchangeClient. Ignored if +/// plan fragment doesn't have an ExchangeNode. +/// @param queryCtx Query context containing MemoryPool and MemoryAllocator +/// instances to use for memory allocations during execution, executor to +/// schedule operators on, and session properties. +/// @param mode Execution mode for this task. The task can be executed in +/// Serial and Parallel mode. +/// @param consumer Optional factory function to get callbacks to pass the +/// results of the execution. In a parallel execution mode, results from each +/// thread are passed on to a separate consumer. +/// @param onError Optional callback to receive an exception if task +/// execution fails. +/// @param memoryArbitrationPriority Optional priority on task that, in a +/// multi task system, is used for memory arbitration to decide the order of +/// reclaiming. +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY + // TODO: Remove this overload once call sites are updated. static std::shared_ptr create( const std::string& taskId, core::PlanFragment planFragment, @@ -76,8 +81,19 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, Consumer consumer = nullptr, - std::function onError = nullptr); + std::function onError = nullptr) { + return Task::create( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + (consumer ? [c = std::move(consumer)]() { return c; } + : ConsumerSupplier{}), + std::move(onError)); + } + // TODO: Remove this overload once call sites are updated. static std::shared_ptr create( const std::string& taskId, core::PlanFragment planFragment, @@ -85,7 +101,68 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, - std::function onError = nullptr); + std::function onError = nullptr) { + auto task = std::shared_ptr(new Task( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + std::move(consumerSupplier), + 0, + std::move(onError))); + task->initTaskPool(); + task->addToTaskList(); + return task; + } +#else + // TODO: Move the definition of this function to the cpp file after above is + // cleaned up. The temporary move of definition from cpp to header is because + // compatibility macro does not work in cpp file. + static std::shared_ptr create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + Consumer consumer = nullptr, + int32_t memoryArbitrationPriority = 0, + std::function onError = nullptr) { + return Task::create( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + (consumer ? [c = std::move(consumer)]() { return c; } + : ConsumerSupplier{}), + memoryArbitrationPriority, + std::move(onError)); + } + + static std::shared_ptr create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority = 0, + std::function onError = nullptr) { + auto task = std::shared_ptr(new Task( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + std::move(consumerSupplier), + memoryArbitrationPriority, + std::move(onError))); + task->initTaskPool(); + task->addToTaskList(); + return task; + } +#endif /// Convenience function for shortening a Presto taskId. To be used /// in debugging messages and listings. @@ -718,6 +795,7 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority = 0, std::function onError = nullptr); // Invoked to add this to the system-wide running task list on task creation. @@ -816,7 +894,8 @@ class Task : public std::enable_shared_from_this { class MemoryReclaimer : public exec::MemoryReclaimer { public: static std::unique_ptr create( - const std::shared_ptr& task); + const std::shared_ptr& task, + int64_t priority = 0); uint64_t reclaim( memory::MemoryPool* pool, @@ -828,7 +907,8 @@ class Task : public std::enable_shared_from_this { override; private: - explicit MemoryReclaimer(const std::shared_ptr& task) : task_(task) { + MemoryReclaimer(const std::shared_ptr& task, int64_t priority) + : exec::MemoryReclaimer(priority), task_(task) { VELOX_CHECK_NOT_NULL(task); } @@ -1017,6 +1097,10 @@ class Task : public std::enable_shared_from_this { // executed in a single mode throughout its lifetime const ExecutionMode mode_; + // In a multi-task system, it is used to make cross task decisions by memory + // arbitration to determine which task to reclaim first. + const int32_t memoryArbitrationPriority_; + std::shared_ptr queryCtx_; core::PlanFragment planFragment_; diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 60de09c2513f..2ffcd78cbd7d 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -18,7 +18,6 @@ #include #include #include "velox/common/base/tests/GTestUtils.h" -// #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/Task.h" #include "velox/exec/tests/utils/LocalExchangeSource.h" diff --git a/velox/exec/tests/MemoryReclaimerTest.cpp b/velox/exec/tests/MemoryReclaimerTest.cpp index 19bed7fe8ef6..2e12003f4bc8 100644 --- a/velox/exec/tests/MemoryReclaimerTest.cpp +++ b/velox/exec/tests/MemoryReclaimerTest.cpp @@ -170,15 +170,18 @@ TEST(ReclaimableSectionGuard, basic) { } namespace { + +using ReclaimCallback = const std::function; + class MockMemoryReclaimer : public memory::MemoryReclaimer { public: static std::unique_ptr create( bool reclaimable, uint64_t memoryBytes, - const std::function& reclaimCallback = - nullptr) { - return std::unique_ptr( - new MockMemoryReclaimer(reclaimable, memoryBytes, reclaimCallback)); + ReclaimCallback& reclaimCallback = nullptr, + int32_t priority = 0) { + return std::unique_ptr(new MockMemoryReclaimer( + reclaimable, memoryBytes, reclaimCallback, priority)); } bool reclaimableBytes(const MemoryPool& pool, uint64_t& reclaimableBytes) @@ -210,18 +213,25 @@ class MockMemoryReclaimer : public memory::MemoryReclaimer { return memoryBytes_; } + int32_t priority() const override { + return priority_; + } + private: MockMemoryReclaimer( bool reclaimable, uint64_t memoryBytes, - const std::function& reclaimCallback) - : reclaimCallback_(reclaimCallback), + const std::function& reclaimCallback, + int32_t priority) + : MemoryReclaimer(priority), + reclaimCallback_(reclaimCallback), + priority_(priority), reclaimable_(reclaimable), memoryBytes_(memoryBytes) {} - const std::function reclaimCallback_; + const ReclaimCallback reclaimCallback_; + const int32_t priority_; bool reclaimable_{false}; - int reclaimCount_{0}; uint64_t memoryBytes_{0}; }; } // namespace @@ -313,3 +323,153 @@ TEST_F(MemoryReclaimerTest, recursiveArbitrationWithParallelReclaim) { } ASSERT_TRUE(reclaimExecuted); } + +TEST_F(MemoryReclaimerTest, reclaimerPriorities) { + auto rootPool = memory::memoryManager()->addRootPool( + "reclaimerPriorities", kMaxMemory, exec::MemoryReclaimer::create()); + std::vector> leafPools; + + const uint32_t kNumChildren = 10; + const uint64_t kPoolMemoryBytes = 1024; + std::vector priorityOrder; + priorityOrder.reserve(kNumChildren); + ReclaimCallback reclaimerCb = [&](MemoryPool* pool) { + auto* mockReclaimer = dynamic_cast(pool->reclaimer()); + ASSERT_TRUE(mockReclaimer != nullptr); + priorityOrder.push_back(mockReclaimer->priority()); + }; + + for (uint32_t i = 0; i < kNumChildren; ++i) { + auto reclaimer = MockMemoryReclaimer::create( + kPoolMemoryBytes, + kPoolMemoryBytes, + reclaimerCb, + static_cast(folly::Random::rand32(10000)) - 5000); + leafPools.push_back(rootPool->addLeafChild( + fmt::format("leaf-{}", i), true, std::move(reclaimer))); + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim(kNumChildren * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(priorityOrder.size(), kNumChildren); + for (uint32_t i = 0; i < priorityOrder.size() - 1; i++) { + ASSERT_LE(priorityOrder[i], priorityOrder[i + 1]); + } +} + +TEST_F(MemoryReclaimerTest, multiLevelReclaimerPriorities) { + // Following tree structure is built with priorities + // rootPool + // ├── serial-aggr-0 (priority: 4) + // │ ├── serial-aggr-0.leaf-0 (priority: 0) + // │ ├── serial-aggr-0.leaf-1 (priority: 0) + // │ ├── serial-aggr-0.leaf-2 (priority: 1) + // │ ├── serial-aggr-0.leaf-3 (priority: 1) + // │ ├── ... + // │ └── serial-aggr-0.leaf-9 (priority: 4) + // ├── parallel-aggr-1 (priority: 3) + // │ ├── parallel-aggr-1.leaf-0 (priority: 0) + // │ ├── parallel-aggr-1.leaf-1 (priority: 0) + // │ ├── parallel-aggr-1.leaf-2 (priority: 1) + // │ ├── parallel-aggr-1.leaf-3 (priority: 1) + // │ ├── ... + // │ └── parallel-aggr-1.leaf-9 (priority: 4) + // ├── serial-aggr-2 (priority: 2) + // │ ├── serial-aggr-2.leaf-0 (priority: 0) + // │ ├── serial-aggr-2.leaf-1 (priority: 0) + // │ ├── serial-aggr-2.leaf-2 (priority: 1) + // │ ├── serial-aggr-2.leaf-3 (priority: 1) + // │ ├── ... + // │ └── serial-aggr-2.leaf-9 (priority: 4) + // └── parallel-aggr-3 (priority: 1) + // ├── parallel-aggr-3.leaf-0 (priority: 0) + // ├── parallel-aggr-3.leaf-1 (priority: 0) + // ├── parallel-aggr-3.leaf-2 (priority: 1) + // ├── parallel-aggr-3.leaf-3 (priority: 1) + // ├── ... + // └── parallel-aggr-3.leaf-9 (priority: 4) + auto rootPool = memory::memoryManager()->addRootPool( + "multiLevelReclaimerPriorities", + 32 << 20, + exec::MemoryReclaimer::create()); + const uint32_t kNumAggrPools = 4; + const uint32_t kNumLeafPerAggr = 10; + const uint64_t kPoolMemoryBytes = 1024; + std::vector> aggrPools; + std::vector> leafPools; + std::mutex reclaimOrderMutex; + std::vector reclaimOrder; + uint32_t poolIdx{0}; + for (uint32_t i = 0; i < kNumAggrPools; i++) { + if (i % 2 == 0) { + aggrPools.push_back(rootPool->addAggregateChild( + fmt::format("serial-aggr-{}", poolIdx++), + exec::MemoryReclaimer::create(kNumAggrPools - i))); + } else { + aggrPools.push_back(rootPool->addAggregateChild( + fmt::format("parallel-aggr-{}", poolIdx++), + exec::ParallelMemoryReclaimer::create( + executor_.get(), kNumAggrPools - i))); + } + auto& aggrPool = aggrPools.back(); + const auto aggrPoolName = aggrPool->name(); + for (uint32_t j = 0; j < kNumLeafPerAggr; j++) { + leafPools.push_back(aggrPools.back()->addLeafChild( + fmt::format("{}.leaf-{}", aggrPoolName, j), + true, + MockMemoryReclaimer::create( + kPoolMemoryBytes, + kPoolMemoryBytes, + [&](MemoryPool* pool) { + std::lock_guard l(reclaimOrderMutex); + reclaimOrder.push_back(pool); + }, + j / 2))); + } + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim( + kNumAggrPools * kNumLeafPerAggr * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(reclaimOrder.size(), kNumAggrPools * kNumLeafPerAggr); + for (uint32_t i = 0; i < kNumAggrPools; i++) { + uint32_t start = i * kNumLeafPerAggr; + const auto poolName = reclaimOrder[start]->name(); + bool isParallel = false; + switch (i) { + case 0: + ASSERT_TRUE(poolName.find("parallel-aggr-3") != std::string::npos); + isParallel = true; + break; + case 1: + ASSERT_TRUE(poolName.find("serial-aggr-2") != std::string::npos); + break; + case 2: + ASSERT_TRUE(poolName.find("parallel-aggr-1") != std::string::npos); + isParallel = true; + break; + case 3: + ASSERT_TRUE(poolName.find("serial-aggr-0") != std::string::npos); + break; + default: + FAIL(); + } + for (uint32_t j = start; j < start + kNumLeafPerAggr - 1; j++) { + if (isParallel) { + // Priority is not applicable to parallel reclaimer. + continue; + } + const auto* firstReclaimer = + dynamic_cast(reclaimOrder[j]->reclaimer()); + ASSERT_TRUE(firstReclaimer != nullptr); + const auto* secondReclaimer = + dynamic_cast(reclaimOrder[j + 1]->reclaimer()); + ASSERT_TRUE(secondReclaimer != nullptr); + ASSERT_LE(firstReclaimer->priority(), secondReclaimer->priority()); + } + } +} diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.h b/velox/exec/tests/utils/ArbitratorTestUtil.h index 748cb642f515..42ad7d468163 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.h +++ b/velox/exec/tests/utils/ArbitratorTestUtil.h @@ -36,7 +36,7 @@ constexpr uint64_t kMemoryPoolInitCapacity = 16 * MB; class FakeMemoryReclaimer : public exec::MemoryReclaimer { public: - FakeMemoryReclaimer() = default; + FakeMemoryReclaimer() : exec::MemoryReclaimer(0) {} static std::unique_ptr create() { return std::make_unique(); diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index ad7b4133c6c7..eaa098997f15 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -243,6 +243,7 @@ class MultiThreadedTaskCursor : public TaskCursorBase { copy->copy(vector.get(), 0, 0, vector->size()); return queue->enqueue(std::move(copy), future); }, + 0, [queue](std::exception_ptr) { // onError close the queue to unblock producers and consumers. // moveNext will handle rethrowing the error once it's diff --git a/velox/runner/LocalRunner.cpp b/velox/runner/LocalRunner.cpp index d18a32528416..6e173a1a4e80 100644 --- a/velox/runner/LocalRunner.cpp +++ b/velox/runner/LocalRunner.cpp @@ -166,6 +166,7 @@ LocalRunner::makeStages() { params_.queryCtx, exec::Task::ExecutionMode::kParallel, consumer, + 0, onError); stages_.back().push_back(task); if (fragment.numBroadcastDestinations) {