Skip to content

Commit

Permalink
Add reserved memory capacity in arbitrator
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Apr 16, 2024
1 parent a3b4849 commit 4aab044
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 76 deletions.
4 changes: 4 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ void registerVeloxMetrics() {
DEFINE_METRIC(
kMetricArbitratorFreeCapacityBytes, facebook::velox::StatType::AVG);

DEFINE_METRIC(
kMetricArbitratorFreeReservedCapacityBytes,
facebook::velox::StatType::AVG);

// Tracks the leaf memory pool usage leak in bytes.
DEFINE_METRIC(
kMetricMemoryPoolUsageLeakBytes, facebook::velox::StatType::SUM);
Expand Down
3 changes: 3 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ constexpr folly::StringPiece kMetricArbitratorArbitrationTimeMs{
constexpr folly::StringPiece kMetricArbitratorFreeCapacityBytes{
"velox.arbitrator_free_capacity_bytes"};

constexpr folly::StringPiece kMetricArbitratorFreeReservedCapacityBytes{
"velox.arbitrator_free_reserved_capacity_bytes"};

constexpr folly::StringPiece kMetricDriverYieldCount{
"velox.driver_yield_count"};

Expand Down
5 changes: 3 additions & 2 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ std::unique_ptr<MemoryArbitrator> createArbitrator(
{.kind = options.arbitratorKind,
.capacity =
std::min(options.arbitratorCapacity, options.allocatorCapacity),
.reservedCapacity = options.arbitratorReservedCapacity,
.memoryPoolTransferCapacity = options.memoryPoolTransferCapacity,
.memoryReclaimWaitMs = options.memoryReclaimWaitMs,
.arbitrationStateCheckCb = options.arbitrationStateCheckCb,
Expand Down Expand Up @@ -208,7 +209,7 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
options);
pools_.emplace(poolName, pool);
VELOX_CHECK_EQ(pool->capacity(), 0);
arbitrator_->growCapacity(
arbitrator_->reserveCapacity(
pool.get(), std::min<uint64_t>(poolInitCapacity_, maxCapacity));
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricMemoryPoolInitialCapacityBytes, pool->capacity());
Expand Down Expand Up @@ -249,7 +250,7 @@ void MemoryManager::dropPool(MemoryPool* pool) {
}
pools_.erase(it);
VELOX_DCHECK_EQ(pool->currentBytes(), 0);
arbitrator_->shrinkCapacity(pool, 0);
arbitrator_->releaseCapacity(pool);
}

MemoryPool& MemoryManager::deprecatedSharedLeafPool() {
Expand Down
14 changes: 12 additions & 2 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,28 @@ struct MemoryManagerOptions {
/// reservation capacity for system usage.
int64_t arbitratorCapacity{kMaxMemory};

/// Memory capacity reserved to ensure that a query has minimal memory
/// capacity to run. This capacity should be less than 'arbitratorCapacity'.
/// A query's minimal memory capacity is defined by
/// 'memoryPoolReservedCapacity'.
int64_t arbitratorReservedCapacity{0};

/// The string kind of memory arbitrator used in the memory manager.
///
/// NOTE: the arbitrator will only be created if its kind is set explicitly.
/// Otherwise MemoryArbitrator::create returns a nullptr.
std::string arbitratorKind{};

/// The initial memory capacity to reserve for a newly created memory pool.
/// The initial memory capacity to reserve for a newly created query memory
/// pool.
uint64_t memoryPoolInitCapacity{256 << 20};

/// The minimal memory capacity reserved for a query memory pool to run.
uint64_t memoryPoolReservedCapacity{64 << 20};

/// The minimal memory capacity to transfer out of or into a memory pool
/// during the memory arbitration.
uint64_t memoryPoolTransferCapacity{32 << 20};
uint64_t memoryPoolTransferCapacity{128 << 20};

/// Specifies the max time to wait for memory reclaim by arbitration. The
/// memory reclaim might fail if the max wait time has exceeded. If it is
Expand Down
14 changes: 10 additions & 4 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class NoopArbitrator : public MemoryArbitrator {

// Noop arbitrator has no memory capacity limit so no operation needed for
// memory pool capacity reserve.
uint64_t growCapacity(MemoryPool* pool, uint64_t /*unused*/) override {
uint64_t reserveCapacity(MemoryPool* pool, uint64_t /*unused*/) override {
pool->grow(pool->maxCapacity());
return pool->maxCapacity();
}
Expand All @@ -109,7 +109,7 @@ class NoopArbitrator : public MemoryArbitrator {

// Noop arbitrator has no memory capacity limit so no operation needed for
// memory pool capacity release.
uint64_t shrinkCapacity(MemoryPool* pool, uint64_t targetBytes) override {
uint64_t releaseCapacity(MemoryPool* pool) override {
// No-op
return 0;
}
Expand Down Expand Up @@ -307,6 +307,7 @@ MemoryArbitrator::Stats::Stats(
uint64_t _numReclaimedBytes,
uint64_t _maxCapacityBytes,
uint64_t _freeCapacityBytes,
uint64_t _freeReservedCapacityBytes,
uint64_t _reclaimTimeUs,
uint64_t _numNonReclaimableAttempts,
uint64_t _numReserves,
Expand All @@ -321,6 +322,7 @@ MemoryArbitrator::Stats::Stats(
numReclaimedBytes(_numReclaimedBytes),
maxCapacityBytes(_maxCapacityBytes),
freeCapacityBytes(_freeCapacityBytes),
freeReservedCapacityBytes(_freeReservedCapacityBytes),
reclaimTimeUs(_reclaimTimeUs),
numNonReclaimableAttempts(_numNonReclaimableAttempts),
numReserves(_numReserves),
Expand All @@ -331,7 +333,7 @@ std::string MemoryArbitrator::Stats::toString() const {
"STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} "
"numNonReclaimableAttempts {} numReserves {} numReleases {} "
"queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} "
"reclaimedMemory {} maxCapacity {} freeCapacity {}]",
"reclaimedMemory {} maxCapacity {} freeCapacity {} freeReservedCapacity {}]",
numRequests,
numSucceeded,
numAborted,
Expand All @@ -345,7 +347,8 @@ std::string MemoryArbitrator::Stats::toString() const {
succinctBytes(numShrunkBytes),
succinctBytes(numReclaimedBytes),
succinctBytes(maxCapacityBytes),
succinctBytes(freeCapacityBytes));
succinctBytes(freeCapacityBytes),
succinctBytes(freeReservedCapacityBytes));
}

MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-(
Expand All @@ -361,6 +364,7 @@ MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-(
result.numReclaimedBytes = numReclaimedBytes - other.numReclaimedBytes;
result.maxCapacityBytes = maxCapacityBytes;
result.freeCapacityBytes = freeCapacityBytes;
result.freeReservedCapacityBytes = freeReservedCapacityBytes;
result.reclaimTimeUs = reclaimTimeUs - other.reclaimTimeUs;
result.numNonReclaimableAttempts =
numNonReclaimableAttempts - other.numNonReclaimableAttempts;
Expand All @@ -381,6 +385,7 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const {
numReclaimedBytes,
maxCapacityBytes,
freeCapacityBytes,
freeReservedCapacityBytes,
reclaimTimeUs,
numNonReclaimableAttempts,
numReserves,
Expand All @@ -396,6 +401,7 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const {
other.numReclaimedBytes,
other.maxCapacityBytes,
other.freeCapacityBytes,
other.freeReservedCapacityBytes,
other.reclaimTimeUs,
other.numNonReclaimableAttempts,
other.numReserves,
Expand Down
32 changes: 23 additions & 9 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ class MemoryArbitrator {
/// manager.
int64_t capacity;

int64_t reservedCapacity{6LL << 30};

/// The min reserved capacity for each query.
uint64_t memoryPoolReservedCapacity{64UL << 20};

/// The minimal memory capacity to transfer out of or into a memory pool
/// during the memory arbitration.
uint64_t memoryPoolTransferCapacity{32 << 20};
uint64_t memoryPoolTransferCapacity{128 << 20};

/// Specifies the max time to wait for memory reclaim by arbitration. The
/// memory reclaim might fail if the max time has exceeded. This prevents
Expand Down Expand Up @@ -123,7 +128,13 @@ class MemoryArbitrator {
/// grow the memory pool's capacity based on the free available memory
/// capacity in the arbitrator, and returns the actual growed capacity in
/// bytes.
virtual uint64_t growCapacity(MemoryPool* pool, uint64_t bytes) = 0;
virtual uint64_t reserveCapacity(MemoryPool* pool, uint64_t bytes) = 0;

/// Invoked by the memory manager to shrink up to 'targetBytes' free capacity
/// from a memory 'pool', and returns them back to the arbitrator. If
/// 'targetBytes' is zero, we shrink all the free capacity from the memory
/// pool. The function returns the actual freed capacity from 'pool'.
virtual uint64_t releaseCapacity(MemoryPool* pool) = 0;

/// Invoked by the memory manager to grow a memory pool's capacity.
/// 'pool' is the memory pool to request to grow. 'candidates' is a list
Expand All @@ -142,12 +153,6 @@ class MemoryArbitrator {
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) = 0;

/// Invoked by the memory manager to shrink up to 'targetBytes' free capacity
/// from a memory 'pool', and returns them back to the arbitrator. If
/// 'targetBytes' is zero, we shrink all the free capacity from the memory
/// pool. The function returns the actual freed capacity from 'pool'.
virtual uint64_t shrinkCapacity(MemoryPool* pool, uint64_t targetBytes) = 0;

/// Invoked by the memory manager to shrink memory capacity from a given list
/// of memory pools by reclaiming free and used memory. The freed memory
/// capacity is given back to the arbitrator. If 'targetBytes' is zero, then
Expand Down Expand Up @@ -186,6 +191,8 @@ class MemoryArbitrator {
uint64_t maxCapacityBytes{0};
/// The free memory capacity in bytes.
uint64_t freeCapacityBytes{0};
/// The free memory capacity in bytes.
uint64_t freeReservedCapacityBytes{0};
/// The sum of all reclaim operation durations during arbitration in
/// microseconds.
uint64_t reclaimTimeUs{0};
Expand All @@ -208,6 +215,7 @@ class MemoryArbitrator {
uint64_t _numReclaimedBytes,
uint64_t _maxCapacityBytes,
uint64_t _freeCapacityBytes,
uint64_t _freeReservedCapacityBytes,
uint64_t _reclaimTimeUs,
uint64_t _numNonReclaimableAttempts,
uint64_t _numReserves,
Expand Down Expand Up @@ -239,12 +247,18 @@ class MemoryArbitrator {
protected:
explicit MemoryArbitrator(const Config& config)
: capacity_(config.capacity),
reservedCapacity_(config.reservedCapacity),
memoryPoolReservedCapacity_(config.memoryPoolReservedCapacity),
memoryPoolTransferCapacity_(config.memoryPoolTransferCapacity),
memoryReclaimWaitMs_(config.memoryReclaimWaitMs),
arbitrationStateCheckCb_(config.arbitrationStateCheckCb),
checkUsageLeak_(config.checkUsageLeak) {}
checkUsageLeak_(config.checkUsageLeak) {
VELOX_CHECK_LE(reservedCapacity_, capacity_);
}

const uint64_t capacity_;
const uint64_t reservedCapacity_;
const uint64_t memoryPoolReservedCapacity_;
const uint64_t memoryPoolTransferCapacity_;
const uint64_t memoryReclaimWaitMs_;
const MemoryArbitrationStateCheckCB arbitrationStateCheckCb_;
Expand Down
Loading

0 comments on commit 4aab044

Please sign in to comment.