Skip to content

Commit

Permalink
feat: Add priority based memory reclaim framework (facebookincubator#…
Browse files Browse the repository at this point in the history
…11598)

Summary:
* Adds priority base reclaiming to memory reclaim framework. The priority determines which memory pool to reclaim first on the same level. This would help to make reclaim more application logic aware. 
* Make join node reclaim priority lower than others. This is because cost of reclaiming (spilling) on join node is high compared to other nodes.


Reviewed By: xiaoxmeng

Differential Revision: D66261340

Pulled By: tanjialiang
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Dec 4, 2024
1 parent 28c319e commit d9e761c
Show file tree
Hide file tree
Showing 27 changed files with 417 additions and 138 deletions.
25 changes: 18 additions & 7 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ void MemoryArbitrator::unregisterFactory(const std::string& kind) {
return pool->shrink(targetBytes);
}

std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create() {
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer());
std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create(int32_t priority) {
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer(priority));
}

// static
Expand Down Expand Up @@ -234,8 +234,9 @@ uint64_t MemoryReclaimer::reclaim(
return 0;
}

// Sort the child pools based on their reserved memory and reclaim from the
// child pool with most reservation first.
// Sort the child pools based on their reclaimer priority and reserved memory.
// Reclaim from the child pool with highest priority and most reservation
// first.
struct Candidate {
std::shared_ptr<memory::MemoryPool> pool;
int64_t reclaimableBytes;
Expand All @@ -247,8 +248,13 @@ uint64_t MemoryReclaimer::reclaim(
for (auto& entry : pool->children_) {
auto child = entry.second.lock();
if (child != nullptr) {
const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0);
candidates.push_back(Candidate{std::move(child), reclaimableBytes});
const auto reclaimableBytesOpt = child->reclaimableBytes();
if (!reclaimableBytesOpt.has_value()) {
continue;
}
candidates.push_back(Candidate{
std::move(child),
static_cast<int64_t>(reclaimableBytesOpt.value())});
}
}
}
Expand All @@ -257,7 +263,12 @@ uint64_t MemoryReclaimer::reclaim(
candidates.begin(),
candidates.end(),
[](const auto& lhs, const auto& rhs) {
return lhs.reclaimableBytes > rhs.reclaimableBytes;
const auto lhsPrio = lhs.pool->reclaimer()->priority();
const auto rhsPrio = rhs.pool->reclaimer()->priority();
if (lhsPrio == rhsPrio) {
return lhs.reclaimableBytes > rhs.reclaimableBytes;
}
return lhsPrio < rhsPrio;
});

