Skip to content

Commit

Permalink
Add reserved memory capacity pool in arbitrator
Browse files Browse the repository at this point in the history
To ensure small queries having enough memory capacity to run through without the interference
of the other large queries by slow memory arbitration. We reserve a small percentage of memory
capacity which is only used for memory reservation when we start a query execution to ensure that
each query has a minimal amount of memory capacity to run. Correspondingly, when we free unused
memory capacity from a query memory pool, we make sure the reclaimed memory pool still has the
minimal amount of capacity after the shrink.
  • Loading branch information
xiaoxmeng committed Apr 17, 2024
1 parent a3b4849 commit 6ed5e68
Show file tree
Hide file tree
Showing 14 changed files with 461 additions and 158 deletions.
4 changes: 4 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ void registerVeloxMetrics() {
DEFINE_METRIC(
kMetricArbitratorFreeCapacityBytes, facebook::velox::StatType::AVG);

DEFINE_METRIC(
kMetricArbitratorFreeReservedCapacityBytes,
facebook::velox::StatType::AVG);

// Tracks the leaf memory pool usage leak in bytes.
DEFINE_METRIC(
kMetricMemoryPoolUsageLeakBytes, facebook::velox::StatType::SUM);
Expand Down
3 changes: 3 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ constexpr folly::StringPiece kMetricArbitratorArbitrationTimeMs{
constexpr folly::StringPiece kMetricArbitratorFreeCapacityBytes{
"velox.arbitrator_free_capacity_bytes"};

constexpr folly::StringPiece kMetricArbitratorFreeReservedCapacityBytes{
"velox.arbitrator_free_reserved_capacity_bytes"};

constexpr folly::StringPiece kMetricDriverYieldCount{
"velox.driver_yield_count"};

Expand Down
4 changes: 3 additions & 1 deletion velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ std::unique_ptr<MemoryArbitrator> createArbitrator(
{.kind = options.arbitratorKind,
.capacity =
std::min(options.arbitratorCapacity, options.allocatorCapacity),
.reservedCapacity = options.arbitratorReservedCapacity,
.memoryPoolReservedCapacity = options.memoryPoolReservedCapacity,
.memoryPoolTransferCapacity = options.memoryPoolTransferCapacity,
.memoryReclaimWaitMs = options.memoryReclaimWaitMs,
.arbitrationStateCheckCb = options.arbitrationStateCheckCb,
Expand Down Expand Up @@ -249,7 +251,7 @@ void MemoryManager::dropPool(MemoryPool* pool) {
}
pools_.erase(it);
VELOX_DCHECK_EQ(pool->currentBytes(), 0);
arbitrator_->shrinkCapacity(pool, 0);
arbitrator_->releaseCapacity(pool);
}

MemoryPool& MemoryManager::deprecatedSharedLeafPool() {
Expand Down
14 changes: 12 additions & 2 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,28 @@ struct MemoryManagerOptions {
/// reservation capacity for system usage.
int64_t arbitratorCapacity{kMaxMemory};

/// Memory capacity reserved to ensure that a query has minimal memory
/// capacity to run. This capacity should be less than 'arbitratorCapacity'.
/// A query's minimal memory capacity is defined by
/// 'memoryPoolReservedCapacity'.
int64_t arbitratorReservedCapacity{0};

/// The string kind of memory arbitrator used in the memory manager.
///
/// NOTE: the arbitrator will only be created if its kind is set explicitly.
/// Otherwise MemoryArbitrator::create returns a nullptr.
std::string arbitratorKind{};

/// The initial memory capacity to reserve for a newly created memory pool.
/// The initial memory capacity to reserve for a newly created query memory
/// pool.
uint64_t memoryPoolInitCapacity{256 << 20};

/// The minimal memory capacity reserved for a query memory pool to run.
uint64_t memoryPoolReservedCapacity{64 << 20};

/// The minimal memory capacity to transfer out of or into a memory pool
/// during the memory arbitration.
uint64_t memoryPoolTransferCapacity{32 << 20};
uint64_t memoryPoolTransferCapacity{128 << 20};

/// Specifies the max time to wait for memory reclaim by arbitration. The
/// memory reclaim might fail if the max wait time has exceeded. If it is
Expand Down
12 changes: 9 additions & 3 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class NoopArbitrator : public MemoryArbitrator {

// Noop arbitrator has no memory capacity limit so no operation needed for
// memory pool capacity release.
uint64_t shrinkCapacity(MemoryPool* pool, uint64_t targetBytes) override {
uint64_t releaseCapacity(MemoryPool* pool) override {
// No-op
return 0;
}
Expand Down Expand Up @@ -307,6 +307,7 @@ MemoryArbitrator::Stats::Stats(
uint64_t _numReclaimedBytes,
uint64_t _maxCapacityBytes,
uint64_t _freeCapacityBytes,
uint64_t _freeReservedCapacityBytes,
uint64_t _reclaimTimeUs,
uint64_t _numNonReclaimableAttempts,
uint64_t _numReserves,
Expand All @@ -321,6 +322,7 @@ MemoryArbitrator::Stats::Stats(
numReclaimedBytes(_numReclaimedBytes),
maxCapacityBytes(_maxCapacityBytes),
freeCapacityBytes(_freeCapacityBytes),
freeReservedCapacityBytes(_freeReservedCapacityBytes),
reclaimTimeUs(_reclaimTimeUs),
numNonReclaimableAttempts(_numNonReclaimableAttempts),
numReserves(_numReserves),
Expand All @@ -331,7 +333,7 @@ std::string MemoryArbitrator::Stats::toString() const {
"STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} "
"numNonReclaimableAttempts {} numReserves {} numReleases {} "
"queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} "
"reclaimedMemory {} maxCapacity {} freeCapacity {}]",
"reclaimedMemory {} maxCapacity {} freeCapacity {} freeReservedCapacity {}]",
numRequests,
numSucceeded,
numAborted,
Expand All @@ -345,7 +347,8 @@ std::string MemoryArbitrator::Stats::toString() const {
succinctBytes(numShrunkBytes),
succinctBytes(numReclaimedBytes),
succinctBytes(maxCapacityBytes),
succinctBytes(freeCapacityBytes));
succinctBytes(freeCapacityBytes),
succinctBytes(freeReservedCapacityBytes));
}

MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-(
Expand All @@ -361,6 +364,7 @@ MemoryArbitrator::Stats MemoryArbitrator::Stats::operator-(
result.numReclaimedBytes = numReclaimedBytes - other.numReclaimedBytes;
result.maxCapacityBytes = maxCapacityBytes;
result.freeCapacityBytes = freeCapacityBytes;
result.freeReservedCapacityBytes = freeReservedCapacityBytes;
result.reclaimTimeUs = reclaimTimeUs - other.reclaimTimeUs;
result.numNonReclaimableAttempts =
numNonReclaimableAttempts - other.numNonReclaimableAttempts;
Expand All @@ -381,6 +385,7 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const {
numReclaimedBytes,
maxCapacityBytes,
freeCapacityBytes,
freeReservedCapacityBytes,
reclaimTimeUs,
numNonReclaimableAttempts,
numReserves,
Expand All @@ -396,6 +401,7 @@ bool MemoryArbitrator::Stats::operator==(const Stats& other) const {
other.numReclaimedBytes,
other.maxCapacityBytes,
other.freeCapacityBytes,
other.freeReservedCapacityBytes,
other.reclaimTimeUs,
other.numNonReclaimableAttempts,
other.numReserves,
Expand Down
32 changes: 24 additions & 8 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,16 @@ class MemoryArbitrator {
/// manager.
int64_t capacity;

/// The memory capacity reserved to ensure each running query has minimal
/// capacity of 'memoryPoolReservedCapacity' to run.
int64_t reservedCapacity{4LL << 30};

/// The minimal amount of memory capacity reserved for each query to run.
uint64_t memoryPoolReservedCapacity{64UL << 20};

/// The minimal memory capacity to transfer out of or into a memory pool
/// during the memory arbitration.
uint64_t memoryPoolTransferCapacity{32 << 20};
uint64_t memoryPoolTransferCapacity{128 << 20};

/// Specifies the max time to wait for memory reclaim by arbitration. The
/// memory reclaim might fail if the max time has exceeded. This prevents
Expand Down Expand Up @@ -142,12 +149,6 @@ class MemoryArbitrator {
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
uint64_t targetBytes) = 0;

/// Invoked by the memory manager to shrink up to 'targetBytes' free capacity
/// from a memory 'pool', and returns them back to the arbitrator. If
/// 'targetBytes' is zero, we shrink all the free capacity from the memory
/// pool. The function returns the actual freed capacity from 'pool'.
virtual uint64_t shrinkCapacity(MemoryPool* pool, uint64_t targetBytes) = 0;

/// Invoked by the memory manager to shrink memory capacity from a given list
/// of memory pools by reclaiming free and used memory. The freed memory
/// capacity is given back to the arbitrator. If 'targetBytes' is zero, then
Expand All @@ -163,6 +164,12 @@ class MemoryArbitrator {
bool allowSpill = true,
bool allowAbort = false) = 0;

/// Invoked by the memory manager to shrink up to 'targetBytes' free capacity
/// from a memory 'pool', and returns them back to the arbitrator. If
/// 'targetBytes' is zero, we shrink all the free capacity from the memory
/// pool. The function returns the actual freed capacity from 'pool'.
virtual uint64_t releaseCapacity(MemoryPool* pool) = 0;

/// The internal execution stats of the memory arbitrator.
struct Stats {
/// The number of arbitration requests.
Expand All @@ -186,6 +193,8 @@ class MemoryArbitrator {
uint64_t maxCapacityBytes{0};
/// The free memory capacity in bytes.
uint64_t freeCapacityBytes{0};
/// The free reserved memory capacity in bytes.
uint64_t freeReservedCapacityBytes{0};
/// The sum of all reclaim operation durations during arbitration in
/// microseconds.
uint64_t reclaimTimeUs{0};
Expand All @@ -208,6 +217,7 @@ class MemoryArbitrator {
uint64_t _numReclaimedBytes,
uint64_t _maxCapacityBytes,
uint64_t _freeCapacityBytes,
uint64_t _freeReservedCapacityBytes,
uint64_t _reclaimTimeUs,
uint64_t _numNonReclaimableAttempts,
uint64_t _numReserves,
Expand Down Expand Up @@ -239,12 +249,18 @@ class MemoryArbitrator {
protected:
explicit MemoryArbitrator(const Config& config)
: capacity_(config.capacity),
reservedCapacity_(config.reservedCapacity),
memoryPoolReservedCapacity_(config.memoryPoolReservedCapacity),
memoryPoolTransferCapacity_(config.memoryPoolTransferCapacity),
memoryReclaimWaitMs_(config.memoryReclaimWaitMs),
arbitrationStateCheckCb_(config.arbitrationStateCheckCb),
checkUsageLeak_(config.checkUsageLeak) {}
checkUsageLeak_(config.checkUsageLeak) {
VELOX_CHECK_LE(reservedCapacity_, capacity_);
}

const uint64_t capacity_;
const uint64_t reservedCapacity_;
const uint64_t memoryPoolReservedCapacity_;
const uint64_t memoryPoolTransferCapacity_;
const uint64_t memoryReclaimWaitMs_;
const MemoryArbitrationStateCheckCB arbitrationStateCheckCb_;
Expand Down
Loading

0 comments on commit 6ed5e68

Please sign in to comment.