Skip to content

Commit

Permalink
feat: Fix the task hanging under serialized execution mode (facebooki…
Browse files Browse the repository at this point in the history
…ncubator#11747)

Summary:
Pull Request resolved: facebookincubator#11747

This is resubmit of facebookincubator#11647 with Meta internal streaming use case fix

Reviewed By: Yuhta, vudung45, weijiadeng-uber

Differential Revision: D66708173

fbshipit-source-id: 1c91570401d5a95350582ffa7d66ac912c1eebf6
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 5, 2024
1 parent e40259f commit 627adac
Show file tree
Hide file tree
Showing 3 changed files with 385 additions and 6 deletions.
99 changes: 93 additions & 6 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,11 @@ RowVectorPtr Task::next(ContinueFuture* future) {
}

drivers_ = std::move(drivers);
driverBlockingStates_.reserve(drivers_.size());
for (auto i = 0; i < drivers_.size(); ++i) {
driverBlockingStates_.emplace_back(
std::make_unique<DriverBlockingState>(drivers_[i].get()));
}
}

// Run drivers one at a time. If a driver blocks, continue running the other
Expand All @@ -696,7 +701,10 @@ RowVectorPtr Task::next(ContinueFuture* future) {
int runnableDrivers = 0;
int blockedDrivers = 0;
for (auto i = 0; i < numDrivers; ++i) {
if (drivers_[i] == nullptr) {
// Holds a reference to driver for access as async task terminate might
// remove drivers from 'drivers_' slot.
auto driver = getDriver(i);
if (driver == nullptr) {
// This driver has finished processing.
continue;
}
Expand All @@ -707,16 +715,25 @@ RowVectorPtr Task::next(ContinueFuture* future) {
continue;
}

ContinueFuture blockFuture = ContinueFuture::makeEmpty();
if (driverBlockingStates_[i]->blocked(&blockFuture)) {
VELOX_CHECK(blockFuture.valid());
futures[i] = std::move(blockFuture);
// This driver is still blocked.
++blockedDrivers;
continue;
}
++runnableDrivers;

ContinueFuture driverFuture = ContinueFuture::makeEmpty();
auto result = drivers_[i]->next(&driverFuture);
if (result) {
auto result = driver->next(&driverFuture);
if (result != nullptr) {
VELOX_CHECK(!driverFuture.valid());
return result;
}

if (driverFuture.valid()) {
futures[i] = std::move(driverFuture);
driverBlockingStates_[i]->setDriverFuture(driverFuture);
}

if (error()) {
Expand All @@ -726,7 +743,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {

if (runnableDrivers == 0) {
if (blockedDrivers > 0) {
if (!future) {
if (future == nullptr) {
VELOX_FAIL(
"Cannot make progress as all remaining drivers are blocked and user are not expected to wait.");
} else {
Expand All @@ -736,7 +753,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {
notReadyFutures.emplace_back(std::move(continueFuture));
}
}
*future = folly::collectAll(std::move(notReadyFutures)).unit();
*future = folly::collectAny(std::move(notReadyFutures)).unit();
}
}
return nullptr;
Expand Down Expand Up @@ -792,6 +809,12 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
}
}

std::shared_ptr<Driver> Task::getDriver(uint32_t driverId) const {
VELOX_CHECK_LT(driverId, drivers_.size());
std::unique_lock<std::timed_mutex> l(mutex_);
return drivers_[driverId];
}

void Task::checkExecutionMode(ExecutionMode mode) {
VELOX_CHECK_EQ(mode, mode_, "Inconsistent task execution mode.");
}
Expand Down Expand Up @@ -3116,4 +3139,68 @@ void Task::MemoryReclaimer::abort(
<< "Timeout waiting for task to complete during query memory aborting.";
}
}

void Task::DriverBlockingState::setDriverFuture(ContinueFuture& driverFuture) {
VELOX_CHECK(!blocked_);
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(promises_.empty());
VELOX_CHECK_NULL(error_);
blocked_ = true;
}
std::move(driverFuture)
.via(&folly::InlineExecutor::instance())
.thenValue(
[&, driverHolder = driver_->shared_from_this()](auto&& /* unused */) {
std::vector<std::unique_ptr<ContinuePromise>> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(blocked_);
VELOX_CHECK_NULL(error_);
promises = std::move(promises_);
blocked_ = false;
}
for (auto& promise : promises) {
promise->setValue();
}
})
.thenError(
folly::tag_t<std::exception>{},
[&, driverHolder = driver_->shared_from_this()](
std::exception const& e) {
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(blocked_);
VELOX_CHECK_NULL(error_);
try {
VELOX_FAIL(
"A driver future from task {} was realized with error: {}",
driver_->task()->taskId(),
e.what());
} catch (const VeloxException&) {
error_ = std::current_exception();
}
blocked_ = false;
});
}

bool Task::DriverBlockingState::blocked(ContinueFuture* future) {
VELOX_CHECK_NOT_NULL(future);
std::lock_guard<std::mutex> l(mutex_);
if (error_ != nullptr) {
std::rethrow_exception(error_);
}
if (!blocked_) {
VELOX_CHECK(promises_.empty());
return false;
}
auto [blockPromise, blockFuture] =
makeVeloxContinuePromiseContract(fmt::format(
"DriverBlockingState {} from task {}",
driver_->driverCtx()->driverId,
driver_->task()->taskId()));
*future = std::move(blockFuture);
promises_.emplace_back(
std::make_unique<ContinuePromise>(std::move(blockPromise)));
return true;
}
} // namespace facebook::velox::exec
35 changes: 35 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,8 @@ class Task : public std::enable_shared_from_this<Task> {
// trace enabled.
void maybeInitTrace();

std::shared_ptr<Driver> getDriver(uint32_t driverId) const;

// Universally unique identifier of the task. Used to identify the task when
// calling TaskListener.
const std::string uuid_;
Expand Down Expand Up @@ -1072,6 +1074,39 @@ class Task : public std::enable_shared_from_this<Task> {

std::vector<std::unique_ptr<DriverFactory>> driverFactories_;
std::vector<std::shared_ptr<Driver>> drivers_;

// Tracks the blocking state for each driver under serialized execution mode.
class DriverBlockingState {
public:
explicit DriverBlockingState(const Driver* driver) : driver_(driver) {
VELOX_CHECK_NOT_NULL(driver_);
}

/// Sets driver future by setting the continuation callback via inline
/// executor.
void setDriverFuture(ContinueFuture& diverFuture);

/// Indicates if the associated driver is blocked or not. If blocked,
/// 'future' is set which becomes realized when the driver is unblocked.
///
/// NOTE: the function throws if the driver has encountered error.
bool blocked(ContinueFuture* future);

private:
const Driver* const driver_;

mutable std::mutex mutex_;
// Indicates if the associated driver is blocked or not.
bool blocked_{false};
// Sets the driver future error if not null.
std::exception_ptr error_{nullptr};
// Promises to fulfill when the driver is unblocked.
std::vector<std::unique_ptr<ContinuePromise>> promises_;
};

// Tracks the driver blocking state under serialized execution mode.
std::vector<std::unique_ptr<DriverBlockingState>> driverBlockingStates_;

// When Drivers are closed by the Task, there is a chance that race and/or
// bugs can cause such Drivers to be held forever, in turn holding a pointer
// to the Task making it a zombie Tasks. This vector is used to keep track of
Expand Down
Loading

0 comments on commit 627adac

Please sign in to comment.