Skip to content

Commit

Permalink
De-flake tests that verify task failures (facebookincubator#6907)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#6907

Tests that run multi-threaded tasks that are expected to fail sometimes crash.

```
F1005 01:30:34.221653 2215122 DefaultKeepAliveExecutor.h:145] Check failed: keepAliveCount > 0
```

This happens because the Task and the Executor are destroyed while some drivers are still running. A fix is to wait for all drivers to complete before allowing the Task to go out of scope.

Reviewed By: xiaoxmeng

Differential Revision: D49947841

fbshipit-source-id: 10d4bb311e73aedb5e624ee885edf716601d30fc
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Oct 5, 2023
1 parent 812f858 commit bf64482
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 20 deletions.
23 changes: 23 additions & 0 deletions velox/exec/tests/utils/Cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@

namespace facebook::velox::exec::test {

bool waitForTaskDriversToFinish(exec::Task* task, uint64_t maxWaitMicros) {
VELOX_USER_CHECK(!task->isRunning());
uint64_t waitMicros = 0;
while ((task->numFinishedDrivers() != task->numTotalDrivers()) &&
(waitMicros < maxWaitMicros)) {
const uint64_t kWaitMicros = 1000;
std::this_thread::sleep_for(std::chrono::microseconds(kWaitMicros));
waitMicros += kWaitMicros;
}

if (task->numFinishedDrivers() != task->numTotalDrivers()) {
LOG(ERROR)
<< "Timed out waiting for all task drivers to finish. Finished drivers: "
<< task->numFinishedDrivers()
<< ". Total drivers: " << task->numTotalDrivers();
}

return task->numFinishedDrivers() == task->numTotalDrivers();
}

exec::BlockingReason TaskQueue::enqueue(
RowVectorPtr vector,
velox::ContinueFuture* future) {
Expand Down Expand Up @@ -195,6 +215,9 @@ bool TaskCursor::moveNext() {
start();
current_ = queue_->dequeue();
if (task_->error()) {
// Wait for all task drivers to finish to avoid destroying the executor_
// before task_ finished using it and causing a crash.
waitForTaskDriversToFinish(task_.get());
std::rethrow_exception(task_->error());
}
if (!current_) {
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/tests/utils/Cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@

namespace facebook::velox::exec::test {

/// Wait up to maxWaitMicros for all the task drivers to finish. The function
/// returns true if all the drivers have finished, otherwise false.
///
/// NOTE: user must call this on a finished or failed task.
bool waitForTaskDriversToFinish(
exec::Task* task,
uint64_t maxWaitMicros = 1'000'000);

// Parameters for initializing a TaskCursor or RowCursor.
struct CursorParameters {
// Root node of the plan tree
Expand Down
12 changes: 0 additions & 12 deletions velox/exec/tests/utils/QueryAssertions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1279,18 +1279,6 @@ bool waitForTaskStateChange(
return task->state() == state;
}

bool waitForTaskDriversToFinish(exec::Task* task, uint64_t maxWaitMicros) {
VELOX_USER_CHECK(!task->isRunning());
uint64_t waitMicros = 0;
while ((task->numFinishedDrivers() != task->numTotalDrivers()) &&
(waitMicros < maxWaitMicros)) {
const uint64_t kWaitMicros = 1000;
std::this_thread::sleep_for(std::chrono::microseconds(kWaitMicros));
waitMicros += kWaitMicros;
}
return task->numFinishedDrivers() == task->numTotalDrivers();
}

void waitForAllTasksToBeDeleted(uint64_t maxWaitUs) {
const uint64_t numCreatedTasks = Task::numCreatedTasks();
uint64_t numDeletedTasks = Task::numDeletedTasks();
Expand Down
8 changes: 0 additions & 8 deletions velox/exec/tests/utils/QueryAssertions.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,6 @@ bool waitForTaskStateChange(
TaskState state,
uint64_t maxWaitMicros = 1'000'000);

/// Wait up to maxWaitMicros for all the task drivers to finish. The function
/// returns true if all the drivers have finished, otherwise false.
///
/// NOTE: user must call this on a finished or failed task.
bool waitForTaskDriversToFinish(
exec::Task* task,
uint64_t maxWaitMicros = 1'000'000);

/// Invoked to wait for all the tasks created by the test to be deleted.
///
/// NOTE: it is assumed that there is no more task to be created after or
Expand Down

0 comments on commit bf64482

Please sign in to comment.