Skip to content

Commit

Permalink
Fix LocalExchangeSource timeout (#8101)
Browse files Browse the repository at this point in the history
Summary:
Timeout can happen concurrently with the data callback from the output buffer manager. Only one of these must happen. These are not naturally serialized because these do not realize the same promise and do all kinds of other actions in addition to the promise.

Overlap of timeout and data makes exchange fuzzer flaky.

Pull Request resolved: #8101

Reviewed By: Yuhta

Differential Revision: D52287073

Pulled By: oerling

fbshipit-source-id: 3c82d66d1b187b4c98ac2a1b8b26bd81461c9325
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Jan 5, 2024
1 parent 8321b3e commit 7376fb2
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 13 deletions.
47 changes: 47 additions & 0 deletions velox/exec/tests/ExchangeClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class ExchangeClientTest : public testing::Test,
bool atEnd;
ContinueFuture future;
auto pages = client.next(1, &atEnd, &future);
if (pages.empty()) {
auto& exec = folly::QueuedImmediateExecutor::instance();
std::move(future).via(&exec).wait();
pages = client.next(1, &atEnd, &future);
}
ASSERT_EQ(1, pages.size());
}
}
Expand Down Expand Up @@ -344,5 +349,47 @@ TEST_F(ExchangeClientTest, sourceTimeout) {
ASSERT_TRUE(atEnd);
}

TEST_F(ExchangeClientTest, timeoutDuringValueCallback) {
common::testutil::TestValue::enable();
auto row = makeRowVector({makeFlatVector<int32_t>({1, 2, 3})});

auto plan = test::PlanBuilder()
.values({row})
.partitionedOutput({"c0"}, 100)
.planNode();
auto taskId = "local://t1";
auto task = makeTask(taskId, plan);

bufferManager_->initializeTask(
task, core::PartitionedOutputNode::Kind::kPartitioned, 100, 16);

ExchangeClient client(
"t", 17, pool(), ExchangeClient::kDefaultMaxQueuedBytes);
client.addRemoteTaskId(taskId);
int32_t numTimeouts = 0;
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::test::LocalExchangeSource::timeout",
std::function<void(void*)>(([&](void* /*ignore*/) { ++numTimeouts; })));

SCOPED_TESTVALUE_SET(
"facebook::velox::exec::test::LocalExchangeSource",
std::function<void(void*)>(([&](void* /*pages*/) {
std::this_thread::sleep_for(
std::chrono::seconds(2 * ExchangeClient::kDefaultMaxWaitSeconds));
})));

auto thread = std::thread([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
enqueue(taskId, 17, row);
});

fetchPages(client, 1);
thread.join();
EXPECT_EQ(0, numTimeouts);

task->requestCancel();
bufferManager_->removeTask(taskId);
}

} // namespace
} // namespace facebook::velox::exec
76 changes: 63 additions & 13 deletions velox/exec/tests/utils/LocalExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,24 @@ class LocalExchangeSource : public exec::ExchangeSource {
VELOX_CHECK(requestPending_);
auto requestedSequence = sequence_;
auto self = shared_from_this();

// Have a flag shared between the data available and timeout callbacks. Only
// one of these must run but they could overlap at call time.
static std::mutex realizeMutex;
auto state = std::make_shared<State>(State::kPending);

// Since this lambda may outlive 'this', we need to capture a
// shared_ptr to the current object (self).
auto resultCallback = [self, requestedSequence, buffers, this](
auto resultCallback = [self, requestedSequence, buffers, state, this](
std::vector<std::unique_ptr<folly::IOBuf>> data,
int64_t sequence) {
{
std::lock_guard<std::mutex> l(realizeMutex);
if (*state != State::kPending) {
return;
}
*state = State::kResultReceived;
}
if (requestedSequence > sequence) {
VLOG(2) << "Receives earlier sequence than requested: task " << taskId_
<< ", destination " << destination_ << ", requested "
Expand Down Expand Up @@ -127,24 +140,55 @@ class LocalExchangeSource : public exec::ExchangeSource {
if (!requestPromise.isFulfilled()) {
requestPromise.setValue(Response{totalBytes, atEnd_});
}
{
std::lock_guard<std::mutex> l(realizeMutex);
*state = State::kResultProcessed;
}
};

// Call the callback in any case after timeout.
auto& exec = folly::QueuedImmediateExecutor::instance();

future = std::move(future).via(&exec).onTimeout(
std::chrono::seconds(maxWaitSeconds), [self, this] {
common::testutil::TestValue::adjust(
"facebook::velox::exec::test::LocalExchangeSource::timeout",
this);
VeloxPromise<Response> requestPromise;
{
std::lock_guard<std::mutex> l(queue_->mutex());
requestPending_ = false;
requestPromise = std::move(promise_);
}
std::chrono::seconds(maxWaitSeconds), [self, state, this] {
// The timeout callback detects if a result is being
// processed. If so, it waits for the result processing to be
// complete. It must not realize promises while a result is
// being processed. After the result is processed, returning a
// value should be no-op since the promise already has a value.
bool done = false;
bool timeout = false;
do {
{
std::lock_guard<std::mutex> l(realizeMutex);
if (*state == State::kPending) {
*state = State::kTimeout;
timeout = true;
done = true;
} else if (*state == State::kResultReceived) {
done = true;
}
}
if (!done) {
// wait for the result callback to finish on another thread. Must
// not set the future until the other thread is finished.
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
} while (!done);
Response response = {0, false};
if (!requestPromise.isFulfilled()) {
requestPromise.setValue(response);
if (timeout) {
common::testutil::TestValue::adjust(
"facebook::velox::exec::test::LocalExchangeSource::timeout",
this);
VeloxPromise<Response> requestPromise;
{
std::lock_guard<std::mutex> l(queue_->mutex());
requestPending_ = false;
requestPromise = std::move(promise_);
}
if (!requestPromise.isFulfilled()) {
requestPromise.setValue(response);
}
}
return response;
});
Expand Down Expand Up @@ -173,6 +217,12 @@ class LocalExchangeSource : public exec::ExchangeSource {
}

private:
// state for serializing concurrent result and timeout. If timeout
// happens when state is kResultReceived, it must wait until state
// is kResultProcessed. If result arrives when state != kPending,
// the result is ignored.
enum class State { kPending, kResultReceived, kResultProcessed, kTimeout };

bool checkSetRequestPromise() {
VeloxPromise<Response> promise;
{
Expand Down

0 comments on commit 7376fb2

Please sign in to comment.