diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 849252a58384..5bcb59d2a27f 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -827,6 +827,15 @@ void Driver::closeOperators() { } } +void Driver::updateStats() { + DriverStats stats; + stats.runtimeStats[DriverStats::kTotalPauseTime] = RuntimeMetric( + 1'000'000 * state_.totalPauseTimeMs, RuntimeCounter::Unit::kNanos); + stats.runtimeStats[DriverStats::kTotalOffThreadTime] = RuntimeMetric( + 1'000'000 * state_.totalOffThreadTimeMs, RuntimeCounter::Unit::kNanos); + task()->addDriverStats(ctx_->pipelineId, std::move(stats)); +} + void Driver::close() { if (closed_) { // Already closed. @@ -836,6 +845,7 @@ void Driver::close() { LOG(FATAL) << "Driver::close is only allowed from the Driver's thread"; } closeOperators(); + updateStats(); closed_ = true; Task::removeDriver(ctx_->task, this); } diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index ce310884bdc7..9c76ae0f319e 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -60,6 +60,14 @@ std::string stopReasonString(StopReason reason); std::ostream& operator<<(std::ostream& out, const StopReason& reason); +struct DriverStats { + static constexpr const char* kTotalPauseTime = "totalDriverPauseWallNanos"; + static constexpr const char* kTotalOffThreadTime = + "totalDriverOffThreadWallNanos"; + + std::unordered_map runtimeStats; +}; + /// Represents a Driver's state. This is used for cancellation, forcing /// release of and for waiting for memory. The fields are serialized on /// the mutex of the Driver's Task. @@ -110,6 +118,13 @@ struct ThreadState { /// driver goes off thread. This is used to track the time that a driver has /// continuously run on a thread for per-driver cpu time slice enforcement. size_t startExecTimeMs{0}; + /// The end execution time on thread in milliseconds. It is set when the + /// driver goes off thread and reset when the driver gets on a thread. + size_t endExecTimeMs{0}; + /// Total time the driver in pause. + uint64_t totalPauseTimeMs{0}; + /// Total off thread time (including blocked time and pause time). + uint64_t totalOffThreadTimeMs{0}; bool isOnThread() const { return thread != std::thread::id(); @@ -118,6 +133,10 @@ struct ThreadState { void setThread() { thread = std::this_thread::get_id(); startExecTimeMs = getCurrentTimeMs(); + if (endExecTimeMs != 0) { + totalOffThreadTimeMs += startExecTimeMs - endExecTimeMs; + endExecTimeMs = 0; + } #if !defined(__APPLE__) // This is a debugging feature disabled on the Mac since syscall // is deprecated on that platform. @@ -128,6 +147,7 @@ struct ThreadState { void clearThread() { thread = std::thread::id(); // no thread. startExecTimeMs = 0; + endExecTimeMs = getCurrentTimeMs(); tid = 0; } @@ -445,6 +465,8 @@ class Driver : public std::enable_shared_from_this { std::shared_ptr& blockingState, RowVectorPtr& result); + void updateStats(); + void close(); // Push down dynamic filters produced by the operator at the specified diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 2b7b1c0366e2..b503ed80996e 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -874,6 +874,10 @@ void Task::resume(std::shared_ptr self) { } VELOX_CHECK(!driver->isOnThread() && !driver->isTerminated()); if (!driver->state().hasBlockingFuture) { + if (driver->state().endExecTimeMs != 0) { + driver->state().totalPauseTimeMs += + getCurrentTimeMs() - driver->state().endExecTimeMs; + } // Do not continue a Driver that is blocked on external // event. The Driver gets enqueued by the promise realization. Driver::enqueue(driver); @@ -1994,6 +1998,12 @@ void Task::addOperatorStats(OperatorStats& stats) { .add(stats); } +void Task::addDriverStats(int pipelineId, DriverStats stats) { + std::lock_guard l(mutex_); + VELOX_CHECK(0 <= pipelineId && pipelineId < taskStats_.pipelineStats.size()); + taskStats_.pipelineStats[pipelineId].driverStats.push_back(std::move(stats)); +} + TaskStats Task::taskStats() const { std::lock_guard l(mutex_); diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 755de93e3a69..2f5a1081a223 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -509,6 +509,9 @@ class Task : public std::enable_shared_from_this { /// stats. Called from Drivers upon their closure. void addOperatorStats(OperatorStats& stats); + /// Adds per driver statistics. Called from Drivers upon their closure. + void addDriverStats(int pipelineId, DriverStats stats); + /// Returns kNone if no pause or terminate is requested. The thread count is /// incremented if kNone is returned. If something else is returned the /// calling thread should unwind and return itself to its pool. If 'this' goes diff --git a/velox/exec/TaskStats.h b/velox/exec/TaskStats.h index 85e7690d6c50..9603909e3ea1 100644 --- a/velox/exec/TaskStats.h +++ b/velox/exec/TaskStats.h @@ -34,6 +34,9 @@ struct PipelineStats { // operator in the DriverFactory. std::vector operatorStats; + // Runtime statistics per driver. + std::vector driverStats; + // True if contains the source node for the task. bool inputPipeline; diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 8c411f45421a..91f23e075ebd 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1659,13 +1659,27 @@ DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) { arbitrator->testingFreeCapacity(reclaimedQueryCapacity); } - const auto taskStats = task->taskStats(); + auto taskStats = task->taskStats(); ASSERT_EQ(taskStats.memoryReclaimCount, numReclaims); ASSERT_GT(taskStats.memoryReclaimMs, 0); // Fail the task to finish test. task->requestAbort(); ASSERT_TRUE(waitForTaskAborted(task.get())); + + taskStats = task->taskStats(); + ASSERT_EQ(taskStats.pipelineStats.size(), 1); + ASSERT_EQ(taskStats.pipelineStats[0].driverStats.size(), 1); + const auto& driverStats = taskStats.pipelineStats[0].driverStats[0]; + const auto& totalPauseTime = + driverStats.runtimeStats.at(DriverStats::kTotalPauseTime); + ASSERT_EQ(totalPauseTime.count, 1); + ASSERT_GE(totalPauseTime.sum, 0); + const auto& totalOffThreadTime = + driverStats.runtimeStats.at(DriverStats::kTotalOffThreadTime); + ASSERT_EQ(totalOffThreadTime.count, 1); + ASSERT_GE(totalOffThreadTime.sum, 0); + task.reset(); waitForAllTasksToBeDeleted(); }