Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add priority based memory reclaim framework #11598

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ void MemoryArbitrator::unregisterFactory(const std::string& kind) {
return pool->shrink(targetBytes);
}

std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create() {
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer());
std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create(int32_t priority) {
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer(priority));
}

// static
Expand Down Expand Up @@ -234,8 +234,9 @@ uint64_t MemoryReclaimer::reclaim(
return 0;
}

// Sort the child pools based on their reserved memory and reclaim from the
// child pool with most reservation first.
// Sort the child pools based on their reclaimer priority and reserved memory.
// Reclaim from the child pool with highest priority and most reservation
// first.
struct Candidate {
std::shared_ptr<memory::MemoryPool> pool;
int64_t reclaimableBytes;
Expand All @@ -247,8 +248,13 @@ uint64_t MemoryReclaimer::reclaim(
for (auto& entry : pool->children_) {
auto child = entry.second.lock();
if (child != nullptr) {
const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0);
candidates.push_back(Candidate{std::move(child), reclaimableBytes});
const auto reclaimableBytesOpt = child->reclaimableBytes();
if (!reclaimableBytesOpt.has_value()) {
tanjialiang marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
candidates.push_back(Candidate{
std::move(child),
static_cast<int64_t>(reclaimableBytesOpt.value())});
}
}
}
Expand All @@ -257,7 +263,12 @@ uint64_t MemoryReclaimer::reclaim(
candidates.begin(),
candidates.end(),
[](const auto& lhs, const auto& rhs) {
return lhs.reclaimableBytes > rhs.reclaimableBytes;
const auto lhsPrio = lhs.pool->reclaimer()->priority();
const auto rhsPrio = rhs.pool->reclaimer()->priority();
if (lhsPrio == rhsPrio) {
return lhs.reclaimableBytes > rhs.reclaimableBytes;
}
return lhsPrio < rhsPrio;
});

uint64_t reclaimedBytes{0};
Expand Down
27 changes: 25 additions & 2 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class MemoryReclaimer {

virtual ~MemoryReclaimer() = default;

static std::unique_ptr<MemoryReclaimer> create();
static std::unique_ptr<MemoryReclaimer> create(int32_t priority = 0);

/// Invoked memory reclaim function from 'pool' and record execution 'stats'.
static uint64_t run(const std::function<int64_t()>& func, Stats& stats);
Expand All @@ -309,6 +309,26 @@ class MemoryReclaimer {
/// enterArbitration has been called.
virtual void leaveArbitration() noexcept {}

/// Invoked by upper layer reclaimer, to return the priority of this
/// reclaimer. The priority determines the reclaiming order of self among all
/// same level reclaimers. The smaller the number, the higher the priority.
/// Consider the following memory pool & reclaimer structure:
///
/// rec1(pri 1)
/// / \
/// / \
/// / \
/// rec2(pri 1) rec3(pri 3)
/// / \ / \
/// / \ / \
/// rec4(pri 1) rec5(pri 0) rec6(pri 0) rec7(pri 1)
///
/// The reclaiming traversing order will be rec1 -> rec2 -> rec5 -> rec4 ->
/// rec3 -> rec6 -> rec7
virtual int32_t priority() const {
return priority_;
};

/// Invoked by the memory arbitrator to get the amount of memory bytes that
/// can be reclaimed from 'pool'. The function returns true if 'pool' is
/// reclaimable and returns the estimated reclaimable bytes in
Expand Down Expand Up @@ -343,7 +363,10 @@ class MemoryReclaimer {
virtual void abort(MemoryPool* pool, const std::exception_ptr& error);

protected:
MemoryReclaimer() = default;
explicit MemoryReclaimer(int32_t priority) : priority_(priority){};

private:
const int32_t priority_;
};

/// Helper class used to measure the memory bytes reclaimed from a memory pool
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/tests/ArbitrationParticipantTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ class MockTask : public std::enable_shared_from_this<MockTask> {

class RootMemoryReclaimer : public memory::MemoryReclaimer {
public:
RootMemoryReclaimer(const std::shared_ptr<MockTask>& task) : task_(task) {}
RootMemoryReclaimer(const std::shared_ptr<MockTask>& task)
: memory::MemoryReclaimer(0), task_(task) {}

static std::unique_ptr<MemoryReclaimer> create(
const std::shared_ptr<MockTask>& task) {
Expand Down Expand Up @@ -179,7 +180,8 @@ class MockTask : public std::enable_shared_from_this<MockTask> {
bool reclaimable,
ReclaimInjectionCallback reclaimInjectCb = nullptr,
ArbitrationInjectionCallback arbitrationInjectCb = nullptr)
: task_(task),
: memory::MemoryReclaimer(0),
task_(task),
reclaimable_(reclaimable),
reclaimInjectCb_(std::move(reclaimInjectCb)),
arbitrationInjectCb_(std::move(arbitrationInjectCb)) {}
Expand Down
3 changes: 2 additions & 1 deletion velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ class MockLeafMemoryReclaimer : public MemoryReclaimer {
std::atomic<uint64_t>& totalUsedBytes,
bool reclaimable = true,
bool* underArbitration = nullptr)
: reclaimable_(reclaimable),
: MemoryReclaimer(0),
reclaimable_(reclaimable),
underArbitration_(underArbitration),
totalUsedBytes_(totalUsedBytes) {}

Expand Down
3 changes: 2 additions & 1 deletion velox/common/memory/tests/MemoryPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3572,7 +3572,8 @@ class MockMemoryReclaimer : public MemoryReclaimer {
}

private:
explicit MockMemoryReclaimer(bool doThrow) : doThrow_(doThrow) {}
explicit MockMemoryReclaimer(bool doThrow)
: MemoryReclaimer(0), doThrow_(doThrow) {}

const bool doThrow_;
};
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class MockTask : public std::enable_shared_from_this<MockTask> {

class MemoryReclaimer : public memory::MemoryReclaimer {
public:
MemoryReclaimer(const std::shared_ptr<MockTask>& task) : task_(task) {}
MemoryReclaimer(const std::shared_ptr<MockTask>& task)
: memory::MemoryReclaimer(0), task_(task) {}

static std::unique_ptr<MemoryReclaimer> create(
const std::shared_ptr<MockTask>& task) {
Expand Down Expand Up @@ -178,7 +179,8 @@ class MockMemoryOperator {
bool reclaimable,
ReclaimInjectionCallback reclaimInjectCb = nullptr,
ArbitrationInjectionCallback arbitrationInjectCb = nullptr)
: op_(op),
: memory::MemoryReclaimer(0),
op_(op),
reclaimable_(reclaimable),
reclaimInjectCb_(std::move(reclaimInjectCb)),
arbitrationInjectCb_(std::move(arbitrationInjectCb)) {}
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ class HiveDataSink : public DataSink {
HiveDataSink* dataSink,
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats)
: exec::MemoryReclaimer(),
: exec::MemoryReclaimer(0),
dataSink_(dataSink),
writerInfo_(writerInfo),
ioStats_(ioStats) {
Expand Down
5 changes: 3 additions & 2 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
protected:
MemoryReclaimer(
const std::shared_ptr<QueryCtx>& queryCtx,
memory::MemoryPool* pool)
: queryCtx_(queryCtx), pool_(pool) {
memory::MemoryPool* pool,
int32_t priority = 0)
: memory::MemoryReclaimer(priority), queryCtx_(queryCtx), pool_(pool) {
VELOX_CHECK_NOT_NULL(pool_);
}

Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/common/SortingWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class SortingWriter : public Writer {
memory::MemoryReclaimer::Stats& stats) override;

private:
explicit MemoryReclaimer(SortingWriter* writer)
: exec::MemoryReclaimer(),
MemoryReclaimer(SortingWriter* writer)
: exec::MemoryReclaimer(0),
writer_(writer),
canReclaim_(writer_->sortBuffer_->canSpill()) {}

Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/dwrf/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ class Writer : public dwio::common::Writer {
memory::MemoryReclaimer::Stats& stats) override;

private:
explicit MemoryReclaimer(Writer* writer) : writer_(writer) {
MemoryReclaimer(Writer* writer)
: exec::MemoryReclaimer(0), writer_(writer) {
VELOX_CHECK_NOT_NULL(writer_);
}

Expand Down
12 changes: 7 additions & 5 deletions velox/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ bool isLeftNullAwareJoinWithFilter(
class HashJoinMemoryReclaimer final : public MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
std::shared_ptr<HashJoinBridge> joinBridge) {
std::shared_ptr<HashJoinBridge> joinBridge,
int32_t priority = 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we enforce a priority input later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can probably enforce that later. But now we don't want to enforce as the current priority setting is on Task level.

return std::unique_ptr<memory::MemoryReclaimer>(
new HashJoinMemoryReclaimer(joinBridge));
new HashJoinMemoryReclaimer(joinBridge, priority));
}

uint64_t reclaim(
Expand All @@ -186,9 +187,10 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer {
memory::MemoryReclaimer::Stats& stats) final;

private:
explicit HashJoinMemoryReclaimer(
const std::shared_ptr<HashJoinBridge>& joinBridge)
: MemoryReclaimer(), joinBridge_(joinBridge) {}
HashJoinMemoryReclaimer(
const std::shared_ptr<HashJoinBridge>& joinBridge,
int32_t priority)
: MemoryReclaimer(priority), joinBridge_(joinBridge) {}
std::weak_ptr<HashJoinBridge> joinBridge_;
};

Expand Down
32 changes: 22 additions & 10 deletions velox/exec/MemoryReclaimer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
std::unique_ptr<memory::MemoryReclaimer> MemoryReclaimer::create() {
return std::unique_ptr<memory::MemoryReclaimer>(new MemoryReclaimer());
MemoryReclaimer::MemoryReclaimer(int32_t priority)
: memory::MemoryReclaimer(priority) {}

std::unique_ptr<memory::MemoryReclaimer> MemoryReclaimer::create(
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new MemoryReclaimer(priority));
}

void MemoryReclaimer::enterArbitration() {
Expand Down Expand Up @@ -67,13 +72,15 @@ void MemoryReclaimer::abort(
}

/*static*/ std::unique_ptr<memory::MemoryReclaimer>
ParallelMemoryReclaimer::create(folly::Executor* executor) {
ParallelMemoryReclaimer::create(folly::Executor* executor, int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new ParallelMemoryReclaimer(executor));
new ParallelMemoryReclaimer(executor, priority));
}

ParallelMemoryReclaimer::ParallelMemoryReclaimer(folly::Executor* executor)
: executor_(executor) {}
ParallelMemoryReclaimer::ParallelMemoryReclaimer(
folly::Executor* executor,
int32_t priority)
: MemoryReclaimer(priority), executor_(executor) {}

uint64_t ParallelMemoryReclaimer::reclaim(
memory::MemoryPool* pool,
Expand All @@ -85,8 +92,7 @@ uint64_t ParallelMemoryReclaimer::reclaim(
pool, targetBytes, maxWaitMs, stats);
}

// Sort the child pools based on their reserved memory and reclaim from the
// child pool with most reservation first.
// Sort candidates based on priority.
struct Candidate {
std::shared_ptr<memory::MemoryPool> pool;
int64_t reclaimableBytes;
Expand All @@ -98,11 +104,17 @@ uint64_t ParallelMemoryReclaimer::reclaim(
for (auto& entry : pool->children_) {
auto child = entry.second.lock();
if (child != nullptr) {
const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0);
candidates.push_back(Candidate{std::move(child), reclaimableBytes});
const auto reclaimableBytesOpt = child->reclaimableBytes();
if (!reclaimableBytesOpt.has_value()) {
continue;
}
candidates.push_back(Candidate{
std::move(child),
static_cast<int64_t>(reclaimableBytesOpt.value())});
}
}
}

struct ReclaimResult {
const uint64_t reclaimedBytes{0};
const Stats stats;
Expand Down
9 changes: 5 additions & 4 deletions velox/exec/MemoryReclaimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer {
public:
virtual ~MemoryReclaimer() = default;

static std::unique_ptr<memory::MemoryReclaimer> create();
static std::unique_ptr<memory::MemoryReclaimer> create(int32_t priority = 0);

void enterArbitration() override;

Expand All @@ -35,7 +35,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer {
override;

protected:
MemoryReclaimer() = default;
explicit MemoryReclaimer(int32_t priortity);
};

/// Provides the parallel memory reclaimer implementation for velox task
Expand All @@ -46,7 +46,8 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer {
virtual ~ParallelMemoryReclaimer() = default;

static std::unique_ptr<memory::MemoryReclaimer> create(
folly::Executor* executor);
folly::Executor* executor,
int32_t priority = 0);

uint64_t reclaim(
memory::MemoryPool* pool,
Expand All @@ -55,7 +56,7 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer {
Stats& stats) override;

protected:
explicit ParallelMemoryReclaimer(folly::Executor* executor);
ParallelMemoryReclaimer(folly::Executor* executor, int32_t priority);

folly::Executor* const executor_{nullptr};
};
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ class Operator : public BaseRuntimeStatWriter {

protected:
MemoryReclaimer(const std::shared_ptr<Driver>& driver, Operator* op)
: driver_(driver), op_(op) {
: memory::MemoryReclaimer(0), driver_(driver), op_(op) {
VELOX_CHECK_NOT_NULL(op_);
}

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/TableWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ class TableWriter : public Operator {
const std::shared_ptr<Driver>& driver,
Operator* op)
: ParallelMemoryReclaimer(
spillConfig.has_value() ? spillConfig.value().executor : nullptr),
spillConfig.has_value() ? spillConfig.value().executor : nullptr,
0),
canReclaim_(spillConfig.has_value()),
driver_(driver),
op_(op) {}
Expand Down
Loading
Loading