Skip to content

Commit

Permalink
Add stats collection to memory reclaimer
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 4, 2023
1 parent f5bbde6 commit 52ca239
Show file tree
Hide file tree
Showing 23 changed files with 97 additions and 38 deletions.
23 changes: 16 additions & 7 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ bool MemoryReclaimer::reclaimableBytes(
return reclaimable;
}

uint64_t MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes) {
uint64_t
MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats) {
if (pool->kind() == MemoryPool::Kind::kLeaf) {
return 0;
}
Expand Down Expand Up @@ -214,7 +215,7 @@ uint64_t MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes) {

uint64_t reclaimedBytes{0};
for (const auto& candidate : candidates) {
const auto bytes = candidate.pool->reclaim(targetBytes);
const auto bytes = candidate.pool->reclaim(targetBytes, stats);
reclaimedBytes += bytes;
if (targetBytes != 0) {
if (bytes >= targetBytes) {
Expand Down Expand Up @@ -254,7 +255,8 @@ MemoryArbitrator::Stats::Stats(
uint64_t _numReclaimedBytes,
uint64_t _maxCapacityBytes,
uint64_t _freeCapacityBytes,
uint64_t _reclaimTimeUs)
uint64_t _reclaimTimeUs,
uint64_t _numNonReclaimableAttempts)
: numRequests(_numRequests),
numSucceeded(_numSucceeded),
numAborted(_numAborted),
Expand All @@ -265,15 +267,17 @@ MemoryArbitrator::Stats::Stats(
numReclaimedBytes(_numReclaimedBytes),
maxCapacityBytes(_maxCapacityBytes),
freeCapacityBytes(_freeCapacityBytes),
reclaimTimeUs(_reclaimTimeUs) {}
reclaimTimeUs(_reclaimTimeUs),
numNonReclaimableAttempts(_numNonReclaimableAttempts) {}

std::string MemoryArbitrator::Stats::toString() const {
return fmt::format(
"STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} reclaimedMemory {} maxCapacity {} freeCapacity {}]",
"STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} numNonReclaimableAttempts {} queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} reclaimedMemory {} maxCapacity {} freeCapacity {}]",
numRequests,
numSucceeded,
numAborted,
numFailures,
numNonReclaimableAttempts,
succinctMicros(queueTimeUs),
succinctMicros(arbitrationTimeUs),
succinctMicros(reclaimTimeUs),
Expand All @@ -297,6 +301,8 @@ MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-(
result.maxCapacityBytes = maxCapacityBytes;
result.freeCapacityBytes = freeCapacityBytes;
result.reclaimTimeUs = reclaimTimeUs - other.reclaimTimeUs;
result.numNonReclaimableAttempts =
numNonReclaimableAttempts - other.numNonReclaimableAttempts;
return result;
}

Expand All @@ -312,7 +318,8 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const {
numReclaimedBytes,
maxCapacityBytes,
freeCapacityBytes,
reclaimTimeUs) ==
reclaimTimeUs,
numNonReclaimableAttempts) ==
std::tie(
other.numRequests,
other.numSucceeded,
Expand All @@ -324,7 +331,8 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const {
other.numReclaimedBytes,
other.maxCapacityBytes,
other.freeCapacityBytes,
other.reclaimTimeUs);
other.reclaimTimeUs,
other.numNonReclaimableAttempts);
}

bool MemoryArbitrator::Stats::operator!=(const Stats& other) const {
Expand Down Expand Up @@ -355,6 +363,7 @@ bool MemoryArbitrator::Stats::operator<(const Stats& other) const {
UPDATE_COUNTER(numShrunkBytes);
UPDATE_COUNTER(numReclaimedBytes);
UPDATE_COUNTER(reclaimTimeUs);
UPDATE_COUNTER(numNonReclaimableAttempts);
#undef UPDATE_COUNTER
VELOX_CHECK(
!((gtCount > 0) && (ltCount > 0)),
Expand Down
15 changes: 13 additions & 2 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ class MemoryArbitrator {
/// The sum of all reclaim operation durations during arbitration in
/// microseconds.
uint64_t reclaimTimeUs{0};
/// The total number of times of the reclaim attempts that end up failing
/// due to reclaiming at non-reclaimable stage.
uint64_t numNonReclaimableAttempts{0};

Stats(
uint64_t _numRequests,
Expand All @@ -188,7 +191,8 @@ class MemoryArbitrator {
uint64_t _numReclaimedBytes,
uint64_t _maxCapacityBytes,
uint64_t _freeCapacityBytes,
uint64_t _reclaimTimeUs);
uint64_t _reclaimTimeUs,
uint64_t _numNonReclaimableAttempts);

Stats() = default;

Expand Down Expand Up @@ -252,6 +256,12 @@ FOLLY_ALWAYS_INLINE std::ostream& operator<<(
/// through techniques such as disks spilling.
class MemoryReclaimer {
public:
struct Stats {
/// The total number of times of the reclaim attempts that end up failing
/// due to reclaiming at non-reclaimable stage.
std::atomic<uint64_t> numNonReclaimableAttempts{0};
};

virtual ~MemoryReclaimer() = default;

static std::unique_ptr<MemoryReclaimer> create();
Expand Down Expand Up @@ -286,7 +296,8 @@ class MemoryReclaimer {
/// memory bytes but there is no guarantees. If 'targetBytes' is zero, then it
/// reclaims all the reclaimable memory from the memory 'pool'. The function
/// returns the actual reclaimed memory bytes.
virtual uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes);
virtual uint64_t
reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats);

/// Invoked by the memory arbitrator to abort memory 'pool' and the associated
/// query execution when encounters non-recoverable memory reclaim error or
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,11 +923,13 @@ bool MemoryPoolImpl::reclaimableBytes(uint64_t& reclaimableBytes) const {
return reclaimer()->reclaimableBytes(*this, reclaimableBytes);
}

uint64_t MemoryPoolImpl::reclaim(uint64_t targetBytes) {
uint64_t MemoryPoolImpl::reclaim(
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {
if (reclaimer() == nullptr) {
return 0;
}
return reclaimer()->reclaim(this, targetBytes);
return reclaimer()->reclaim(this, targetBytes, stats);
}

void MemoryPoolImpl::enterArbitration() {
Expand Down
7 changes: 5 additions & 2 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
/// noop if the reclaimer is not set, otherwise invoke the reclaimer's
/// corresponding method. The function returns the actually freed capacity
/// from the root of this memory pool.
virtual uint64_t reclaim(uint64_t targetBytes) = 0;
virtual uint64_t reclaim(
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) = 0;

/// Invoked by the memory arbitrator to abort a root memory pool. The function
/// forwards the request to the corresponding query object to abort its
Expand Down Expand Up @@ -627,7 +629,8 @@ class MemoryPoolImpl : public MemoryPool {

bool reclaimableBytes(uint64_t& reclaimableBytes) const override;

uint64_t reclaim(uint64_t targetBytes) override;
uint64_t reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
override;

uint64_t shrink(uint64_t targetBytes = 0) override;

Expand Down
3 changes: 2 additions & 1 deletion velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ uint64_t SharedArbitrator::reclaim(
try {
freedBytes = pool->shrink(targetBytes);
if (freedBytes < targetBytes) {
pool->reclaim(targetBytes - freedBytes);
pool->reclaim(targetBytes - freedBytes, reclaimerStats_);
}
} catch (const std::exception& e) {
VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool "
Expand Down Expand Up @@ -492,6 +492,7 @@ MemoryArbitrator::Stats SharedArbitrator::statsLocked() const {
stats.maxCapacityBytes = capacity_;
stats.freeCapacityBytes = freeCapacity_;
stats.reclaimTimeUs = reclaimTimeUs_;
stats.numNonReclaimableAttempts = reclaimerStats_.numNonReclaimableAttempts;
return stats;
}

Expand Down
1 change: 1 addition & 0 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,6 @@ class SharedArbitrator : public MemoryArbitrator {
tsan_atomic<uint64_t> numShrunkBytes_{0};
tsan_atomic<uint64_t> numReclaimedBytes_{0};
tsan_atomic<uint64_t> reclaimTimeUs_{0};
MemoryReclaimer::Stats reclaimerStats_;
};
} // namespace facebook::velox::memory
6 changes: 4 additions & 2 deletions velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,10 @@ class MockLeafMemoryReclaimer : public MemoryReclaimer {
return true;
}

uint64_t reclaim(MemoryPool* /*unused*/, uint64_t targetBytes) noexcept
override {
uint64_t reclaim(
MemoryPool* /*unused*/,
uint64_t targetBytes,
Stats& stats) noexcept override {
std::lock_guard<std::mutex> l(mu_);
uint64_t reclaimedBytes{0};
while (!allocations_.empty() &&
Expand Down
3 changes: 2 additions & 1 deletion velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ class MockMemoryOperator {
return op_->reclaimableBytes(pool, reclaimableBytes);
}

uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes) override {
uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats)
override {
++numReclaims_;
if (!reclaimable_) {
return 0;
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ class FakeMemoryOperator : public Operator {
return canReclaim_;
}

void reclaim(uint64_t targetBytes) override {
void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
override {
VELOX_CHECK(canReclaim());
auto* driver = operatorCtx_->driver();
VELOX_CHECK(!driver->state().isOnThread() || driver->state().isSuspended);
Expand Down Expand Up @@ -575,7 +576,8 @@ class TestMemoryReclaimer : public MemoryReclaimer {
TestMemoryReclaimer(std::function<void(MemoryPool*)> reclaimCb)
: reclaimCb_(std::move(reclaimCb)) {}

uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes) override {
uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats)
override {
if (pool->kind() == MemoryPool::Kind::kLeaf) {
return 0;
}
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ bool HashAggregation::isFinished() {
return finished_;
}

void HashAggregation::reclaim(uint64_t targetBytes) {
void HashAggregation::reclaim(
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {
VELOX_CHECK(canReclaim());
auto* driver = operatorCtx_->driver();

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/HashAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class HashAggregation : public Operator {

bool isFinished() override;

void reclaim(uint64_t targetBytes) override;
void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
override;

void close() override;

Expand Down
4 changes: 3 additions & 1 deletion velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,9 @@ bool HashBuild::testingTriggerSpill() {
spillConfig()->testSpillPct;
}

void HashBuild::reclaim(uint64_t /*unused*/) {
void HashBuild::reclaim(
uint64_t /*unused*/,
memory::MemoryReclaimer::Stats& stats) {
VELOX_CHECK(canReclaim());
auto* driver = operatorCtx_->driver();

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class HashBuild final : public Operator {

bool isFinished() override;

void reclaim(uint64_t targetBytes) override;
void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
override;

void abort() override;

Expand Down
7 changes: 4 additions & 3 deletions velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,19 @@ bool isLeftNullAwareJoinWithFilter(

uint64_t HashJoinMemoryReclaimer::reclaim(
memory::MemoryPool* pool,
uint64_t targetBytes) {
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {
uint64_t reclaimedBytes{0};
pool->visitChildren(
[&targetBytes, &reclaimedBytes](memory::MemoryPool* child) {
[&targetBytes, &reclaimedBytes, &stats](memory::MemoryPool* child) {
VELOX_CHECK_EQ(child->kind(), memory::MemoryPool::Kind::kLeaf);
// The hash probe operator do not support memory reclaim.
if (!isHashBuildMemoryPool(*child)) {
return true;
}
// We only need to reclaim from any one of the hash build operators
// which will reclaim from all the peer hash build operators.
reclaimedBytes = child->reclaim(targetBytes);
reclaimedBytes = child->reclaim(targetBytes, stats);
return false;
});
return reclaimedBytes;
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer {
new HashJoinMemoryReclaimer());
}

uint64_t reclaim(memory::MemoryPool* pool, uint64_t targetBytes) final;
uint64_t reclaim(
memory::MemoryPool* pool,
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) final;

private:
HashJoinMemoryReclaimer() : MemoryReclaimer() {}
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ bool Operator::MemoryReclaimer::reclaimableBytes(

uint64_t Operator::MemoryReclaimer::reclaim(
memory::MemoryPool* pool,
uint64_t targetBytes) {
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {
std::shared_ptr<Driver> driver = ensureDriver();
if (FOLLY_UNLIKELY(driver == nullptr)) {
return 0;
Expand All @@ -575,7 +576,7 @@ uint64_t Operator::MemoryReclaimer::reclaim(
TestValue::adjust(
"facebook::velox::exec::Operator::MemoryReclaimer::reclaim", pool);

op_->reclaim(targetBytes);
op_->reclaim(targetBytes, stats);
return pool->shrink(targetBytes);
}

Expand Down
9 changes: 7 additions & 2 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,9 @@ class Operator : public BaseRuntimeStatWriter {
/// NOTE: this method doesn't return the actually freed memory bytes. The
/// caller need to claim the actually freed memory space by shrinking the
/// associated root memory pool's capacity accordingly.
virtual void reclaim(uint64_t targetBytes) {}
virtual void reclaim(
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {}

const core::PlanNodeId& planNodeId() const {
return operatorCtx_->planNodeId();
Expand Down Expand Up @@ -559,7 +561,10 @@ class Operator : public BaseRuntimeStatWriter {
const memory::MemoryPool& pool,
uint64_t& reclaimableBytes) const override;

uint64_t reclaim(memory::MemoryPool* pool, uint64_t targetBytes) override;
uint64_t reclaim(
memory::MemoryPool* pool,
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) override;

void abort(memory::MemoryPool* pool, const std::exception_ptr& /* error */)
override;
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ void OrderBy::addInput(RowVectorPtr input) {
sortBuffer_->addInput(input);
}

void OrderBy::reclaim(uint64_t targetBytes) {
void OrderBy::reclaim(
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {
VELOX_CHECK(canReclaim());
auto* driver = operatorCtx_->driver();

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/OrderBy.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class OrderBy : public Operator {
return finished_;
}

void reclaim(uint64_t targetBytes) override;
void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
override;

void abort() override;

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/TableWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ bool TableWriter::MemoryReclaimer::reclaimableBytes(

uint64_t TableWriter::MemoryReclaimer::reclaim(
memory::MemoryPool* pool,
uint64_t /*unused*/) {
uint64_t /*unused*/,
memory::MemoryReclaimer::Stats& /*unused*/) {
VELOX_CHECK(!pool->isLeaf());
return 0;
}
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/TableWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ class TableWriter : public Operator {
const memory::MemoryPool& pool,
uint64_t& reclaimableBytes) const override;

uint64_t reclaim(memory::MemoryPool* pool, uint64_t targetBytes) override;
uint64_t reclaim(
memory::MemoryPool* pool,
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) override;

void abort(memory::MemoryPool* pool, const std::exception_ptr& /* error */)
override;
Expand Down
Loading

0 comments on commit 52ca239

Please sign in to comment.