Skip to content

Commit

Permalink
Add stats collection to memory reclaimer
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 6, 2023
1 parent f5bbde6 commit bdb72fe
Show file tree
Hide file tree
Showing 28 changed files with 296 additions and 106 deletions.
34 changes: 27 additions & 7 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ bool MemoryReclaimer::reclaimableBytes(
return reclaimable;
}

uint64_t MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes) {
uint64_t
MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats) {
if (pool->kind() == MemoryPool::Kind::kLeaf) {
return 0;
}
Expand Down Expand Up @@ -214,7 +215,7 @@ uint64_t MemoryReclaimer::reclaim(MemoryPool* pool, uint64_t targetBytes) {

uint64_t reclaimedBytes{0};
for (const auto& candidate : candidates) {
const auto bytes = candidate.pool->reclaim(targetBytes);
const auto bytes = candidate.pool->reclaim(targetBytes, stats);
reclaimedBytes += bytes;
if (targetBytes != 0) {
if (bytes >= targetBytes) {
Expand Down Expand Up @@ -243,6 +244,17 @@ void MemoryReclaimer::abort(MemoryPool* pool, const std::exception_ptr& error) {
});
}

bool MemoryReclaimer::Stats::operator==(
const MemoryReclaimer::Stats& other) const {
return std::tie(numNonReclaimableAttempts) ==
std::tie(other.numNonReclaimableAttempts);
}

bool MemoryReclaimer::Stats::operator!=(
const MemoryReclaimer::Stats& other) const {
return !(*this == other);
}

MemoryArbitrator::Stats::Stats(
uint64_t _numRequests,
uint64_t _numSucceeded,
Expand All @@ -254,7 +266,8 @@ MemoryArbitrator::Stats::Stats(
uint64_t _numReclaimedBytes,
uint64_t _maxCapacityBytes,
uint64_t _freeCapacityBytes,
uint64_t _reclaimTimeUs)
uint64_t _reclaimTimeUs,
uint64_t _numNonReclaimableAttempts)
: numRequests(_numRequests),
numSucceeded(_numSucceeded),
numAborted(_numAborted),
Expand All @@ -265,15 +278,17 @@ MemoryArbitrator::Stats::Stats(
numReclaimedBytes(_numReclaimedBytes),
maxCapacityBytes(_maxCapacityBytes),
freeCapacityBytes(_freeCapacityBytes),
reclaimTimeUs(_reclaimTimeUs) {}
reclaimTimeUs(_reclaimTimeUs),
numNonReclaimableAttempts(_numNonReclaimableAttempts) {}

std::string MemoryArbitrator::Stats::toString() const {
return fmt::format(
"STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} reclaimedMemory {} maxCapacity {} freeCapacity {}]",
"STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} numNonReclaimableAttempts {} queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} reclaimedMemory {} maxCapacity {} freeCapacity {}]",
numRequests,
numSucceeded,
numAborted,
numFailures,
numNonReclaimableAttempts,
succinctMicros(queueTimeUs),
succinctMicros(arbitrationTimeUs),
succinctMicros(reclaimTimeUs),
Expand All @@ -297,6 +312,8 @@ MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-(
result.maxCapacityBytes = maxCapacityBytes;
result.freeCapacityBytes = freeCapacityBytes;
result.reclaimTimeUs = reclaimTimeUs - other.reclaimTimeUs;
result.numNonReclaimableAttempts =
numNonReclaimableAttempts - other.numNonReclaimableAttempts;
return result;
}

Expand All @@ -312,7 +329,8 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const {
numReclaimedBytes,
maxCapacityBytes,
freeCapacityBytes,
reclaimTimeUs) ==
reclaimTimeUs,
numNonReclaimableAttempts) ==
std::tie(
other.numRequests,
other.numSucceeded,
Expand All @@ -324,7 +342,8 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const {
other.numReclaimedBytes,
other.maxCapacityBytes,
other.freeCapacityBytes,
other.reclaimTimeUs);
other.reclaimTimeUs,
other.numNonReclaimableAttempts);
}

bool MemoryArbitrator::Stats::operator!=(const Stats& other) const {
Expand Down Expand Up @@ -355,6 +374,7 @@ bool MemoryArbitrator::Stats::operator<(const Stats& other) const {
UPDATE_COUNTER(numShrunkBytes);
UPDATE_COUNTER(numReclaimedBytes);
UPDATE_COUNTER(reclaimTimeUs);
UPDATE_COUNTER(numNonReclaimableAttempts);
#undef UPDATE_COUNTER
VELOX_CHECK(
!((gtCount > 0) && (ltCount > 0)),
Expand Down
20 changes: 18 additions & 2 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ class MemoryArbitrator {
/// The sum of all reclaim operation durations during arbitration in
/// microseconds.
uint64_t reclaimTimeUs{0};
/// The total number of times of the reclaim attempts that end up failing
/// due to reclaiming at non-reclaimable stage.
uint64_t numNonReclaimableAttempts{0};

Stats(
uint64_t _numRequests,
Expand All @@ -188,7 +191,8 @@ class MemoryArbitrator {
uint64_t _numReclaimedBytes,
uint64_t _maxCapacityBytes,
uint64_t _freeCapacityBytes,
uint64_t _reclaimTimeUs);
uint64_t _reclaimTimeUs,
uint64_t _numNonReclaimableAttempts);

Stats() = default;

Expand Down Expand Up @@ -252,6 +256,17 @@ FOLLY_ALWAYS_INLINE std::ostream& operator<<(
/// through techniques such as disks spilling.
class MemoryReclaimer {
public:
/// Used to collect memory reclaim execution stats.
struct Stats {
/// The total number of times of the reclaim attempts that end up failing
/// due to reclaiming at non-reclaimable stage.
uint64_t numNonReclaimableAttempts{0};

bool operator==(const Stats& other) const;

bool operator!=(const Stats& other) const;
};

virtual ~MemoryReclaimer() = default;

static std::unique_ptr<MemoryReclaimer> create();
Expand Down Expand Up @@ -286,7 +301,8 @@ class MemoryReclaimer {
/// memory bytes but there is no guarantees. If 'targetBytes' is zero, then it
/// reclaims all the reclaimable memory from the memory 'pool'. The function
/// returns the actual reclaimed memory bytes.
virtual uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes);
virtual uint64_t
reclaim(MemoryPool* pool, uint64_t targetBytes, Stats& stats);

/// Invoked by the memory arbitrator to abort memory 'pool' and the associated
/// query execution when encounters non-recoverable memory reclaim error or
Expand Down
6 changes: 4 additions & 2 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,11 +923,13 @@ bool MemoryPoolImpl::reclaimableBytes(uint64_t& reclaimableBytes) const {
return reclaimer()->reclaimableBytes(*this, reclaimableBytes);
}

uint64_t MemoryPoolImpl::reclaim(uint64_t targetBytes) {
uint64_t MemoryPoolImpl::reclaim(
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {
if (reclaimer() == nullptr) {
return 0;
}
return reclaimer()->reclaim(this, targetBytes);
return reclaimer()->reclaim(this, targetBytes, stats);
}

void MemoryPoolImpl::enterArbitration() {
Expand Down
7 changes: 5 additions & 2 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
/// noop if the reclaimer is not set, otherwise invoke the reclaimer's
/// corresponding method. The function returns the actually freed capacity
/// from the root of this memory pool.
virtual uint64_t reclaim(uint64_t targetBytes) = 0;
virtual uint64_t reclaim(
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) = 0;

/// Invoked by the memory arbitrator to abort a root memory pool. The function
/// forwards the request to the corresponding query object to abort its
Expand Down Expand Up @@ -627,7 +629,8 @@ class MemoryPoolImpl : public MemoryPool {

bool reclaimableBytes(uint64_t& reclaimableBytes) const override;

uint64_t reclaim(uint64_t targetBytes) override;
uint64_t reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
override;

uint64_t shrink(uint64_t targetBytes = 0) override;

Expand Down
5 changes: 4 additions & 1 deletion velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,14 @@ uint64_t SharedArbitrator::reclaim(
uint64_t reclaimDurationUs{0};
uint64_t reclaimedBytes{0};
uint64_t freedBytes{0};
MemoryReclaimer::Stats reclaimerStats;
{
MicrosecondTimer reclaimTimer(&reclaimDurationUs);
const uint64_t oldCapacity = pool->capacity();
try {
freedBytes = pool->shrink(targetBytes);
if (freedBytes < targetBytes) {
pool->reclaim(targetBytes - freedBytes);
pool->reclaim(targetBytes - freedBytes, reclaimerStats);
}
} catch (const std::exception& e) {
VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool "
Expand All @@ -421,6 +422,7 @@ uint64_t SharedArbitrator::reclaim(
numReclaimedBytes_ += reclaimedBytes - freedBytes;
numShrunkBytes_ += freedBytes;
reclaimTimeUs_ += reclaimDurationUs;
numNonReclaimableAttempts_ += reclaimerStats.numNonReclaimableAttempts;
VELOX_MEM_LOG(INFO) << "Reclaimed from memory pool " << pool->name()
<< " with target of " << succinctBytes(targetBytes)
<< ", actually reclaimed " << succinctBytes(freedBytes)
Expand Down Expand Up @@ -492,6 +494,7 @@ MemoryArbitrator::Stats SharedArbitrator::statsLocked() const {
stats.maxCapacityBytes = capacity_;
stats.freeCapacityBytes = freeCapacity_;
stats.reclaimTimeUs = reclaimTimeUs_;
stats.numNonReclaimableAttempts = numNonReclaimableAttempts_;
return stats;
}

Expand Down
1 change: 1 addition & 0 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,6 @@ class SharedArbitrator : public MemoryArbitrator {
tsan_atomic<uint64_t> numShrunkBytes_{0};
tsan_atomic<uint64_t> numReclaimedBytes_{0};
tsan_atomic<uint64_t> reclaimTimeUs_{0};
tsan_atomic<uint64_t> numNonReclaimableAttempts_{0};
};
} // namespace facebook::velox::memory
Loading

0 comments on commit bdb72fe

Please sign in to comment.