Skip to content

Commit

Permalink
Fix empty sources case in Merge operator in case of task termination … (
Browse files Browse the repository at this point in the history
facebookincubator#5004)

Summary:
There is a race condition can trigger Merge operator SEGV in
Meta internal Prestissimo shadowing test:
1. Presto coordinator sends all the splits to a worker (with no more split)
2. Presto coordinator (or some other worker side failure) triggers early
    task termination which cleanup the pending splits to process
3. A running merge operator from the failed task calls isBlocked() call for
    the first time and get empty sources and non-blocked reason to continue
    Note: there is no source merge tree created
4. The merge operator getOutput call run into segment fault as no source
    merge tree is created.

Add  a unit test to create this race condition and verified the fix which mark the
operator finishes when sources are empty.

Pull Request resolved: facebookincubator#5004

Reviewed By: Yuhta, amitkdutta, tanjialiang

Differential Revision: D46130001

Pulled By: xiaoxmeng

fbshipit-source-id: b0f372639a54bbf7fbd17ad6f09faa6020f2ac0e
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed May 24, 2023
1 parent 7f972cc commit b05cd61
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 1 deletion.
1 change: 0 additions & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <folly/ScopeGuard.h>
#include <folly/executors/QueuedImmediateExecutor.h>
#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
#include <folly/executors/thread_factory/InitThreadFactory.h>
#include <gflags/gflags.h>
#include "velox/common/testutil/TestValue.h"
Expand Down
9 changes: 9 additions & 0 deletions velox/exec/Merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,20 @@ void Merge::initializeTreeOfLosers() {
}

BlockingReason Merge::isBlocked(ContinueFuture* future) {
TestValue::adjust("facebook::velox::exec::Merge::isBlocked", this);

auto reason = addMergeSources(future);
if (reason != BlockingReason::kNotBlocked) {
return reason;
}

// NOTE: the task might terminate early which leaves empty sources. Once it
// happens, we shall simply mark the merge operator as finished.
if (sources_.empty()) {
finished_ = true;
return BlockingReason::kNotBlocked;
}

// No merging is needed if there is only one source.
if (streams_.empty() && sources_.size() > 1) {
initializeTreeOfLosers();
Expand Down
57 changes: 57 additions & 0 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ using namespace facebook::velox::connector::hive;
using namespace facebook::velox::common::testutil;
using namespace facebook::velox::memory;

using facebook::velox::common::testutil::TestValue;
using facebook::velox::test::BatchMaker;

class MultiFragmentTest : public HiveConnectorTestBase {
Expand Down Expand Up @@ -1288,3 +1289,59 @@ TEST_F(MultiFragmentTest, taskTerminateWithPendingOutputBuffers) {
ASSERT_EQ(task.use_count(), 1);
task.reset();
}

DEBUG_ONLY_TEST_F(MultiFragmentTest, mergeWithEarlyTermination) {
setupSources(10, 1000);

std::vector<std::shared_ptr<TempFilePath>> filePaths(
filePaths_.begin(), filePaths_.begin());

std::vector<std::string> partialSortTaskIds;
auto sortTaskId = makeTaskId("orderby", 0);
partialSortTaskIds.push_back(sortTaskId);
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto partialSortPlan = PlanBuilder(planNodeIdGenerator)
.localMerge(
{"c0"},
{PlanBuilder(planNodeIdGenerator)
.tableScan(rowType_)
.orderBy({"c0"}, true)
.planNode()})
.partitionedOutput({}, 1)
.planNode();

auto partialSortTask = makeTask(sortTaskId, partialSortPlan, 1);
Task::start(partialSortTask, 1);
addHiveSplits(partialSortTask, filePaths);

std::atomic<bool> blockMergeOnce{true};
folly::EventCount mergeIsBlockedWait;
auto mergeIsBlockedWaitKet = mergeIsBlockedWait.prepareWait();
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Merge::isBlocked",
std::function<void(const Operator*)>([&](const Operator* op) {
if (op->operatorType() != "MergeExchange") {
return;
}
if (!blockMergeOnce.exchange(false)) {
return;
}
mergeIsBlockedWait.wait(mergeIsBlockedWaitKet);
// Trigger early termination.
op->testingOperatorCtx()->task()->requestAbort();
}));

auto finalSortTaskId = makeTaskId("orderby", 1);
auto finalSortPlan = PlanBuilder()
.mergeExchange(partialSortPlan->outputType(), {"c0"})
.partitionedOutput({}, 1)
.planNode();
auto finalSortTask = makeTask(finalSortTaskId, finalSortPlan, 0);
Task::start(finalSortTask, 1);
addRemoteSplits(finalSortTask, partialSortTaskIds);

mergeIsBlockedWait.notify();

ASSERT_TRUE(waitForTaskCompletion(partialSortTask.get(), 1'000'000'000));
ASSERT_TRUE(waitForTaskAborted(finalSortTask.get(), 1'000'000'000));
}

0 comments on commit b05cd61

Please sign in to comment.