Skip to content

Commit

Permalink
Update driver and operator stats when close by task (facebookincubato…
Browse files Browse the repository at this point in the history
…r#9327)

Summary:

Currently the stats are not updated if close is issued by task.

Fix facebookincubator#9301

Reviewed By: xiaoxmeng

Differential Revision: D55601334
  • Loading branch information
Yuhta authored and facebook-github-bot committed Apr 1, 2024
1 parent 73a7f71 commit ebc3696
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
1 change: 1 addition & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ void Driver::closeByTask() {
VELOX_CHECK(isOnThread());
VELOX_CHECK(isTerminated());
closeOperators();
updateStats();
closed_ = true;
}

Expand Down
35 changes: 35 additions & 0 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,41 @@ DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) {
waitForAllTasksToBeDeleted();
}

TEST_F(TaskTest, updateStatsWhileCloseOffThreadDriver) {
const auto data = makeRowVector({
makeFlatVector<int64_t>(50, folly::identity),
makeFlatVector<int64_t>(50, folly::identity),
});
const auto plan =
PlanBuilder()
.tableScan(asRowType(data->type()))
.partitionedOutput({}, 1, std::vector<std::string>{"c0"})
.planFragment();
auto task = Task::create(
"task",
std::move(plan),
0,
std::make_shared<core::QueryCtx>(driverExecutor_.get()));
task->start(4, 1);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
task->testingVisitDrivers(
[](Driver* driver) { VELOX_CHECK(!driver->isOnThread()); });
task->requestAbort();
ASSERT_TRUE(waitForTaskAborted(task.get()));
auto taskStats = task->taskStats();
ASSERT_EQ(taskStats.pipelineStats.size(), 1);
ASSERT_EQ(taskStats.pipelineStats[0].driverStats.size(), 4);
const auto& driverStats = taskStats.pipelineStats[0].driverStats[0];
const auto& totalPauseTime =
driverStats.runtimeStats.at(DriverStats::kTotalPauseTime);
ASSERT_EQ(totalPauseTime.count, 1);
ASSERT_GE(totalPauseTime.sum, 0);
const auto& totalOffThreadTime =
driverStats.runtimeStats.at(DriverStats::kTotalOffThreadTime);
ASSERT_EQ(totalOffThreadTime.count, 1);
ASSERT_GE(totalOffThreadTime.sum, 0);
}

DEBUG_ONLY_TEST_F(TaskTest, driverEnqueAfterFailedAndPausedTask) {
const auto data = makeRowVector({
makeFlatVector<int64_t>(50, [](auto row) { return row; }),
Expand Down

0 comments on commit ebc3696

Please sign in to comment.