Skip to content

Commit

Permalink
[facebookincubator#9025] Add RowsStreamingWindow support
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored and zhztheplayer committed Jul 26, 2024
1 parent a1d0aac commit 1b9e0e0
Show file tree
Hide file tree
Showing 23 changed files with 652 additions and 47 deletions.
3 changes: 2 additions & 1 deletion velox/exec/AggregateWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ void registerAggregateWindowFunction(const std::string& name) {
pool,
stringAllocator,
config);
});
},
{exec::ProcessedUnit::kRows, true});
}
}
} // namespace facebook::velox::exec
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ velox_add_library(
PlanNodeStats.cpp
PrefixSort.cpp
ProbeOperatorState.cpp
RowsStreamingWindowBuild.cpp
RowContainer.cpp
RowNumber.cpp
SortBuffer.cpp
Expand Down
86 changes: 86 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/RowsStreamingWindowBuild.h"
#include "velox/common/testutil/TestValue.h"

namespace facebook::velox::exec {

RowsStreamingWindowBuild::RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}

void RowsStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) {
if (windowPartitions_.size() <= inputPartition_) {
windowPartitions_.push_back(std::make_shared<WindowPartition>(
data_.get(), inversedInputChannels_, sortKeyInfo_));
}

windowPartitions_[inputPartition_]->addRows(inputRows_);

if (isFinished) {
windowPartitions_[inputPartition_]->setComplete();
inputPartition_++;
}

inputRows_.clear();
}

void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
}

for (auto row = 0; row < input->size(); ++row) {
char* newRow = data_->newRow();

for (auto col = 0; col < input->childrenSize(); ++col) {
data_->store(decodedInputVectors_[col], row, newRow, col);
}

if (previousRow_ != nullptr &&
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
buildNextInputOrPartition(true);
}

if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) {
buildNextInputOrPartition(false);
}

inputRows_.push_back(newRow);
previousRow_ = newRow;
}
}

void RowsStreamingWindowBuild::noMoreInput() {
buildNextInputOrPartition(true);
}

std::shared_ptr<WindowPartition> RowsStreamingWindowBuild::nextPartition() {
// The previous partition has already been set to nullptr by the
// Window.cpp#callResetPartition() method.
return windowPartitions_[++outputPartition_];
}

bool RowsStreamingWindowBuild::hasNextPartition() {
return windowPartitions_.size() > 0 &&
outputPartition_ <= int(windowPartitions_.size() - 2);
}

} // namespace facebook::velox::exec
80 changes: 80 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/exec/WindowBuild.h"

namespace facebook::velox::exec {

/// Unlike StreamingWindowBuild, RowsStreamingWindowBuild is capable of
/// processing window functions as rows arrive within a single partition,
/// without the need to wait for the entire partition to be ready. This approach
/// can significantly reduce memory usage, especially when a single partition
/// contains a large amount of data. It is particularly suited for optimizing
/// rank and row_number functions, as well as aggregate window functions with a
/// default frame.
class RowsStreamingWindowBuild : public WindowBuild {
public:
RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection);

void addInput(RowVectorPtr input) override;

void spill() override {
VELOX_UNREACHABLE();
}

std::optional<common::SpillStats> spilledStats() const override {
return std::nullopt;
}

void noMoreInput() override;

bool hasNextPartition() override;

std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
// one, so can consume input rows.
return windowPartitions_.size() == 0 ||
outputPartition_ == windowPartitions_.size() - 1;
}

private:
void buildNextInputOrPartition(bool isFinished);

// Holds input rows within the current partition.
std::vector<char*> inputRows_;

// Used to compare rows based on partitionKeys.
char* previousRow_ = nullptr;

// Current partition being output. Used to return the WidnowPartitions.
vector_size_t outputPartition_ = -1;

// Current partition when adding input. Used to construct WindowPartitions.
vector_size_t inputPartition_ = 0;

// Holds all the WindowPartitions.
std::vector<std::shared_ptr<WindowPartition>> windowPartitions_;
};

} // namespace facebook::velox::exec
6 changes: 3 additions & 3 deletions velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ void SortWindowBuild::loadNextPartitionFromSpill() {
}
}

