From 078c12a2cc625c840c8c43304561bdb436c92c7c Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Tue, 3 Dec 2024 15:40:43 -0800 Subject: [PATCH] feat: Add priority based memory reclaim framework (#11598) Summary: * Adds priority base reclaiming to memory reclaim framework. The priority determines which memory pool to reclaim first on the same level. This would help to make reclaim more application logic aware. * Make join node reclaim priority lower than others. This is because cost of reclaiming (spilling) on join node is high compared to other nodes. Reviewed By: xiaoxmeng Differential Revision: D66261340 Pulled By: tanjialiang --- velox/common/memory/MemoryArbitrator.cpp | 25 ++- velox/common/memory/MemoryArbitrator.h | 27 ++- .../tests/ArbitrationParticipantTest.cpp | 6 +- .../memory/tests/MemoryArbitratorTest.cpp | 3 +- velox/common/memory/tests/MemoryPoolTest.cpp | 3 +- .../memory/tests/MockSharedArbitratorTest.cpp | 6 +- velox/connectors/hive/HiveDataSink.cpp | 2 +- velox/connectors/hive/HiveDataSink.h | 5 +- velox/core/QueryCtx.h | 5 +- velox/dwio/common/SortingWriter.cpp | 6 +- velox/dwio/common/SortingWriter.h | 7 +- velox/dwio/dwrf/writer/Writer.cpp | 5 +- velox/dwio/dwrf/writer/Writer.h | 7 +- velox/exec/HashJoinBridge.h | 12 +- velox/exec/MemoryReclaimer.cpp | 32 +++- velox/exec/MemoryReclaimer.h | 9 +- velox/exec/Operator.cpp | 7 +- velox/exec/Operator.h | 12 +- velox/exec/TableWriter.cpp | 5 +- velox/exec/TableWriter.h | 9 +- velox/exec/Task.cpp | 63 ++++++- velox/exec/Task.h | 40 +++- velox/exec/tests/ExchangeClientTest.cpp | 1 - velox/exec/tests/MemoryReclaimerTest.cpp | 176 +++++++++++++++++- velox/exec/tests/utils/ArbitratorTestUtil.h | 2 +- velox/exec/tests/utils/Cursor.cpp | 1 + velox/runner/LocalRunner.cpp | 1 + 27 files changed, 401 insertions(+), 76 deletions(-) diff --git a/velox/common/memory/MemoryArbitrator.cpp b/velox/common/memory/MemoryArbitrator.cpp index eac6d149f485..769630bf8bd3 100644 --- a/velox/common/memory/MemoryArbitrator.cpp +++ b/velox/common/memory/MemoryArbitrator.cpp @@ -174,8 +174,8 @@ void MemoryArbitrator::unregisterFactory(const std::string& kind) { return pool->shrink(targetBytes); } -std::unique_ptr MemoryReclaimer::create() { - return std::unique_ptr(new MemoryReclaimer()); +std::unique_ptr MemoryReclaimer::create(int32_t priority) { + return std::unique_ptr(new MemoryReclaimer(priority)); } // static @@ -234,8 +234,9 @@ uint64_t MemoryReclaimer::reclaim( return 0; } - // Sort the child pools based on their reserved memory and reclaim from the - // child pool with most reservation first. + // Sort the child pools based on their reclaimer priority and reserved memory. + // Reclaim from the child pool with highest priority and most reservation + // first. struct Candidate { std::shared_ptr pool; int64_t reclaimableBytes; @@ -247,8 +248,13 @@ uint64_t MemoryReclaimer::reclaim( for (auto& entry : pool->children_) { auto child = entry.second.lock(); if (child != nullptr) { - const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0); - candidates.push_back(Candidate{std::move(child), reclaimableBytes}); + const auto reclaimableBytesOpt = child->reclaimableBytes(); + if (!reclaimableBytesOpt.has_value()) { + continue; + } + candidates.push_back(Candidate{ + std::move(child), + static_cast(reclaimableBytesOpt.value())}); } } } @@ -257,7 +263,12 @@ uint64_t MemoryReclaimer::reclaim( candidates.begin(), candidates.end(), [](const auto& lhs, const auto& rhs) { - return lhs.reclaimableBytes > rhs.reclaimableBytes; + const auto lhsPrio = lhs.pool->reclaimer()->priority(); + const auto rhsPrio = rhs.pool->reclaimer()->priority(); + if (lhsPrio == rhsPrio) { + return lhs.reclaimableBytes > rhs.reclaimableBytes; + } + return lhsPrio < rhsPrio; }); uint64_t reclaimedBytes{0}; diff --git a/velox/common/memory/MemoryArbitrator.h b/velox/common/memory/MemoryArbitrator.h index 969fbb0fa5a9..09f495357300 100644 --- a/velox/common/memory/MemoryArbitrator.h +++ b/velox/common/memory/MemoryArbitrator.h @@ -287,7 +287,7 @@ class MemoryReclaimer { virtual ~MemoryReclaimer() = default; - static std::unique_ptr create(); + static std::unique_ptr create(int32_t priority = 0); /// Invoked memory reclaim function from 'pool' and record execution 'stats'. static uint64_t run(const std::function& func, Stats& stats); @@ -309,6 +309,26 @@ class MemoryReclaimer { /// enterArbitration has been called. virtual void leaveArbitration() noexcept {} + /// Invoked by upper layer reclaimer, to return the priority of this + /// reclaimer. The priority determines the reclaiming order of self among all + /// same level reclaimers. The smaller the number, the higher the priority. + /// Consider the following memory pool & reclaimer structure: + /// + /// rec1(pri 1) + /// / \ + /// / \ + /// / \ + /// rec2(pri 1) rec3(pri 3) + /// / \ / \ + /// / \ / \ + /// rec4(pri 1) rec5(pri 0) rec6(pri 0) rec7(pri 1) + /// + /// The reclaiming traversing order will be rec1 -> rec2 -> rec5 -> rec4 -> + /// rec3 -> rec6 -> rec7 + virtual int32_t priority() const { + return priority_; + }; + /// Invoked by the memory arbitrator to get the amount of memory bytes that /// can be reclaimed from 'pool'. The function returns true if 'pool' is /// reclaimable and returns the estimated reclaimable bytes in @@ -343,7 +363,10 @@ class MemoryReclaimer { virtual void abort(MemoryPool* pool, const std::exception_ptr& error); protected: - MemoryReclaimer() = default; + explicit MemoryReclaimer(int32_t priority) : priority_(priority){}; + + private: + const int32_t priority_; }; /// Helper class used to measure the memory bytes reclaimed from a memory pool diff --git a/velox/common/memory/tests/ArbitrationParticipantTest.cpp b/velox/common/memory/tests/ArbitrationParticipantTest.cpp index 5bb23d2c3729..c72bb476dea7 100644 --- a/velox/common/memory/tests/ArbitrationParticipantTest.cpp +++ b/velox/common/memory/tests/ArbitrationParticipantTest.cpp @@ -131,7 +131,8 @@ class MockTask : public std::enable_shared_from_this { class RootMemoryReclaimer : public memory::MemoryReclaimer { public: - RootMemoryReclaimer(const std::shared_ptr& task) : task_(task) {} + RootMemoryReclaimer(const std::shared_ptr& task) + : memory::MemoryReclaimer(0), task_(task) {} static std::unique_ptr create( const std::shared_ptr& task) { @@ -179,7 +180,8 @@ class MockTask : public std::enable_shared_from_this { bool reclaimable, ReclaimInjectionCallback reclaimInjectCb = nullptr, ArbitrationInjectionCallback arbitrationInjectCb = nullptr) - : task_(task), + : memory::MemoryReclaimer(0), + task_(task), reclaimable_(reclaimable), reclaimInjectCb_(std::move(reclaimInjectCb)), arbitrationInjectCb_(std::move(arbitrationInjectCb)) {} diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 1befc83a1a8f..bdbf0cc5b3e6 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -505,7 +505,8 @@ class MockLeafMemoryReclaimer : public MemoryReclaimer { std::atomic& totalUsedBytes, bool reclaimable = true, bool* underArbitration = nullptr) - : reclaimable_(reclaimable), + : MemoryReclaimer(0), + reclaimable_(reclaimable), underArbitration_(underArbitration), totalUsedBytes_(totalUsedBytes) {} diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index 28e7732624a4..648f29c60c2b 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -3572,7 +3572,8 @@ class MockMemoryReclaimer : public MemoryReclaimer { } private: - explicit MockMemoryReclaimer(bool doThrow) : doThrow_(doThrow) {} + explicit MockMemoryReclaimer(bool doThrow) + : MemoryReclaimer(0), doThrow_(doThrow) {} const bool doThrow_; }; diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 61efea38717b..6573709a7027 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -92,7 +92,8 @@ class MockTask : public std::enable_shared_from_this { class MemoryReclaimer : public memory::MemoryReclaimer { public: - MemoryReclaimer(const std::shared_ptr& task) : task_(task) {} + MemoryReclaimer(const std::shared_ptr& task) + : memory::MemoryReclaimer(0), task_(task) {} static std::unique_ptr create( const std::shared_ptr& task) { @@ -178,7 +179,8 @@ class MockMemoryOperator { bool reclaimable, ReclaimInjectionCallback reclaimInjectCb = nullptr, ArbitrationInjectionCallback arbitrationInjectCb = nullptr) - : op_(op), + : memory::MemoryReclaimer(0), + op_(op), reclaimable_(reclaimable), reclaimInjectCb_(std::move(reclaimInjectCb)), arbitrationInjectCb_(std::move(arbitrationInjectCb)) {} diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 5098bccbcd84..369fe3cd03fb 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -1125,7 +1125,7 @@ std::unique_ptr HiveDataSink::WriterReclaimer::create( HiveWriterInfo* writerInfo, io::IoStatistics* ioStats) { return std::unique_ptr( - new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats)); + new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats, 0)); } bool HiveDataSink::WriterReclaimer::reclaimableBytes( diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index f0d31f765418..f54ef09c5b28 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -493,8 +493,9 @@ class HiveDataSink : public DataSink { WriterReclaimer( HiveDataSink* dataSink, HiveWriterInfo* writerInfo, - io::IoStatistics* ioStats) - : exec::MemoryReclaimer(), + io::IoStatistics* ioStats, + int32_t priority) + : exec::MemoryReclaimer(priority), dataSink_(dataSink), writerInfo_(writerInfo), ioStats_(ioStats) { diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 5e89ccc4c907..88ce04b1b9a0 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -170,8 +170,9 @@ class QueryCtx : public std::enable_shared_from_this { protected: MemoryReclaimer( const std::shared_ptr& queryCtx, - memory::MemoryPool* pool) - : queryCtx_(queryCtx), pool_(pool) { + memory::MemoryPool* pool, + int32_t priority = 0) + : memory::MemoryReclaimer(priority), queryCtx_(queryCtx), pool_(pool) { VELOX_CHECK_NOT_NULL(pool_); } diff --git a/velox/dwio/common/SortingWriter.cpp b/velox/dwio/common/SortingWriter.cpp index 00bab4ee5360..5f82c66eed93 100644 --- a/velox/dwio/common/SortingWriter.cpp +++ b/velox/dwio/common/SortingWriter.cpp @@ -154,8 +154,10 @@ vector_size_t SortingWriter::outputBatchRows() { } std::unique_ptr SortingWriter::MemoryReclaimer::create( - SortingWriter* writer) { - return std::unique_ptr(new MemoryReclaimer(writer)); + SortingWriter* writer, + int32_t priority) { + return std::unique_ptr( + new MemoryReclaimer(writer, priority)); } bool SortingWriter::MemoryReclaimer::reclaimableBytes( diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index a7edd69ed8e2..fdaa7669d019 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -50,7 +50,8 @@ class SortingWriter : public Writer { class MemoryReclaimer : public exec::MemoryReclaimer { public: static std::unique_ptr create( - SortingWriter* writer); + SortingWriter* writer, + int32_t priority = 0); bool reclaimableBytes( const memory::MemoryPool& pool, @@ -63,8 +64,8 @@ class SortingWriter : public Writer { memory::MemoryReclaimer::Stats& stats) override; private: - explicit MemoryReclaimer(SortingWriter* writer) - : exec::MemoryReclaimer(), + MemoryReclaimer(SortingWriter* writer, int32_t priority) + : exec::MemoryReclaimer(priority), writer_(writer), canReclaim_(writer_->sortBuffer_->canSpill()) {} diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index e63666190e36..06f2eeb84b8f 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -715,9 +715,10 @@ void Writer::abort() { } std::unique_ptr Writer::MemoryReclaimer::create( - Writer* writer) { + Writer* writer, + int32_t priority) { return std::unique_ptr( - new Writer::MemoryReclaimer(writer)); + new Writer::MemoryReclaimer(writer, priority)); } bool Writer::MemoryReclaimer::reclaimableBytes( diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 8cc82303e10f..43cd4225bc87 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -150,7 +150,9 @@ class Writer : public dwio::common::Writer { private: class MemoryReclaimer : public exec::MemoryReclaimer { public: - static std::unique_ptr create(Writer* writer); + static std::unique_ptr create( + Writer* writer, + int32_t priority = 0); bool reclaimableBytes( const memory::MemoryPool& pool, @@ -163,7 +165,8 @@ class Writer : public dwio::common::Writer { memory::MemoryReclaimer::Stats& stats) override; private: - explicit MemoryReclaimer(Writer* writer) : writer_(writer) { + MemoryReclaimer(Writer* writer, int32_t priority) + : exec::MemoryReclaimer(priority), writer_(writer) { VELOX_CHECK_NOT_NULL(writer_); } diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index 40ba021988f6..d59c564cc7af 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -174,9 +174,10 @@ bool isLeftNullAwareJoinWithFilter( class HashJoinMemoryReclaimer final : public MemoryReclaimer { public: static std::unique_ptr create( - std::shared_ptr joinBridge) { + std::shared_ptr joinBridge, + int32_t priority = 0) { return std::unique_ptr( - new HashJoinMemoryReclaimer(joinBridge)); + new HashJoinMemoryReclaimer(joinBridge, priority)); } uint64_t reclaim( @@ -186,9 +187,10 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer { memory::MemoryReclaimer::Stats& stats) final; private: - explicit HashJoinMemoryReclaimer( - const std::shared_ptr& joinBridge) - : MemoryReclaimer(), joinBridge_(joinBridge) {} + HashJoinMemoryReclaimer( + const std::shared_ptr& joinBridge, + int32_t priority) + : MemoryReclaimer(priority), joinBridge_(joinBridge) {} std::weak_ptr joinBridge_; }; diff --git a/velox/exec/MemoryReclaimer.cpp b/velox/exec/MemoryReclaimer.cpp index 4fa91f69433d..4aee429883db 100644 --- a/velox/exec/MemoryReclaimer.cpp +++ b/velox/exec/MemoryReclaimer.cpp @@ -20,8 +20,13 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { -std::unique_ptr MemoryReclaimer::create() { - return std::unique_ptr(new MemoryReclaimer()); +MemoryReclaimer::MemoryReclaimer(int32_t priority) + : memory::MemoryReclaimer(priority) {} + +std::unique_ptr MemoryReclaimer::create( + int32_t priority) { + return std::unique_ptr( + new MemoryReclaimer(priority)); } void MemoryReclaimer::enterArbitration() { @@ -67,13 +72,15 @@ void MemoryReclaimer::abort( } /*static*/ std::unique_ptr -ParallelMemoryReclaimer::create(folly::Executor* executor) { +ParallelMemoryReclaimer::create(folly::Executor* executor, int32_t priority) { return std::unique_ptr( - new ParallelMemoryReclaimer(executor)); + new ParallelMemoryReclaimer(executor, priority)); } -ParallelMemoryReclaimer::ParallelMemoryReclaimer(folly::Executor* executor) - : executor_(executor) {} +ParallelMemoryReclaimer::ParallelMemoryReclaimer( + folly::Executor* executor, + int32_t priority) + : MemoryReclaimer(priority), executor_(executor) {} uint64_t ParallelMemoryReclaimer::reclaim( memory::MemoryPool* pool, @@ -85,8 +92,7 @@ uint64_t ParallelMemoryReclaimer::reclaim( pool, targetBytes, maxWaitMs, stats); } - // Sort the child pools based on their reserved memory and reclaim from the - // child pool with most reservation first. + // Sort candidates based on priority. struct Candidate { std::shared_ptr pool; int64_t reclaimableBytes; @@ -98,11 +104,17 @@ uint64_t ParallelMemoryReclaimer::reclaim( for (auto& entry : pool->children_) { auto child = entry.second.lock(); if (child != nullptr) { - const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0); - candidates.push_back(Candidate{std::move(child), reclaimableBytes}); + const auto reclaimableBytesOpt = child->reclaimableBytes(); + if (!reclaimableBytesOpt.has_value()) { + continue; + } + candidates.push_back(Candidate{ + std::move(child), + static_cast(reclaimableBytesOpt.value())}); } } } + struct ReclaimResult { const uint64_t reclaimedBytes{0}; const Stats stats; diff --git a/velox/exec/MemoryReclaimer.h b/velox/exec/MemoryReclaimer.h index 7acc41cbed70..d94799a73d1c 100644 --- a/velox/exec/MemoryReclaimer.h +++ b/velox/exec/MemoryReclaimer.h @@ -25,7 +25,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer { public: virtual ~MemoryReclaimer() = default; - static std::unique_ptr create(); + static std::unique_ptr create(int32_t priority = 0); void enterArbitration() override; @@ -35,7 +35,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer { override; protected: - MemoryReclaimer() = default; + explicit MemoryReclaimer(int32_t priortity); }; /// Provides the parallel memory reclaimer implementation for velox task @@ -46,7 +46,8 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer { virtual ~ParallelMemoryReclaimer() = default; static std::unique_ptr create( - folly::Executor* executor); + folly::Executor* executor, + int32_t priority = 0); uint64_t reclaim( memory::MemoryPool* pool, @@ -55,7 +56,7 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer { Stats& stats) override; protected: - explicit ParallelMemoryReclaimer(folly::Executor* executor); + ParallelMemoryReclaimer(folly::Executor* executor, int32_t priority); folly::Executor* const executor_{nullptr}; }; diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 5ee37bd29814..b014d6b3470e 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -633,9 +633,10 @@ void OperatorStats::clear() { std::unique_ptr Operator::MemoryReclaimer::create( DriverCtx* driverCtx, - Operator* op) { - return std::unique_ptr( - new Operator::MemoryReclaimer(driverCtx->driver->shared_from_this(), op)); + Operator* op, + int32_t priority) { + return std::unique_ptr(new Operator::MemoryReclaimer( + driverCtx->driver->shared_from_this(), op, priority)); } void Operator::MemoryReclaimer::enterArbitration() { diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 25d355802d25..a89e42d7a11a 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -685,9 +685,8 @@ class Operator : public BaseRuntimeStatWriter { class MemoryReclaimer : public memory::MemoryReclaimer { public: - static std::unique_ptr create( - DriverCtx* driverCtx, - Operator* op); + static std::unique_ptr + create(DriverCtx* driverCtx, Operator* op, int32_t priority = 0); void enterArbitration() override; @@ -707,8 +706,11 @@ class Operator : public BaseRuntimeStatWriter { override; protected: - MemoryReclaimer(const std::shared_ptr& driver, Operator* op) - : driver_(driver), op_(op) { + MemoryReclaimer( + const std::shared_ptr& driver, + Operator* op, + int32_t priority) + : memory::MemoryReclaimer(priority), driver_(driver), op_(op) { VELOX_CHECK_NOT_NULL(op_); } diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 4899e4b4ebf8..330c1b76d300 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -308,10 +308,11 @@ std::unique_ptr TableWriter::ConnectorReclaimer::create( const std::optional& spillConfig, DriverCtx* driverCtx, - Operator* op) { + Operator* op, + int32_t priority) { return std::unique_ptr( new TableWriter::ConnectorReclaimer( - spillConfig, driverCtx->driver->shared_from_this(), op)); + spillConfig, driverCtx->driver->shared_from_this(), op, priority)); } bool TableWriter::ConnectorReclaimer::reclaimableBytes( diff --git a/velox/exec/TableWriter.h b/velox/exec/TableWriter.h index b188019362fb..9758286154ca 100644 --- a/velox/exec/TableWriter.h +++ b/velox/exec/TableWriter.h @@ -149,7 +149,8 @@ class TableWriter : public Operator { static std::unique_ptr create( const std::optional& spillConfig, DriverCtx* driverCtx, - Operator* op); + Operator* op, + int32_t priority = 0); void enterArbitration() override {} @@ -176,9 +177,11 @@ class TableWriter : public Operator { ConnectorReclaimer( const std::optional& spillConfig, const std::shared_ptr& driver, - Operator* op) + Operator* op, + int32_t priority) : ParallelMemoryReclaimer( - spillConfig.has_value() ? spillConfig.value().executor : nullptr), + spillConfig.has_value() ? spillConfig.value().executor : nullptr, + priority), canReclaim_(spillConfig.has_value()), driver_(driver), op_(op) {} diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 7b10948b53a0..80f9d0b0b701 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -244,6 +244,7 @@ std::shared_ptr Task::create( std::shared_ptr queryCtx, ExecutionMode mode, Consumer consumer, + int32_t memoryArbitrationPriority, std::function onError) { return Task::create( taskId, @@ -253,6 +254,7 @@ std::shared_ptr Task::create( mode, (consumer ? [c = std::move(consumer)]() { return c; } : ConsumerSupplier{}), + memoryArbitrationPriority, std::move(onError)); } @@ -264,6 +266,7 @@ std::shared_ptr Task::create( std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority, std::function onError) { auto task = std::shared_ptr(new Task( taskId, @@ -272,12 +275,61 @@ std::shared_ptr Task::create( std::move(queryCtx), mode, std::move(consumerSupplier), + memoryArbitrationPriority, std::move(onError))); task->initTaskPool(); task->addToTaskList(); return task; } +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY +// TODO: Remove after callsites is cleaned up. +std::shared_ptr Task::create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + Consumer consumer, + std::function onError, + int32_t memoryArbitrationPriority) { + return Task::create( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + (consumer ? [c = std::move(consumer)]() { return c; } + : ConsumerSupplier{}), + std::move(onError), + memoryArbitrationPriority); +} + +// TODO: Remove after callsites is cleaned up. +std::shared_ptr Task::create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + ConsumerSupplier consumerSupplier, + std::function onError, + int32_t memoryArbitrationPriority) { + auto task = std::shared_ptr(new Task( + taskId, + std::move(planFragment), + destination, + std::move(queryCtx), + mode, + std::move(consumerSupplier), + memoryArbitrationPriority, + std::move(onError))); + task->initTaskPool(); + task->addToTaskList(); + return task; +} +#endif + Task::Task( const std::string& taskId, core::PlanFragment planFragment, @@ -285,11 +337,13 @@ Task::Task( std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority, std::function onError) : uuid_{makeUuid()}, taskId_(taskId), destination_(destination), mode_(mode), + memoryArbitrationPriority_(memoryArbitrationPriority), queryCtx_(std::move(queryCtx)), planFragment_(std::move(planFragment)), traceConfig_(maybeMakeTraceConfig()), @@ -514,6 +568,7 @@ memory::MemoryPool* Task::getOrAddJoinNodePool( } childPools_.push_back(pool_->addAggregateChild( fmt::format("node.{}", nodeId), createNodeReclaimer([&]() { + // Set join reclaimer lower priority as cost of reclaiming join is high. return HashJoinMemoryReclaimer::create( getHashJoinBridgeLocked(splitGroupId, planNodeId)); }))); @@ -548,7 +603,8 @@ std::unique_ptr Task::createTaskReclaimer() { if (queryCtx_->pool()->reclaimer() == nullptr) { return nullptr; } - return Task::MemoryReclaimer::create(shared_from_this()); + return Task::MemoryReclaimer::create( + shared_from_this(), memoryArbitrationPriority_); } velox::memory::MemoryPool* Task::addOperatorPool( @@ -2984,9 +3040,10 @@ void Task::testingVisitDrivers(const std::function& callback) { } std::unique_ptr Task::MemoryReclaimer::create( - const std::shared_ptr& task) { + const std::shared_ptr& task, + int64_t priority) { return std::unique_ptr( - new Task::MemoryReclaimer(task)); + new Task::MemoryReclaimer(task, priority)); } uint64_t Task::MemoryReclaimer::reclaim( diff --git a/velox/exec/Task.h b/velox/exec/Task.h index d205b15178af..8d854f7348cd 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -69,6 +69,9 @@ class Task : public std::enable_shared_from_this { /// thread are passed on to a separate consumer. /// @param onError Optional callback to receive an exception if task /// execution fails. + /// @param memoryArbitrationPriority Optional priority on task that, in a + /// multi task system, is used for memory arbitration to decide the order of + /// reclaiming. static std::shared_ptr create( const std::string& taskId, core::PlanFragment planFragment, @@ -76,6 +79,7 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, Consumer consumer = nullptr, + int32_t memoryArbitrationPriority = 0, std::function onError = nullptr); static std::shared_ptr create( @@ -85,8 +89,33 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority = 0, std::function onError = nullptr); +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY + // TODO: Remove this overload once call sites are updated. + static std::shared_ptr create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + Consumer consumer = nullptr, + std::function onError = nullptr, + int32_t memoryArbitrationPriority = 0); + + // TODO: Remove this overload once call sites are updated. + static std::shared_ptr create( + const std::string& taskId, + core::PlanFragment planFragment, + int destination, + std::shared_ptr queryCtx, + ExecutionMode mode, + ConsumerSupplier consumerSupplier, + std::function onError = nullptr, + int32_t memoryArbitrationPriority = 0); +#endif + /// Convenience function for shortening a Presto taskId. To be used /// in debugging messages and listings. static std::string shortId(const std::string& id); @@ -713,6 +742,7 @@ class Task : public std::enable_shared_from_this { std::shared_ptr queryCtx, ExecutionMode mode, ConsumerSupplier consumerSupplier, + int32_t memoryArbitrationPriority = 0, std::function onError = nullptr); // Invoked to add this to the system-wide running task list on task creation. @@ -811,7 +841,8 @@ class Task : public std::enable_shared_from_this { class MemoryReclaimer : public exec::MemoryReclaimer { public: static std::unique_ptr create( - const std::shared_ptr& task); + const std::shared_ptr& task, + int64_t priority = 0); uint64_t reclaim( memory::MemoryPool* pool, @@ -823,7 +854,8 @@ class Task : public std::enable_shared_from_this { override; private: - explicit MemoryReclaimer(const std::shared_ptr& task) : task_(task) { + MemoryReclaimer(const std::shared_ptr& task, int64_t priority) + : exec::MemoryReclaimer(priority), task_(task) { VELOX_CHECK_NOT_NULL(task); } @@ -1010,6 +1042,10 @@ class Task : public std::enable_shared_from_this { // executed in a single mode throughout its lifetime const ExecutionMode mode_; + // In a multi-task system, it is used to make cross task decisions by memory + // arbitration to determine which task to reclaim first. + const int32_t memoryArbitrationPriority_; + std::shared_ptr queryCtx_; core::PlanFragment planFragment_; diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 60de09c2513f..2ffcd78cbd7d 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -18,7 +18,6 @@ #include #include #include "velox/common/base/tests/GTestUtils.h" -// #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/Task.h" #include "velox/exec/tests/utils/LocalExchangeSource.h" diff --git a/velox/exec/tests/MemoryReclaimerTest.cpp b/velox/exec/tests/MemoryReclaimerTest.cpp index 19bed7fe8ef6..2e12003f4bc8 100644 --- a/velox/exec/tests/MemoryReclaimerTest.cpp +++ b/velox/exec/tests/MemoryReclaimerTest.cpp @@ -170,15 +170,18 @@ TEST(ReclaimableSectionGuard, basic) { } namespace { + +using ReclaimCallback = const std::function; + class MockMemoryReclaimer : public memory::MemoryReclaimer { public: static std::unique_ptr create( bool reclaimable, uint64_t memoryBytes, - const std::function& reclaimCallback = - nullptr) { - return std::unique_ptr( - new MockMemoryReclaimer(reclaimable, memoryBytes, reclaimCallback)); + ReclaimCallback& reclaimCallback = nullptr, + int32_t priority = 0) { + return std::unique_ptr(new MockMemoryReclaimer( + reclaimable, memoryBytes, reclaimCallback, priority)); } bool reclaimableBytes(const MemoryPool& pool, uint64_t& reclaimableBytes) @@ -210,18 +213,25 @@ class MockMemoryReclaimer : public memory::MemoryReclaimer { return memoryBytes_; } + int32_t priority() const override { + return priority_; + } + private: MockMemoryReclaimer( bool reclaimable, uint64_t memoryBytes, - const std::function& reclaimCallback) - : reclaimCallback_(reclaimCallback), + const std::function& reclaimCallback, + int32_t priority) + : MemoryReclaimer(priority), + reclaimCallback_(reclaimCallback), + priority_(priority), reclaimable_(reclaimable), memoryBytes_(memoryBytes) {} - const std::function reclaimCallback_; + const ReclaimCallback reclaimCallback_; + const int32_t priority_; bool reclaimable_{false}; - int reclaimCount_{0}; uint64_t memoryBytes_{0}; }; } // namespace @@ -313,3 +323,153 @@ TEST_F(MemoryReclaimerTest, recursiveArbitrationWithParallelReclaim) { } ASSERT_TRUE(reclaimExecuted); } + +TEST_F(MemoryReclaimerTest, reclaimerPriorities) { + auto rootPool = memory::memoryManager()->addRootPool( + "reclaimerPriorities", kMaxMemory, exec::MemoryReclaimer::create()); + std::vector> leafPools; + + const uint32_t kNumChildren = 10; + const uint64_t kPoolMemoryBytes = 1024; + std::vector priorityOrder; + priorityOrder.reserve(kNumChildren); + ReclaimCallback reclaimerCb = [&](MemoryPool* pool) { + auto* mockReclaimer = dynamic_cast(pool->reclaimer()); + ASSERT_TRUE(mockReclaimer != nullptr); + priorityOrder.push_back(mockReclaimer->priority()); + }; + + for (uint32_t i = 0; i < kNumChildren; ++i) { + auto reclaimer = MockMemoryReclaimer::create( + kPoolMemoryBytes, + kPoolMemoryBytes, + reclaimerCb, + static_cast(folly::Random::rand32(10000)) - 5000); + leafPools.push_back(rootPool->addLeafChild( + fmt::format("leaf-{}", i), true, std::move(reclaimer))); + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim(kNumChildren * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(priorityOrder.size(), kNumChildren); + for (uint32_t i = 0; i < priorityOrder.size() - 1; i++) { + ASSERT_LE(priorityOrder[i], priorityOrder[i + 1]); + } +} + +TEST_F(MemoryReclaimerTest, multiLevelReclaimerPriorities) { + // Following tree structure is built with priorities + // rootPool + // ├── serial-aggr-0 (priority: 4) + // │ ├── serial-aggr-0.leaf-0 (priority: 0) + // │ ├── serial-aggr-0.leaf-1 (priority: 0) + // │ ├── serial-aggr-0.leaf-2 (priority: 1) + // │ ├── serial-aggr-0.leaf-3 (priority: 1) + // │ ├── ... + // │ └── serial-aggr-0.leaf-9 (priority: 4) + // ├── parallel-aggr-1 (priority: 3) + // │ ├── parallel-aggr-1.leaf-0 (priority: 0) + // │ ├── parallel-aggr-1.leaf-1 (priority: 0) + // │ ├── parallel-aggr-1.leaf-2 (priority: 1) + // │ ├── parallel-aggr-1.leaf-3 (priority: 1) + // │ ├── ... + // │ └── parallel-aggr-1.leaf-9 (priority: 4) + // ├── serial-aggr-2 (priority: 2) + // │ ├── serial-aggr-2.leaf-0 (priority: 0) + // │ ├── serial-aggr-2.leaf-1 (priority: 0) + // │ ├── serial-aggr-2.leaf-2 (priority: 1) + // │ ├── serial-aggr-2.leaf-3 (priority: 1) + // │ ├── ... + // │ └── serial-aggr-2.leaf-9 (priority: 4) + // └── parallel-aggr-3 (priority: 1) + // ├── parallel-aggr-3.leaf-0 (priority: 0) + // ├── parallel-aggr-3.leaf-1 (priority: 0) + // ├── parallel-aggr-3.leaf-2 (priority: 1) + // ├── parallel-aggr-3.leaf-3 (priority: 1) + // ├── ... + // └── parallel-aggr-3.leaf-9 (priority: 4) + auto rootPool = memory::memoryManager()->addRootPool( + "multiLevelReclaimerPriorities", + 32 << 20, + exec::MemoryReclaimer::create()); + const uint32_t kNumAggrPools = 4; + const uint32_t kNumLeafPerAggr = 10; + const uint64_t kPoolMemoryBytes = 1024; + std::vector> aggrPools; + std::vector> leafPools; + std::mutex reclaimOrderMutex; + std::vector reclaimOrder; + uint32_t poolIdx{0}; + for (uint32_t i = 0; i < kNumAggrPools; i++) { + if (i % 2 == 0) { + aggrPools.push_back(rootPool->addAggregateChild( + fmt::format("serial-aggr-{}", poolIdx++), + exec::MemoryReclaimer::create(kNumAggrPools - i))); + } else { + aggrPools.push_back(rootPool->addAggregateChild( + fmt::format("parallel-aggr-{}", poolIdx++), + exec::ParallelMemoryReclaimer::create( + executor_.get(), kNumAggrPools - i))); + } + auto& aggrPool = aggrPools.back(); + const auto aggrPoolName = aggrPool->name(); + for (uint32_t j = 0; j < kNumLeafPerAggr; j++) { + leafPools.push_back(aggrPools.back()->addLeafChild( + fmt::format("{}.leaf-{}", aggrPoolName, j), + true, + MockMemoryReclaimer::create( + kPoolMemoryBytes, + kPoolMemoryBytes, + [&](MemoryPool* pool) { + std::lock_guard l(reclaimOrderMutex); + reclaimOrder.push_back(pool); + }, + j / 2))); + } + } + + memory::ScopedMemoryArbitrationContext context(rootPool.get()); + memory::MemoryReclaimer::Stats stats; + rootPool->reclaim( + kNumAggrPools * kNumLeafPerAggr * kPoolMemoryBytes, 0, stats); + + ASSERT_EQ(reclaimOrder.size(), kNumAggrPools * kNumLeafPerAggr); + for (uint32_t i = 0; i < kNumAggrPools; i++) { + uint32_t start = i * kNumLeafPerAggr; + const auto poolName = reclaimOrder[start]->name(); + bool isParallel = false; + switch (i) { + case 0: + ASSERT_TRUE(poolName.find("parallel-aggr-3") != std::string::npos); + isParallel = true; + break; + case 1: + ASSERT_TRUE(poolName.find("serial-aggr-2") != std::string::npos); + break; + case 2: + ASSERT_TRUE(poolName.find("parallel-aggr-1") != std::string::npos); + isParallel = true; + break; + case 3: + ASSERT_TRUE(poolName.find("serial-aggr-0") != std::string::npos); + break; + default: + FAIL(); + } + for (uint32_t j = start; j < start + kNumLeafPerAggr - 1; j++) { + if (isParallel) { + // Priority is not applicable to parallel reclaimer. + continue; + } + const auto* firstReclaimer = + dynamic_cast(reclaimOrder[j]->reclaimer()); + ASSERT_TRUE(firstReclaimer != nullptr); + const auto* secondReclaimer = + dynamic_cast(reclaimOrder[j + 1]->reclaimer()); + ASSERT_TRUE(secondReclaimer != nullptr); + ASSERT_LE(firstReclaimer->priority(), secondReclaimer->priority()); + } + } +} diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.h b/velox/exec/tests/utils/ArbitratorTestUtil.h index 748cb642f515..42ad7d468163 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.h +++ b/velox/exec/tests/utils/ArbitratorTestUtil.h @@ -36,7 +36,7 @@ constexpr uint64_t kMemoryPoolInitCapacity = 16 * MB; class FakeMemoryReclaimer : public exec::MemoryReclaimer { public: - FakeMemoryReclaimer() = default; + FakeMemoryReclaimer() : exec::MemoryReclaimer(0) {} static std::unique_ptr create() { return std::make_unique(); diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index ad7b4133c6c7..eaa098997f15 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -243,6 +243,7 @@ class MultiThreadedTaskCursor : public TaskCursorBase { copy->copy(vector.get(), 0, 0, vector->size()); return queue->enqueue(std::move(copy), future); }, + 0, [queue](std::exception_ptr) { // onError close the queue to unblock producers and consumers. // moveNext will handle rethrowing the error once it's diff --git a/velox/runner/LocalRunner.cpp b/velox/runner/LocalRunner.cpp index d18a32528416..6e173a1a4e80 100644 --- a/velox/runner/LocalRunner.cpp +++ b/velox/runner/LocalRunner.cpp @@ -166,6 +166,7 @@ LocalRunner::makeStages() { params_.queryCtx, exec::Task::ExecutionMode::kParallel, consumer, + 0, onError); stages_.back().push_back(task); if (fragment.numBroadcastDestinations) {