diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index eac6d149f4856..769630bf8bd32 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -174,8 +174,8 @@ void MemoryArbitrator::unregisterFactory(const std::string& kind) { return pool->shrink(targetBytes); } -std::unique_ptr MemoryReclaimer::create() { - return std::unique_ptr(new MemoryReclaimer()); +std::unique_ptr MemoryReclaimer::create(int32_t priority) { + return std::unique_ptr(new MemoryReclaimer(priority)); } // static @@ -234,8 +234,9 @@ uint64_t MemoryReclaimer::reclaim( return 0; } - // Sort the child pools based on their reserved memory and reclaim from the - // child pool with most reservation first. + // Sort the child pools based on their reclaimer priority and reserved memory. + // Reclaim from the child pool with highest priority and most reservation + // first. struct Candidate { std::shared_ptr pool; int64_t reclaimableBytes; @@ -247,8 +248,13 @@ uint64_t MemoryReclaimer::reclaim( for (auto& entry : pool->children_) { auto child = entry.second.lock(); if (child != nullptr) { - const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0); - candidates.push_back(Candidate{std::move(child), reclaimableBytes}); + const auto reclaimableBytesOpt = child->reclaimableBytes(); + if (!reclaimableBytesOpt.has_value()) { + continue; + } + candidates.push_back(Candidate{ + std::move(child), + static_cast(reclaimableBytesOpt.value())}); } } } @@ -257,7 +263,12 @@ uint64_t MemoryReclaimer::reclaim( candidates.begin(), candidates.end(), [](const auto& lhs, const auto& rhs) { - return lhs.reclaimableBytes > rhs.reclaimableBytes; + const auto lhsPrio = lhs.pool->reclaimer()->priority(); + const auto rhsPrio = rhs.pool->reclaimer()->priority(); + if (lhsPrio == rhsPrio) { + return lhs.reclaimableBytes > rhs.reclaimableBytes; + } + return lhsPrio < rhsPrio; }); uint64_t reclaimedBytes{0}; diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 969fbb0fa5a97..09f4953573003 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -287,7 +287,7 @@ class MemoryReclaimer { virtual ~MemoryReclaimer() = default; - static std::unique_ptr create(); + static std::unique_ptr create(int32_t priority = 0); /// Invoked memory reclaim function from 'pool' and record execution 'stats'. static uint64_t run(const std::function& func, Stats& stats); @@ -309,6 +309,26 @@ class MemoryReclaimer { /// enterArbitration has been called. virtual void leaveArbitration() noexcept {} + /// Invoked by upper layer reclaimer, to return the priority of this + /// reclaimer. The priority determines the reclaiming order of self among all + /// same level reclaimers. The smaller the number, the higher the priority. + /// Consider the following memory pool & reclaimer structure: + /// + /// rec1(pri 1) + /// / \ + /// / \ + /// / \ + /// rec2(pri 1) rec3(pri 3) + /// / \ / \ + /// / \ / \ + /// rec4(pri 1) rec5(pri 0) rec6(pri 0) rec7(pri 1) + /// + /// The reclaiming traversing order will be rec1 -> rec2 -> rec5 -> rec4 -> + /// rec3 -> rec6 -> rec7 + virtual int32_t priority() const { + return priority_; + }; + /// Invoked by the memory arbitrator to get the amount of memory bytes that /// can be reclaimed from 'pool'. The function returns true if 'pool' is /// reclaimable and returns the estimated reclaimable bytes in @@ -343,7 +363,10 @@ class MemoryReclaimer { virtual void abort(MemoryPool* pool, const std::exception_ptr& error); protected: - MemoryReclaimer() = default; + explicit MemoryReclaimer(int32_t priority) : priority_(priority){}; + + private: + const int32_t priority_; }; /// Helper class used to measure the memory bytes reclaimed from a memory pool diff --git a/velox/common/memory/tests/ArbitrationParticipantTest.cpp b/velox/common/memory/tests/ArbitrationParticipantTest.cpp index 5bb23d2c37292..c72bb476dea7d 100644 --- a/velox/common/memory/tests/ArbitrationParticipantTest.cpp +++ b/velox/common/memory/tests/ArbitrationParticipantTest.cpp @@ -131,7 +131,8 @@ class MockTask : public std::enable_shared_from_this { class RootMemoryReclaimer : public memory::MemoryReclaimer { public: - RootMemoryReclaimer(const std::shared_ptr& task) : task_(task) {} + RootMemoryReclaimer(const std::shared_ptr& task) + : memory::MemoryReclaimer(0), task_(task) {} static std::unique_ptr create( const std::shared_ptr& task) { @@ -179,7 +180,8 @@ class MockTask : public std::enable_shared_from_this { bool reclaimable, ReclaimInjectionCallback reclaimInjectCb = nullptr, ArbitrationInjectionCallback arbitrationInjectCb = nullptr) - : task_(task), + : memory::MemoryReclaimer(0), + task_(task), reclaimable_(reclaimable), reclaimInjectCb_(std::move(reclaimInjectCb)), arbitrationInjectCb_(std::move(arbitrationInjectCb)) {} diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 1befc83a1a8fd..bdbf0cc5b3e65 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -505,7 +505,8 @@ class MockLeafMemoryReclaimer : public MemoryReclaimer { std::atomic& totalUsedBytes, bool reclaimable = true, bool* underArbitration = nullptr) - : reclaimable_(reclaimable), + : MemoryReclaimer(0), + reclaimable_(reclaimable), underArbitration_(underArbitration), totalUsedBytes_(totalUsedBytes) {} diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index 28e7732624a4d..648f29c60c2b9 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -3572,7 +3572,8 @@ class MockMemoryReclaimer : public MemoryReclaimer { } private: - explicit MockMemoryReclaimer(bool doThrow) : doThrow_(doThrow) {} + explicit MockMemoryReclaimer(bool doThrow) + : MemoryReclaimer(0), doThrow_(doThrow) {} const bool doThrow_; }; diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 61efea38717b1..6573709a70275 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -92,7 +92,8 @@ class MockTask : public std::enable_shared_from_this { class MemoryReclaimer : public memory::MemoryReclaimer { public: - MemoryReclaimer(const std::shared_ptr& task) : task_(task) {} + MemoryReclaimer(const std::shared_ptr& task) + : memory::MemoryReclaimer(0), task_(task) {} static std::unique_ptr create( const std::shared_ptr& task) { @@ -178,7 +179,8 @@ class MockMemoryOperator { bool reclaimable, ReclaimInjectionCallback reclaimInjectCb = nullptr, ArbitrationInjectionCallback arbitrationInjectCb = nullptr) - : op_(op), + : memory::MemoryReclaimer(0), + op_(op), reclaimable_(reclaimable), reclaimInjectCb_(std::move(reclaimInjectCb)), arbitrationInjectCb_(std::move(arbitrationInjectCb)) {} diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 5098bccbcd840..369fe3cd03fba 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -1125,7 +1125,7 @@ std::unique_ptr HiveDataSink::WriterReclaimer::create( HiveWriterInfo* writerInfo, io::IoStatistics* ioStats) { return std::unique_ptr( - new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats)); + new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats, 0)); } bool HiveDataSink::WriterReclaimer::reclaimableBytes( diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index f0d31f7654183..f54ef09c5b288 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -493,8 +493,9 @@ class HiveDataSink : public DataSink { WriterReclaimer( HiveDataSink* dataSink, HiveWriterInfo* writerInfo, - io::IoStatistics* ioStats) - : exec::MemoryReclaimer(), + io::IoStatistics* ioStats, + int32_t priority) + : exec::MemoryReclaimer(priority), dataSink_(dataSink), writerInfo_(writerInfo), ioStats_(ioStats) { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 5e89ccc4c9077..88ce04b1b9a00 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -170,8 +170,9 @@ class QueryCtx : public std::enable_shared_from_this { protected: MemoryReclaimer( const std::shared_ptr& queryCtx, - memory::MemoryPool* pool) - : queryCtx_(queryCtx), pool_(pool) { + memory::MemoryPool* pool, + int32_t priority = 0) + : memory::MemoryReclaimer(priority), queryCtx_(queryCtx), pool_(pool) { VELOX_CHECK_NOT_NULL(pool_); } diff --git a/velox/dwio/common/SortingWriter.cpp b/velox/dwio/common/SortingWriter.cpp index 00bab4ee5360f..5f82c66eed93b 100644 --- a/velox/dwio/common/SortingWriter.cpp +++ b/velox/dwio/common/SortingWriter.cpp @@ -154,8 +154,10 @@ vector_size_t SortingWriter::outputBatchRows() { } std::unique_ptr SortingWriter::MemoryReclaimer::create( - SortingWriter* writer) { - return std::unique_ptr(new MemoryReclaimer(writer)); + SortingWriter* writer, + int32_t priority) { + return std::unique_ptr( + new MemoryReclaimer(writer, priority)); } bool SortingWriter::MemoryReclaimer::reclaimableBytes( diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index a7edd69ed8e28..fdaa7669d0190 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -50,7 +50,8 @@ class SortingWriter : public Writer { class MemoryReclaimer : public exec::MemoryReclaimer { public: static std::unique_ptr create( - SortingWriter* writer); + SortingWriter* writer, + int32_t priority = 0); bool reclaimableBytes( const memory::MemoryPool& pool, @@ -63,8 +64,8 @@ class SortingWriter : public Writer { memory::MemoryReclaimer::Stats& stats) override; private: - explicit MemoryReclaimer(SortingWriter* writer) - : exec::MemoryReclaimer(), + MemoryReclaimer(SortingWriter* writer, int32_t priority) + : exec::MemoryReclaimer(priority), writer_(writer), canReclaim_(writer_->sortBuffer_->canSpill()) {} diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index e63666190e36a..06f2eeb84b8f2 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -715,9 +715,10 @@ void Writer::abort() { } std::unique_ptr Writer::MemoryReclaimer::create( - Writer* writer) { + Writer* writer, + int32_t priority) { return std::unique_ptr( - new Writer::MemoryReclaimer(writer)); + new Writer::MemoryReclaimer(writer, priority)); } bool Writer::MemoryReclaimer::reclaimableBytes( diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 8cc82303e10fd..43cd4225bc875 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -150,7 +150,9 @@ class Writer : public dwio::common::Writer { private: class MemoryReclaimer : public exec::MemoryReclaimer { public: - static std::unique_ptr create(Writer* writer); + static std::unique_ptr create( + Writer* writer, + int32_t priority = 0); bool reclaimableBytes( const memory::MemoryPool& pool, @@ -163,7 +165,8 @@ class Writer : public dwio::common::Writer { memory::MemoryReclaimer::Stats& stats) override; private: - explicit MemoryReclaimer(Writer* writer) : writer_(writer) { + MemoryReclaimer(Writer* writer, int32_t priority) + : exec::MemoryReclaimer(priority), writer_(writer) { VELOX_CHECK_NOT_NULL(writer_); } diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index 40ba021988f67..d59c564cc7af1 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -174,9 +174,10 @@ bool isLeftNullAwareJoinWithFilter( class HashJoinMemoryReclaimer final : public MemoryReclaimer { public: static std::unique_ptr create( - std::shared_ptr joinBridge) { + std::shared_ptr joinBridge, + int32_t priority = 0) { return std::unique_ptr( - new HashJoinMemoryReclaimer(joinBridge)); + new HashJoinMemoryReclaimer(joinBridge, priority)); } uint64_t reclaim( @@ -186,9 +187,10 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer { memory::MemoryReclaimer::Stats& stats) final; private: - explicit HashJoinMemoryReclaimer( - const std::shared_ptr& joinBridge) - : MemoryReclaimer(), joinBridge_(joinBridge) {} + HashJoinMemoryReclaimer( + const std::shared_ptr& joinBridge, + int32_t priority) + : MemoryReclaimer(priority), joinBridge_(joinBridge) {} std::weak_ptr joinBridge_; }; diff --git a/velox/exec/MemoryReclaimer.cpp b/velox/exec/MemoryReclaimer.cpp index 4fa91f69433df..4aee429883db7 100644 --- a/velox/exec/MemoryReclaimer.cpp +++ b/velox/exec/MemoryReclaimer.cpp @@ -20,8 +20,13 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { -std::unique_ptr MemoryReclaimer::create() { - return std::unique_ptr(new MemoryReclaimer()); +MemoryReclaimer::MemoryReclaimer(int32_t priority) + : memory::MemoryReclaimer(priority) {} + +std::unique_ptr MemoryReclaimer::create( + int32_t priority) { + return std::unique_ptr( + new MemoryReclaimer(priority)); } void MemoryReclaimer::enterArbitration() { @@ -67,13 +72,15 @@ void MemoryReclaimer::abort( } /*static*/ std::unique_ptr -ParallelMemoryReclaimer::create(folly::Executor* executor) { +ParallelMemoryReclaimer::create(folly::Executor* executor, int32_t priority) { return std::unique_ptr( - new ParallelMemoryReclaimer(executor)); + new ParallelMemoryReclaimer(executor, priority)); } -ParallelMemoryReclaimer::ParallelMemoryReclaimer(folly::Executor* executor) - : executor_(executor) {} +ParallelMemoryReclaimer::ParallelMemoryReclaimer( + folly::Executor* executor, + int32_t priority) + : MemoryReclaimer(priority), executor_(executor) {} uint64_t ParallelMemoryReclaimer::reclaim( memory::MemoryPool* pool, @@ -85,8 +92,7 @@ uint64_t ParallelMemoryReclaimer::reclaim( pool, targetBytes, maxWaitMs, stats); } - // Sort the child pools based on their reserved memory and reclaim from the - // child pool with most reservation first. + // Sort candidates based on priority. struct Candidate { std::shared_ptr pool; int64_t reclaimableBytes; @@ -98,11 +104,17 @@ uint64_t ParallelMemoryReclaimer::reclaim( for (auto& entry : pool->children_) { auto child = entry.second.lock(); if (child != nullptr) { - const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0); - candidates.push_back(Candidate{std::move(child), reclaimableBytes}); + const auto reclaimableBytesOpt = child->reclaimableBytes(); + if (!reclaimableBytesOpt.has_value()) { + continue; + } + candidates.push_back(Candidate{ + std::move(child), + static_cast(reclaimableBytesOpt.value())}); } } } + struct ReclaimResult { const uint64_t reclaimedBytes{0}; const Stats stats; diff --git a/velox/exec/MemoryReclaimer.h b/velox/exec/MemoryReclaimer.h index 7acc41cbed70d..d94799a73d1c0 100644 --- a/velox/exec/MemoryReclaimer.h +++ b/velox/exec/MemoryReclaimer.h @@ -25,7 +25,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer { public: virtual ~MemoryReclaimer() = default; - static std::unique_ptr create(); + static std::unique_ptr create(int32_t priority = 0); void enterArbitration() override; @@ -35,7 +35,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer { override; protected: - MemoryReclaimer() = default; + explicit MemoryReclaimer(int32_t priortity); }; /// Provides the parallel memory reclaimer implementation for velox task @@ -46,7 +46,8 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer { virtual ~ParallelMemoryReclaimer() = default; static std::unique_ptr create( - folly::Executor* executor); + folly::Executor* executor, + int32_t priority = 0); uint64_t reclaim( memory::MemoryPool* pool, @@ -55,7 +56,7 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer { Stats& stats) override; protected: - explicit ParallelMemoryReclaimer(folly::Executor* executor); + ParallelMemoryReclaimer(folly::Executor* executor, int32_t priority); folly::Executor* const executor_{nullptr}; }; diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 5ee37bd298149..b014d6b3470e8 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -633,9 +633,10 @@ void OperatorStats::clear() { std::unique_ptr Operator::MemoryReclaimer::create( DriverCtx* driverCtx, - Operator* op) { - return std::unique_ptr( - new Operator::MemoryReclaimer(driverCtx->driver->shared_from_this(), op)); + Operator* op, + int32_t priority) { + return std::unique_ptr(new Operator::MemoryReclaimer( + driverCtx->driver->shared_from_this(), op, priority)); } void Operator::MemoryReclaimer::enterArbitration() { diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 25d355802d25a..a89e42d7a11af 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -685,9 +685,8 @@ class Operator : public BaseRuntimeStatWriter { class MemoryReclaimer : public memory::MemoryReclaimer { public: - static std::unique_ptr create( - DriverCtx* driverCtx, - Operator* op); + static std::unique_ptr + create(DriverCtx* driverCtx, Operator* op, int32_t priority = 0); void enterArbitration() override; @@ -707,8 +706,11 @@ class Operator : public BaseRuntimeStatWriter { override; protected: - MemoryReclaimer(const std::shared_ptr& driver, Operator* op) - : driver_(driver), op_(op) { + MemoryReclaimer( + const std::shared_ptr& driver, + Operator* op, + int32_t priority) + : memory::MemoryReclaimer(priority), driver_(driver), op_(op) { VELOX_CHECK_NOT_NULL(op_); } diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 4899e4b4ebf89..330c1b76d3002 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -308,10 +308,11 @@ std::unique_ptr TableWriter::ConnectorReclaimer::create( const std::optional& spillConfig, DriverCtx* driverCtx, - Operator* op) { + Operator* op, + int32_t priority) { return std::unique_ptr( new TableWriter::ConnectorReclaimer( - spillConfig, driverCtx->driver->shared_from_this(), op)); + spillConfig, driverCtx->driver->shared_from_this(), op, priority)); } bool TableWriter::ConnectorReclaimer::reclaimableBytes( diff --git a/velox/exec/TableWriter.h b/velox/exec/TableWriter.h index b188019362fbd..9758286154ca7 100644 --- a/velox/exec/TableWriter.h +++ b/velox/exec/TableWriter.h @@ -149,7 +149,8 @@ class TableWriter : public Operator { static std::unique_ptr create( const std::optional& spillConfig, DriverCtx* driverCtx, - Operator* op); + Operator* op, + int32_t priority = 0); void enterArbitration() override {} @@ -176,9 +177,11 @@ class TableWriter : public Operator { ConnectorReclaimer( const std::optional& spillConfig, const std::shared_ptr& driver, - Operator* op) + Operator* op, + int32_t priority) : ParallelMemoryReclaimer( - spillConfig.has_value() ? spillConfig.value().executor : nullptr), + spillConfig.has_value() ? spillConfig.value().executor : nullptr, + priority), canReclaim_(spillConfig.has_value()), driver_(driver), op_(op) {} diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 7b10948b53a08..6e07f87033691 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -244,6 +244,7 @@ std::shared_ptr Task::create( std::shared_ptr queryCtx, ExecutionMode mode, Consumer consumer, + int32_t memoryArbitrationPriority, std::function onError) { return Task::create( taskId, @@ -253,7 +254,8 @@ std::shared_ptr Task::create( mode, (consumer ? [c = std::move(consumer)]() { return c; } : ConsumerSupplier{}), - std::move(onError)); + std::move(onError), + memoryArbitrationPriority); } // static @@ -264,6 +266,7 @@ std::shared_ptr Task::create( std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority, std::function onError) { auto task = std::shared_ptr(new Task( taskId, @@ -272,12 +275,61 @@ std::shared_ptr Task::create( std::move(queryCtx), mode, std::move(consumerSupplier), + memoryArbitrationPriority, std::move(onError))); task->initTaskPool(); task->addToTaskList(); return task; } +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY +// TODO: Remove after callsites is cleaned up. +std::shared_ptr Task::create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + Consumer consumer, + std::function onError, + int32_t memoryArbitrationPriority) { + return Task::create( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + (consumer ? [c = std::move(consumer)]() { return c; } + : ConsumerSupplier{}), + std::move(onError), + memoryArbitrationPriority); +} + +// TODO: Remove after callsites is cleaned up. +std::shared_ptr Task::create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + ConsumerSupplier consumerSupplier, + std::function onError, + int32_t memoryArbitrationPriority) { + auto task = std::shared_ptr(new Task( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + std::move(consumerSupplier), + memoryArbitrationPriority, + std::move(onError))); + task->initTaskPool(); + task->addToTaskList(); + return task; +} +#endif + Task::Task( const std::string& taskId, core::PlanFragment planFragment, @@ -285,11 +337,13 @@ Task::Task( std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority, std::function onError) : uuid_{makeUuid()}, taskId_(taskId), destination_(destination), mode_(mode), + memoryArbitrationPriority_(memoryArbitrationPriority), queryCtx_(std::move(queryCtx)), planFragment_(std::move(planFragment)), traceConfig_(maybeMakeTraceConfig()), @@ -514,6 +568,7 @@ memory::MemoryPool* Task::getOrAddJoinNodePool( } childPools_.push_back(pool_->addAggregateChild( fmt::format("node.{}", nodeId), createNodeReclaimer([&]() { + // Set join reclaimer lower priority as cost of reclaiming join is high. return HashJoinMemoryReclaimer::create( getHashJoinBridgeLocked(splitGroupId, planNodeId)); }))); @@ -548,7 +603,8 @@ std::unique_ptr Task::createTaskReclaimer() { if (queryCtx_->pool()->reclaimer() == nullptr) { return nullptr; } - return Task::MemoryReclaimer::create(shared_from_this()); + return Task::MemoryReclaimer::create( + shared_from_this(), memoryArbitrationPriority_); } velox::memory::MemoryPool* Task::addOperatorPool( @@ -2984,9 +3040,10 @@ void Task::testingVisitDrivers(const std::function& callback) { } std::unique_ptr Task::MemoryReclaimer::create( - const std::shared_ptr& task) { + const std::shared_ptr& task, + int64_t priority) { return std::unique_ptr( - new Task::MemoryReclaimer(task)); + new Task::MemoryReclaimer(task, priority)); } uint64_t Task::MemoryReclaimer::reclaim( diff --git a/velox/exec/Task.h b/velox/exec/Task.h index d205b15178aff..8d854f7348cdd 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -69,6 +69,9 @@ class Task : public std::enable_shared_from_this { /// thread are passed on to a separate consumer. /// @param onError Optional callback to receive an exception if task /// execution fails. + /// @param memoryArbitrationPriority Optional priority on task that, in a + /// multi task system, is used for memory arbitration to decide the order of + /// reclaiming. static std::shared_ptr create( const std::string& taskId, core::PlanFragment planFragment, @@ -76,6 +79,7 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, Consumer consumer = nullptr, + int32_t memoryArbitrationPriority = 0, std::function onError = nullptr); static std::shared_ptr create( @@ -85,8 +89,33 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority = 0, std::function onError = nullptr); +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY + // TODO: Remove this overload once call sites are updated. + static std::shared_ptr create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + Consumer consumer = nullptr, + std::function onError = nullptr, + int32_t memoryArbitrationPriority = 0); + + // TODO: Remove this overload once call sites are updated. + static std::shared_ptr create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + ConsumerSupplier consumerSupplier, + std::function onError = nullptr, + int32_t memoryArbitrationPriority = 0); +#endif + /// Convenience function for shortening a Presto taskId. To be used /// in debugging messages and listings. static std::string shortId(const std::string& id); @@ -713,6 +742,7 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority = 0, std::function onError = nullptr); // Invoked to add this to the system-wide running task list on task creation. @@ -811,7 +841,8 @@ class Task : public std::enable_shared_from_this { class MemoryReclaimer : public exec::MemoryReclaimer { public: static std::unique_ptr create( - const std::shared_ptr& task); + const std::shared_ptr& task, + int64_t priority = 0); uint64_t reclaim( memory::MemoryPool* pool, @@ -823,7 +854,8 @@ class Task : public std::enable_shared_from_this { override; private: - explicit MemoryReclaimer(const std::shared_ptr& task) : task_(task) { + MemoryReclaimer(const std::shared_ptr& task, int64_t priority) + : exec::MemoryReclaimer(priority), task_(task) { VELOX_CHECK_NOT_NULL(task); } @@ -1010,6 +1042,10 @@ class Task : public std::enable_shared_from_this { // executed in a single mode throughout its lifetime const ExecutionMode mode_; + // In a multi-task system, it is used to make cross task decisions by memory + // arbitration to determine which task to reclaim first. + const int32_t memoryArbitrationPriority_; + std::shared_ptr queryCtx_; core::PlanFragment planFragment_; diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 60de09c2513f4..2ffcd78cbd7da 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -18,7 +18,6 @@ #include #include #include "velox/common/base/tests/GTestUtils.h" -// #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/Task.h" #include "velox/exec/tests/utils/LocalExchangeSource.h" diff --git a/velox/exec/tests/MemoryReclaimerTest.cpp b/velox/exec/tests/MemoryReclaimerTest.cpp index 19bed7fe8ef6b..2e12003f4bc8d 100644 --- a/velox/exec/tests/MemoryReclaimerTest.cpp +++ b/velox/exec/tests/MemoryReclaimerTest.cpp @@ -170,15 +170,18 @@ TEST(ReclaimableSectionGuard, basic) { } namespace { + +using ReclaimCallback = const std::function; + class MockMemoryReclaimer : public memory::MemoryReclaimer { public: static std::unique_ptr create( bool reclaimable, uint64_t memoryBytes, - const std::function& reclaimCallback = - nullptr) { - return std::unique_ptr( - new MockMemoryReclaimer(reclaimable, memoryBytes, reclaimCallback)); + ReclaimCallback& reclaimCallback = nullptr, + int32_t priority = 0) { + return std::unique_ptr(new MockMemoryReclaimer( + reclaimable, memoryBytes, reclaimCallback, priority)); } bool reclaimableBytes(const MemoryPool& pool, uint64_t& reclaimableBytes) @@ -210,18 +213,25 @@ class MockMemoryReclaimer : public memory::MemoryReclaimer { return memoryBytes_; } + int32_t priority() const override { + return priority_; + } + private: MockMemoryReclaimer( bool reclaimable, uint64_t memoryBytes, - const std::function& reclaimCallback) - : reclaimCallback_(reclaimCallback), + const std::function& reclaimCallback, + int32_t priority) + : MemoryReclaimer(priority), + reclaimCallback_(reclaimCallback), + priority_(priority), reclaimable_(reclaimable), memoryBytes_(memoryBytes) {} - const std::function reclaimCallback_; + const ReclaimCallback reclaimCallback_; + const int32_t priority_; bool reclaimable_{false}; - int reclaimCount_{0}; uint64_t memoryBytes_{0}; }; } // namespace @@ -313,3 +323,153 @@ TEST_F(MemoryReclaimerTest, recursiveArbitrationWithParallelReclaim) { } ASSERT_TRUE(reclaimExecuted); } + +TEST_F(MemoryReclaimerTest, reclaimerPriorities) { + auto rootPool = memory::memoryManager()->addRootPool( + "reclaimerPriorities", kMaxMemory, exec::MemoryReclaimer::create()); + std::vector> leafPools; + + const uint32_t kNumChildren = 10; + const uint64_t kPoolMemoryBytes = 1024; + std::vector priorityOrder; + priorityOrder.reserve(kNumChildren); + ReclaimCallback reclaimerCb = [&](MemoryPool* pool) { + auto* mockReclaimer = dynamic_cast(pool->reclaimer()); + ASSERT_TRUE(mockReclaimer != nullptr); + priorityOrder.push_back(mockReclaimer->priority()); + }; + + for (uint32_t i = 0; i < kNumChildren; ++i) { + auto reclaimer = MockMemoryReclaimer::create( + kPoolMemoryBytes, + kPoolMemoryBytes, + reclaimerCb, + static_cast(folly::Random::rand32(10000)) - 5000); + leafPools.push_back(rootPool->addLeafChild( + fmt::format("leaf-{}", i), true, std::move(reclaimer))); + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim(kNumChildren * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(priorityOrder.size(), kNumChildren); + for (uint32_t i = 0; i < priorityOrder.size() - 1; i++) { + ASSERT_LE(priorityOrder[i], priorityOrder[i + 1]); + } +} + +TEST_F(MemoryReclaimerTest, multiLevelReclaimerPriorities) { + // Following tree structure is built with priorities + // rootPool + // ├── serial-aggr-0 (priority: 4) + // │ ├── serial-aggr-0.leaf-0 (priority: 0) + // │ ├── serial-aggr-0.leaf-1 (priority: 0) + // │ ├── serial-aggr-0.leaf-2 (priority: 1) + // │ ├── serial-aggr-0.leaf-3 (priority: 1) + // │ ├── ... + // │ └── serial-aggr-0.leaf-9 (priority: 4) + // ├── parallel-aggr-1 (priority: 3) + // │ ├── parallel-aggr-1.leaf-0 (priority: 0) + // │ ├── parallel-aggr-1.leaf-1 (priority: 0) + // │ ├── parallel-aggr-1.leaf-2 (priority: 1) + // │ ├── parallel-aggr-1.leaf-3 (priority: 1) + // │ ├── ... + // │ └── parallel-aggr-1.leaf-9 (priority: 4) + // ├── serial-aggr-2 (priority: 2) + // │ ├── serial-aggr-2.leaf-0 (priority: 0) + // │ ├── serial-aggr-2.leaf-1 (priority: 0) + // │ ├── serial-aggr-2.leaf-2 (priority: 1) + // │ ├── serial-aggr-2.leaf-3 (priority: 1) + // │ ├── ... + // │ └── serial-aggr-2.leaf-9 (priority: 4) + // └── parallel-aggr-3 (priority: 1) + // ├── parallel-aggr-3.leaf-0 (priority: 0) + // ├── parallel-aggr-3.leaf-1 (priority: 0) + // ├── parallel-aggr-3.leaf-2 (priority: 1) + // ├── parallel-aggr-3.leaf-3 (priority: 1) + // ├── ... + // └── parallel-aggr-3.leaf-9 (priority: 4) + auto rootPool = memory::memoryManager()->addRootPool( + "multiLevelReclaimerPriorities", + 32 << 20, + exec::MemoryReclaimer::create()); + const uint32_t kNumAggrPools = 4; + const uint32_t kNumLeafPerAggr = 10; + const uint64_t kPoolMemoryBytes = 1024; + std::vector> aggrPools; + std::vector> leafPools; + std::mutex reclaimOrderMutex; + std::vector reclaimOrder; + uint32_t poolIdx{0}; + for (uint32_t i = 0; i < kNumAggrPools; i++) { + if (i % 2 == 0) { + aggrPools.push_back(rootPool->addAggregateChild( + fmt::format("serial-aggr-{}", poolIdx++), + exec::MemoryReclaimer::create(kNumAggrPools - i))); + } else { + aggrPools.push_back(rootPool->addAggregateChild( + fmt::format("parallel-aggr-{}", poolIdx++), + exec::ParallelMemoryReclaimer::create( + executor_.get(), kNumAggrPools - i))); + } + auto& aggrPool = aggrPools.back(); + const auto aggrPoolName = aggrPool->name(); + for (uint32_t j = 0; j < kNumLeafPerAggr; j++) { + leafPools.push_back(aggrPools.back()->addLeafChild( + fmt::format("{}.leaf-{}", aggrPoolName, j), + true, + MockMemoryReclaimer::create( + kPoolMemoryBytes, + kPoolMemoryBytes, + [&](MemoryPool* pool) { + std::lock_guard l(reclaimOrderMutex); + reclaimOrder.push_back(pool); + }, + j / 2))); + } + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim( + kNumAggrPools * kNumLeafPerAggr * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(reclaimOrder.size(), kNumAggrPools * kNumLeafPerAggr); + for (uint32_t i = 0; i < kNumAggrPools; i++) { + uint32_t start = i * kNumLeafPerAggr; + const auto poolName = reclaimOrder[start]->name(); + bool isParallel = false; + switch (i) { + case 0: + ASSERT_TRUE(poolName.find("parallel-aggr-3") != std::string::npos); + isParallel = true; + break; + case 1: + ASSERT_TRUE(poolName.find("serial-aggr-2") != std::string::npos); + break; + case 2: + ASSERT_TRUE(poolName.find("parallel-aggr-1") != std::string::npos); + isParallel = true; + break; + case 3: + ASSERT_TRUE(poolName.find("serial-aggr-0") != std::string::npos); + break; + default: + FAIL(); + } + for (uint32_t j = start; j < start + kNumLeafPerAggr - 1; j++) { + if (isParallel) { + // Priority is not applicable to parallel reclaimer. + continue; + } + const auto* firstReclaimer = + dynamic_cast(reclaimOrder[j]->reclaimer()); + ASSERT_TRUE(firstReclaimer != nullptr); + const auto* secondReclaimer = + dynamic_cast(reclaimOrder[j + 1]->reclaimer()); + ASSERT_TRUE(secondReclaimer != nullptr); + ASSERT_LE(firstReclaimer->priority(), secondReclaimer->priority()); + } + } +} diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.h b/velox/exec/tests/utils/ArbitratorTestUtil.h index 748cb642f5155..42ad7d468163f 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.h +++ b/velox/exec/tests/utils/ArbitratorTestUtil.h @@ -36,7 +36,7 @@ constexpr uint64_t kMemoryPoolInitCapacity = 16 * MB; class FakeMemoryReclaimer : public exec::MemoryReclaimer { public: - FakeMemoryReclaimer() = default; + FakeMemoryReclaimer() : exec::MemoryReclaimer(0) {} static std::unique_ptr create() { return std::make_unique();