std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> SortWindowBuild::nextPartition() {
if (merge_ != nullptr) {
VELOX_CHECK(!sortedRows_.empty(), "No window partitions available")
auto partition = folly::Range(sortedRows_.data(), sortedRows_.size());
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand All @@ -316,7 +316,7 @@ std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
auto partition = folly::Range(
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

private:
void ensureInputFits(const RowVectorPtr& input);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/StreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() {
partitionStartRows_.push_back(sortedRows_.size());
}

std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
VELOX_CHECK_GT(
partitionStartRows_.size(), 0, "No window partitions available")

Expand Down Expand Up @@ -89,7 +89,7 @@ std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);

return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/StreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
Expand Down
82 changes: 76 additions & 6 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#include "velox/exec/Window.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/RowsStreamingWindowBuild.h"
#include "velox/exec/SortWindowBuild.h"
#include "velox/exec/StreamingWindowBuild.h"
#include "velox/exec/Task.h"
Expand All @@ -41,8 +42,13 @@ Window::Window(
auto* spillConfig =
spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
if (windowNode->inputsSorted()) {
windowBuild_ = std::make_unique<StreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
if (supportRowsStreaming()) {
windowBuild_ = std::make_unique<RowsStreamingWindowBuild>(
windowNode_, pool(), spillConfig, &nonReclaimableSection_);
} else {
windowBuild_ = std::make_unique<StreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}
} else {
windowBuild_ = std::make_unique<SortWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_);
Expand All @@ -54,6 +60,7 @@ void Window::initialize() {
VELOX_CHECK_NOT_NULL(windowNode_);
createWindowFunctions();
createPeerAndFrameBuffers();
windowBuild_->setNumRowsPerOutput(numRowsPerOutput_);
windowNode_.reset();
}

Expand Down Expand Up @@ -187,6 +194,46 @@ void Window::createWindowFunctions() {
}
}

// Support 'rank', 'dense_rank' and
// 'row_number' functions and the agg window function with default frame.
bool Window::supportRowsStreaming() {
bool supportsStreaming = false;

for (const auto& windowNodeFunction : windowNode_->windowFunctions()) {
const auto& functionName = windowNodeFunction.functionCall->name();
auto windowFunctionMetadata =
exec::getWindowFunctionMetadata(functionName).value();

if (windowFunctionMetadata.processedUnit == ProcessedUnit::kRows) {
const auto& frame = windowNodeFunction.frame;
bool isDefaultFrame =
(frame.startType ==
core::WindowNode::BoundType::kUnboundedPreceding &&
frame.endType == core::WindowNode::BoundType::kCurrentRow);
if (!windowFunctionMetadata.isAggregateWindow || isDefaultFrame) {
supportsStreaming = true;
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::Window::supportRowsStreaming",
&supportsStreaming);
} else {
supportsStreaming = false;
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::Window::supportRowsStreaming",
&supportsStreaming);
break;
}
} else {
supportsStreaming = false;
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::Window::supportRowsStreaming",
&supportsStreaming);
break;
}
}

return supportsStreaming;
}

void Window::addInput(RowVectorPtr input) {
windowBuild_->addInput(input);
numRows_ += input->size();
Expand Down Expand Up @@ -543,9 +590,12 @@ void Window::callApplyForPartitionRows(
vector_size_t endRow,
vector_size_t resultOffset,
const RowVectorPtr& result) {
getInputColumns(startRow, endRow, resultOffset, result);

// The lastRow that was retained from the previous batch will be deleted in
// the computePeerAndFrameBuffers method after peer group compare. Thereforre,
// the getInputColumns method need to be called subsequently.
computePeerAndFrameBuffers(startRow, endRow);

getInputColumns(startRow, endRow, resultOffset, result);
vector_size_t numFuncs = windowFunctions_.size();
for (auto w = 0; w < numFuncs; w++) {
windowFunctions_[w]->apply(
Expand All @@ -561,6 +611,10 @@ void Window::callApplyForPartitionRows(
vector_size_t numRows = endRow - startRow;
numProcessedRows_ += numRows;
partitionOffset_ += numRows;

if (currentPartition_->isPartial()) {
currentPartition_->clearOutputRows(numRows);
}
}

vector_size_t Window::callApplyLoop(
Expand All @@ -574,8 +628,9 @@ vector_size_t Window::callApplyLoop(
// This function requires that the currentPartition_ is available for output.
VELOX_DCHECK_NOT_NULL(currentPartition_);
while (numOutputRowsLeft > 0) {
auto rowsForCurrentPartition =
currentPartition_->numRows() - partitionOffset_;
auto rowsForCurrentPartition = currentPartition_->isPartial()
? currentPartition_->numRowsForProcessing()
: currentPartition_->numRowsForProcessing() - partitionOffset_;
if (rowsForCurrentPartition <= numOutputRowsLeft) {
// Current partition can fit completely in the output buffer.
// So output all its rows.
Expand All @@ -586,6 +641,13 @@ vector_size_t Window::callApplyLoop(
result);
resultIndex += rowsForCurrentPartition;
numOutputRowsLeft -= rowsForCurrentPartition;

if (!currentPartition_->isComplete()) {
// Still more data for the current partition would need to be processed.
// So resume on the next getOutput call.
break;
}

callResetPartition();
if (!currentPartition_) {
// The WindowBuild doesn't have any more partitions to process right
Expand Down Expand Up @@ -628,6 +690,14 @@ RowVectorPtr Window::getOutput() {
}
}

if (!currentPartition_->isComplete() &&
(currentPartition_->numRowsForProcessing() == 0)) {
// The numRows may be 1, because we keep the last row in previous batch to
// compare with the first row in next batch to determine whether they are in
// same peer group.
return nullptr;
}

auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft);
auto result = BaseVector::create<RowVector>(
outputType_, numOutputRows, operatorCtx_->pool());
Expand Down
Loading

0 comments on commit 1b9e0e0

Please sign in to comment.