diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 1debe9540150..61b44d9789be 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -209,9 +209,6 @@ std::string taskStateString(TaskState state) { } } -std::atomic Task::numCreatedTasks_ = 0; -std::atomic Task::numDeletedTasks_ = 0; - bool registerTaskListener(std::shared_ptr listener) { return listeners().withWLock([&](auto& listeners) { for (const auto& existingListener : listeners) { @@ -277,6 +274,7 @@ std::shared_ptr Task::create( std::move(consumerSupplier), std::move(onError))); task->initTaskPool(); + task->addToTaskList(); return task; } @@ -310,6 +308,8 @@ Task::Task( } Task::~Task() { + removeFromTaskList(); + // TODO(spershin): Temporary code designed to reveal what causes SIGABRT in // jemalloc when destroying some Tasks. std::string clearStage; @@ -355,6 +355,48 @@ Task::~Task() { } } +Task::TaskList& Task::taskList() { + static TaskList taskList; + return taskList; +} + +folly::SharedMutex& Task::taskListLock() { + static folly::SharedMutex lock; + return lock; +} + +size_t Task::numRunningTasks() { + std::shared_lock guard{taskListLock()}; + return taskList().size(); +} + +std::vector> Task::getRunningTasks() { + std::vector> tasks; + std::shared_lock guard(taskListLock()); + tasks.reserve(taskList().size()); + for (auto taskEntry : taskList()) { + if (auto task = taskEntry.taskPtr.lock()) { + tasks.push_back(std::move(task)); + } + } + return tasks; +} + +void Task::addToTaskList() { + VELOX_CHECK(!taskListEntry_.listHook.is_linked()); + taskListEntry_.taskPtr = shared_from_this(); + + std::unique_lock guard{taskListLock()}; + taskList().push_back(taskListEntry_); +} + +void Task::removeFromTaskList() { + std::unique_lock guard{taskListLock()}; + if (taskListEntry_.listHook.is_linked()) { + taskListEntry_.listHook.unlink(); + } +} + uint64_t Task::timeSinceStartMsLocked() const { if (taskStats_.executionStartTimeMs == 0UL) { return 0UL; diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 2f63fa4bf3d6..9d66c1e1548c 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -640,16 +640,6 @@ class Task : public std::enable_shared_from_this { return driverFactories_[driver->driverCtx()->pipelineId]->numDrivers; } - /// Returns the number of created and deleted tasks since the velox engine - /// starts running so far. - static uint64_t numCreatedTasks() { - return numCreatedTasks_; - } - - static uint64_t numDeletedTasks() { - return numDeletedTasks_; - } - const std::string& spillDirectory() const { return spillDirectory_; } @@ -677,6 +667,12 @@ class Task : public std::enable_shared_from_this { ++numThreads_; } + /// Returns the number of running tasks from velox runtime. + static size_t numRunningTasks(); + + /// Returns the list of running tasks from velox runtime. + static std::vector> getRunningTasks(); + /// Invoked to run provided 'callback' on each alive driver of the task. void testingVisitDrivers(const std::function& callback); @@ -686,6 +682,20 @@ class Task : public std::enable_shared_from_this { } private: + // Hook of system-wide running task list. + struct TaskListEntry { + std::weak_ptr taskPtr; + folly::IntrusiveListHook listHook; + }; + using TaskList = + folly::IntrusiveList; + + // Returns the system-wide running task list. + FOLLY_EXPORT static TaskList& taskList(); + + // Returns the lock that protects the system-wide running task list. + FOLLY_EXPORT static folly::SharedMutex& taskListLock(); + Task( const std::string& taskId, core::PlanFragment planFragment, @@ -695,6 +705,13 @@ class Task : public std::enable_shared_from_this { ConsumerSupplier consumerSupplier, std::function onError = nullptr); + // Invoked to add this to the system-wide running task list on task creation. + void addToTaskList(); + + // Invoked to remove this from the system-wide running task list on task + // destruction. + void removeFromTaskList(); + // Consistency check of the task execution to make sure the execution mode // stays the same. void checkExecutionMode(ExecutionMode mode); @@ -816,22 +833,6 @@ class Task : public std::enable_shared_from_this { std::weak_ptr task_; }; - // Counts the number of created tasks which is incremented on each task - // creation. - static std::atomic numCreatedTasks_; - - // Counts the number of deleted tasks which is incremented on each task - // destruction. - static std::atomic numDeletedTasks_; - - static void taskCreated() { - ++numCreatedTasks_; - } - - static void taskDeleted() { - ++numDeletedTasks_; - } - /// Returns true if state is 'running'. bool isRunningLocked() const; @@ -984,26 +985,6 @@ class Task : public std::enable_shared_from_this { // trace enabled. void maybeInitQueryTrace(); - // The helper class used to maintain 'numCreatedTasks_' and 'numDeletedTasks_' - // on task construction and destruction. - class TaskCounter { - public: - TaskCounter() { - Task::taskCreated(); - } - ~TaskCounter() { - Task::taskDeleted(); - } - }; - friend class Task::TaskCounter; - - // NOTE: keep 'taskCount_' the first member so that it will be the first - // constructed member and the last destructed one. The purpose is to make - // 'numCreatedTasks_' and 'numDeletedTasks_' counting more robust to the - // timing race condition when used in scenarios such as waiting for all the - // tasks to be destructed in test. - const TaskCounter taskCounter_; - // Universally unique identifier of the task. Used to identify the task when // calling TaskListener. const std::string uuid_; @@ -1020,6 +1001,9 @@ class Task : public std::enable_shared_from_this { // executed in a single mode throughout its lifetime const ExecutionMode mode_; + // Hook in the system wide task list. + TaskListEntry taskListEntry_; + // Root MemoryPool for this Task. All member variables that hold references // to pool_ must be defined after pool_, childPools_. std::shared_ptr pool_; diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index fd982240c2c6..db96a5089460 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -1539,7 +1539,7 @@ TEST_F(TableScanTest, preloadingSplitClose) { latch.count_down(); }); } - ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks()); + ASSERT_EQ(Task::numRunningTasks(), 0); auto task = assertQuery(tableScanNode(), filePaths, "SELECT * FROM tmp", 2); auto stats = getTableScanRuntimeStats(task); @@ -1547,9 +1547,8 @@ TEST_F(TableScanTest, preloadingSplitClose) { ASSERT_GT(stats.at("preloadedSplits").sum, 1); task.reset(); - // Once all task references are cleared, the count of deleted tasks should - // promptly match the count of created tasks. - ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks()); + // Once all task references are cleared, all the tasks should be destroyed. + ASSERT_EQ(Task::numRunningTasks(), 0); // Clean blocking items in the IO thread pool. for (auto& baton : batons) { baton.post(); diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 307d168a3f1a..84f4d23e1b41 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -466,8 +466,9 @@ class TaskTest : public HiveConnectorTestBase { core::PlanFragment plan, const std::unordered_map>& filePaths = {}) { + static std::atomic_uint64_t taskId{0}; auto task = Task::create( - "single.execution.task.0", + fmt::format("single.execution.task.{}", taskId++), plan, 0, core::QueryCtx::create(), @@ -743,15 +744,11 @@ TEST_F(TaskTest, serialExecution) { makeFlatVector(100, [](auto row) { return row + 5; }), }); - uint64_t numCreatedTasks = Task::numCreatedTasks(); - uint64_t numDeletedTasks = Task::numDeletedTasks(); { auto [task, results] = executeSerial(plan); assertEqualResults( std::vector{expectedResult, expectedResult}, results); } - ASSERT_EQ(numCreatedTasks + 1, Task::numCreatedTasks()); - ASSERT_EQ(numDeletedTasks + 1, Task::numDeletedTasks()); // Project + Aggregation. plan = PlanBuilder() @@ -776,14 +773,10 @@ TEST_F(TaskTest, serialExecution) { 995 / 2.0 + 4}), }); - ++numCreatedTasks; - ++numDeletedTasks; { auto [task, results] = executeSerial(plan); assertEqualResults({expectedResult}, results); } - ASSERT_EQ(numCreatedTasks + 1, Task::numCreatedTasks()); - ASSERT_EQ(numDeletedTasks + 1, Task::numDeletedTasks()); // Project + Aggregation over TableScan. auto filePath = TempFilePath::create(); @@ -808,6 +801,65 @@ TEST_F(TaskTest, serialExecution) { VELOX_ASSERT_THROW(executeSerial(plan), "division by zero"); } +// The purpose of the test is to check the running task list APIs. +TEST_F(TaskTest, runningTaskList) { + const auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + + ASSERT_EQ(Task::numRunningTasks(), 0); + ASSERT_TRUE(Task::getRunningTasks().empty()); + + const auto plan = PlanBuilder() + .values({data, data}) + .filter("c0 < 100") + .project({"c0 + 5"}) + .planFragment(); + + // This is to verify that runningTaskList API returns a completed task which + // still has pending references. + std::vector> expectedRunningTasks; + expectedRunningTasks.push_back(executeSerial(plan).first); + ASSERT_EQ(Task::numRunningTasks(), 1); + ASSERT_EQ(Task::getRunningTasks().size(), 1); + + expectedRunningTasks.push_back(executeSerial(plan).first); + ASSERT_EQ(Task::numRunningTasks(), 2); + ASSERT_EQ(Task::getRunningTasks().size(), 2); + + expectedRunningTasks.push_back(executeSerial(plan).first); + ASSERT_EQ(Task::numRunningTasks(), 3); + ASSERT_EQ(Task::getRunningTasks().size(), 3); + + std::set expectedTaskIdSet; + for (const auto& task : expectedRunningTasks) { + expectedTaskIdSet.insert(task->taskId()); + } + ASSERT_EQ(expectedTaskIdSet.size(), 3); + std::vector> runningTasks = Task::getRunningTasks(); + ASSERT_EQ(runningTasks.size(), 3); + for (const auto& task : runningTasks) { + ASSERT_EQ(expectedTaskIdSet.count(task->taskId()), 1); + } + + expectedTaskIdSet.erase(expectedRunningTasks.back()->taskId()); + expectedRunningTasks.pop_back(); + ASSERT_EQ(expectedTaskIdSet.size(), 2); + + runningTasks.clear(); + runningTasks = Task::getRunningTasks(); + ASSERT_EQ(runningTasks.size(), 2); + for (const auto& task : runningTasks) { + ASSERT_EQ(expectedTaskIdSet.count(task->taskId()), 1); + } + + runningTasks.clear(); + expectedRunningTasks.clear(); + + ASSERT_EQ(Task::numRunningTasks(), 0); + ASSERT_TRUE(Task::getRunningTasks().empty()); +} + TEST_F(TaskTest, serialHashJoin) { auto left = makeRowVector( {"t_c0", "t_c1"}, diff --git a/velox/exec/tests/utils/QueryAssertions.cpp b/velox/exec/tests/utils/QueryAssertions.cpp index 50e7a668a94e..f7944e6a9dea 100644 --- a/velox/exec/tests/utils/QueryAssertions.cpp +++ b/velox/exec/tests/utils/QueryAssertions.cpp @@ -1474,48 +1474,28 @@ bool waitForTaskStateChange( } void waitForAllTasksToBeDeleted(uint64_t maxWaitUs) { - const uint64_t numCreatedTasks = Task::numCreatedTasks(); - uint64_t numDeletedTasks = Task::numDeletedTasks(); uint64_t waitUs = 0; - while (numCreatedTasks > numDeletedTasks) { + while (Task::numRunningTasks() != 0) { constexpr uint64_t kWaitInternalUs = 1'000; std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs)); waitUs += kWaitInternalUs; - numDeletedTasks = Task::numDeletedTasks(); if (waitUs >= maxWaitUs) { break; } } - VELOX_CHECK_EQ( - numDeletedTasks, - numCreatedTasks, - "{} tasks have been created while only {} have been deleted after waiting for {} us", - numCreatedTasks, - numDeletedTasks, - waitUs); -} - -void waitForAllTasksToBeDeleted( - uint64_t expectedDeletedTasks, - uint64_t maxWaitUs) { - uint64_t numDeletedTasks = Task::numDeletedTasks(); - uint64_t waitUs = 0; - while (expectedDeletedTasks > numDeletedTasks) { - constexpr uint64_t kWaitInternalUs = 1'000; - std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs)); - waitUs += kWaitInternalUs; - numDeletedTasks = Task::numDeletedTasks(); - if (waitUs >= maxWaitUs) { - break; - } + std::vector> pendingTasks = Task::getRunningTasks(); + if (pendingTasks.empty()) { + return; } - VELOX_CHECK_EQ( - numDeletedTasks, - expectedDeletedTasks, - "expected {} tasks to be deleted but only {} have been deleted after waiting for {} us", - expectedDeletedTasks, - numDeletedTasks, - waitUs); + std::vector pendingTaskStats; + pendingTaskStats.reserve(pendingTasks.size()); + for (const auto& task : pendingTasks) { + pendingTaskStats.push_back(task->toString()); + } + VELOX_FAIL( + "{} pending tasks\n{}", + pendingTasks.size(), + folly::join("\n", pendingTaskStats)); } std::shared_ptr assertQuery( diff --git a/velox/exec/tests/utils/QueryAssertions.h b/velox/exec/tests/utils/QueryAssertions.h index 217d12351e32..d1bbe253a2ff 100644 --- a/velox/exec/tests/utils/QueryAssertions.h +++ b/velox/exec/tests/utils/QueryAssertions.h @@ -213,12 +213,6 @@ bool waitForTaskStateChange( /// during this wait call. This is for testing purpose for now. void waitForAllTasksToBeDeleted(uint64_t maxWaitUs = 3'000'000); -/// Similar to above test utility except waiting for a specific number of -/// tasks to be deleted. -void waitForAllTasksToBeDeleted( - uint64_t expectedDeletedTasks, - uint64_t maxWaitUs); - std::shared_ptr assertQuery( const core::PlanNodePtr& plan, const std::string& duckDbSql,