Skip to content

Commit

Permalink
Add numTopBuffers to OutputBuffer::Stats
Browse files Browse the repository at this point in the history
Summary: This can be used to spot skewed exchange.

Differential Revision: D53827154
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 15, 2024
1 parent ec6741c commit d33aab8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
34 changes: 34 additions & 0 deletions velox/exec/OutputBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,39 @@ int64_t OutputBuffer::getAverageBufferTimeMsLocked() const {
return 0;
}

namespace {

// Find out how many buffers hold 80% of the data. Useful to identify skew.
int32_t countTopBuffers(
const std::vector<DestinationBuffer::Stats>& bufferStats,
int64_t totalBytes) {
std::vector<int64_t> bufferSizes;
bufferSizes.reserve(bufferStats.size());
for (auto i = 0; i < bufferStats.size(); ++i) {
const auto& stats = bufferStats[i];
bufferSizes.push_back(stats.bytesBuffered + stats.bytesSent);
}

// Sort descending.
std::sort(bufferSizes.begin(), bufferSizes.end(), std::greater<int64_t>());

const auto limit = totalBytes * 0.8;
int32_t numBuffers = 0;
int32_t runningTotal = 0;
for (auto size : bufferSizes) {
runningTotal += size;
numBuffers++;

if (runningTotal >= limit) {
break;
}
}

return numBuffers;
}

} // namespace

OutputBuffer::Stats OutputBuffer::stats() {
std::lock_guard<std::mutex> l(mutex_);
std::vector<DestinationBuffer::Stats> bufferStats;
Expand Down Expand Up @@ -772,6 +805,7 @@ OutputBuffer::Stats OutputBuffer::stats() {
numOutputRows_,
numOutputPages_,
getAverageBufferTimeMsLocked(),
countTopBuffers(bufferStats, numOutputBytes_),
bufferStats);
}

Expand Down
5 changes: 5 additions & 0 deletions velox/exec/OutputBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class OutputBuffer {
int64_t _totalRowsSent,
int64_t _totalPagesSent,
int64_t _averageBufferTimeMs,
int32_t _numTopBuffers,
const std::vector<DestinationBuffer::Stats>& _buffersStats)
: kind(_kind),
noMoreBuffers(_noMoreBuffers),
Expand All @@ -202,6 +203,7 @@ class OutputBuffer {
totalRowsSent(_totalRowsSent),
totalPagesSent(_totalPagesSent),
averageBufferTimeMs(_averageBufferTimeMs),
numTopBuffers(_numTopBuffers),
buffersStats(_buffersStats) {}

core::PartitionedOutputNode::Kind kind;
Expand All @@ -223,6 +225,9 @@ class OutputBuffer {
/// Average time each piece of data has been buffered for in milliseconds.
int64_t averageBufferTimeMs{0};

/// The number of largest buffers that handle 80% of the total data.
int32_t numTopBuffers{0};

/// Stats of the OutputBuffer's destinations.
std::vector<DestinationBuffer::Stats> buffersStats;

Expand Down
11 changes: 10 additions & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,13 @@ ContinueFuture Task::terminate(TaskState terminalState) {
void Task::maybeRemoveFromOutputBufferManager() {
if (hasPartitionedOutput()) {
if (auto bufferManager = bufferManager_.lock()) {
// Capture output buffer stats before deleting the buffer.
{
std::lock_guard<std::timed_mutex> l(mutex_);
if (!taskStats_.outputBufferStats.has_value()) {
taskStats_.outputBufferStats = bufferManager->stats(taskId_);
}
}
bufferManager->removeTask(taskId_);
}
}
Expand Down Expand Up @@ -1982,7 +1989,9 @@ TaskStats Task::taskStats() const {
auto bufferManager = bufferManager_.lock();
taskStats.outputBufferUtilization = bufferManager->getUtilization(taskId_);
taskStats.outputBufferOverutilized = bufferManager->isOverutilized(taskId_);
taskStats.outputBufferStats = bufferManager->stats(taskId_);
if (!taskStats.outputBufferStats.has_value()) {
taskStats.outputBufferStats = bufferManager->stats(taskId_);
}
return taskStats;
}

Expand Down

0 comments on commit d33aab8

Please sign in to comment.