diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index f2f43c6b451a..9c8899093dbd 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -853,6 +853,7 @@ void Driver::closeByTask() { VELOX_CHECK(isOnThread()); VELOX_CHECK(isTerminated()); closeOperators(); + updateStats(); closed_ = true; } diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 397341dcea46..8da0552ff8cb 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1683,6 +1683,41 @@ DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) { waitForAllTasksToBeDeleted(); } +TEST_F(TaskTest, updateStatsWhileCloseOffThreadDriver) { + const auto data = makeRowVector({ + makeFlatVector(50, folly::identity), + makeFlatVector(50, folly::identity), + }); + const auto plan = + PlanBuilder() + .tableScan(asRowType(data->type())) + .partitionedOutput({}, 1, std::vector{"c0"}) + .planFragment(); + auto task = Task::create( + "task", + std::move(plan), + 0, + std::make_shared(driverExecutor_.get())); + task->start(4, 1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + task->testingVisitDrivers( + [](Driver* driver) { VELOX_CHECK(!driver->isOnThread()); }); + task->requestAbort(); + ASSERT_TRUE(waitForTaskAborted(task.get())); + auto taskStats = task->taskStats(); + ASSERT_EQ(taskStats.pipelineStats.size(), 1); + ASSERT_EQ(taskStats.pipelineStats[0].driverStats.size(), 4); + const auto& driverStats = taskStats.pipelineStats[0].driverStats[0]; + const auto& totalPauseTime = + driverStats.runtimeStats.at(DriverStats::kTotalPauseTime); + ASSERT_EQ(totalPauseTime.count, 1); + ASSERT_GE(totalPauseTime.sum, 0); + const auto& totalOffThreadTime = + driverStats.runtimeStats.at(DriverStats::kTotalOffThreadTime); + ASSERT_EQ(totalOffThreadTime.count, 1); + ASSERT_GE(totalOffThreadTime.sum, 0); +} + DEBUG_ONLY_TEST_F(TaskTest, driverEnqueAfterFailedAndPausedTask) { const auto data = makeRowVector({ makeFlatVector(50, [](auto row) { return row; }),