Skip to content

Commit

Permalink
feat: Add priority based memory reclaim framework
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Nov 20, 2024
1 parent af6609d commit d5693b4
Show file tree
Hide file tree
Showing 20 changed files with 407 additions and 108 deletions.
25 changes: 18 additions & 7 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ void MemoryArbitrator::unregisterFactory(const std::string& kind) {
return pool->shrink(targetBytes);
}

std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create() {
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer());
std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create(int32_t priority) {
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer(priority));
}

// static
Expand Down Expand Up @@ -234,8 +234,9 @@ uint64_t MemoryReclaimer::reclaim(
return 0;
}

// Sort the child pools based on their reserved memory and reclaim from the
// child pool with most reservation first.
// Sort the child pools based on their reclaimer priority and reserved memory.
// Reclaim from the child pool with highest priority and most reservation
// first.
struct Candidate {
std::shared_ptr<memory::MemoryPool> pool;
int64_t reclaimableBytes;
Expand All @@ -247,8 +248,13 @@ uint64_t MemoryReclaimer::reclaim(
for (auto& entry : pool->children_) {
auto child = entry.second.lock();
if (child != nullptr) {
const int64_t reclaimableBytes = child->reclaimableBytes().value_or(0);
candidates.push_back(Candidate{std::move(child), reclaimableBytes});
const auto reclaimableBytesOpt = child->reclaimableBytes();
if (!reclaimableBytesOpt.has_value()) {
continue;
}
candidates.push_back(Candidate{
std::move(child),
static_cast<int64_t>(reclaimableBytesOpt.value())});
}
}
}
Expand All @@ -257,7 +263,12 @@ uint64_t MemoryReclaimer::reclaim(
candidates.begin(),
candidates.end(),
[](const auto& lhs, const auto& rhs) {
return lhs.reclaimableBytes > rhs.reclaimableBytes;
const auto lPriority = lhs.pool->reclaimer()->priority();
const auto rPriority = rhs.pool->reclaimer()->priority();
if (lPriority == rPriority) {
return lhs.reclaimableBytes > rhs.reclaimableBytes;
}
return lPriority < rPriority;
});

uint64_t reclaimedBytes{0};
Expand Down
27 changes: 25 additions & 2 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class MemoryReclaimer {

virtual ~MemoryReclaimer() = default;

static std::unique_ptr<MemoryReclaimer> create();
static std::unique_ptr<MemoryReclaimer> create(int32_t priority = 0);

/// Invoked memory reclaim function from 'pool' and record execution 'stats'.
static uint64_t run(const std::function<int64_t()>& func, Stats& stats);
Expand All @@ -309,6 +309,26 @@ class MemoryReclaimer {
/// enterArbitration has been called.
virtual void leaveArbitration() noexcept {}

/// Invoked by upper layer reclaimer, to return the priority of this
/// reclaimer. The priority determines the reclaiming order of self among all
/// same level reclaimers. The smaller the number, the higher the priority.
/// Consider the following memory pool & reclaimer structure:
///
/// rec1(pri 1)
/// / \
/// / \
/// / \
/// rec2(pri 1) rec3(pri 3)
/// / \ / \
/// / \ / \
/// rec4(pri 1) rec5(pri 0) rec6(pri 0) rec7(pri 1)
///
/// The reclaiming traversing order will be rec1 -> rec2 -> rec5 -> rec4 ->
/// rec3 -> rec6 -> rec7
virtual int32_t priority() const {
return priority_;
};

/// Invoked by the memory arbitrator to get the amount of memory bytes that
/// can be reclaimed from 'pool'. The function returns true if 'pool' is
/// reclaimable and returns the estimated reclaimable bytes in
Expand Down Expand Up @@ -343,7 +363,10 @@ class MemoryReclaimer {
virtual void abort(MemoryPool* pool, const std::exception_ptr& error);

protected:
MemoryReclaimer() = default;
explicit MemoryReclaimer(int32_t priority) : priority_(priority){};

private:
const int32_t priority_;
};

/// Helper class used to measure the memory bytes reclaimed from a memory pool
Expand Down
6 changes: 4 additions & 2 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,9 +1123,11 @@ LocationHandlePtr LocationHandle::create(const folly::dynamic& obj) {
std::unique_ptr<memory::MemoryReclaimer> HiveDataSink::WriterReclaimer::create(
HiveDataSink* dataSink,
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats) {
io::IoStatistics* ioStats,
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats));
new HiveDataSink::WriterReclaimer(
dataSink, writerInfo, ioStats, priority));
}

bool HiveDataSink::WriterReclaimer::reclaimableBytes(
Expand Down
8 changes: 5 additions & 3 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ class HiveDataSink : public DataSink {
static std::unique_ptr<memory::MemoryReclaimer> create(
HiveDataSink* dataSink,
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats);
io::IoStatistics* ioStats,
int32_t priority = 0);

bool reclaimableBytes(
const memory::MemoryPool& pool,
Expand All @@ -493,8 +494,9 @@ class HiveDataSink : public DataSink {
WriterReclaimer(
HiveDataSink* dataSink,
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats)
: exec::MemoryReclaimer(),
io::IoStatistics* ioStats,
int32_t priority)
: exec::MemoryReclaimer(priority),
dataSink_(dataSink),
writerInfo_(writerInfo),
ioStats_(ioStats) {
Expand Down
5 changes: 3 additions & 2 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
protected:
MemoryReclaimer(
const std::shared_ptr<QueryCtx>& queryCtx,
memory::MemoryPool* pool)
: queryCtx_(queryCtx), pool_(pool) {
memory::MemoryPool* pool,
int32_t priority = 0)
: memory::MemoryReclaimer(priority), queryCtx_(queryCtx), pool_(pool) {
VELOX_CHECK_NOT_NULL(pool_);
}

Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/common/SortingWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ vector_size_t SortingWriter::outputBatchRows() {
}

std::unique_ptr<memory::MemoryReclaimer> SortingWriter::MemoryReclaimer::create(
SortingWriter* writer) {
return std::unique_ptr<memory::MemoryReclaimer>(new MemoryReclaimer(writer));
SortingWriter* writer,
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new MemoryReclaimer(writer, priority));
}

bool SortingWriter::MemoryReclaimer::reclaimableBytes(
Expand Down
7 changes: 4 additions & 3 deletions velox/dwio/common/SortingWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class SortingWriter : public Writer {
class MemoryReclaimer : public exec::MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
SortingWriter* writer);
SortingWriter* writer,
int32_t priority = 0);

bool reclaimableBytes(
const memory::MemoryPool& pool,
Expand All @@ -63,8 +64,8 @@ class SortingWriter : public Writer {
memory::MemoryReclaimer::Stats& stats) override;

private:
explicit MemoryReclaimer(SortingWriter* writer)
: exec::MemoryReclaimer(),
MemoryReclaimer(SortingWriter* writer, int32_t priority)
: exec::MemoryReclaimer(priority),
writer_(writer),
canReclaim_(writer_->sortBuffer_->canSpill()) {}

Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,10 @@ void Writer::abort() {
}

std::unique_ptr<memory::MemoryReclaimer> Writer::MemoryReclaimer::create(
Writer* writer) {
Writer* writer,
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new Writer::MemoryReclaimer(writer));
new Writer::MemoryReclaimer(writer, priority));
}

bool Writer::MemoryReclaimer::reclaimableBytes(
Expand Down
7 changes: 5 additions & 2 deletions velox/dwio/dwrf/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ class Writer : public dwio::common::Writer {
private:
class MemoryReclaimer : public exec::MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(Writer* writer);
static std::unique_ptr<memory::MemoryReclaimer> create(
Writer* writer,
int32_t priority = 0);

bool reclaimableBytes(
const memory::MemoryPool& pool,
Expand All @@ -163,7 +165,8 @@ class Writer : public dwio::common::Writer {
memory::MemoryReclaimer::Stats& stats) override;

private:
explicit MemoryReclaimer(Writer* writer) : writer_(writer) {
MemoryReclaimer(Writer* writer, int32_t priority)
: exec::MemoryReclaimer(priority), writer_(writer) {
VELOX_CHECK_NOT_NULL(writer_);
}

Expand Down
10 changes: 6 additions & 4 deletions velox/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ bool isLeftNullAwareJoinWithFilter(
class HashJoinMemoryReclaimer final : public MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
std::shared_ptr<HashJoinBridge> joinBridge) {
std::shared_ptr<HashJoinBridge> joinBridge,
int32_t priority) {
return std::unique_ptr<memory::MemoryReclaimer>(
new HashJoinMemoryReclaimer(joinBridge));
new HashJoinMemoryReclaimer(joinBridge, priority));
}

uint64_t reclaim(
Expand All @@ -187,8 +188,9 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer {

private:
explicit HashJoinMemoryReclaimer(
const std::shared_ptr<HashJoinBridge>& joinBridge)
: MemoryReclaimer(), joinBridge_(joinBridge) {}
const std::shared_ptr<HashJoinBridge>& joinBridge,
int32_t priority)
: MemoryReclaimer(priority), joinBridge_(joinBridge) {}
std::weak_ptr<HashJoinBridge> joinBridge_;
};

Expand Down
Loading

0 comments on commit d5693b4

Please sign in to comment.