Skip to content

Commit

Permalink
Add support for splitting up input in PartitionedOutput (#7059)
Browse files Browse the repository at this point in the history
Summary:
In case of a single partition or round robin partitioning, we end up
generating a single IndexRange with the full set of input rows to be
processed by the serializer. The current implementation only checks
if it needs to flush the serialized data after processing a complete
IndexRange. That check exists to ensure we don’t serialize if we hit
either memory or row count limits. Due to this we can encounter
situations where the full input gets serialized and is way over those
limits resulting in OOMs. This change adds support for partially
serializing an IndexRange and checks the limit after each row.
Additionally this fixes a bug where row count limits were incorrectly
evaluated, either because the number of IndexRanges were being
compared or the row count was not maintained between serialization
calls that did not result in flushing.

Fixes: #7048

Pull Request resolved: #7059

Test Plan:
Added unit tests that verify both paths and ensure an input IndexRange
can be split multiple ways.

Reviewed By: xiaoxmeng

Differential Revision: D50293366

Pulled By: bikramSingh91

fbshipit-source-id: dece682bbd7b10ac8acb21f4373c3171eb29b3f9
  • Loading branch information
bikramSingh91 authored and facebook-github-bot committed Oct 16, 2023
1 parent de33aef commit 56cb7e6
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 31 deletions.
58 changes: 30 additions & 28 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,43 @@ BlockingReason Destination::advance(
return flush(bufferManager, bufferReleaseFn, future);
}

auto firstRangeIdx = rangeIdx_;
for (; rangeIdx_ < ranges_.size(); ++rangeIdx_) {
// TODO: add support for serializing partial ranges if the full range is too
// big.
for (vector_size_t i = 0; i < ranges_[rangeIdx_].size; i++) {
bytesInCurrent_ += sizes[ranges_[rangeIdx_].begin + i];
// Collect ranges to serialize.
rangesToSerialize_.clear();
bool shouldFlush = false;
while (rangeIdx_ < ranges_.size() && !shouldFlush) {
auto& currRange = ranges_[rangeIdx_];
auto startRow = rowsInCurrentRange_;
for (; rowsInCurrentRange_ < currRange.size && !shouldFlush;
rowsInCurrentRange_++) {
++rowsInCurrent_;
bytesInCurrent_ += sizes[currRange.begin + rowsInCurrentRange_];
shouldFlush = bytesInCurrent_ >= adjustedMaxBytes ||
rowsInCurrent_ >= targetNumRows_;
}
if (bytesInCurrent_ >= adjustedMaxBytes ||
rangeIdx_ - firstRangeIdx >= targetNumRows_) {
serialize(output, firstRangeIdx, rangeIdx_ + 1);
if (rangeIdx_ == ranges_.size() - 1) {
*atEnd = true;
}
++rangeIdx_;
return flush(bufferManager, bufferReleaseFn, future);
rangesToSerialize_.push_back(
{currRange.begin + startRow, rowsInCurrentRange_ - startRow});
if (rowsInCurrentRange_ == currRange.size) {
rowsInCurrentRange_ = 0;
rangeIdx_++;
}
}
serialize(output, firstRangeIdx, rangeIdx_);
*atEnd = true;
return BlockingReason::kNotBlocked;
}

void Destination::serialize(
const RowVectorPtr& output,
vector_size_t begin,
vector_size_t end) {
// Serialize
if (!current_) {
current_ = std::make_unique<VectorStreamGroup>(pool_);
auto rowType = asRowType(output->type());
vector_size_t numRows = 0;
for (vector_size_t i = begin; i < end; i++) {
numRows += ranges_[i].size;
}
current_->createStreamTree(rowType, numRows);
current_->createStreamTree(rowType, rowsInCurrent_);
}
current_->append(output, folly::Range(&ranges_[begin], end - begin));
current_->append(
output, folly::Range(&rangesToSerialize_[0], rangesToSerialize_.size()));
// Update output state variable.
if (rangeIdx_ == ranges_.size()) {
*atEnd = true;
}
if (shouldFlush) {
return flush(bufferManager, bufferReleaseFn, future);
}
return BlockingReason::kNotBlocked;
}

BlockingReason Destination::flush(
Expand All @@ -95,6 +96,7 @@ BlockingReason Destination::flush(
current_->flush(&stream);
current_.reset();
bytesInCurrent_ = 0;
rowsInCurrent_ = 0;
setTargetSizePct();

bool blocked = bufferManager.enqueue(
Expand Down
18 changes: 15 additions & 3 deletions velox/exec/PartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ class Destination {
// Resets the destination before starting a new batch.
void beginBatch() {
ranges_.clear();
rangesToSerialize_.clear();
rangeIdx_ = 0;
rowsInCurrentRange_ = 0;
}

void addRow(vector_size_t row) {
Expand All @@ -47,6 +49,7 @@ class Destination {
ranges_.push_back(rows);
}

// Serializes row from 'output' till either 'maxBytes' have been serialized or
BlockingReason advance(
uint64_t maxBytes,
const std::vector<vector_size_t>& sizes,
Expand Down Expand Up @@ -74,9 +77,6 @@ class Destination {
}

private:
void
serialize(const RowVectorPtr& input, vector_size_t begin, vector_size_t end);

// Sets the next target size for flushing. This is called at the
// start of each batch of output for the destination. The effect is
// to make different destinations ready at slightly different times
Expand All @@ -93,11 +93,23 @@ class Destination {
const std::string taskId_;
const int destination_;
memory::MemoryPool* const pool_;
// Bytes serialized in 'current_'
uint64_t bytesInCurrent_{0};
// Number of rows serialized in 'current_'
vector_size_t rowsInCurrent_{0};
std::vector<IndexRange> ranges_;
// List of ranges to be serialized. This is only used by
// Destination::advance() and defined as a member variable to reuse allocated
// capacity between calls.
std::vector<IndexRange> rangesToSerialize_;

// First range index of 'ranges_' that is not appended to 'current_'.
vector_size_t rangeIdx_{0};
// Number of rows serialized in the current range pointed to by 'rangeIdx_'.
// This is non-zero if the current range was partially serialized.
vector_size_t rowsInCurrentRange_{0};
// The current stream where the input is serialized to. This is cleared on
// every flush() call.
std::unique_ptr<VectorStreamGroup> current_;
bool finished_{false};

Expand Down
72 changes: 72 additions & 0 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,78 @@ TEST_F(MultiFragmentTest, partitionedOutput) {
}
}

TEST_F(MultiFragmentTest, partitionedOutputWithLargeInput) {
// Verify that partitionedOutput operator is able to split a single input
// vector if it hits memory or row limits.
// We create a large vector that hits the row limit (70% - 120% of 10,000)
// which would hit a task level memory limit of 1MB unless its split up.
// This test exercises splitting up the input both from the edges and the
// middle as it ends up splitting it in ~ 10 splits.
setupSources(1, 100'000);
const int64_t kRootMemoryLimit = 1 << 20; // 1MB
// Single Partition
{
auto leafTaskId = makeTaskId("leaf", 0);
auto leafPlan =
PlanBuilder()
.values(vectors_)
.partitionedOutput({}, 1, {"c0", "c1", "c2", "c3", "c4"})
.planNode();
auto leafTask =
makeTask(leafTaskId, leafPlan, 0, nullptr, kRootMemoryLimit);
Task::start(leafTask, 1);
auto op = PlanBuilder().exchange(leafPlan->outputType()).planNode();

auto task =
assertQuery(op, {leafTaskId}, "SELECT c0, c1, c2, c3, c4 FROM tmp");
auto exchangeStats = task->taskStats().pipelineStats[0].operatorStats[0];
ASSERT_GT(exchangeStats.inputVectors, 2);
ASSERT_TRUE(waitForTaskCompletion(leafTask.get()))
<< leafTask->taskId() << "state: " << leafTask->state();
}

// Multiple partitions but round-robin.
{
constexpr int32_t kFanout = 2;
auto leafTaskId = makeTaskId("leaf", 0);
auto leafPlan =
PlanBuilder()
.values(vectors_)
.partitionedOutput(
{},
kFanout,
false,
std::make_shared<exec::RoundRobinPartitionFunctionSpec>(),
{"c0", "c1", "c2", "c3", "c4"})
.planNode();
auto leafTask = makeTask(leafTaskId, leafPlan, 0);
Task::start(leafTask, 1);

auto intermediatePlan =
PlanBuilder()
.exchange(leafPlan->outputType())
.partitionedOutput({}, 1, {"c0", "c1", "c2", "c3", "c4"})
.planNode();
std::vector<std::string> intermediateTaskIds;
for (auto i = 0; i < kFanout; ++i) {
intermediateTaskIds.push_back(makeTaskId("intermediate", i));
auto intermediateTask =
makeTask(intermediateTaskIds.back(), intermediatePlan, i);
Task::start(intermediateTask, 1);
addRemoteSplits(intermediateTask, {leafTaskId});
}

auto op = PlanBuilder().exchange(intermediatePlan->outputType()).planNode();

auto task = assertQuery(
op, intermediateTaskIds, "SELECT c0, c1, c2, c3, c4 FROM tmp");
auto exchangeStats = task->taskStats().pipelineStats[0].operatorStats[0];
ASSERT_GT(exchangeStats.inputVectors, 2);
ASSERT_TRUE(waitForTaskCompletion(leafTask.get()))
<< "state: " << leafTask->state();
}
}

TEST_F(MultiFragmentTest, broadcast) {
auto data = makeRowVector(
{makeFlatVector<int32_t>(1'000, [](auto row) { return row; })});
Expand Down

0 comments on commit 56cb7e6

Please sign in to comment.