Skip to content

Commit

Permalink
Fix and extend arbitration related metrics
Browse files Browse the repository at this point in the history
Fixes accounting of kMetricArbitratorLocalArbitrationCountwhich was
previously sometimes incremented for global arbitration.

Also adds additional operator level metrics for keeping track of
global and local arbitration attempts initiated by them.
  • Loading branch information
bikramSingh91 committed Apr 8, 2024
1 parent 41bed84 commit 0dfd1f0
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 9 deletions.
1 change: 1 addition & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ class MemoryManager {
std::vector<std::shared_ptr<MemoryPool>> sharedLeafPools_;

mutable folly::SharedMutex mutex_;
// All user root pools allocated from 'this'.
std::unordered_map<std::string, std::weak_ptr<MemoryPool>> pools_;
};

Expand Down
27 changes: 21 additions & 6 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ bool SharedArbitrator::ensureCapacity(
if (checkCapacityGrowth(*requestor, targetBytes)) {
return true;
}
const uint64_t reclaimedBytes = reclaim(requestor, targetBytes);
const uint64_t reclaimedBytes = reclaim(requestor, targetBytes, true);
// NOTE: return the reclaimed bytes back to the arbitrator and let the memory
// arbitration process to grow the requestor's memory capacity accordingly.
incrementFreeCapacity(reclaimedBytes);
Expand Down Expand Up @@ -426,7 +426,7 @@ bool SharedArbitrator::arbitrateMemory(
}

VELOX_CHECK_LT(freedBytes, growTarget);
RECORD_METRIC_VALUE(kMetricArbitratorGlobalArbitrationCount);
incrementGlobalArbitrationCount();
freedBytes += reclaimUsedMemoryFromCandidatesBySpill(
requestor, candidates, growTarget - freedBytes);
if (requestor->aborted()) {
Expand Down Expand Up @@ -494,7 +494,7 @@ uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesBySpill(
const int64_t bytesToReclaim = std::max<int64_t>(
targetBytes - freedBytes, memoryPoolTransferCapacity_);
VELOX_CHECK_GT(bytesToReclaim, 0);
freedBytes += reclaim(candidate.pool, bytesToReclaim);
freedBytes += reclaim(candidate.pool, bytesToReclaim, false);
if ((freedBytes >= targetBytes) ||
(requestor != nullptr && requestor->aborted())) {
break;
Expand Down Expand Up @@ -531,7 +531,8 @@ uint64_t SharedArbitrator::reclaimUsedMemoryFromCandidatesByAbort(

uint64_t SharedArbitrator::reclaim(
MemoryPool* pool,
uint64_t targetBytes) noexcept {
uint64_t targetBytes,
bool isLocalArbitration) noexcept {
uint64_t reclaimDurationUs{0};
uint64_t reclaimedBytes{0};
uint64_t freedBytes{0};
Expand All @@ -542,7 +543,9 @@ uint64_t SharedArbitrator::reclaim(
try {
freedBytes = pool->shrink(targetBytes);
if (freedBytes < targetBytes) {
RECORD_METRIC_VALUE(kMetricArbitratorLocalArbitrationCount);
if (isLocalArbitration) {
incrementLocalArbitrationCount();
}
pool->reclaim(
targetBytes - freedBytes, memoryReclaimWaitMs_, reclaimerStats);
}
Expand Down Expand Up @@ -706,7 +709,7 @@ SharedArbitrator::ScopedArbitration::~ScopedArbitration() {
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricArbitratorArbitrationTimeMs, arbitrationTimeUs / 1'000);
addThreadLocalRuntimeStat(
"memoryArbitrationWallNanos",
kMemoryArbitrationWallNanos,
RuntimeCounter(arbitrationTimeUs * 1'000, RuntimeCounter::Unit::kNanos));
arbitrator_->arbitrationTimeUs_ += arbitrationTimeUs;
arbitrator_->finishArbitration();
Expand Down Expand Up @@ -773,4 +776,16 @@ void SharedArbitrator::registerFactory() {
void SharedArbitrator::unregisterFactory() {
MemoryArbitrator::unregisterFactory(kind_);
}

void SharedArbitrator::incrementGlobalArbitrationCount() {
RECORD_METRIC_VALUE(kMetricArbitratorGlobalArbitrationCount);
addThreadLocalRuntimeStat(
kGlobalArbitrationCount, RuntimeCounter(1, RuntimeCounter::Unit::kNone));
}

void SharedArbitrator::incrementLocalArbitrationCount() {
RECORD_METRIC_VALUE(kMetricArbitratorLocalArbitrationCount);
addThreadLocalRuntimeStat(
kLocalArbitrationCount, RuntimeCounter(1, RuntimeCounter::Unit::kNone));
}
} // namespace facebook::velox::memory
23 changes: 20 additions & 3 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ class SharedArbitrator : public memory::MemoryArbitrator {
/// Returns 'freeCapacity' back to the arbitrator for testing.
void testingFreeCapacity(uint64_t freeCapacity);

/// Operator level runtime stats that are reported during a shared arbitration
/// attempt.
static inline const std::string kMemoryArbitrationWallNanos{
"memoryArbitrationWallNanos"};
static inline const std::string kGlobalArbitrationCount{
"globalArbitrationCount"};
static inline const std::string kLocalArbitrationCount{
"localArbitrationCount"};

private:
// The kind string of shared arbitrator.
inline static const std::string kind_{"SHARED"};
Expand Down Expand Up @@ -151,9 +160,14 @@ class SharedArbitrator : public memory::MemoryArbitrator {
std::vector<Candidate>& candidates,
uint64_t targetBytes);

// Invoked to reclaim used memory from 'pool' with specified 'targetBytes'.
// The function returns the actually freed capacity.
uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes) noexcept;
// Invoked to reclaim used memory from 'targetPool' with specified
// 'targetBytes'. The function returns the actually freed capacity.
// 'isLocalArbitration' is true when the reclaim attempt is within a local
// arbitration.
uint64_t reclaim(
MemoryPool* targetPool,
uint64_t targetBytes,
bool isLocalArbitration) noexcept;

// Invoked to abort memory 'pool'.
void abort(MemoryPool* pool, const std::exception_ptr& error);
Expand Down Expand Up @@ -182,6 +196,9 @@ class SharedArbitrator : public memory::MemoryArbitrator {

Stats statsLocked() const;

void incrementGlobalArbitrationCount();
void incrementLocalArbitrationCount();

mutable std::mutex mutex_;
uint64_t freeCapacity_{0};
// Indicates if there is a running arbitration request or not.
Expand Down
50 changes: 50 additions & 0 deletions velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "velox/dwio/dwrf/writer/Writer.h"
#include "velox/exec/Driver.h"
#include "velox/exec/HashBuild.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/Values.h"
#include "velox/exec/tests/utils/ArbitratorTestUtil.h"
Expand Down Expand Up @@ -271,6 +272,30 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
numAddedPools_ = 0;
}

void checkOperatorStatsForArbitration(
PlanNodeStats& stats,
bool expectGlobalArbitration) {
if (expectGlobalArbitration) {
VELOX_CHECK_EQ(
stats.customStats.count(SharedArbitrator::kGlobalArbitrationCount),
1);
VELOX_CHECK_EQ(
stats.customStats.at(SharedArbitrator::kGlobalArbitrationCount).sum,
1);
VELOX_CHECK_EQ(
stats.customStats.count(SharedArbitrator::kLocalArbitrationCount), 0);
} else {
VELOX_CHECK_EQ(
stats.customStats.count(SharedArbitrator::kLocalArbitrationCount), 1);
VELOX_CHECK_EQ(
stats.customStats.at(SharedArbitrator::kLocalArbitrationCount).sum,
1);
VELOX_CHECK_EQ(
stats.customStats.count(SharedArbitrator::kGlobalArbitrationCount),
0);
}
}

static inline FakeMemoryOperatorFactory* fakeOperatorFactory_;
std::unique_ptr<memory::MemoryManager> memoryManager_;
SharedArbitrator* arbitrator_;
Expand Down Expand Up @@ -340,14 +365,20 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
([&](Task* /*unused*/) { taskPauseWait.notify(); })));

std::thread orderByThread([&]() {
core::PlanNodeId orderByNodeId;
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(orderByQueryCtx)
.plan(PlanBuilder()
.values(vectors)
.orderBy({"c0 ASC NULLS LAST"}, false)
.capturePlanNodeId(orderByNodeId)
.planNode())
.assertResults("SELECT * FROM tmp ORDER BY c0 ASC NULLS LAST");
auto taskStats = exec::toPlanStats(task->taskStats());
auto& stats = taskStats.at(orderByNodeId);
checkOperatorStatsForArbitration(
stats, !sameQuery /*expectGlobalArbitration*/);
});

std::thread memThread([&]() {
Expand Down Expand Up @@ -434,15 +465,21 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
([&](Task* /*unused*/) { taskPauseWait.notify(); })));

std::thread aggregationThread([&]() {
core::PlanNodeId aggregationNodeId;
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(aggregationQueryCtx)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c0", "c1"}, {"array_agg(c2)"})
.capturePlanNodeId(aggregationNodeId)
.planNode())
.assertResults(
"SELECT c0, c1, array_agg(c2) FROM tmp GROUP BY c0, c1");
auto taskStats = exec::toPlanStats(task->taskStats());
auto& stats = taskStats.at(aggregationNodeId);
checkOperatorStatsForArbitration(
stats, !sameQuery /*expectGlobalArbitration*/);
});

std::thread memThread([&]() {
Expand Down Expand Up @@ -530,6 +567,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {

std::thread joinThread([&]() {
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId joinNodeId;
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(joinQueryCtx)
Expand All @@ -546,9 +584,14 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
"",
{"t1"},
core::JoinType::kAnti)
.capturePlanNodeId(joinNodeId)
.planNode())
.assertResults(
"SELECT c1 FROM tmp WHERE c0 NOT IN (SELECT c0 FROM tmp)");
auto taskStats = exec::toPlanStats(task->taskStats());
auto& stats = taskStats.at(joinNodeId);
checkOperatorStatsForArbitration(
stats, !sameQuery /*expectGlobalArbitration*/);
});

std::thread memThread([&]() {
Expand Down Expand Up @@ -984,13 +1027,15 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, arbitrateMemoryFromOtherOperator) {
})));

std::shared_ptr<Task> task;
core::PlanNodeId aggregationNodeId;
std::thread queryThread([&]() {
if (sameDriver) {
task = AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(queryCtx)
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c0", "c1"}, {"array_agg(c2)"})
.capturePlanNodeId(aggregationNodeId)
.localPartition(std::vector<std::string>{})
.planNode())
.assertResults(
Expand All @@ -1002,13 +1047,18 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, arbitrateMemoryFromOtherOperator) {
.values(vectors)
.localPartition({"c0", "c1"})
.singleAggregation({"c0", "c1"}, {"array_agg(c2)"})
.capturePlanNodeId(aggregationNodeId)
.planNode())
.assertResults(
"SELECT c0, c1, array_agg(c2) FROM tmp GROUP BY c0, c1");
}
});

queryThread.join();
auto taskStats = exec::toPlanStats(task->taskStats());
auto& aggNodeStats = taskStats.at(aggregationNodeId);
checkOperatorStatsForArbitration(
aggNodeStats, false /*expectGlobalArbitration*/);
ASSERT_TRUE(buffer != nullptr);
ASSERT_TRUE(bufferPool != nullptr);
bufferPool.load()->free(buffer, initialBufferLen);
Expand Down
11 changes: 11 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ These stats are reported by all operators.
- bytes
- The reclaimed memory bytes of an operator during the memory arbitration.
This stats only applies for spillable operators.
* - globalArbitrationCount
-
- The number of times a request for more memory hit the arbitrator's
capacity limit and initiated a global arbitration attempt where
memory is reclaimed from viable candidates chosen among all running
queries based on a criterion.
* - localArbitrationCount
-
- The number of times a request for more memory hit the query memory
limit and initiated a local arbitration attempt where memory is
reclaimed from the requestor itself.

HashBuild, HashAggregation
--------------------------
Expand Down

0 comments on commit 0dfd1f0

Please sign in to comment.