Skip to content

Commit

Permalink
Expose metrics related to memory arbitration
Browse files Browse the repository at this point in the history
This adds the following stats:

"velox.arbitrator_requests_count"
"velox.arbitrator_aborted_count"
"velox.arbitrator_failures_count"
"velox.arbitrator_queue_time_ms"
"velox.arbitrator_arbitration_time_ms"
"velox.arbitrator_shrunk_bytes"
"velox.arbitrator_free_capacity_bytes"

Also fixed accounting for:
"velox.arbitrator_failures_count"
  • Loading branch information
bikramSingh91 committed Dec 8, 2023
1 parent ddc3471 commit d8e94b3
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 4 deletions.
36 changes: 34 additions & 2 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ void registerVeloxMetrics() {
DEFINE_HISTOGRAM_METRIC(
kMetricCacheShrinkTimeMs, 10, 0, 100'000, 50, 90, 99, 100);

// Tracks the number of times that we hit the max spill level limit.
DEFINE_METRIC(
kMetricMaxSpillLevelExceededCount, facebook::velox::StatType::COUNT);

/// ================== Memory Arbitration Counters =================

// Tracks memory reclaim exec time in range of [0, 600s] and reports
// P50, P90, P99, and P100.
DEFINE_HISTOGRAM_METRIC(
Expand All @@ -55,8 +61,34 @@ void registerVeloxMetrics() {
DEFINE_METRIC(
kMetricMemoryNonReclaimableCount, facebook::velox::StatType::COUNT);

// Tracks the number of times that we hit the max spill level limit.
// The number of arbitration requests.
DEFINE_METRIC(
kMetricMaxSpillLevelExceededCount, facebook::velox::StatType::COUNT);
kMetricArbitratorRequestsCount, facebook::velox::StatType::COUNT);

// The number of aborted arbitration requests.
DEFINE_METRIC(
kMetricArbitratorAbortedCount, facebook::velox::StatType::COUNT);

// The number of arbitration request failures.
DEFINE_METRIC(
kMetricArbitratorFailuresCount, facebook::velox::StatType::COUNT);

// Tracks the arbitration request queue times in range of [0, 100s] and
// reports P50, P90, P99, and P100.
DEFINE_HISTOGRAM_METRIC(
kMetricArbitratorQueueTimeMs, 10, 0, 100'000, 50, 90, 99, 100);

// Tracks the arbitration run times in range of [0, 100s] and reports P50,
// P90, P99, and P100.
DEFINE_HISTOGRAM_METRIC(
kMetricArbitratorArbitrationTimeMs, 10, 0, 100'000, 50, 90, 99, 100);

// Tracks the amount of memory bytes freed via an arbitration call by reducing
// the memory pool's capacity without actually freeing memory
DEFINE_METRIC(kMetricArbitratorShrunkBytes, facebook::velox::StatType::SUM);

/// Tracks the free memory capacity managed by the arbitrator in bytes.
DEFINE_METRIC(
kMetricArbitratorFreeCapacityBytes, facebook::velox::StatType::SUM);
}
} // namespace facebook::velox
25 changes: 23 additions & 2 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ constexpr folly::StringPiece kMetricCacheShrinkCount{

constexpr folly::StringPiece kMetricCacheShrinkTimeMs{"velox.cache_shrink_ms"};

constexpr folly::StringPiece kMetricMaxSpillLevelExceededCount{
"velox.spill_max_level_exceeded_count"};

constexpr folly::StringPiece kMetricMemoryReclaimExecTimeMs{
"velox.memory_reclaim_exec_ms"};

Expand All @@ -52,6 +55,24 @@ constexpr folly::StringPiece kMetricMemoryReclaimWaitTimeoutCount{
constexpr folly::StringPiece kMetricMemoryNonReclaimableCount{
"velox.memory_non_reclaimable_count"};

constexpr folly::StringPiece kMetricMaxSpillLevelExceededCount{
"velox.spill_max_level_exceeded_count"};
constexpr folly::StringPiece kMetricArbitratorRequestsCount{
"velox.arbitrator_requests_count"};

constexpr folly::StringPiece kMetricArbitratorAbortedCount{
"velox.arbitrator_aborted_count"};

constexpr folly::StringPiece kMetricArbitratorFailuresCount{
"velox.arbitrator_failures_count"};

constexpr folly::StringPiece kMetricArbitratorQueueTimeMs{
"velox.arbitrator_queue_time_ms"};

constexpr folly::StringPiece kMetricArbitratorArbitrationTimeMs{
"velox.arbitrator_arbitration_time_ms"};

constexpr folly::StringPiece kMetricArbitratorShrunkBytes{
"velox.arbitrator_shrunk_bytes"};

constexpr folly::StringPiece kMetricArbitratorFreeCapacityBytes{
"velox.arbitrator_free_capacity_bytes"};
} // namespace facebook::velox
3 changes: 3 additions & 0 deletions velox/dwio/common/SortingWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/

#include "velox/dwio/common/SortingWriter.h"
#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"

namespace facebook::velox::dwio::common {

Expand Down Expand Up @@ -94,6 +96,7 @@ uint64_t SortingWriter::reclaim(
}

if (!isRunning()) {
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
LOG(WARNING) << "Can't reclaim from a not running hive sort writer pool: "
<< sortPool_->name() << ", state: " << state()
<< "used memory: " << succinctBytes(sortPool_->currentBytes())
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -736,13 +736,15 @@ uint64_t Writer::MemoryReclaimer::reclaim(
return 0;
}
if (!writer_->isRunning()) {
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
LOG(WARNING) << "Can't reclaim from a not running dwrf writer: "
<< pool->name() << ", state: " << writer_->state();
++stats.numNonReclaimableAttempts;
return 0;
}
const uint64_t memoryUsage = writer_->getContext().getTotalMemoryUsage();
if (memoryUsage < writer_->spillConfig_->writerFlushThresholdSize) {
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
LOG(WARNING)
<< "Can't reclaim memory from dwrf writer pool " << pool->name()
<< " which doesn't have sufficient memory to flush, writer memory usage: "
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,7 @@ void HashBuild::reclaim(
// build processing and is not under non-reclaimable execution section.
if (nonReclaimableState()) {
// TODO: reduce the log frequency if it is too verbose.
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
LOG(WARNING) << "Can't reclaim from hash build operator, state_["
<< stateName(state_) << "], nonReclaimableSection_["
Expand All @@ -1107,6 +1108,7 @@ void HashBuild::reclaim(
VELOX_CHECK(buildOp->canReclaim());
if (buildOp->nonReclaimableState()) {
// TODO: reduce the log frequency if it is too verbose.
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
LOG(WARNING) << "Can't reclaim from hash build operator, state_["
<< stateName(buildOp->state_) << "], nonReclaimableSection_["
Expand Down
18 changes: 18 additions & 0 deletions velox/exec/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include "velox/exec/SharedArbitrator.h"

#include "velox/common/base/Counters.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/common/time/Timer.h"

Expand Down Expand Up @@ -61,6 +63,7 @@ std::string memoryPoolAbortMessage(

SharedArbitrator::SharedArbitrator(const MemoryArbitrator::Config& config)
: MemoryArbitrator(config), freeCapacity_(capacity_) {
RECORD_METRIC_VALUE(kMetricArbitratorFreeCapacityBytes, capacity_);
VELOX_CHECK_EQ(kind_, config.kind);
}

Expand Down Expand Up @@ -190,11 +193,13 @@ bool SharedArbitrator::growMemory(
ScopedArbitration scopedArbitration(pool, this);
MemoryPool* requestor = pool->root();
if (FOLLY_UNLIKELY(requestor->aborted())) {
RECORD_METRIC_VALUE(kMetricArbitratorFailuresCount);
++numFailures_;
VELOX_MEM_POOL_ABORTED("The requestor has already been aborted");
}

if (FOLLY_UNLIKELY(!ensureCapacity(requestor, targetBytes))) {
RECORD_METRIC_VALUE(kMetricArbitratorFailuresCount);
++numFailures_;
VELOX_MEM_LOG(ERROR) << "Can't grow " << requestor->name()
<< " capacity to "
Expand Down Expand Up @@ -227,6 +232,7 @@ bool SharedArbitrator::growMemory(
<< requestor->name() << ", request " << succinctBytes(targetBytes)
<< " after " << numRetries
<< " retries, Arbitrator state: " << toString();
RECORD_METRIC_VALUE(kMetricArbitratorFailuresCount);
++numFailures_;
return false;
}
Expand All @@ -253,6 +259,7 @@ bool SharedArbitrator::ensureCapacity(
incrementFreeCapacity(reclaimedBytes);
// Check if the requestor has been aborted in reclaim operation above.
if (requestor->aborted()) {
RECORD_METRIC_VALUE(kMetricArbitratorFailuresCount);
++numFailures_;
VELOX_MEM_POOL_ABORTED("The requestor pool has been aborted");
}
Expand Down Expand Up @@ -327,6 +334,7 @@ bool SharedArbitrator::arbitrateMemory(
freedBytes += reclaimUsedMemoryFromCandidates(
requestor, candidates, growTarget - freedBytes);
if (requestor->aborted()) {
RECORD_METRIC_VALUE(kMetricArbitratorFailuresCount);
++numFailures_;
VELOX_MEM_POOL_ABORTED("The requestor pool has been aborted.");
}
Expand Down Expand Up @@ -370,6 +378,7 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
break;
}
}
RECORD_METRIC_VALUE(kMetricArbitratorShrunkBytes, freedBytes);
numShrunkBytes_ += freedBytes;
return freedBytes;
}
Expand Down Expand Up @@ -427,6 +436,7 @@ uint64_t SharedArbitrator::reclaim(
reclaimedBytes = oldCapacity - newCapacity;
}
numReclaimedBytes_ += reclaimedBytes - freedBytes;
RECORD_METRIC_VALUE(kMetricArbitratorShrunkBytes, freedBytes);
numShrunkBytes_ += freedBytes;
reclaimTimeUs_ += reclaimDurationUs;
numNonReclaimableAttempts_ += reclaimerStats.numNonReclaimableAttempts;
Expand All @@ -442,6 +452,7 @@ uint64_t SharedArbitrator::reclaim(
void SharedArbitrator::abort(
MemoryPool* pool,
const std::exception_ptr& error) {
RECORD_METRIC_VALUE(kMetricArbitratorAbortedCount);
++numAborted_;
try {
pool->abort(error);
Expand All @@ -463,6 +474,7 @@ uint64_t SharedArbitrator::decrementFreeCapacity(uint64_t bytes) {
uint64_t SharedArbitrator::decrementFreeCapacityLocked(uint64_t bytes) {
const uint64_t targetBytes = std::min(freeCapacity_, bytes);
VELOX_CHECK_LE(targetBytes, freeCapacity_);
RECORD_METRIC_VALUE(kMetricArbitratorFreeCapacityBytes, -1 * targetBytes);
freeCapacity_ -= targetBytes;
return targetBytes;
}
Expand All @@ -473,6 +485,7 @@ void SharedArbitrator::incrementFreeCapacity(uint64_t bytes) {
}

void SharedArbitrator::incrementFreeCapacityLocked(uint64_t bytes) {
RECORD_METRIC_VALUE(kMetricArbitratorFreeCapacityBytes, bytes);
freeCapacity_ += bytes;
if (FOLLY_UNLIKELY(freeCapacity_ > capacity_)) {
VELOX_FAIL(
Expand Down Expand Up @@ -539,6 +552,8 @@ SharedArbitrator::ScopedArbitration::~ScopedArbitration() {
const auto arbitrationTime =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - startTime_);
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricArbitratorArbitrationTimeMs, arbitrationTime.count() / 1'000);
arbitrator_->arbitrationTimeUs_ += arbitrationTime.count();
arbitrator_->finishArbitration();
}
Expand All @@ -548,6 +563,7 @@ void SharedArbitrator::startArbitration(MemoryPool* requestor) {
ContinueFuture waitPromise{ContinueFuture::makeEmpty()};
{
std::lock_guard<std::mutex> l(mutex_);
RECORD_METRIC_VALUE(kMetricArbitratorRequestsCount);
++numRequests_;
if (running_) {
waitPromises_.emplace_back(fmt::format(
Expand All @@ -570,6 +586,8 @@ void SharedArbitrator::startArbitration(MemoryPool* requestor) {
MicrosecondTimer timer(&waitTimeUs);
waitPromise.wait();
}
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricArbitratorQueueTimeMs, waitTimeUs / 1'000);
queueTimeUs_ += waitTimeUs;
}
}
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/TableWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "velox/exec/TableWriter.h"

#include "HashAggregation.h"
#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -322,6 +324,7 @@ uint64_t TableWriter::ConnectorReclaimer::reclaim(
auto* writer = dynamic_cast<TableWriter*>(op_);
if (writer->closed_) {
// TODO: reduce the log frequency if it is too verbose.
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
LOG(WARNING) << "Can't reclaim from a closed writer connector pool: "
<< pool->name()
Expand All @@ -331,6 +334,7 @@ uint64_t TableWriter::ConnectorReclaimer::reclaim(

if (writer->dataSink_ == nullptr) {
// TODO: reduce the log frequency if it is too verbose.
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
LOG(WARNING)
<< "Can't reclaim from a writer connector pool which hasn't initialized yet: "
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/
#include "velox/exec/TopNRowNumber.h"
#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/exec/OperatorUtils.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -641,6 +643,7 @@ void TopNRowNumber::reclaim(
}

if (noMoreInput_) {
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
// TODO Add support for spilling after noMoreInput().
LOG(WARNING)
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* limitations under the License.
*/
#include "velox/exec/Window.h"

#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/SortWindowBuild.h"
#include "velox/exec/StreamingWindowBuild.h"
Expand Down Expand Up @@ -152,6 +155,7 @@ void Window::reclaim(
VELOX_CHECK(!nonReclaimableSection_);

if (noMoreInput_) {
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
// TODO Add support for spilling after noMoreInput().
LOG(WARNING)
Expand Down

0 comments on commit d8e94b3

Please sign in to comment.