Skip to content

Commit

Permalink
Add to track running tasks for velox runtime (#11102)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11102

Track running tasks for velox runtime for runtime task stats collection.
Followup can add to periodic print out or report task stats from velox
runtime stats reporter per query system needs.

Reviewed By: tanjialiang, oerling

Differential Revision: D63444008

fbshipit-source-id: 4e533470e417951cd86456c439101160ab1a05ca
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 27, 2024
1 parent ce2b907 commit 28a8979
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 101 deletions.
48 changes: 45 additions & 3 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,6 @@ std::string taskStateString(TaskState state) {
}
}

std::atomic<uint64_t> Task::numCreatedTasks_ = 0;
std::atomic<uint64_t> Task::numDeletedTasks_ = 0;

bool registerTaskListener(std::shared_ptr<TaskListener> listener) {
return listeners().withWLock([&](auto& listeners) {
for (const auto& existingListener : listeners) {
Expand Down Expand Up @@ -277,6 +274,7 @@ std::shared_ptr<Task> Task::create(
std::move(consumerSupplier),
std::move(onError)));
task->initTaskPool();
task->addToTaskList();
return task;
}

Expand Down Expand Up @@ -310,6 +308,8 @@ Task::Task(
}

Task::~Task() {
removeFromTaskList();

// TODO(spershin): Temporary code designed to reveal what causes SIGABRT in
// jemalloc when destroying some Tasks.
std::string clearStage;
Expand Down Expand Up @@ -355,6 +355,48 @@ Task::~Task() {
}
}

Task::TaskList& Task::taskList() {
static TaskList taskList;
return taskList;
}

folly::SharedMutex& Task::taskListLock() {
static folly::SharedMutex lock;
return lock;
}

size_t Task::numRunningTasks() {
std::shared_lock guard{taskListLock()};
return taskList().size();
}

std::vector<std::shared_ptr<Task>> Task::getRunningTasks() {
std::vector<std::shared_ptr<Task>> tasks;
std::shared_lock guard(taskListLock());
tasks.reserve(taskList().size());
for (auto taskEntry : taskList()) {
if (auto task = taskEntry.taskPtr.lock()) {
tasks.push_back(std::move(task));
}
}
return tasks;
}

void Task::addToTaskList() {
VELOX_CHECK(!taskListEntry_.listHook.is_linked());
taskListEntry_.taskPtr = shared_from_this();

std::unique_lock guard{taskListLock()};
taskList().push_back(taskListEntry_);
}

void Task::removeFromTaskList() {
std::unique_lock guard{taskListLock()};
if (taskListEntry_.listHook.is_linked()) {
taskListEntry_.listHook.unlink();
}
}

uint64_t Task::timeSinceStartMsLocked() const {
if (taskStats_.executionStartTimeMs == 0UL) {
return 0UL;
Expand Down
76 changes: 30 additions & 46 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,16 +640,6 @@ class Task : public std::enable_shared_from_this<Task> {
return driverFactories_[driver->driverCtx()->pipelineId]->numDrivers;
}

/// Returns the number of created and deleted tasks since the velox engine
/// starts running so far.
static uint64_t numCreatedTasks() {
return numCreatedTasks_;
}

static uint64_t numDeletedTasks() {
return numDeletedTasks_;
}

const std::string& spillDirectory() const {
return spillDirectory_;
}
Expand Down Expand Up @@ -677,6 +667,12 @@ class Task : public std::enable_shared_from_this<Task> {
++numThreads_;
}

/// Returns the number of running tasks from velox runtime.
static size_t numRunningTasks();

/// Returns the list of running tasks from velox runtime.
static std::vector<std::shared_ptr<Task>> getRunningTasks();

/// Invoked to run provided 'callback' on each alive driver of the task.
void testingVisitDrivers(const std::function<void(Driver*)>& callback);

Expand All @@ -686,6 +682,20 @@ class Task : public std::enable_shared_from_this<Task> {
}

private:
// Hook of system-wide running task list.
struct TaskListEntry {
std::weak_ptr<Task> taskPtr;
folly::IntrusiveListHook listHook;
};
using TaskList =
folly::IntrusiveList<TaskListEntry, &TaskListEntry::listHook>;

// Returns the system-wide running task list.
FOLLY_EXPORT static TaskList& taskList();

// Returns the lock that protects the system-wide running task list.
FOLLY_EXPORT static folly::SharedMutex& taskListLock();

Task(
const std::string& taskId,
core::PlanFragment planFragment,
Expand All @@ -695,6 +705,13 @@ class Task : public std::enable_shared_from_this<Task> {
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError = nullptr);

// Invoked to add this to the system-wide running task list on task creation.
void addToTaskList();

// Invoked to remove this from the system-wide running task list on task
// destruction.
void removeFromTaskList();

// Consistency check of the task execution to make sure the execution mode
// stays the same.
void checkExecutionMode(ExecutionMode mode);
Expand Down Expand Up @@ -816,22 +833,6 @@ class Task : public std::enable_shared_from_this<Task> {
std::weak_ptr<Task> task_;
};

// Counts the number of created tasks which is incremented on each task
// creation.
static std::atomic<uint64_t> numCreatedTasks_;

// Counts the number of deleted tasks which is incremented on each task
// destruction.
static std::atomic<uint64_t> numDeletedTasks_;

static void taskCreated() {
++numCreatedTasks_;
}

static void taskDeleted() {
++numDeletedTasks_;
}

/// Returns true if state is 'running'.
bool isRunningLocked() const;

Expand Down Expand Up @@ -984,26 +985,6 @@ class Task : public std::enable_shared_from_this<Task> {
// trace enabled.
void maybeInitQueryTrace();

// The helper class used to maintain 'numCreatedTasks_' and 'numDeletedTasks_'
// on task construction and destruction.
class TaskCounter {
public:
TaskCounter() {
Task::taskCreated();
}
~TaskCounter() {
Task::taskDeleted();
}
};
friend class Task::TaskCounter;

// NOTE: keep 'taskCount_' the first member so that it will be the first
// constructed member and the last destructed one. The purpose is to make
// 'numCreatedTasks_' and 'numDeletedTasks_' counting more robust to the
// timing race condition when used in scenarios such as waiting for all the
// tasks to be destructed in test.
const TaskCounter taskCounter_;

// Universally unique identifier of the task. Used to identify the task when
// calling TaskListener.
const std::string uuid_;
Expand All @@ -1020,6 +1001,9 @@ class Task : public std::enable_shared_from_this<Task> {
// executed in a single mode throughout its lifetime
const ExecutionMode mode_;

// Hook in the system wide task list.
TaskListEntry taskListEntry_;

// Root MemoryPool for this Task. All member variables that hold references
// to pool_ must be defined after pool_, childPools_.
std::shared_ptr<memory::MemoryPool> pool_;
Expand Down
7 changes: 3 additions & 4 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1539,17 +1539,16 @@ TEST_F(TableScanTest, preloadingSplitClose) {
latch.count_down();
});
}
ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks());
ASSERT_EQ(Task::numRunningTasks(), 0);
auto task = assertQuery(tableScanNode(), filePaths, "SELECT * FROM tmp", 2);
auto stats = getTableScanRuntimeStats(task);

// Verify that split preloading is enabled.
ASSERT_GT(stats.at("preloadedSplits").sum, 1);

task.reset();
// Once all task references are cleared, the count of deleted tasks should
// promptly match the count of created tasks.
ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks());
// Once all task references are cleared, all the tasks should be destroyed.
ASSERT_EQ(Task::numRunningTasks(), 0);
// Clean blocking items in the IO thread pool.
for (auto& baton : batons) {
baton.post();
Expand Down
70 changes: 61 additions & 9 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,9 @@ class TaskTest : public HiveConnectorTestBase {
core::PlanFragment plan,
const std::unordered_map<std::string, std::vector<std::string>>&
filePaths = {}) {
static std::atomic_uint64_t taskId{0};
auto task = Task::create(
"single.execution.task.0",
fmt::format("single.execution.task.{}", taskId++),
plan,
0,
core::QueryCtx::create(),
Expand Down Expand Up @@ -743,15 +744,11 @@ TEST_F(TaskTest, serialExecution) {
makeFlatVector<int64_t>(100, [](auto row) { return row + 5; }),
});

uint64_t numCreatedTasks = Task::numCreatedTasks();
uint64_t numDeletedTasks = Task::numDeletedTasks();
{
auto [task, results] = executeSerial(plan);
assertEqualResults(
std::vector<RowVectorPtr>{expectedResult, expectedResult}, results);
}
ASSERT_EQ(numCreatedTasks + 1, Task::numCreatedTasks());
ASSERT_EQ(numDeletedTasks + 1, Task::numDeletedTasks());

// Project + Aggregation.
plan = PlanBuilder()
Expand All @@ -776,14 +773,10 @@ TEST_F(TaskTest, serialExecution) {
995 / 2.0 + 4}),
});

++numCreatedTasks;
++numDeletedTasks;
{
auto [task, results] = executeSerial(plan);
assertEqualResults({expectedResult}, results);
}
ASSERT_EQ(numCreatedTasks + 1, Task::numCreatedTasks());
ASSERT_EQ(numDeletedTasks + 1, Task::numDeletedTasks());

// Project + Aggregation over TableScan.
auto filePath = TempFilePath::create();
Expand All @@ -808,6 +801,65 @@ TEST_F(TaskTest, serialExecution) {
VELOX_ASSERT_THROW(executeSerial(plan), "division by zero");
}

// The purpose of the test is to check the running task list APIs.
TEST_F(TaskTest, runningTaskList) {
const auto data = makeRowVector({
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
});

ASSERT_EQ(Task::numRunningTasks(), 0);
ASSERT_TRUE(Task::getRunningTasks().empty());

const auto plan = PlanBuilder()
.values({data, data})
.filter("c0 < 100")
.project({"c0 + 5"})
.planFragment();

// This is to verify that runningTaskList API returns a completed task which
// still has pending references.
std::vector<std::shared_ptr<Task>> expectedRunningTasks;
expectedRunningTasks.push_back(executeSerial(plan).first);
ASSERT_EQ(Task::numRunningTasks(), 1);
ASSERT_EQ(Task::getRunningTasks().size(), 1);

expectedRunningTasks.push_back(executeSerial(plan).first);
ASSERT_EQ(Task::numRunningTasks(), 2);
ASSERT_EQ(Task::getRunningTasks().size(), 2);

expectedRunningTasks.push_back(executeSerial(plan).first);
ASSERT_EQ(Task::numRunningTasks(), 3);
ASSERT_EQ(Task::getRunningTasks().size(), 3);

std::set<std::string> expectedTaskIdSet;
for (const auto& task : expectedRunningTasks) {
expectedTaskIdSet.insert(task->taskId());
}
ASSERT_EQ(expectedTaskIdSet.size(), 3);
std::vector<std::shared_ptr<Task>> runningTasks = Task::getRunningTasks();
ASSERT_EQ(runningTasks.size(), 3);
for (const auto& task : runningTasks) {
ASSERT_EQ(expectedTaskIdSet.count(task->taskId()), 1);
}

expectedTaskIdSet.erase(expectedRunningTasks.back()->taskId());
expectedRunningTasks.pop_back();
ASSERT_EQ(expectedTaskIdSet.size(), 2);

runningTasks.clear();
runningTasks = Task::getRunningTasks();
ASSERT_EQ(runningTasks.size(), 2);
for (const auto& task : runningTasks) {
ASSERT_EQ(expectedTaskIdSet.count(task->taskId()), 1);
}

runningTasks.clear();
expectedRunningTasks.clear();

ASSERT_EQ(Task::numRunningTasks(), 0);
ASSERT_TRUE(Task::getRunningTasks().empty());
}

TEST_F(TaskTest, serialHashJoin) {
auto left = makeRowVector(
{"t_c0", "t_c1"},
Expand Down
46 changes: 13 additions & 33 deletions velox/exec/tests/utils/QueryAssertions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1474,48 +1474,28 @@ bool waitForTaskStateChange(
}

void waitForAllTasksToBeDeleted(uint64_t maxWaitUs) {
const uint64_t numCreatedTasks = Task::numCreatedTasks();
uint64_t numDeletedTasks = Task::numDeletedTasks();
uint64_t waitUs = 0;
while (numCreatedTasks > numDeletedTasks) {
while (Task::numRunningTasks() != 0) {
constexpr uint64_t kWaitInternalUs = 1'000;
std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs));
waitUs += kWaitInternalUs;
numDeletedTasks = Task::numDeletedTasks();
if (waitUs >= maxWaitUs) {
break;
}
}
VELOX_CHECK_EQ(
numDeletedTasks,
numCreatedTasks,
"{} tasks have been created while only {} have been deleted after waiting for {} us",
numCreatedTasks,
numDeletedTasks,
waitUs);
}

void waitForAllTasksToBeDeleted(
uint64_t expectedDeletedTasks,
uint64_t maxWaitUs) {
uint64_t numDeletedTasks = Task::numDeletedTasks();
uint64_t waitUs = 0;
while (expectedDeletedTasks > numDeletedTasks) {
constexpr uint64_t kWaitInternalUs = 1'000;
std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs));
waitUs += kWaitInternalUs;
numDeletedTasks = Task::numDeletedTasks();
if (waitUs >= maxWaitUs) {
break;
}
std::vector<std::shared_ptr<Task>> pendingTasks = Task::getRunningTasks();
if (pendingTasks.empty()) {
return;
}
VELOX_CHECK_EQ(
numDeletedTasks,
expectedDeletedTasks,
"expected {} tasks to be deleted but only {} have been deleted after waiting for {} us",
expectedDeletedTasks,
numDeletedTasks,
waitUs);
std::vector<std::string> pendingTaskStats;
pendingTaskStats.reserve(pendingTasks.size());
for (const auto& task : pendingTasks) {
pendingTaskStats.push_back(task->toString());
}
VELOX_FAIL(
"{} pending tasks\n{}",
pendingTasks.size(),
folly::join("\n", pendingTaskStats));
}

std::shared_ptr<Task> assertQuery(
Expand Down
Loading

0 comments on commit 28a8979

Please sign in to comment.