Skip to content

Commit

Permalink
Record pause time and off thread time per driver (facebookincubator#9199
Browse files Browse the repository at this point in the history
)

Summary:

Collect driver pause and off thread time and records as runtime stats.

Reviewed By: xiaoxmeng

Differential Revision: D55159826
  • Loading branch information
Yuhta authored and facebook-github-bot committed Mar 25, 2024
1 parent 05cede5 commit e610b4d
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 1 deletion.
10 changes: 10 additions & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,15 @@ void Driver::closeOperators() {
}
}

void Driver::updateStats() {
DriverStats stats;
stats.runtimeStats[DriverStats::kTotalPauseTime] = RuntimeMetric(
1'000'000 * state_.totalPauseTimeMs, RuntimeCounter::Unit::kNanos);
stats.runtimeStats[DriverStats::kTotalOffThreadTime] = RuntimeMetric(
1'000'000 * state_.totalOffThreadTimeMs, RuntimeCounter::Unit::kNanos);
task()->addDriverStats(ctx_->pipelineId, std::move(stats));
}

void Driver::close() {
if (closed_) {
// Already closed.
Expand All @@ -836,6 +845,7 @@ void Driver::close() {
LOG(FATAL) << "Driver::close is only allowed from the Driver's thread";
}
closeOperators();
updateStats();
closed_ = true;
Task::removeDriver(ctx_->task, this);
}
Expand Down
22 changes: 22 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ std::string stopReasonString(StopReason reason);

std::ostream& operator<<(std::ostream& out, const StopReason& reason);

struct DriverStats {
static constexpr const char* kTotalPauseTime = "totalDriverPauseWallNanos";
static constexpr const char* kTotalOffThreadTime =
"totalDriverOffThreadWallNanos";

std::unordered_map<std::string, RuntimeMetric> runtimeStats;
};

/// Represents a Driver's state. This is used for cancellation, forcing
/// release of and for waiting for memory. The fields are serialized on
/// the mutex of the Driver's Task.
Expand Down Expand Up @@ -110,6 +118,13 @@ struct ThreadState {
/// driver goes off thread. This is used to track the time that a driver has
/// continuously run on a thread for per-driver cpu time slice enforcement.
size_t startExecTimeMs{0};
/// The end execution time on thread in milliseconds. It is set when the
/// driver goes off thread and reset when the driver gets on a thread.
size_t endExecTimeMs{0};
/// Total time the driver in pause.
uint64_t totalPauseTimeMs{0};
/// Total off thread time (including blocked time and pause time).
uint64_t totalOffThreadTimeMs{0};

bool isOnThread() const {
return thread != std::thread::id();
Expand All @@ -118,6 +133,10 @@ struct ThreadState {
void setThread() {
thread = std::this_thread::get_id();
startExecTimeMs = getCurrentTimeMs();
if (endExecTimeMs != 0) {
totalOffThreadTimeMs += startExecTimeMs - endExecTimeMs;
endExecTimeMs = 0;
}
#if !defined(__APPLE__)
// This is a debugging feature disabled on the Mac since syscall
// is deprecated on that platform.
Expand All @@ -128,6 +147,7 @@ struct ThreadState {
void clearThread() {
thread = std::thread::id(); // no thread.
startExecTimeMs = 0;
endExecTimeMs = getCurrentTimeMs();
tid = 0;
}

Expand Down Expand Up @@ -445,6 +465,8 @@ class Driver : public std::enable_shared_from_this<Driver> {
std::shared_ptr<BlockingState>& blockingState,
RowVectorPtr& result);

void updateStats();

void close();

// Push down dynamic filters produced by the operator at the specified
Expand Down
10 changes: 10 additions & 0 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,10 @@ void Task::resume(std::shared_ptr<Task> self) {
}
VELOX_CHECK(!driver->isOnThread() && !driver->isTerminated());
if (!driver->state().hasBlockingFuture) {
if (driver->state().endExecTimeMs != 0) {
driver->state().totalPauseTimeMs +=
getCurrentTimeMs() - driver->state().endExecTimeMs;
}
// Do not continue a Driver that is blocked on external
// event. The Driver gets enqueued by the promise realization.
Driver::enqueue(driver);
Expand Down Expand Up @@ -1994,6 +1998,12 @@ void Task::addOperatorStats(OperatorStats& stats) {
.add(stats);
}

void Task::addDriverStats(int pipelineId, DriverStats stats) {
std::lock_guard<std::timed_mutex> l(mutex_);
VELOX_CHECK(0 <= pipelineId && pipelineId < taskStats_.pipelineStats.size());
taskStats_.pipelineStats[pipelineId].driverStats.push_back(std::move(stats));
}

TaskStats Task::taskStats() const {
std::lock_guard<std::timed_mutex> l(mutex_);

Expand Down
3 changes: 3 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ class Task : public std::enable_shared_from_this<Task> {
/// stats. Called from Drivers upon their closure.
void addOperatorStats(OperatorStats& stats);

/// Adds per driver statistics. Called from Drivers upon their closure.
void addDriverStats(int pipelineId, DriverStats stats);

/// Returns kNone if no pause or terminate is requested. The thread count is
/// incremented if kNone is returned. If something else is returned the
/// calling thread should unwind and return itself to its pool. If 'this' goes
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/TaskStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ struct PipelineStats {
// operator in the DriverFactory.
std::vector<OperatorStats> operatorStats;

// Runtime statistics per driver.
std::vector<DriverStats> driverStats;

// True if contains the source node for the task.
bool inputPipeline;

Expand Down
16 changes: 15 additions & 1 deletion velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1659,13 +1659,27 @@ DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) {
arbitrator->testingFreeCapacity(reclaimedQueryCapacity);
}

const auto taskStats = task->taskStats();
auto taskStats = task->taskStats();
ASSERT_EQ(taskStats.memoryReclaimCount, numReclaims);
ASSERT_GT(taskStats.memoryReclaimMs, 0);

// Fail the task to finish test.
task->requestAbort();
ASSERT_TRUE(waitForTaskAborted(task.get()));

taskStats = task->taskStats();
ASSERT_EQ(taskStats.pipelineStats.size(), 1);
ASSERT_EQ(taskStats.pipelineStats[0].driverStats.size(), 1);
const auto& driverStats = taskStats.pipelineStats[0].driverStats[0];
const auto& totalPauseTime =
driverStats.runtimeStats.at(DriverStats::kTotalPauseTime);
ASSERT_EQ(totalPauseTime.count, 1);
ASSERT_GE(totalPauseTime.sum, 0);
const auto& totalOffThreadTime =
driverStats.runtimeStats.at(DriverStats::kTotalOffThreadTime);
ASSERT_EQ(totalOffThreadTime.count, 1);
ASSERT_GE(totalOffThreadTime.sum, 0);

task.reset();
waitForAllTasksToBeDeleted();
}
Expand Down

0 comments on commit e610b4d

Please sign in to comment.