Skip to content

Commit

Permalink
Fix a task early termination with group execution
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Mar 12, 2024
1 parent 2a1a8ba commit deffc3a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
9 changes: 7 additions & 2 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,12 @@ bool Task::checkNoMoreSplitGroupsLocked() {
// we should review the total number of drivers, which initially is set to
// process all split groups, but in reality workers share split groups and
// each worker processes only a part of them, meaning much less than all.
if (allNodesReceivedNoMoreSplitsMessageLocked()) {
//
// NOTE: we shall only do task finish check after the task has been started
// which initializes 'numDriversPerSplitGroup_', otherwise the task will
// finish early.
if ((numDriversPerSplitGroup_ != 0) &&
allNodesReceivedNoMoreSplitsMessageLocked()) {
numTotalDrivers_ = seenSplitGroups_.size() * numDriversPerSplitGroup_ +
numDriversUngrouped_;
if (groupedPartitionedOutput_) {
Expand Down Expand Up @@ -1586,7 +1591,7 @@ bool Task::checkIfFinishedLocked() {
// TODO Add support for terminating processing early in grouped execution.
bool allFinished = numFinishedDrivers_ == numTotalDrivers_;
if (!allFinished && isUngroupedExecution()) {
auto outputPipelineId = getOutputPipelineId();
const auto outputPipelineId = getOutputPipelineId();
if (splitGroupStates_[kUngroupedGroupId].numFinishedOutputDrivers ==
numDrivers(outputPipelineId)) {
allFinished = true;
Expand Down
46 changes: 46 additions & 0 deletions velox/exec/tests/GroupedExecutionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,4 +540,50 @@ TEST_F(GroupedExecutionTest, groupedExecution) {
EXPECT_EQ(numRead, numSplits * 10'000);
}

TEST_F(GroupedExecutionTest, allGroupSplitsReceivedBeforeTaskStart) {
// Create source file - we will read from it in 6 splits.
const size_t numSplits{6};
auto vectors = makeVectors(10, 1'000);
auto filePath = TempFilePath::create();
writeToFile(filePath->path, vectors);

CursorParameters params;
params.planNode = tableScanNode(ROW({}, {}));
params.maxDrivers = 1;
params.executionStrategy = core::ExecutionStrategy::kGrouped;
params.groupedExecutionLeafNodeIds.emplace(params.planNode->id());
params.numSplitGroups = 3;
params.numConcurrentSplitGroups = 1;

// Create the cursor with the task underneath. It is not started yet.
auto cursor = TaskCursor::create(params);
auto task = cursor->task();

// Add all split groups before start to ensure we can handle such cases.
task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 0));
task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 1));
task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 2));
task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 0));
task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 1));
task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 2));
task->noMoreSplits("0");

// Start task now.
cursor->start();
waitForFinishedDrivers(task, 3);
ASSERT_EQ(
getCompletedSplitGroups(task), std::unordered_set<int32_t>({0, 1, 2}));

// Make sure we've got the right number of rows.
int32_t numReadRows{0};
while (cursor->moveNext()) {
auto vector = cursor->current();
EXPECT_EQ(vector->childrenSize(), 0);
numReadRows += vector->size();
}

// Task must be finished at this stage.
ASSERT_EQ(task->state(), exec::TaskState::kFinished);
ASSERT_EQ(numSplits * 10'000, numReadRows);
}
} // namespace facebook::velox::exec::test

0 comments on commit deffc3a

Please sign in to comment.