Skip to content

Commit

Permalink
feat: Add priority based memory reclaim framework
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Dec 3, 2024
1 parent 6ff029e commit d65eaf7
Show file tree
Hide file tree
Showing 24 changed files with 331 additions and 80 deletions.
25 changes: 18 additions & 7 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ void MemoryArbitrator::unregisterFactory(const std::string& kind) {
return pool->shrink(targetBytes);
}

std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create() {
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer());
std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create(int32_t priority) {
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer(priority));
}

// static
Expand Down Expand Up @@ -234,8 +234,9 @@ uint64_t MemoryReclaimer::reclaim(
return 0;
}

// Sort the child pools based on their reserved memory and reclaim from the
// child pool with most reservation first.
// Sort the child pools based on their reclaimer priority and reserved memory.
// Reclaim from the child pool with highest priority and most reservation
// first.
struct Candidate {
std::shared_ptr<memory::MemoryPool> pool;
int64_t reclaimableBytes;
Expand All @@ -247,8 +248,13 @@ uint64_t MemoryReclaimer::reclaim(
for (auto& entry : pool->children_) {
auto child = entry.second.lock();
if (child != nullptr) {
const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0);
candidates.push_back(Candidate{std::move(child), reclaimableBytes});
const auto reclaimableBytesOpt = child->reclaimableBytes();
if (!reclaimableBytesOpt.has_value()) {
continue;
}
candidates.push_back(Candidate{
std::move(child),
static_cast<int64_t>(reclaimableBytesOpt.value())});
}
}
}
Expand All @@ -257,7 +263,12 @@ uint64_t MemoryReclaimer::reclaim(
candidates.begin(),
candidates.end(),
[](const auto& lhs, const auto& rhs) {
return lhs.reclaimableBytes > rhs.reclaimableBytes;
const auto lhsPrio = lhs.pool->reclaimer()->priority();
const auto rhsPrio = rhs.pool->reclaimer()->priority();
if (lhsPrio == rhsPrio) {
return lhs.reclaimableBytes > rhs.reclaimableBytes;
}
return lhsPrio < rhsPrio;
});