uint64_t reclaimedBytes{0};
Expand Down
27 changes: 25 additions & 2 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class MemoryReclaimer {

virtual ~MemoryReclaimer() = default;

static std::unique_ptr<MemoryReclaimer> create();
static std::unique_ptr<MemoryReclaimer> create(int32_t priority = 0);

/// Invoked memory reclaim function from 'pool' and record execution 'stats'.
static uint64_t run(const std::function<int64_t()>& func, Stats& stats);
Expand All @@ -309,6 +309,26 @@ class MemoryReclaimer {
/// enterArbitration has been called.
virtual void leaveArbitration() noexcept {}

/// Invoked by upper layer reclaimer, to return the priority of this
/// reclaimer. The priority determines the reclaiming order of self among all
/// same level reclaimers. The smaller the number, the higher the priority.
/// Consider the following memory pool & reclaimer structure:
///
/// rec1(pri 1)
/// / \
/// / \
/// / \
/// rec2(pri 1) rec3(pri 3)
/// / \ / \
/// / \ / \
/// rec4(pri 1) rec5(pri 0) rec6(pri 0) rec7(pri 1)
///
/// The reclaiming traversing order will be rec1 -> rec2 -> rec5 -> rec4 ->
/// rec3 -> rec6 -> rec7
virtual int32_t priority() const {
return priority_;
};

/// Invoked by the memory arbitrator to get the amount of memory bytes that
/// can be reclaimed from 'pool'. The function returns true if 'pool' is
/// reclaimable and returns the estimated reclaimable bytes in
Expand Down Expand Up @@ -343,7 +363,10 @@ class MemoryReclaimer {
virtual void abort(MemoryPool* pool, const std::exception_ptr& error);

protected:
MemoryReclaimer() = default;
explicit MemoryReclaimer(int32_t priority) : priority_(priority){};

private:
const int32_t priority_;
};

/// Helper class used to measure the memory bytes reclaimed from a memory pool
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/tests/ArbitrationParticipantTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ class MockTask : public std::enable_shared_from_this<MockTask> {

class RootMemoryReclaimer : public memory::MemoryReclaimer {
public:
RootMemoryReclaimer(const std::shared_ptr<MockTask>& task) : task_(task) {}
RootMemoryReclaimer(const std::shared_ptr<MockTask>& task)
: memory::MemoryReclaimer(0), task_(task) {}

static std::unique_ptr<MemoryReclaimer> create(
const std::shared_ptr<MockTask>& task) {
Expand Down Expand Up @@ -179,7 +180,8 @@ class MockTask : public std::enable_shared_from_this<MockTask> {
bool reclaimable,
ReclaimInjectionCallback reclaimInjectCb = nullptr,
ArbitrationInjectionCallback arbitrationInjectCb = nullptr)
: task_(task),
: memory::MemoryReclaimer(0),
task_(task),
reclaimable_(reclaimable),
reclaimInjectCb_(std::move(reclaimInjectCb)),
arbitrationInjectCb_(std::move(arbitrationInjectCb)) {}
Expand Down
3 changes: 2 additions & 1 deletion velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ class MockLeafMemoryReclaimer : public MemoryReclaimer {
std::atomic<uint64_t>& totalUsedBytes,
bool reclaimable = true,
bool* underArbitration = nullptr)
: reclaimable_(reclaimable),
: MemoryReclaimer(0),
reclaimable_(reclaimable),
underArbitration_(underArbitration),
totalUsedBytes_(totalUsedBytes) {}

Expand Down
3 changes: 2 additions & 1 deletion velox/common/memory/tests/MemoryPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3562,7 +3562,8 @@ class MockMemoryReclaimer : public MemoryReclaimer {
}

private:
explicit MockMemoryReclaimer(bool doThrow) : doThrow_(doThrow) {}
explicit MockMemoryReclaimer(bool doThrow)
: MemoryReclaimer(0), doThrow_(doThrow) {}

const bool doThrow_;
};
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class MockTask : public std::enable_shared_from_this<MockTask> {

class MemoryReclaimer : public memory::MemoryReclaimer {
public:
MemoryReclaimer(const std::shared_ptr<MockTask>& task) : task_(task) {}
MemoryReclaimer(const std::shared_ptr<MockTask>& task)
: memory::MemoryReclaimer(0), task_(task) {}

static std::unique_ptr<MemoryReclaimer> create(
const std::shared_ptr<MockTask>& task) {
Expand Down Expand Up @@ -178,7 +179,8 @@ class MockMemoryOperator {
bool reclaimable,
ReclaimInjectionCallback reclaimInjectCb = nullptr,
ArbitrationInjectionCallback arbitrationInjectCb = nullptr)
: op_(op),
: memory::MemoryReclaimer(0),
op_(op),
reclaimable_(reclaimable),
reclaimInjectCb_(std::move(reclaimInjectCb)),
arbitrationInjectCb_(std::move(arbitrationInjectCb)) {}
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ std::unique_ptr<memory::MemoryReclaimer> HiveDataSink::WriterReclaimer::create(
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats) {
return std::unique_ptr<memory::MemoryReclaimer>(
new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats));
new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats, 0));
}

bool HiveDataSink::WriterReclaimer::reclaimableBytes(
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,9 @@ class HiveDataSink : public DataSink {
WriterReclaimer(
HiveDataSink* dataSink,
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats)
: exec::MemoryReclaimer(),
io::IoStatistics* ioStats,
int32_t priority)
: exec::MemoryReclaimer(priority),
dataSink_(dataSink),
writerInfo_(writerInfo),
ioStats_(ioStats) {
Expand Down
5 changes: 3 additions & 2 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
protected:
MemoryReclaimer(
const std::shared_ptr<QueryCtx>& queryCtx,
memory::MemoryPool* pool)
: queryCtx_(queryCtx), pool_(pool) {
memory::MemoryPool* pool,
int32_t priority = 0)
: memory::MemoryReclaimer(priority), queryCtx_(queryCtx), pool_(pool) {
VELOX_CHECK_NOT_NULL(pool_);
}

Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/common/SortingWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ vector_size_t SortingWriter::outputBatchRows() {
}

std::unique_ptr<memory::MemoryReclaimer> SortingWriter::MemoryReclaimer::create(
SortingWriter* writer) {
return std::unique_ptr<memory::MemoryReclaimer>(new MemoryReclaimer(writer));
SortingWriter* writer,
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new MemoryReclaimer(writer, priority));
}

bool SortingWriter::MemoryReclaimer::reclaimableBytes(
Expand Down
7 changes: 4 additions & 3 deletions velox/dwio/common/SortingWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class SortingWriter : public Writer {
class MemoryReclaimer : public exec::MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
SortingWriter* writer);
SortingWriter* writer,
int32_t priority = 0);

bool reclaimableBytes(
const memory::MemoryPool& pool,
Expand All @@ -63,8 +64,8 @@ class SortingWriter : public Writer {
memory::MemoryReclaimer::Stats& stats) override;

private:
explicit MemoryReclaimer(SortingWriter* writer)
: exec::MemoryReclaimer(),
MemoryReclaimer(SortingWriter* writer, int32_t priority)
: exec::MemoryReclaimer(priority),
writer_(writer),
canReclaim_(writer_->sortBuffer_->canSpill()) {}

Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,10 @@ void Writer::abort() {
}

std::unique_ptr<memory::MemoryReclaimer> Writer::MemoryReclaimer::create(
Writer* writer) {
Writer* writer,
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new Writer::MemoryReclaimer(writer));
new Writer::MemoryReclaimer(writer, priority));
}

bool Writer::MemoryReclaimer::reclaimableBytes(
Expand Down
7 changes: 5 additions & 2 deletions velox/dwio/dwrf/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ class Writer : public dwio::common::Writer {
private:
class MemoryReclaimer : public exec::MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(Writer* writer);
static std::unique_ptr<memory::MemoryReclaimer> create(
Writer* writer,
int32_t priority = 0);

bool reclaimableBytes(
const memory::MemoryPool& pool,
Expand All @@ -163,7 +165,8 @@ class Writer : public dwio::common::Writer {
memory::MemoryReclaimer::Stats& stats) override;

private:
explicit MemoryReclaimer(Writer* writer) : writer_(writer) {
MemoryReclaimer(Writer* writer, int32_t priority)
: exec::MemoryReclaimer(priority), writer_(writer) {
VELOX_CHECK_NOT_NULL(writer_);
}

Expand Down
12 changes: 7 additions & 5 deletions velox/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ bool isLeftNullAwareJoinWithFilter(
class HashJoinMemoryReclaimer final : public MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
std::shared_ptr<HashJoinBridge> joinBridge) {
std::shared_ptr<HashJoinBridge> joinBridge,
int32_t priority = 0) {
return std::unique_ptr<memory::MemoryReclaimer>(
new HashJoinMemoryReclaimer(joinBridge));
new HashJoinMemoryReclaimer(joinBridge, priority));
}

uint64_t reclaim(
Expand All @@ -186,9 +187,10 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer {
memory::MemoryReclaimer::Stats& stats) final;

private:
explicit HashJoinMemoryReclaimer(
const std::shared_ptr<HashJoinBridge>& joinBridge)
: MemoryReclaimer(), joinBridge_(joinBridge) {}
HashJoinMemoryReclaimer(
const std::shared_ptr<HashJoinBridge>& joinBridge,
int32_t priority)
: MemoryReclaimer(priority), joinBridge_(joinBridge) {}
std::weak_ptr<HashJoinBridge> joinBridge_;
};

Expand Down
32 changes: 22 additions & 10 deletions velox/exec/MemoryReclaimer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
std::unique_ptr<memory::MemoryReclaimer> MemoryReclaimer::create() {
return std::unique_ptr<memory::MemoryReclaimer>(new MemoryReclaimer());
MemoryReclaimer::MemoryReclaimer(int32_t priority)
: memory::MemoryReclaimer(priority) {}

std::unique_ptr<memory::MemoryReclaimer> MemoryReclaimer::create(
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new MemoryReclaimer(priority));
}

void MemoryReclaimer::enterArbitration() {
Expand Down Expand Up @@ -67,13 +72,15 @@ void MemoryReclaimer::abort(
}

/*static*/ std::unique_ptr<memory::MemoryReclaimer>
ParallelMemoryReclaimer::create(folly::Executor* executor) {
ParallelMemoryReclaimer::create(folly::Executor* executor, int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new ParallelMemoryReclaimer(executor));
new ParallelMemoryReclaimer(executor, priority));
}

ParallelMemoryReclaimer::ParallelMemoryReclaimer(folly::Executor* executor)
: executor_(executor) {}
ParallelMemoryReclaimer::ParallelMemoryReclaimer(
folly::Executor* executor,
int32_t priority)
: MemoryReclaimer(priority), executor_(executor) {}

uint64_t ParallelMemoryReclaimer::reclaim(
memory::MemoryPool* pool,
Expand All @@ -85,8 +92,7 @@ uint64_t ParallelMemoryReclaimer::reclaim(
pool, targetBytes, maxWaitMs, stats);
}

// Sort the child pools based on their reserved memory and reclaim from the
// child pool with most reservation first.
// Sort candidates based on priority.
struct Candidate {
std::shared_ptr<memory::MemoryPool> pool;
int64_t reclaimableBytes;
Expand All @@ -98,11 +104,17 @@ uint64_t ParallelMemoryReclaimer::reclaim(
for (auto& entry : pool->children_) {
auto child = entry.second.lock();
if (child != nullptr) {
const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0);
candidates.push_back(Candidate{std::move(child), reclaimableBytes});
const auto reclaimableBytesOpt = child->reclaimableBytes();
if (!reclaimableBytesOpt.has_value()) {
continue;
}
candidates.push_back(Candidate{
std::move(child),
static_cast<int64_t>(reclaimableBytesOpt.value())});
}
}
}

struct ReclaimResult {
const uint64_t reclaimedBytes{0};
const Stats stats;
Expand Down
9 changes: 5 additions & 4 deletions velox/exec/MemoryReclaimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer {
public:
virtual ~MemoryReclaimer() = default;

static std::unique_ptr<memory::MemoryReclaimer> create();
static std::unique_ptr<memory::MemoryReclaimer> create(int32_t priority = 0);

void enterArbitration() override;

Expand All @@ -35,7 +35,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer {
override;

protected:
MemoryReclaimer() = default;
explicit MemoryReclaimer(int32_t priortity);
};

/// Provides the parallel memory reclaimer implementation for velox task
Expand All @@ -46,7 +46,8 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer {
virtual ~ParallelMemoryReclaimer() = default;

static std::unique_ptr<memory::MemoryReclaimer> create(
folly::Executor* executor);
folly::Executor* executor,
int32_t priority = 0);

uint64_t reclaim(
memory::MemoryPool* pool,
Expand All @@ -55,7 +56,7 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer {
Stats& stats) override;

protected:
explicit ParallelMemoryReclaimer(folly::Executor* executor);
ParallelMemoryReclaimer(folly::Executor* executor, int32_t priority);

folly::Executor* const executor_{nullptr};
};
Expand Down
Loading

0 comments on commit d9e761c

Please sign in to comment.