From f64795f55aa9cfc5e8b02b890b76b0b53e663219 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Mon, 1 Apr 2024 18:21:36 -0700 Subject: [PATCH] Update driver and operator stats when close by task (#9327) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/9327 Currently the stats are not updated if close is issued by task. Fix https://github.com/facebookincubator/velox/issues/9301 Reviewed By: xiaoxmeng Differential Revision: D55601334 fbshipit-source-id: 501fc1493e966fccbeb1d1a41e52b5cab635a06f --- velox/exec/Driver.cpp | 1 + velox/exec/tests/TaskTest.cpp | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index f2f43c6b451a..9c8899093dbd 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -853,6 +853,7 @@ void Driver::closeByTask() { VELOX_CHECK(isOnThread()); VELOX_CHECK(isTerminated()); closeOperators(); + updateStats(); closed_ = true; } diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 397341dcea46..8da0552ff8cb 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1683,6 +1683,41 @@ DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) { waitForAllTasksToBeDeleted(); } +TEST_F(TaskTest, updateStatsWhileCloseOffThreadDriver) { + const auto data = makeRowVector({ + makeFlatVector(50, folly::identity), + makeFlatVector(50, folly::identity), + }); + const auto plan = + PlanBuilder() + .tableScan(asRowType(data->type())) + .partitionedOutput({}, 1, std::vector{"c0"}) + .planFragment(); + auto task = Task::create( + "task", + std::move(plan), + 0, + std::make_shared(driverExecutor_.get())); + task->start(4, 1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + task->testingVisitDrivers( + [](Driver* driver) { VELOX_CHECK(!driver->isOnThread()); }); + task->requestAbort(); + ASSERT_TRUE(waitForTaskAborted(task.get())); + auto taskStats = task->taskStats(); + ASSERT_EQ(taskStats.pipelineStats.size(), 1); + ASSERT_EQ(taskStats.pipelineStats[0].driverStats.size(), 4); + const auto& driverStats = taskStats.pipelineStats[0].driverStats[0]; + const auto& totalPauseTime = + driverStats.runtimeStats.at(DriverStats::kTotalPauseTime); + ASSERT_EQ(totalPauseTime.count, 1); + ASSERT_GE(totalPauseTime.sum, 0); + const auto& totalOffThreadTime = + driverStats.runtimeStats.at(DriverStats::kTotalOffThreadTime); + ASSERT_EQ(totalOffThreadTime.count, 1); + ASSERT_GE(totalOffThreadTime.sum, 0); +} + DEBUG_ONLY_TEST_F(TaskTest, driverEnqueAfterFailedAndPausedTask) { const auto data = makeRowVector({ makeFlatVector(50, [](auto row) { return row; }),