diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index eac6d149f485..769630bf8bd3 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -174,8 +174,8 @@ void MemoryArbitrator::unregisterFactory(const std::string& kind) { return pool->shrink(targetBytes); } -std::unique_ptr MemoryReclaimer::create() { - return std::unique_ptr(new MemoryReclaimer()); +std::unique_ptr MemoryReclaimer::create(int32_t priority) { + return std::unique_ptr(new MemoryReclaimer(priority)); } // static @@ -234,8 +234,9 @@ uint64_t MemoryReclaimer::reclaim( return 0; } - // Sort the child pools based on their reserved memory and reclaim from the - // child pool with most reservation first. + // Sort the child pools based on their reclaimer priority and reserved memory. + // Reclaim from the child pool with highest priority and most reservation + // first. struct Candidate { std::shared_ptr pool; int64_t reclaimableBytes; @@ -247,8 +248,13 @@ uint64_t MemoryReclaimer::reclaim( for (auto& entry : pool->children_) { auto child = entry.second.lock(); if (child != nullptr) { - const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0); - candidates.push_back(Candidate{std::move(child), reclaimableBytes}); + const auto reclaimableBytesOpt = child->reclaimableBytes(); + if (!reclaimableBytesOpt.has_value()) { + continue; + } + candidates.push_back(Candidate{ + std::move(child), + static_cast(reclaimableBytesOpt.value())}); } } } @@ -257,7 +263,12 @@ uint64_t MemoryReclaimer::reclaim( candidates.begin(), candidates.end(), [](const auto& lhs, const auto& rhs) { - return lhs.reclaimableBytes > rhs.reclaimableBytes; + const auto lhsPrio = lhs.pool->reclaimer()->priority(); + const auto rhsPrio = rhs.pool->reclaimer()->priority(); + if (lhsPrio == rhsPrio) { + return lhs.reclaimableBytes > rhs.reclaimableBytes; + } + return lhsPrio < rhsPrio; }); uint64_t reclaimedBytes{0}; diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 969fbb0fa5a9..09f495357300 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -287,7 +287,7 @@ class MemoryReclaimer { virtual ~MemoryReclaimer() = default; - static std::unique_ptr create(); + static std::unique_ptr create(int32_t priority = 0); /// Invoked memory reclaim function from 'pool' and record execution 'stats'. static uint64_t run(const std::function& func, Stats& stats); @@ -309,6 +309,26 @@ class MemoryReclaimer { /// enterArbitration has been called. virtual void leaveArbitration() noexcept {} + /// Invoked by upper layer reclaimer, to return the priority of this + /// reclaimer. The priority determines the reclaiming order of self among all + /// same level reclaimers. The smaller the number, the higher the priority. + /// Consider the following memory pool & reclaimer structure: + /// + /// rec1(pri 1) + /// / \ + /// / \ + /// / \ + /// rec2(pri 1) rec3(pri 3) + /// / \ / \ + /// / \ / \ + /// rec4(pri 1) rec5(pri 0) rec6(pri 0) rec7(pri 1) + /// + /// The reclaiming traversing order will be rec1 -> rec2 -> rec5 -> rec4 -> + /// rec3 -> rec6 -> rec7 + virtual int32_t priority() const { + return priority_; + }; + /// Invoked by the memory arbitrator to get the amount of memory bytes that /// can be reclaimed from 'pool'. The function returns true if 'pool' is /// reclaimable and returns the estimated reclaimable bytes in @@ -343,7 +363,10 @@ class MemoryReclaimer { virtual void abort(MemoryPool* pool, const std::exception_ptr& error); protected: - MemoryReclaimer() = default; + explicit MemoryReclaimer(int32_t priority) : priority_(priority){}; + + private: + const int32_t priority_; }; /// Helper class used to measure the memory bytes reclaimed from a memory pool diff --git a/velox/common/memory/tests/ArbitrationParticipantTest.cpp b/velox/common/memory/tests/ArbitrationParticipantTest.cpp index 5bb23d2c3729..c72bb476dea7 100644 --- a/velox/common/memory/tests/ArbitrationParticipantTest.cpp +++ b/velox/common/memory/tests/ArbitrationParticipantTest.cpp @@ -131,7 +131,8 @@ class MockTask : public std::enable_shared_from_this { class RootMemoryReclaimer : public memory::MemoryReclaimer { public: - RootMemoryReclaimer(const std::shared_ptr& task) : task_(task) {} + RootMemoryReclaimer(const std::shared_ptr& task) + : memory::MemoryReclaimer(0), task_(task) {} static std::unique_ptr create( const std::shared_ptr& task) { @@ -179,7 +180,8 @@ class MockTask : public std::enable_shared_from_this { bool reclaimable, ReclaimInjectionCallback reclaimInjectCb = nullptr, ArbitrationInjectionCallback arbitrationInjectCb = nullptr) - : task_(task), + : memory::MemoryReclaimer(0), + task_(task), reclaimable_(reclaimable), reclaimInjectCb_(std::move(reclaimInjectCb)), arbitrationInjectCb_(std::move(arbitrationInjectCb)) {} diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 1befc83a1a8f..bdbf0cc5b3e6 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -505,7 +505,8 @@ class MockLeafMemoryReclaimer : public MemoryReclaimer { std::atomic& totalUsedBytes, bool reclaimable = true, bool* underArbitration = nullptr) - : reclaimable_(reclaimable), + : MemoryReclaimer(0), + reclaimable_(reclaimable), underArbitration_(underArbitration), totalUsedBytes_(totalUsedBytes) {} diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index 28e7732624a4..648f29c60c2b 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -3572,7 +3572,8 @@ class MockMemoryReclaimer : public MemoryReclaimer { } private: - explicit MockMemoryReclaimer(bool doThrow) : doThrow_(doThrow) {} + explicit MockMemoryReclaimer(bool doThrow) + : MemoryReclaimer(0), doThrow_(doThrow) {} const bool doThrow_; }; diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 61efea38717b..6573709a7027 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -92,7 +92,8 @@ class MockTask : public std::enable_shared_from_this { class MemoryReclaimer : public memory::MemoryReclaimer { public: - MemoryReclaimer(const std::shared_ptr& task) : task_(task) {} + MemoryReclaimer(const std::shared_ptr& task) + : memory::MemoryReclaimer(0), task_(task) {} static std::unique_ptr create( const std::shared_ptr& task) { @@ -178,7 +179,8 @@ class MockMemoryOperator { bool reclaimable, ReclaimInjectionCallback reclaimInjectCb = nullptr, ArbitrationInjectionCallback arbitrationInjectCb = nullptr) - : op_(op), + : memory::MemoryReclaimer(0), + op_(op), reclaimable_(reclaimable), reclaimInjectCb_(std::move(reclaimInjectCb)), arbitrationInjectCb_(std::move(arbitrationInjectCb)) {} diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index f0d31f765418..8274d8204f03 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -494,7 +494,7 @@ class HiveDataSink : public DataSink { HiveDataSink* dataSink, HiveWriterInfo* writerInfo, io::IoStatistics* ioStats) - : exec::MemoryReclaimer(), + : exec::MemoryReclaimer(0), dataSink_(dataSink), writerInfo_(writerInfo), ioStats_(ioStats) { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 5e89ccc4c907..88ce04b1b9a0 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -170,8 +170,9 @@ class QueryCtx : public std::enable_shared_from_this { protected: MemoryReclaimer( const std::shared_ptr& queryCtx, - memory::MemoryPool* pool) - : queryCtx_(queryCtx), pool_(pool) { + memory::MemoryPool* pool, + int32_t priority = 0) + : memory::MemoryReclaimer(priority), queryCtx_(queryCtx), pool_(pool) { VELOX_CHECK_NOT_NULL(pool_); } diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index a7edd69ed8e2..a136cff12387 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -63,8 +63,8 @@ class SortingWriter : public Writer { memory::MemoryReclaimer::Stats& stats) override; private: - explicit MemoryReclaimer(SortingWriter* writer) - : exec::MemoryReclaimer(), + MemoryReclaimer(SortingWriter* writer) + : exec::MemoryReclaimer(0), writer_(writer), canReclaim_(writer_->sortBuffer_->canSpill()) {} diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 8cc82303e10f..dec42bc2cc71 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -163,7 +163,8 @@ class Writer : public dwio::common::Writer { memory::MemoryReclaimer::Stats& stats) override; private: - explicit MemoryReclaimer(Writer* writer) : writer_(writer) { + MemoryReclaimer(Writer* writer) + : exec::MemoryReclaimer(0), writer_(writer) { VELOX_CHECK_NOT_NULL(writer_); } diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index 40ba021988f6..d59c564cc7af 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -174,9 +174,10 @@ bool isLeftNullAwareJoinWithFilter( class HashJoinMemoryReclaimer final : public MemoryReclaimer { public: static std::unique_ptr create( - std::shared_ptr joinBridge) { + std::shared_ptr joinBridge, + int32_t priority = 0) { return std::unique_ptr( - new HashJoinMemoryReclaimer(joinBridge)); + new HashJoinMemoryReclaimer(joinBridge, priority)); } uint64_t reclaim( @@ -186,9 +187,10 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer { memory::MemoryReclaimer::Stats& stats) final; private: - explicit HashJoinMemoryReclaimer( - const std::shared_ptr& joinBridge) - : MemoryReclaimer(), joinBridge_(joinBridge) {} + HashJoinMemoryReclaimer( + const std::shared_ptr& joinBridge, + int32_t priority) + : MemoryReclaimer(priority), joinBridge_(joinBridge) {} std::weak_ptr joinBridge_; }; diff --git a/velox/exec/MemoryReclaimer.cpp b/velox/exec/MemoryReclaimer.cpp index 4fa91f69433d..4aee429883db 100644 --- a/velox/exec/MemoryReclaimer.cpp +++ b/velox/exec/MemoryReclaimer.cpp @@ -20,8 +20,13 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { -std::unique_ptr MemoryReclaimer::create() { - return std::unique_ptr(new MemoryReclaimer()); +MemoryReclaimer::MemoryReclaimer(int32_t priority) + : memory::MemoryReclaimer(priority) {} + +std::unique_ptr MemoryReclaimer::create( + int32_t priority) { + return std::unique_ptr( + new MemoryReclaimer(priority)); } void MemoryReclaimer::enterArbitration() { @@ -67,13 +72,15 @@ void MemoryReclaimer::abort( } /*static*/ std::unique_ptr -ParallelMemoryReclaimer::create(folly::Executor* executor) { +ParallelMemoryReclaimer::create(folly::Executor* executor, int32_t priority) { return std::unique_ptr( - new ParallelMemoryReclaimer(executor)); + new ParallelMemoryReclaimer(executor, priority)); } -ParallelMemoryReclaimer::ParallelMemoryReclaimer(folly::Executor* executor) - : executor_(executor) {} +ParallelMemoryReclaimer::ParallelMemoryReclaimer( + folly::Executor* executor, + int32_t priority) + : MemoryReclaimer(priority), executor_(executor) {} uint64_t ParallelMemoryReclaimer::reclaim( memory::MemoryPool* pool, @@ -85,8 +92,7 @@ uint64_t ParallelMemoryReclaimer::reclaim( pool, targetBytes, maxWaitMs, stats); } - // Sort the child pools based on their reserved memory and reclaim from the - // child pool with most reservation first. + // Sort candidates based on priority. struct Candidate { std::shared_ptr pool; int64_t reclaimableBytes; @@ -98,11 +104,17 @@ uint64_t ParallelMemoryReclaimer::reclaim( for (auto& entry : pool->children_) { auto child = entry.second.lock(); if (child != nullptr) { - const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0); - candidates.push_back(Candidate{std::move(child), reclaimableBytes}); + const auto reclaimableBytesOpt = child->reclaimableBytes(); + if (!reclaimableBytesOpt.has_value()) { + continue; + } + candidates.push_back(Candidate{ + std::move(child), + static_cast(reclaimableBytesOpt.value())}); } } } + struct ReclaimResult { const uint64_t reclaimedBytes{0}; const Stats stats; diff --git a/velox/exec/MemoryReclaimer.h b/velox/exec/MemoryReclaimer.h index 7acc41cbed70..d94799a73d1c 100644 --- a/velox/exec/MemoryReclaimer.h +++ b/velox/exec/MemoryReclaimer.h @@ -25,7 +25,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer { public: virtual ~MemoryReclaimer() = default; - static std::unique_ptr create(); + static std::unique_ptr create(int32_t priority = 0); void enterArbitration() override; @@ -35,7 +35,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer { override; protected: - MemoryReclaimer() = default; + explicit MemoryReclaimer(int32_t priortity); }; /// Provides the parallel memory reclaimer implementation for velox task @@ -46,7 +46,8 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer { virtual ~ParallelMemoryReclaimer() = default; static std::unique_ptr create( - folly::Executor* executor); + folly::Executor* executor, + int32_t priority = 0); uint64_t reclaim( memory::MemoryPool* pool, @@ -55,7 +56,7 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer { Stats& stats) override; protected: - explicit ParallelMemoryReclaimer(folly::Executor* executor); + ParallelMemoryReclaimer(folly::Executor* executor, int32_t priority); folly::Executor* const executor_{nullptr}; }; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 25d355802d25..fe556a6d9d66 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -708,7 +708,7 @@ class Operator : public BaseRuntimeStatWriter { protected: MemoryReclaimer(const std::shared_ptr& driver, Operator* op) - : driver_(driver), op_(op) { + : memory::MemoryReclaimer(0), driver_(driver), op_(op) { VELOX_CHECK_NOT_NULL(op_); } diff --git a/velox/exec/TableWriter.h b/velox/exec/TableWriter.h index b188019362fb..b71f24c14588 100644 --- a/velox/exec/TableWriter.h +++ b/velox/exec/TableWriter.h @@ -178,7 +178,8 @@ class TableWriter : public Operator { const std::shared_ptr& driver, Operator* op) : ParallelMemoryReclaimer( - spillConfig.has_value() ? spillConfig.value().executor : nullptr), + spillConfig.has_value() ? spillConfig.value().executor : nullptr, + 0), canReclaim_(spillConfig.has_value()), driver_(driver), op_(op) {} diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index a97457a0b55a..b055105e2047 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -236,48 +236,6 @@ bool unregisterTaskListener(const std::shared_ptr& listener) { }); } -// static. -std::shared_ptr Task::create( - const std::string& taskId, - core::PlanFragment planFragment, - int destination, - std::shared_ptr queryCtx, - ExecutionMode mode, - Consumer consumer, - std::function onError) { - return Task::create( - taskId, - std::move(planFragment), - destination, - std::move(queryCtx), - mode, - (consumer ? [c = std::move(consumer)]() { return c; } - : ConsumerSupplier{}), - std::move(onError)); -} - -// static -std::shared_ptr Task::create( - const std::string& taskId, - core::PlanFragment planFragment, - int destination, - std::shared_ptr queryCtx, - ExecutionMode mode, - ConsumerSupplier consumerSupplier, - std::function onError) { - auto task = std::shared_ptr(new Task( - taskId, - std::move(planFragment), - destination, - std::move(queryCtx), - mode, - std::move(consumerSupplier), - std::move(onError))); - task->initTaskPool(); - task->addToTaskList(); - return task; -} - Task::Task( const std::string& taskId, core::PlanFragment planFragment, @@ -285,11 +243,13 @@ Task::Task( std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority, std::function onError) : uuid_{makeUuid()}, taskId_(taskId), destination_(destination), mode_(mode), + memoryArbitrationPriority_(memoryArbitrationPriority), queryCtx_(std::move(queryCtx)), planFragment_(std::move(planFragment)), traceConfig_(maybeMakeTraceConfig()), @@ -514,6 +474,7 @@ memory::MemoryPool* Task::getOrAddJoinNodePool( } childPools_.push_back(pool_->addAggregateChild( fmt::format("node.{}", nodeId), createNodeReclaimer([&]() { + // Set join reclaimer lower priority as cost of reclaiming join is high. return HashJoinMemoryReclaimer::create( getHashJoinBridgeLocked(splitGroupId, planNodeId)); }))); @@ -548,7 +509,8 @@ std::unique_ptr Task::createTaskReclaimer() { if (queryCtx_->pool()->reclaimer() == nullptr) { return nullptr; } - return Task::MemoryReclaimer::create(shared_from_this()); + return Task::MemoryReclaimer::create( + shared_from_this(), memoryArbitrationPriority_); } velox::memory::MemoryPool* Task::addOperatorPool( @@ -3023,9 +2985,10 @@ void Task::testingVisitDrivers(const std::function& callback) { } std::unique_ptr Task::MemoryReclaimer::create( - const std::shared_ptr& task) { + const std::shared_ptr& task, + int64_t priority) { return std::unique_ptr( - new Task::MemoryReclaimer(task)); + new Task::MemoryReclaimer(task, priority)); } uint64_t Task::MemoryReclaimer::reclaim( diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 74060b9b5034..4a81b3353f81 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -51,24 +51,29 @@ class Task : public std::enable_shared_from_this { kParallel, }; - /// Creates a task to execute a plan fragment, but doesn't start execution - /// until Task::start() method is called. - /// @param taskId Unique task identifier. - /// @param planFragment Plan fragment. - /// @param destination Partition number if task is expected to receive data - /// for a particular partition from a set of upstream tasks participating in a - /// distributed execution. Used to initialize an ExchangeClient. Ignored if - /// plan fragment doesn't have an ExchangeNode. - /// @param queryCtx Query context containing MemoryPool and MemoryAllocator - /// instances to use for memory allocations during execution, executor to - /// schedule operators on, and session properties. - /// @param mode Execution mode for this task. The task can be executed in - /// Serial and Parallel mode. - /// @param consumer Optional factory function to get callbacks to pass the - /// results of the execution. In a parallel execution mode, results from each - /// thread are passed on to a separate consumer. - /// @param onError Optional callback to receive an exception if task - /// execution fails. +/// Creates a task to execute a plan fragment, but doesn't start execution +/// until Task::start() method is called. +/// @param taskId Unique task identifier. +/// @param planFragment Plan fragment. +/// @param destination Partition number if task is expected to receive data +/// for a particular partition from a set of upstream tasks participating in a +/// distributed execution. Used to initialize an ExchangeClient. Ignored if +/// plan fragment doesn't have an ExchangeNode. +/// @param queryCtx Query context containing MemoryPool and MemoryAllocator +/// instances to use for memory allocations during execution, executor to +/// schedule operators on, and session properties. +/// @param mode Execution mode for this task. The task can be executed in +/// Serial and Parallel mode. +/// @param consumer Optional factory function to get callbacks to pass the +/// results of the execution. In a parallel execution mode, results from each +/// thread are passed on to a separate consumer. +/// @param onError Optional callback to receive an exception if task +/// execution fails. +/// @param memoryArbitrationPriority Optional priority on task that, in a +/// multi task system, is used for memory arbitration to decide the order of +/// reclaiming. +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY + // TODO: Remove this overload once call sites are updated. static std::shared_ptr create( const std::string& taskId, core::PlanFragment planFragment, @@ -76,8 +81,19 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, Consumer consumer = nullptr, - std::function onError = nullptr); + std::function onError = nullptr) { + return Task::create( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + (consumer ? [c = std::move(consumer)]() { return c; } + : ConsumerSupplier{}), + std::move(onError)); + } + // TODO: Remove this overload once call sites are updated. static std::shared_ptr create( const std::string& taskId, core::PlanFragment planFragment, @@ -85,7 +101,68 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, - std::function onError = nullptr); + std::function onError = nullptr) { + auto task = std::shared_ptr(new Task( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + std::move(consumerSupplier), + 0, + std::move(onError))); + task->initTaskPool(); + task->addToTaskList(); + return task; + } +#else + // TODO: Move the definition of this function to the cpp file after above is + // cleaned up. The temporary move of definition from cpp to header is because + // compatibility macro does not work in cpp file. + static std::shared_ptr create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + Consumer consumer = nullptr, + int32_t memoryArbitrationPriority = 0, + std::function onError = nullptr) { + return Task::create( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + (consumer ? [c = std::move(consumer)]() { return c; } + : ConsumerSupplier{}), + memoryArbitrationPriority, + std::move(onError)); + } + + static std::shared_ptr create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority = 0, + std::function onError = nullptr) { + auto task = std::shared_ptr(new Task( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + std::move(consumerSupplier), + memoryArbitrationPriority, + std::move(onError))); + task->initTaskPool(); + task->addToTaskList(); + return task; + } +#endif /// Convenience function for shortening a Presto taskId. To be used /// in debugging messages and listings. @@ -718,6 +795,7 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority = 0, std::function onError = nullptr); // Invoked to add this to the system-wide running task list on task creation. @@ -816,7 +894,8 @@ class Task : public std::enable_shared_from_this { class MemoryReclaimer : public exec::MemoryReclaimer { public: static std::unique_ptr create( - const std::shared_ptr& task); + const std::shared_ptr& task, + int64_t priority = 0); uint64_t reclaim( memory::MemoryPool* pool, @@ -828,7 +907,8 @@ class Task : public std::enable_shared_from_this { override; private: - explicit MemoryReclaimer(const std::shared_ptr& task) : task_(task) { + MemoryReclaimer(const std::shared_ptr& task, int64_t priority) + : exec::MemoryReclaimer(priority), task_(task) { VELOX_CHECK_NOT_NULL(task); } @@ -1017,6 +1097,10 @@ class Task : public std::enable_shared_from_this { // executed in a single mode throughout its lifetime const ExecutionMode mode_; + // In a multi-task system, it is used to make cross task decisions by memory + // arbitration to determine which task to reclaim first. + const int32_t memoryArbitrationPriority_; + std::shared_ptr queryCtx_; core::PlanFragment planFragment_; diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 60de09c2513f..2ffcd78cbd7d 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -18,7 +18,6 @@ #include #include #include "velox/common/base/tests/GTestUtils.h" -// #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/Task.h" #include "velox/exec/tests/utils/LocalExchangeSource.h" diff --git a/velox/exec/tests/MemoryReclaimerTest.cpp b/velox/exec/tests/MemoryReclaimerTest.cpp index 19bed7fe8ef6..2e12003f4bc8 100644 --- a/velox/exec/tests/MemoryReclaimerTest.cpp +++ b/velox/exec/tests/MemoryReclaimerTest.cpp @@ -170,15 +170,18 @@ TEST(ReclaimableSectionGuard, basic) { } namespace { + +using ReclaimCallback = const std::function; + class MockMemoryReclaimer : public memory::MemoryReclaimer { public: static std::unique_ptr create( bool reclaimable, uint64_t memoryBytes, - const std::function& reclaimCallback = - nullptr) { - return std::unique_ptr( - new MockMemoryReclaimer(reclaimable, memoryBytes, reclaimCallback)); + ReclaimCallback& reclaimCallback = nullptr, + int32_t priority = 0) { + return std::unique_ptr(new MockMemoryReclaimer( + reclaimable, memoryBytes, reclaimCallback, priority)); } bool reclaimableBytes(const MemoryPool& pool, uint64_t& reclaimableBytes) @@ -210,18 +213,25 @@ class MockMemoryReclaimer : public memory::MemoryReclaimer { return memoryBytes_; } + int32_t priority() const override { + return priority_; + } + private: MockMemoryReclaimer( bool reclaimable, uint64_t memoryBytes, - const std::function& reclaimCallback) - : reclaimCallback_(reclaimCallback), + const std::function& reclaimCallback, + int32_t priority) + : MemoryReclaimer(priority), + reclaimCallback_(reclaimCallback), + priority_(priority), reclaimable_(reclaimable), memoryBytes_(memoryBytes) {} - const std::function reclaimCallback_; + const ReclaimCallback reclaimCallback_; + const int32_t priority_; bool reclaimable_{false}; - int reclaimCount_{0}; uint64_t memoryBytes_{0}; }; } // namespace @@ -313,3 +323,153 @@ TEST_F(MemoryReclaimerTest, recursiveArbitrationWithParallelReclaim) { } ASSERT_TRUE(reclaimExecuted); } + +TEST_F(MemoryReclaimerTest, reclaimerPriorities) { + auto rootPool = memory::memoryManager()->addRootPool( + "reclaimerPriorities", kMaxMemory, exec::MemoryReclaimer::create()); + std::vector> leafPools; + + const uint32_t kNumChildren = 10; + const uint64_t kPoolMemoryBytes = 1024; + std::vector priorityOrder; + priorityOrder.reserve(kNumChildren); + ReclaimCallback reclaimerCb = [&](MemoryPool* pool) { + auto* mockReclaimer = dynamic_cast(pool->reclaimer()); + ASSERT_TRUE(mockReclaimer != nullptr); + priorityOrder.push_back(mockReclaimer->priority()); + }; + + for (uint32_t i = 0; i < kNumChildren; ++i) { + auto reclaimer = MockMemoryReclaimer::create( + kPoolMemoryBytes, + kPoolMemoryBytes, + reclaimerCb, + static_cast(folly::Random::rand32(10000)) - 5000); + leafPools.push_back(rootPool->addLeafChild( + fmt::format("leaf-{}", i), true, std::move(reclaimer))); + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim(kNumChildren * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(priorityOrder.size(), kNumChildren); + for (uint32_t i = 0; i < priorityOrder.size() - 1; i++) { + ASSERT_LE(priorityOrder[i], priorityOrder[i + 1]); + } +} + +TEST_F(MemoryReclaimerTest, multiLevelReclaimerPriorities) { + // Following tree structure is built with priorities + // rootPool + // ├── serial-aggr-0 (priority: 4) + // │ ├── serial-aggr-0.leaf-0 (priority: 0) + // │ ├── serial-aggr-0.leaf-1 (priority: 0) + // │ ├── serial-aggr-0.leaf-2 (priority: 1) + // │ ├── serial-aggr-0.leaf-3 (priority: 1) + // │ ├── ... + // │ └── serial-aggr-0.leaf-9 (priority: 4) + // ├── parallel-aggr-1 (priority: 3) + // │ ├── parallel-aggr-1.leaf-0 (priority: 0) + // │ ├── parallel-aggr-1.leaf-1 (priority: 0) + // │ ├── parallel-aggr-1.leaf-2 (priority: 1) + // │ ├── parallel-aggr-1.leaf-3 (priority: 1) + // │ ├── ... + // │ └── parallel-aggr-1.leaf-9 (priority: 4) + // ├── serial-aggr-2 (priority: 2) + // │ ├── serial-aggr-2.leaf-0 (priority: 0) + // │ ├── serial-aggr-2.leaf-1 (priority: 0) + // │ ├── serial-aggr-2.leaf-2 (priority: 1) + // │ ├── serial-aggr-2.leaf-3 (priority: 1) + // │ ├── ... + // │ └── serial-aggr-2.leaf-9 (priority: 4) + // └── parallel-aggr-3 (priority: 1) + // ├── parallel-aggr-3.leaf-0 (priority: 0) + // ├── parallel-aggr-3.leaf-1 (priority: 0) + // ├── parallel-aggr-3.leaf-2 (priority: 1) + // ├── parallel-aggr-3.leaf-3 (priority: 1) + // ├── ... + // └── parallel-aggr-3.leaf-9 (priority: 4) + auto rootPool = memory::memoryManager()->addRootPool( + "multiLevelReclaimerPriorities", + 32 << 20, + exec::MemoryReclaimer::create()); + const uint32_t kNumAggrPools = 4; + const uint32_t kNumLeafPerAggr = 10; + const uint64_t kPoolMemoryBytes = 1024; + std::vector> aggrPools; + std::vector> leafPools; + std::mutex reclaimOrderMutex; + std::vector reclaimOrder; + uint32_t poolIdx{0}; + for (uint32_t i = 0; i < kNumAggrPools; i++) { + if (i % 2 == 0) { + aggrPools.push_back(rootPool->addAggregateChild( + fmt::format("serial-aggr-{}", poolIdx++), + exec::MemoryReclaimer::create(kNumAggrPools - i))); + } else { + aggrPools.push_back(rootPool->addAggregateChild( + fmt::format("parallel-aggr-{}", poolIdx++), + exec::ParallelMemoryReclaimer::create( + executor_.get(), kNumAggrPools - i))); + } + auto& aggrPool = aggrPools.back(); + const auto aggrPoolName = aggrPool->name(); + for (uint32_t j = 0; j < kNumLeafPerAggr; j++) { + leafPools.push_back(aggrPools.back()->addLeafChild( + fmt::format("{}.leaf-{}", aggrPoolName, j), + true, + MockMemoryReclaimer::create( + kPoolMemoryBytes, + kPoolMemoryBytes, + [&](MemoryPool* pool) { + std::lock_guard l(reclaimOrderMutex); + reclaimOrder.push_back(pool); + }, + j / 2))); + } + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim( + kNumAggrPools * kNumLeafPerAggr * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(reclaimOrder.size(), kNumAggrPools * kNumLeafPerAggr); + for (uint32_t i = 0; i < kNumAggrPools; i++) { + uint32_t start = i * kNumLeafPerAggr; + const auto poolName = reclaimOrder[start]->name(); + bool isParallel = false; + switch (i) { + case 0: + ASSERT_TRUE(poolName.find("parallel-aggr-3") != std::string::npos); + isParallel = true; + break; + case 1: + ASSERT_TRUE(poolName.find("serial-aggr-2") != std::string::npos); + break; + case 2: + ASSERT_TRUE(poolName.find("parallel-aggr-1") != std::string::npos); + isParallel = true; + break; + case 3: + ASSERT_TRUE(poolName.find("serial-aggr-0") != std::string::npos); + break; + default: + FAIL(); + } + for (uint32_t j = start; j < start + kNumLeafPerAggr - 1; j++) { + if (isParallel) { + // Priority is not applicable to parallel reclaimer. + continue; + } + const auto* firstReclaimer = + dynamic_cast(reclaimOrder[j]->reclaimer()); + ASSERT_TRUE(firstReclaimer != nullptr); + const auto* secondReclaimer = + dynamic_cast(reclaimOrder[j + 1]->reclaimer()); + ASSERT_TRUE(secondReclaimer != nullptr); + ASSERT_LE(firstReclaimer->priority(), secondReclaimer->priority()); + } + } +} diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.h b/velox/exec/tests/utils/ArbitratorTestUtil.h index 748cb642f515..42ad7d468163 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.h +++ b/velox/exec/tests/utils/ArbitratorTestUtil.h @@ -36,7 +36,7 @@ constexpr uint64_t kMemoryPoolInitCapacity = 16 * MB; class FakeMemoryReclaimer : public exec::MemoryReclaimer { public: - FakeMemoryReclaimer() = default; + FakeMemoryReclaimer() : exec::MemoryReclaimer(0) {} static std::unique_ptr create() { return std::make_unique(); diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index ad7b4133c6c7..eaa098997f15 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -243,6 +243,7 @@ class MultiThreadedTaskCursor : public TaskCursorBase { copy->copy(vector.get(), 0, 0, vector->size()); return queue->enqueue(std::move(copy), future); }, + 0, [queue](std::exception_ptr) { // onError close the queue to unblock producers and consumers. // moveNext will handle rethrowing the error once it's diff --git a/velox/runner/LocalRunner.cpp b/velox/runner/LocalRunner.cpp index d18a32528416..6e173a1a4e80 100644 --- a/velox/runner/LocalRunner.cpp +++ b/velox/runner/LocalRunner.cpp @@ -166,6 +166,7 @@ LocalRunner::makeStages() { params_.queryCtx, exec::Task::ExecutionMode::kParallel, consumer, + 0, onError); stages_.back().push_back(task); if (fragment.numBroadcastDestinations) {