uint64_t reclaimedBytes{0};
Expand Down
27 changes: 25 additions & 2 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class MemoryReclaimer {

virtual ~MemoryReclaimer() = default;

static std::unique_ptr<MemoryReclaimer> create();
static std::unique_ptr<MemoryReclaimer> create(int32_t priority = 0);

/// Invoked memory reclaim function from 'pool' and record execution 'stats'.
static uint64_t run(const std::function<int64_t()>& func, Stats& stats);
Expand All @@ -309,6 +309,26 @@ class MemoryReclaimer {
/// enterArbitration has been called.
virtual void leaveArbitration() noexcept {}

/// Invoked by upper layer reclaimer, to return the priority of this
/// reclaimer. The priority determines the reclaiming order of self among all
/// same level reclaimers. The smaller the number, the higher the priority.
/// Consider the following memory pool & reclaimer structure:
///
/// rec1(pri 1)
/// / \
/// / \
/// / \
/// rec2(pri 1) rec3(pri 3)
/// / \ / \
/// / \ / \
/// rec4(pri 1) rec5(pri 0) rec6(pri 0) rec7(pri 1)
///
/// The reclaiming traversing order will be rec1 -> rec2 -> rec5 -> rec4 ->
/// rec3 -> rec6 -> rec7
virtual int32_t priority() const {
return priority_;
};

/// Invoked by the memory arbitrator to get the amount of memory bytes that
/// can be reclaimed from 'pool'. The function returns true if 'pool' is
/// reclaimable and returns the estimated reclaimable bytes in
Expand Down Expand Up @@ -343,7 +363,10 @@ class MemoryReclaimer {
virtual void abort(MemoryPool* pool, const std::exception_ptr& error);

protected:
MemoryReclaimer() = default;
explicit MemoryReclaimer(int32_t priority) : priority_(priority){};

private:
const int32_t priority_;
};

/// Helper class used to measure the memory bytes reclaimed from a memory pool
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/tests/ArbitrationParticipantTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ class MockTask : public std::enable_shared_from_this<MockTask> {

class RootMemoryReclaimer : public memory::MemoryReclaimer {
public:
RootMemoryReclaimer(const std::shared_ptr<MockTask>& task) : task_(task) {}
RootMemoryReclaimer(const std::shared_ptr<MockTask>& task)
: memory::MemoryReclaimer(0), task_(task) {}

static std::unique_ptr<MemoryReclaimer> create(
const std::shared_ptr<MockTask>& task) {
Expand Down Expand Up @@ -179,7 +180,8 @@ class MockTask : public std::enable_shared_from_this<MockTask> {
bool reclaimable,
ReclaimInjectionCallback reclaimInjectCb = nullptr,
ArbitrationInjectionCallback arbitrationInjectCb = nullptr)
: task_(task),
: memory::MemoryReclaimer(0),
task_(task),
reclaimable_(reclaimable),
reclaimInjectCb_(std::move(reclaimInjectCb)),
arbitrationInjectCb_(std::move(arbitrationInjectCb)) {}
Expand Down
3 changes: 2 additions & 1 deletion velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ class MockLeafMemoryReclaimer : public MemoryReclaimer {
std::atomic<uint64_t>& totalUsedBytes,
bool reclaimable = true,
bool* underArbitration = nullptr)
: reclaimable_(reclaimable),
: MemoryReclaimer(0),
reclaimable_(reclaimable),
underArbitration_(underArbitration),
totalUsedBytes_(totalUsedBytes) {}

Expand Down
3 changes: 2 additions & 1 deletion velox/common/memory/tests/MemoryPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3572,7 +3572,8 @@ class MockMemoryReclaimer : public MemoryReclaimer {
}

private:
explicit MockMemoryReclaimer(bool doThrow) : doThrow_(doThrow) {}
explicit MockMemoryReclaimer(bool doThrow)
: MemoryReclaimer(0), doThrow_(doThrow) {}

const bool doThrow_;
};
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class MockTask : public std::enable_shared_from_this<MockTask> {

class MemoryReclaimer : public memory::MemoryReclaimer {
public:
MemoryReclaimer(const std::shared_ptr<MockTask>& task) : task_(task) {}
MemoryReclaimer(const std::shared_ptr<MockTask>& task)
: memory::MemoryReclaimer(0), task_(task) {}

static std::unique_ptr<MemoryReclaimer> create(
const std::shared_ptr<MockTask>& task) {
Expand Down Expand Up @@ -178,7 +179,8 @@ class MockMemoryOperator {
bool reclaimable,
ReclaimInjectionCallback reclaimInjectCb = nullptr,
ArbitrationInjectionCallback arbitrationInjectCb = nullptr)
: op_(op),
: memory::MemoryReclaimer(0),
op_(op),
reclaimable_(reclaimable),
reclaimInjectCb_(std::move(reclaimInjectCb)),
arbitrationInjectCb_(std::move(arbitrationInjectCb)) {}
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ std::unique_ptr<memory::MemoryReclaimer> HiveDataSink::WriterReclaimer::create(
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats) {
return std::unique_ptr<memory::MemoryReclaimer>(
new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats));
new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats, 0));
}

bool HiveDataSink::WriterReclaimer::reclaimableBytes(
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,9 @@ class HiveDataSink : public DataSink {
WriterReclaimer(
HiveDataSink* dataSink,
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats)
: exec::MemoryReclaimer(),
io::IoStatistics* ioStats,
int32_t priority)
: exec::MemoryReclaimer(priority),
dataSink_(dataSink),
writerInfo_(writerInfo),
ioStats_(ioStats) {
Expand Down
5 changes: 3 additions & 2 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
protected:
MemoryReclaimer(
const std::shared_ptr<QueryCtx>& queryCtx,
memory::MemoryPool* pool)
: queryCtx_(queryCtx), pool_(pool) {
memory::MemoryPool* pool,
int32_t priority = 0)
: memory::MemoryReclaimer(priority), queryCtx_(queryCtx), pool_(pool) {
VELOX_CHECK_NOT_NULL(pool_);
}

Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/common/SortingWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ vector_size_t SortingWriter::outputBatchRows() {
}

std::unique_ptr<memory::MemoryReclaimer> SortingWriter::MemoryReclaimer::create(
SortingWriter* writer) {
return std::unique_ptr<memory::MemoryReclaimer>(new MemoryReclaimer(writer));
SortingWriter* writer,
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new MemoryReclaimer(writer, priority));
}

bool SortingWriter::MemoryReclaimer::reclaimableBytes(
Expand Down
7 changes: 4 additions & 3 deletions velox/dwio/common/SortingWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class SortingWriter : public Writer {
class MemoryReclaimer : public exec::MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
SortingWriter* writer);
SortingWriter* writer,
int32_t priority = 0);

bool reclaimableBytes(
const memory::MemoryPool& pool,
Expand All @@ -63,8 +64,8 @@ class SortingWriter : public Writer {
memory::MemoryReclaimer::Stats& stats) override;

private:
explicit MemoryReclaimer(SortingWriter* writer)
: exec::MemoryReclaimer(),
MemoryReclaimer(SortingWriter* writer, int32_t priority)
: exec::MemoryReclaimer(priority),
writer_(writer),
canReclaim_(writer_->sortBuffer_->canSpill()) {}

Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,10 @@ void Writer::abort() {
}

std::unique_ptr<memory::MemoryReclaimer> Writer::MemoryReclaimer::create(
Writer* writer) {
Writer* writer,
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new Writer::MemoryReclaimer(writer));
new Writer::MemoryReclaimer(writer, priority));
}

bool Writer::MemoryReclaimer::reclaimableBytes(
Expand Down
7 changes: 5 additions & 2 deletions velox/dwio/dwrf/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ class Writer : public dwio::common::Writer {
private:
class MemoryReclaimer : public exec::MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(Writer* writer);
static std::unique_ptr<memory::MemoryReclaimer> create(
Writer* writer,
int32_t priority = 0);

bool reclaimableBytes(
const memory::MemoryPool& pool,
Expand All @@ -163,7 +165,8 @@ class Writer : public dwio::common::Writer {
memory::MemoryReclaimer::Stats& stats) override;

private:
explicit MemoryReclaimer(Writer* writer) : writer_(writer) {
MemoryReclaimer(Writer* writer, int32_t priority)
: exec::MemoryReclaimer(priority), writer_(writer) {
VELOX_CHECK_NOT_NULL(writer_);
}

Expand Down
12 changes: 7 additions & 5 deletions velox/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ bool isLeftNullAwareJoinWithFilter(
class HashJoinMemoryReclaimer final : public MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
std::shared_ptr<HashJoinBridge> joinBridge) {
std::shared_ptr<HashJoinBridge> joinBridge,
int32_t priority = 0) {
return std::unique_ptr<memory::MemoryReclaimer>(
new HashJoinMemoryReclaimer(joinBridge));
new HashJoinMemoryReclaimer(joinBridge, priority));
}

uint64_t reclaim(
Expand All @@ -186,9 +187,10 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer {
memory::MemoryReclaimer::Stats& stats) final;

private:
explicit HashJoinMemoryReclaimer(
const std::shared_ptr<HashJoinBridge>& joinBridge)
: MemoryReclaimer(), joinBridge_(joinBridge) {}
HashJoinMemoryReclaimer(
const std::shared_ptr<HashJoinBridge>& joinBridge,
int32_t priority)
: MemoryReclaimer(priority), joinBridge_(joinBridge) {}
std::weak_ptr<HashJoinBridge> joinBridge_;
};

Expand Down
32 changes: 22 additions & 10 deletions velox/exec/MemoryReclaimer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
std::unique_ptr<memory::MemoryReclaimer> MemoryReclaimer::create() {
return std::unique_ptr<memory::MemoryReclaimer>(new MemoryReclaimer());
MemoryReclaimer::MemoryReclaimer(int32_t priority)
: memory::MemoryReclaimer(priority) {}

std::unique_ptr<memory::MemoryReclaimer> MemoryReclaimer::create(
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new MemoryReclaimer(priority));
}

void MemoryReclaimer::enterArbitration() {
Expand Down Expand Up @@ -67,13 +72,15 @@ void MemoryReclaimer::abort(
}

/*static*/ std::unique_ptr<memory::MemoryReclaimer>
ParallelMemoryReclaimer::create(folly::Executor* executor) {
ParallelMemoryReclaimer::create(folly::Executor* executor, int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new ParallelMemoryReclaimer(executor));
new ParallelMemoryReclaimer(executor, priority));
}

ParallelMemoryReclaimer::ParallelMemoryReclaimer(folly::Executor* executor)
: executor_(executor) {}
ParallelMemoryReclaimer::ParallelMemoryReclaimer(
folly::Executor* executor,
int32_t priority)
: MemoryReclaimer(priority), executor_(executor) {}

uint64_t ParallelMemoryReclaimer::reclaim(
memory::MemoryPool* pool,
Expand All @@ -85,8 +92,7 @@ uint64_t ParallelMemoryReclaimer::reclaim(
pool, targetBytes, maxWaitMs, stats);
}

// Sort the child pools based on their reserved memory and reclaim from the
// child pool with most reservation first.
// Sort candidates based on priority.
struct Candidate {
std::shared_ptr<memory::MemoryPool> pool;
int64_t reclaimableBytes;
Expand All @@ -98,11 +104,17 @@ uint64_t ParallelMemoryReclaimer::reclaim(
for (auto& entry : pool->children_) {
auto child = entry.second.lock();
if (child != nullptr) {
const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0);
candidates.push_back(Candidate{std::move(child), reclaimableBytes});
const auto reclaimableBytesOpt = child->reclaimableBytes();
if (!reclaimableBytesOpt.has_value()) {
continue;
}
candidates.push_back(Candidate{
std::move(child),
static_cast<int64_t>(reclaimableBytesOpt.value())});
}
}
}

struct ReclaimResult {
const uint64_t reclaimedBytes{0};
const Stats stats;
Expand Down
9 changes: 5 additions & 4 deletions velox/exec/MemoryReclaimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer {
public:
virtual ~MemoryReclaimer() = default;

static std::unique_ptr<memory::MemoryReclaimer> create();
static std::unique_ptr<memory::MemoryReclaimer> create(int32_t priority = 0);

void enterArbitration() override;

Expand All @@ -35,7 +35,7 @@ class MemoryReclaimer : public memory::MemoryReclaimer {
override;

protected:
MemoryReclaimer() = default;
explicit MemoryReclaimer(int32_t priortity);
};

/// Provides the parallel memory reclaimer implementation for velox task
Expand All @@ -46,7 +46,8 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer {
virtual ~ParallelMemoryReclaimer() = default;

static std::unique_ptr<memory::MemoryReclaimer> create(
folly::Executor* executor);
folly::Executor* executor,
int32_t priority = 0);

uint64_t reclaim(
memory::MemoryPool* pool,
Expand All @@ -55,7 +56,7 @@ class ParallelMemoryReclaimer : public memory::MemoryReclaimer {
Stats& stats) override;

protected:
explicit ParallelMemoryReclaimer(folly::Executor* executor);
ParallelMemoryReclaimer(folly::Executor* executor, int32_t priority);

folly::Executor* const executor_{nullptr};
};
Expand Down
Loading

0 comments on commit d65eaf7

Please sign in to comment.