diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 92e1e9e58e3c..f8a77828c686 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -1160,12 +1160,14 @@ WindowNode::WindowNode( std::vector sortingOrders, std::vector windowColumnNames, std::vector windowFunctions, + bool inputsSorted, PlanNodePtr source) : PlanNode(std::move(id)), partitionKeys_(std::move(partitionKeys)), sortingKeys_(std::move(sortingKeys)), sortingOrders_(std::move(sortingOrders)), windowFunctions_(std::move(windowFunctions)), + inputsSorted_(inputsSorted), sources_{std::move(source)}, outputType_(getWindowOutputType( sources_[0]->outputType(), @@ -1201,6 +1203,8 @@ void WindowNode::addDetails(std::stringstream& stream) const { stream << outputType_->names()[i] << " := "; addWindowFunction(stream, windowFunctions_[i - numInputCols]); } + + stream << " inputsSorted [" << inputsSorted_ << "]"; } namespace { @@ -1317,6 +1321,7 @@ folly::dynamic WindowNode::serialize() const { windowNames.push_back(outputType_->nameOf(i)); } obj["names"] = ISerializable::serialize(windowNames); + obj["inputsSorted"] = inputsSorted_; return obj; } @@ -1336,6 +1341,8 @@ PlanNodePtr WindowNode::create(const folly::dynamic& obj, void* context) { auto windowNames = deserializeStrings(obj["names"]); + auto inputsSorted = obj["inputsSorted"].asBool(); + return std::make_shared( deserializePlanNodeId(obj), partitionKeys, @@ -1343,6 +1350,7 @@ PlanNodePtr WindowNode::create(const folly::dynamic& obj, void* context) { sortingOrders, windowNames, functions, + inputsSorted, source); } diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 76e53e01664d..6c1b118ce2e5 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -2002,6 +2002,7 @@ class WindowNode : public PlanNode { std::vector sortingOrders, std::vector windowColumnNames, std::vector windowFunctions, + bool inputsSorted, PlanNodePtr source); const std::vector& sources() const override { @@ -2030,6 +2031,10 @@ class WindowNode : public PlanNode { return windowFunctions_; } + bool inputsSorted() const { + return inputsSorted_; + } + std::string_view name() const override { return "Window"; } @@ -2048,6 +2053,8 @@ class WindowNode : public PlanNode { const std::vector windowFunctions_; + const bool inputsSorted_; + const std::vector sources_; const RowTypePtr outputType_; diff --git a/velox/docs/develop/operators.rst b/velox/docs/develop/operators.rst index 0fbce1b9211c..54374ab58945 100644 --- a/velox/docs/develop/operators.rst +++ b/velox/docs/develop/operators.rst @@ -55,7 +55,7 @@ LocalMergeNode LocalMerge LocalPartitionNode LocalPartition and LocalExchange EnforceSingleRowNode EnforceSingleRow AssignUniqueIdNode AssignUniqueId -WindowNode Window +WindowNode Window or StreamingWindow RowNumberNode RowNumber TopNRowNumberNode TopNRowNumber ========================== ============================================== =========================== @@ -550,6 +550,8 @@ If no sorting columns are specified then the order of the results is unspecified - Output column names for each window function invocation in windowFunctions list below. * - windowFunctions - Window function calls with the frame clause. e.g row_number(), first_value(name) between range 10 preceding and current row. The default frame is between range unbounded preceding and current row. + * - inputsSorted + - If the input data is already sorted, the StreamingWindow will be used to reduce the memory footprint. RowNumberNode ~~~~~~~~~~~~~ diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 421e0b81fcb9..fb777c1b20ed 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -54,10 +54,12 @@ add_library( RowContainer.cpp RowNumber.cpp SortedAggregations.cpp + SortWindowBuild.cpp Spill.cpp SpillOperatorGroup.cpp Spiller.cpp StreamingAggregation.cpp + StreamingWindowBuild.cpp TableScan.cpp TableWriter.cpp Task.cpp @@ -67,6 +69,7 @@ add_library( Values.cpp VectorHasher.cpp Window.cpp + WindowBuild.cpp WindowFunction.cpp WindowPartition.cpp AssignUniqueId.cpp diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index c589e8076ee2..afd7cf75aed4 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -310,6 +310,14 @@ class RowContainer { vector_size_t resultOffset, const VectorPtr& result); + /// Sets in result all locations with null values in col for rows + /// (for numRows number of rows). + static void extractNulls( + const char* FOLLY_NONNULL const* FOLLY_NONNULL rows, + int32_t numRows, + RowColumn col, + const BufferPtr& result); + // Copies the values at 'columnIndex' into 'result' for the // 'numRows' rows pointed to by 'rows'. If an entry in 'rows' is null, sets // corresponding row in 'result' to null. @@ -350,6 +358,15 @@ class RowContainer { rows, rowNumbers, columnAt(columnIndex), resultOffset, result); } + /// Sets in result all locations with null values in columnIndex for rows. + void extractNulls( + const char* FOLLY_NONNULL const* FOLLY_NONNULL rows, + int32_t numRows, + int32_t columnIndex, + const BufferPtr& result) { + extractNulls(rows, numRows, columnAt(columnIndex), result); + } + /// Copies the 'probed' flags for the specified rows into 'result'. /// The 'result' is expected to be flat vector of type boolean. /// For rows with null keys, sets null in 'result' if 'setNullForNullKeysRow' @@ -1279,6 +1296,29 @@ inline void RowContainer::extractColumn( result); } +inline void RowContainer::extractNulls( + const char* FOLLY_NONNULL const* FOLLY_NONNULL rows, + int32_t numRows, + RowColumn column, + const BufferPtr& result) { + VELOX_DCHECK(result->size() >= bits::nbytes(numRows)); + auto* rawResult = result->asMutable(); + bits::fillBits(rawResult, 0, numRows, false); + + auto nullMask = column.nullMask(); + if (!nullMask) { + return; + } + + auto nullByte = column.nullByte(); + for (int32_t i = 0; i < numRows; ++i) { + const char* row = rows[i]; + if (row == nullptr || isNullAt(row, nullByte, nullMask)) { + bits::setBit(rawResult, i, true); + } + } +} + template inline bool RowContainer::equals( const char* FOLLY_NONNULL row, diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp new file mode 100644 index 000000000000..32ec4aa97b46 --- /dev/null +++ b/velox/exec/SortWindowBuild.cpp @@ -0,0 +1,131 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/SortWindowBuild.h" + +namespace facebook::velox::exec { + +SortWindowBuild::SortWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool) + : WindowBuild(windowNode, pool) { + allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size()); + allKeyInfo_.insert( + allKeyInfo_.cend(), partitionKeyInfo_.begin(), partitionKeyInfo_.end()); + allKeyInfo_.insert( + allKeyInfo_.cend(), sortKeyInfo_.begin(), sortKeyInfo_.end()); + partitionStartRows_.resize(0); +} + +void SortWindowBuild::addInput(RowVectorPtr input) { + for (auto col = 0; col < input->childrenSize(); ++col) { + decodedInputVectors_[col].decode(*input->childAt(col)); + } + + // Add all the rows into the RowContainer. + for (auto row = 0; row < input->size(); ++row) { + char* newRow = data_->newRow(); + + for (auto col = 0; col < input->childrenSize(); ++col) { + data_->store(decodedInputVectors_[col], row, newRow, col); + } + } + numRows_ += input->size(); +} + +void SortWindowBuild::computePartitionStartRows() { + partitionStartRows_.reserve(numRows_); + auto partitionCompare = [&](const char* lhs, const char* rhs) -> bool { + return compareRowsWithKeys(lhs, rhs, partitionKeyInfo_); + }; + + // Using a sequential traversal to find changing partitions. + // This algorithm is inefficient and can be changed + // i) Use a binary search kind of strategy. + // ii) If we use a Hashtable instead of a full sort then the count + // of rows in the partition can be directly used. + partitionStartRows_.push_back(0); + + VELOX_CHECK_GT(sortedRows_.size(), 0); + for (auto i = 1; i < sortedRows_.size(); i++) { + if (partitionCompare(sortedRows_[i - 1], sortedRows_[i])) { + partitionStartRows_.push_back(i); + } + } + + // Setting the startRow of the (last + 1) partition to be returningRows.size() + // to help for last partition related calculations. + partitionStartRows_.push_back(sortedRows_.size()); +} + +void SortWindowBuild::sortPartitions() { + // This is a very inefficient but easy implementation to order the input rows + // by partition keys + sort keys. + // Sort the pointers to the rows in RowContainer (data_) instead of sorting + // the rows. + sortedRows_.resize(numRows_); + RowContainerIterator iter; + data_->listRows(&iter, numRows_, sortedRows_.data()); + + std::sort( + sortedRows_.begin(), + sortedRows_.end(), + [this](const char* leftRow, const char* rightRow) { + return compareRowsWithKeys(leftRow, rightRow, allKeyInfo_); + }); + + computePartitionStartRows(); +} + +void SortWindowBuild::noMoreInput() { + if (numRows_ == 0) { + return; + } + // At this point we have seen all the input rows. The operator is + // being prepared to output rows now. + // To prepare the rows for output in SortWindowBuild they need to + // be separated into partitions and sort by ORDER BY keys within + // the partition. This will order the rows for getOutput(). + sortPartitions(); +} + +std::unique_ptr SortWindowBuild::nextPartition() { + VELOX_CHECK(partitionStartRows_.size() > 0, "No window partitions available") + + currentPartition_++; + VELOX_CHECK( + currentPartition_ <= partitionStartRows_.size() - 2, + "All window partitions consumed"); + + auto windowPartition = std::make_unique( + data_.get(), inputColumns_, sortKeyInfo_); + // There is partition data available now. + auto partitionSize = partitionStartRows_[currentPartition_ + 1] - + partitionStartRows_[currentPartition_]; + auto partition = folly::Range( + sortedRows_.data() + partitionStartRows_[currentPartition_], + partitionSize); + windowPartition->resetPartition(partition); + + return windowPartition; +} + +bool SortWindowBuild::hasNextPartition() { + return partitionStartRows_.size() > 0 && + currentPartition_ < int(partitionStartRows_.size() - 2); +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h new file mode 100644 index 000000000000..5a3f78942260 --- /dev/null +++ b/velox/exec/SortWindowBuild.h @@ -0,0 +1,78 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/WindowBuild.h" + +namespace facebook::velox::exec { + +// Sorts input data of the Window by {partition keys, sort keys} +// to identify window partitions. This sort fully orders +// rows as needed for window function computation. +class SortWindowBuild : public WindowBuild { + public: + SortWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool); + + bool needsInput() override { + // No partitions are available yet, so can consume input rows. + return partitionStartRows_.size() == 0; + } + + void addInput(RowVectorPtr input) override; + + void noMoreInput() override; + + bool hasNextPartition() override; + + std::unique_ptr nextPartition() override; + + private: + // Main sorting function loop done after all input rows are received + // by WindowBuild. + void sortPartitions(); + + // Function to compute the partitionStartRows_ structure. + // partitionStartRows_ is vector of the starting rows index + // of each partition in the data. This is an auxiliary + // structure that helps simplify the window function computations. + void computePartitionStartRows(); + + // allKeyInfo_ is a combination of (partitionKeyInfo_ and sortKeyInfo_). + // It is used to perform a full sorting of the input rows to be able to + // separate partitions and sort the rows in it. The rows are output in + // this order by the operator. + std::vector> allKeyInfo_; + + // Vector of pointers to each input row in the data_ RowContainer. + // The rows are sorted by partitionKeys + sortKeys. This total + // ordering can be used to split partitions (with the correct + // order by) for the processing. + std::vector sortedRows_; + + // This is a vector that gives the index of the start row + // (in sortedRows_) of each partition in the RowContainer data_. + // This auxiliary structure helps demarcate partitions. + std::vector partitionStartRows_; + + // Current partition being output. Used to construct WindowPartitions + // during resetPartition. + vector_size_t currentPartition_ = -1; +}; + +} // namespace facebook::velox::exec diff --git a/velox/exec/StreamingWindowBuild.cpp b/velox/exec/StreamingWindowBuild.cpp new file mode 100644 index 000000000000..c845a7eceea4 --- /dev/null +++ b/velox/exec/StreamingWindowBuild.cpp @@ -0,0 +1,100 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/StreamingWindowBuild.h" + +namespace facebook::velox::exec { + +StreamingWindowBuild::StreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool) + : WindowBuild(windowNode, pool) { + partitionStartRows_.resize(0); + sortedRows_.resize(0); +} + +void StreamingWindowBuild::startNewPartition() { + partitionStartRows_.push_back(sortedRows_.size()); + numRows_ = sortedRows_.size(); + sortedRows_.insert( + sortedRows_.end(), partitionRows_.begin(), partitionRows_.end()); + partitionRows_.clear(); +} + +void StreamingWindowBuild::addInput(RowVectorPtr input) { + for (auto col = 0; col < input->childrenSize(); ++col) { + decodedInputVectors_[col].decode(*input->childAt(col)); + } + + for (auto row = 0; row < input->size(); ++row) { + char* newRow = data_->newRow(); + + for (auto col = 0; col < input->childrenSize(); ++col) { + data_->store(decodedInputVectors_[col], row, newRow, col); + } + + if (previousRow_ != nullptr && + compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { + startNewPartition(); + } + + partitionRows_.push_back(newRow); + previousRow_ = newRow; + } +} + +void StreamingWindowBuild::noMoreInput() { + startNewPartition(); + partitionStartRows_.push_back(sortedRows_.size()); + numRows_ = sortedRows_.size(); +} + +std::unique_ptr StreamingWindowBuild::nextPartition() { + VELOX_CHECK(partitionStartRows_.size() > 0, "No window partitions available") + + currentPartition_++; + VELOX_CHECK( + currentPartition_ <= partitionStartRows_.size() - 2, + "All window partitions consumed"); + + // Erase previous partition. + if (currentPartition_ > 0) { + auto numPreviousPartitionRows = partitionStartRows_[currentPartition_] - + partitionStartRows_[currentPartition_ - 1]; + data_->eraseRows(folly::Range( + sortedRows_.data() + partitionStartRows_[currentPartition_ - 1], + numPreviousPartitionRows)); + } + + auto windowPartition = std::make_unique( + data_.get(), inputColumns_, sortKeyInfo_); + // There is partition data available now. + auto partitionSize = partitionStartRows_[currentPartition_ + 1] - + partitionStartRows_[currentPartition_]; + auto partition = folly::Range( + sortedRows_.data() + partitionStartRows_[currentPartition_], + partitionSize); + windowPartition->resetPartition(partition); + + return windowPartition; +} + +bool StreamingWindowBuild::hasNextPartition() { + return partitionStartRows_.size() > 0 && + currentPartition_ < int(partitionStartRows_.size() - 2); +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/StreamingWindowBuild.h b/velox/exec/StreamingWindowBuild.h new file mode 100644 index 000000000000..1f296cf87157 --- /dev/null +++ b/velox/exec/StreamingWindowBuild.h @@ -0,0 +1,73 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/WindowBuild.h" + +namespace facebook::velox::exec { + +// Need to ensure that the input data is already sorted. The previous partition +// will be erased when calling the next partition to reduce the memory +// footprint. +class StreamingWindowBuild : public WindowBuild { + public: + StreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool); + + void addInput(RowVectorPtr input) override; + + void noMoreInput() override; + + std::unique_ptr nextPartition() override; + + bool hasNextPartition() override; + + bool needsInput() override { + // No partitions are available or the currentPartition is the last available + // one, so can consume input rows. + return partitionStartRows_.size() == 0 || + currentPartition_ == partitionStartRows_.size() - 2; + } + + private: + // Create a new partition. + void startNewPartition(); + + // Vector of pointers to each input row in the data_ RowContainer. + // The rows are sorted by partitionKeys + sortKeys. This total + // ordering can be used to split partitions (with the correct + // order by) for the processing. + std::vector sortedRows_; + + // Holds rows within the current partition. + std::vector partitionRows_; + + // This is a vector that gives the index of the start row + // (in sortedRows_) of each partition in the RowContainer data_. + // This auxiliary structure helps demarcate partitions. + std::vector partitionStartRows_; + + // Used to compare rows based on partitionKeys. + char* previousRow_ = nullptr; + + // Current partition being output. Used to construct WindowPartitions + // during resetPartition. + vector_size_t currentPartition_ = -1; +}; + +} // namespace facebook::velox::exec diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index 0a57b6c497a8..1b40700efb88 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -15,35 +15,12 @@ */ #include "velox/exec/Window.h" #include "velox/exec/OperatorUtils.h" +#include "velox/exec/SortWindowBuild.h" +#include "velox/exec/StreamingWindowBuild.h" #include "velox/exec/Task.h" -DEFINE_bool(SkipRowSortInWindowOp, false, "Skip row sort"); - namespace facebook::velox::exec { -namespace { -void initKeyInfo( - const RowTypePtr& type, - const std::vector& keys, - const std::vector& orders, - std::vector>& keyInfo) { - const core::SortOrder defaultPartitionSortOrder(true, true); - - keyInfo.reserve(keys.size()); - for (auto i = 0; i < keys.size(); ++i) { - auto channel = exprToChannel(keys[i].get(), type); - VELOX_CHECK( - channel != kConstantChannel, - "Window doesn't allow constant partition or sort keys"); - if (i < orders.size()) { - keyInfo.push_back(std::make_pair(channel, orders[i])); - } else { - keyInfo.push_back(std::make_pair(channel, defaultPartitionSortOrder)); - } - } -} -}; // namespace - Window::Window( int32_t operatorId, DriverCtx* driverCtx, @@ -55,43 +32,35 @@ Window::Window( windowNode->id(), "Window"), numInputColumns_(windowNode->sources()[0]->outputType()->size()), - data_(std::make_unique( - windowNode->sources()[0]->outputType()->children(), - pool())), - decodedInputVectors_(numInputColumns_), + windowNode_(windowNode), + currentPartition_(nullptr), stringAllocator_(pool()) { - auto inputType = windowNode->sources()[0]->outputType(); - initKeyInfo(inputType, windowNode->partitionKeys(), {}, partitionKeyInfo_); - initKeyInfo( - inputType, - windowNode->sortingKeys(), - windowNode->sortingOrders(), - sortKeyInfo_); - allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size()); - allKeyInfo_.insert( - allKeyInfo_.cend(), partitionKeyInfo_.begin(), partitionKeyInfo_.end()); - allKeyInfo_.insert( - allKeyInfo_.cend(), sortKeyInfo_.begin(), sortKeyInfo_.end()); - - std::vector inputColumns; - for (int i = 0; i < inputType->children().size(); i++) { - inputColumns.push_back(data_->columnAt(i)); + if (windowNode->inputsSorted()) { + windowBuild_ = std::make_unique(windowNode, pool()); + } else { + windowBuild_ = std::make_unique(windowNode, pool()); } - // The WindowPartition is structured over all the input columns data. - // Individual functions access its input argument column values from it. - // The RowColumns are copied by the WindowPartition, so its fine to use - // a local variable here. - windowPartition_ = - std::make_unique(inputColumns, inputType->children()); - - createWindowFunctions(windowNode, inputType); - - initRangeValuesMap(); + createWindowFunctions(); + createPeerAndFrameBuffers(); } Window::WindowFrame Window::createWindowFrame( core::WindowNode::Frame frame, const RowTypePtr& inputType) { + if (frame.type == core::WindowNode::WindowType::kRows) { + auto frameBoundCheck = [&](const core::TypedExprPtr& frame) -> void { + if (frame == nullptr) { + return; + } + + VELOX_USER_CHECK( + frame->type() == INTEGER() || frame->type() == BIGINT(), + "k frame bound must be INTEGER or BIGINT type"); + }; + frameBoundCheck(frame.startValue); + frameBoundCheck(frame.endValue); + } + auto createFrameChannelArg = [&](const core::TypedExprPtr& frame) -> std::optional { // frame is nullptr for non (kPreceding or kFollowing) frames. @@ -103,28 +72,21 @@ Window::WindowFrame Window::createWindowFrame( auto constant = std::dynamic_pointer_cast(frame) ->value(); - VELOX_CHECK(!constant.isNull(), "k in frame bounds must not be null"); + VELOX_CHECK(!constant.isNull(), "Window frame offset must not be null"); + auto value = VariantConverter::convert(constant, TypeKind::BIGINT) + .value(); VELOX_USER_CHECK_GE( - constant.value(), 1, "k in frame bounds must be at least 1"); - return std::make_optional(FrameChannelArg{ - kConstantChannel, nullptr, constant.value()}); + value, 0, "Window frame {} offset must not be negative", value); + return std::make_optional( + FrameChannelArg{kConstantChannel, nullptr, value}); } else { return std::make_optional(FrameChannelArg{ - frameChannel, BaseVector::create(BIGINT(), 0, pool()), std::nullopt}); + frameChannel, + BaseVector::create(frame->type(), 0, pool()), + std::nullopt}); } }; - // If this is a k Range frame bound, then its evaluation requires that the - // order by key be a single column (to add or subtract the k range value - // from). - if (frame.type == core::WindowNode::WindowType::kRange && - (frame.startValue || frame.endValue)) { - VELOX_USER_CHECK_EQ( - sortKeyInfo_.size(), - 1, - "Window frame of type RANGE PRECEDING or FOLLOWING requires single sort item in ORDER BY."); - } - return WindowFrame( {frame.type, frame.startType, @@ -133,13 +95,13 @@ Window::WindowFrame Window::createWindowFrame( createFrameChannelArg(frame.endValue)}); } -void Window::createWindowFunctions( - const std::shared_ptr& windowNode, - const RowTypePtr& inputType) { - for (const auto& windowNodeFunction : windowNode->windowFunctions()) { - VELOX_USER_CHECK( - !windowNodeFunction.ignoreNulls, - "Ignore nulls for window functions is not supported yet"); +void Window::createWindowFunctions() { + VELOX_CHECK_NOT_NULL(windowNode_); + VELOX_CHECK(windowFunctions_.empty()); + VELOX_CHECK(windowFrames_.empty()); + + const auto& inputType = windowNode_->sources()[0]->outputType(); + for (const auto& windowNodeFunction : windowNode_->windowFunctions()) { std::vector functionArgs; functionArgs.reserve(windowNodeFunction.functionCall->inputs().size()); for (auto& arg : windowNodeFunction.functionCall->inputs()) { @@ -166,64 +128,15 @@ void Window::createWindowFunctions( } } -void Window::initRangeValuesMap() { - auto isKBoundFrame = [](core::WindowNode::BoundType boundType) -> bool { - return boundType == core::WindowNode::BoundType::kPreceding || - boundType == core::WindowNode::BoundType::kFollowing; - }; - - hasKRangeFrames_ = false; - for (const auto& frame : windowFrames_) { - if (frame.type == core::WindowNode::WindowType::kRange && - (isKBoundFrame(frame.startType) || isKBoundFrame(frame.endType))) { - hasKRangeFrames_ = true; - rangeValuesMap_.rangeType = outputType_->childAt(sortKeyInfo_[0].first); - rangeValuesMap_.rangeValues = - BaseVector::create(rangeValuesMap_.rangeType, 0, pool()); - break; - } - } -} - void Window::addInput(RowVectorPtr input) { - for (auto col = 0; col < input->childrenSize(); ++col) { - decodedInputVectors_[col].decode(*input->childAt(col)); - } - - // Add all the rows into the RowContainer. - for (auto row = 0; row < input->size(); ++row) { - char* newRow = data_->newRow(); - - for (auto col = 0; col < input->childrenSize(); ++col) { - data_->store(decodedInputVectors_[col], row, newRow, col); - } - } - numRows_ += input->size(); -} - -inline bool Window::compareRowsWithKeys( - const char* lhs, - const char* rhs, - const std::vector>& keys) { - if (lhs == rhs) { - return false; - } - for (auto& key : keys) { - if (auto result = data_->compare( - lhs, - rhs, - key.first, - {key.second.isNullsFirst(), key.second.isAscending(), false})) { - return result < 0; - } - } - return false; + windowBuild_->addInput(input); + numRows_ = windowBuild_->numRows(); } void Window::createPeerAndFrameBuffers() { // TODO: This computation needs to be revised. It only takes into account // the input columns size. We need to also account for the output columns. - numRowsPerOutput_ = outputBatchRows(data_->estimateRowSize()); + numRowsPerOutput_ = outputBatchRows(windowBuild_->estimateRowSize()); peerStartBuffer_ = AlignedBuffer::allocate( numRowsPerOutput_, operatorCtx_->pool()); @@ -246,158 +159,21 @@ void Window::createPeerAndFrameBuffers() { } } -void Window::computePartitionStartRows() { - // Randomly assuming that max 10000 partitions are in the data. - partitionStartRows_.reserve(numRows_); - auto partitionCompare = [&](const char* lhs, const char* rhs) -> bool { - return compareRowsWithKeys(lhs, rhs, partitionKeyInfo_); - }; - - // Using a sequential traversal to find changing partitions. - // This algorithm is inefficient and can be changed - // i) Use a binary search kind of strategy. - // ii) If we use a Hashtable instead of a full sort then the count - // of rows in the partition can be directly used. - partitionStartRows_.push_back(0); - - VELOX_CHECK_GT(sortedRows_.size(), 0); - for (auto i = 1; i < sortedRows_.size(); i++) { - if (partitionCompare(sortedRows_[i - 1], sortedRows_[i])) { - partitionStartRows_.push_back(i); - } - } - - // Setting the startRow of the (last + 1) partition to be returningRows.size() - // to help for last partition related calculations. - partitionStartRows_.push_back(sortedRows_.size()); -} - -void Window::sortPartitions() { - // This is a very inefficient but easy implementation to order the input rows - // by partition keys + sort keys. - // Sort the pointers to the rows in RowContainer (data_) instead of sorting - // the rows. - sortedRows_.resize(numRows_); - RowContainerIterator iter; - data_->listRows(&iter, numRows_, sortedRows_.data()); - if (!FLAGS_SkipRowSortInWindowOp) { - std::sort( - sortedRows_.begin(), - sortedRows_.end(), - [this](const char* leftRow, const char* rightRow) { - return compareRowsWithKeys(leftRow, rightRow, allKeyInfo_); - }); - } - - computePartitionStartRows(); - - currentPartition_ = 0; -} - void Window::noMoreInput() { Operator::noMoreInput(); - // No data. - if (numRows_ == 0) { - finished_ = true; - return; - } - - // At this point we have seen all the input rows. We can start - // outputting rows now. - // However, some preparation is needed. The rows should be - // separated into partitions and sort by ORDER BY keys within - // the partition. This will order the rows for getOutput(). - sortPartitions(); - createPeerAndFrameBuffers(); -} - -void Window::computeRangeValuesMap() { - auto peerCompare = [&](const char* lhs, const char* rhs) -> bool { - return compareRowsWithKeys(lhs, rhs, sortKeyInfo_); - }; - auto firstPartitionRow = partitionStartRows_[currentPartition_]; - auto lastPartitionRow = partitionStartRows_[currentPartition_ + 1] - 1; - auto numRows = lastPartitionRow - firstPartitionRow + 1; - rangeValuesMap_.rangeValues->resize(numRows); - rangeValuesMap_.rowIndices.resize(numRows); - - rangeValuesMap_.rowIndices[0] = 0; - int j = 1; - for (auto i = firstPartitionRow + 1; i <= lastPartitionRow; i++) { - // Here, we removed the below check code, in order to keep raw values. - // if (peerCompare(sortedRows_[i - 1], sortedRows_[i])) { - // The order by values are extracted from the Window partition which - // starts from row number 0 for the firstPartitionRow. So the index - // requires adjustment. - rangeValuesMap_.rowIndices[j++] = i - firstPartitionRow; - // } - } - - // If sort key is desc then reverse the rowIndices so that the range values - // are guaranteed ascending for the further lookup logic. - auto valueIndexesRange = folly::Range(rangeValuesMap_.rowIndices.data(), j); - windowPartition_->extractColumn( - sortKeyInfo_[0].first, valueIndexesRange, 0, rangeValuesMap_.rangeValues); + windowBuild_->noMoreInput(); + numRows_ = windowBuild_->numRows(); } -void Window::callResetPartition(vector_size_t partitionNumber) { +void Window::callResetPartition() { partitionOffset_ = 0; - auto partitionSize = partitionStartRows_[partitionNumber + 1] - - partitionStartRows_[partitionNumber]; - auto partition = folly::Range( - sortedRows_.data() + partitionStartRows_[partitionNumber], partitionSize); - windowPartition_->resetPartition(partition); - for (int i = 0; i < windowFunctions_.size(); i++) { - windowFunctions_[i]->resetPartition(windowPartition_.get()); - } - - if (hasKRangeFrames_) { - computeRangeValuesMap(); - } -} - -void Window::updateKRowsFrameBounds( - bool isKPreceding, - const FrameChannelArg& frameArg, - vector_size_t startRow, - vector_size_t numRows, - vector_size_t* rawFrameBounds) { - auto firstPartitionRow = partitionStartRows_[currentPartition_]; - - if (frameArg.index == kConstantChannel) { - auto constantOffset = frameArg.constant.value(); - auto startValue = startRow + - (isKPreceding ? -constantOffset : constantOffset) - firstPartitionRow; - auto lastPartitionRow = partitionStartRows_[currentPartition_ + 1] - 1; - // TODO: check first partition boundary and validate the frame. - for (int i = 0; i < numRows; i++) { - if (startValue > lastPartitionRow) { - rawFrameBounds[i] = lastPartitionRow + 1; - } else { - rawFrameBounds[i] = startValue; - } - startValue++; - } - // std::iota(rawFrameBounds, rawFrameBounds + numRows, startValue); - } else { - windowPartition_->extractColumn( - frameArg.index, partitionOffset_, numRows, 0, frameArg.value); - auto offsets = frameArg.value->values()->as(); - for (auto i = 0; i < numRows; i++) { - VELOX_USER_CHECK( - !frameArg.value->isNullAt(i), "k in frame bounds cannot be null"); - VELOX_USER_CHECK_GE( - offsets[i], 1, "k in frame bounds must be at least 1"); - } - - // Preceding involves subtracting from the current position, while following - // moves ahead. - int precedingFactor = isKPreceding ? -1 : 1; - for (auto i = 0; i < numRows; i++) { - // TOOD: check whether the value is inside [firstPartitionRow, - // lastPartitionRow]. - rawFrameBounds[i] = (startRow + i) + - vector_size_t(precedingFactor * offsets[i]) - firstPartitionRow; + peerStartRow_ = 0; + peerEndRow_ = 0; + currentPartition_ = nullptr; + if (windowBuild_->hasNextPartition()) { + currentPartition_ = windowBuild_->nextPartition(); + for (int i = 0; i < windowFunctions_.size(); i++) { + windowFunctions_[i]->resetPartition(currentPartition_.get()); } } } @@ -405,180 +181,54 @@ void Window::updateKRowsFrameBounds( namespace { template -vector_size_t findIndex( - const T value, - vector_size_t leftBound, - vector_size_t rightBound, - const FlatVectorPtr& values, - bool findStart) { - vector_size_t originalRightBound = rightBound; - vector_size_t originalLeftBound = leftBound; - while (leftBound < rightBound) { - vector_size_t mid = round((leftBound + rightBound) / 2.0); - auto midValue = values->valueAt(mid); - if (value == midValue) { - return mid; - } - - if (value < midValue) { - rightBound = mid - 1; - } else { - leftBound = mid + 1; - } +void updateKRowsOffsetsColumn( + bool isKPreceding, + const VectorPtr& value, + vector_size_t startRow, + vector_size_t numRows, + vector_size_t* rawFrameBounds) { + auto offsets = value->values()->as(); + for (auto i = 0; i < numRows; i++) { + VELOX_USER_CHECK( + !value->isNullAt(i), "Window frame offset must not be null"); + VELOX_USER_CHECK_GE( + offsets[i], + 0, + "Window frame {} offset must not be negative", + offsets[i]); } - // The value is not found but leftBound == rightBound at this point. - // This could be a value which is the least number greater than - // or the largest number less than value. - // The semantics of this function are to always return the smallest larger - // value (or rightBound if end of range). - if (findStart) { - if (value <= values->valueAt(rightBound)) { - // return std::max(originalLeftBound, rightBound); - return rightBound; - } - return std::min(originalRightBound, rightBound + 1); - } - if (value < values->valueAt(rightBound)) { - return std::max(originalLeftBound, rightBound - 1); + // Preceding involves subtracting from the current position, while following + // moves ahead. + int precedingFactor = isKPreceding ? -1 : 1; + for (auto i = 0; i < numRows; i++) { + rawFrameBounds[i] = + (startRow + i) + vector_size_t(precedingFactor * offsets[i]); } - // std::max(originalLeftBound, rightBound)? - return std::min(originalRightBound, rightBound); -} - -} // namespace - -// TODO: unify into one function. -template -inline vector_size_t Window::kRangeStartBoundSearch( - const T value, - vector_size_t leftBound, - vector_size_t rightBound, - const FlatVectorPtr& valuesVector, - const vector_size_t* rawPeerStarts, - vector_size_t& indexFound) { - auto index = findIndex(value, leftBound, rightBound, valuesVector, true); - indexFound = index; - // Since this is a kPreceding bound it includes the row at the index. - return rangeValuesMap_.rowIndices[rawPeerStarts[index]]; } -// TODO: lastRightBoundRow looks useless. -template -vector_size_t Window::kRangeEndBoundSearch( - const T value, - vector_size_t leftBound, - vector_size_t rightBound, - vector_size_t lastRightBoundRow, - const FlatVectorPtr& valuesVector, - const vector_size_t* rawPeerEnds, - vector_size_t& indexFound) { - auto index = findIndex(value, leftBound, rightBound, valuesVector, false); - indexFound = index; - return rangeValuesMap_.rowIndices[rawPeerEnds[index]]; -} +}; // namespace -template -void Window::updateKRangeFrameBounds( +void Window::updateKRowsFrameBounds( bool isKPreceding, - bool isStartBound, const FrameChannelArg& frameArg, + vector_size_t startRow, vector_size_t numRows, - vector_size_t* rawFrameBounds, - const vector_size_t* rawPeerStarts, - const vector_size_t* rawPeerEnds) { - using NativeType = typename TypeTraits::NativeType; - // Extract the order by key column to calculate the range values for the frame - // boundaries. - std::shared_ptr sortKeyType = - outputType_->childAt(sortKeyInfo_[0].first); - auto orderByValues = BaseVector::create(sortKeyType, numRows, pool()); - windowPartition_->extractColumn( - sortKeyInfo_[0].first, partitionOffset_, numRows, 0, orderByValues); - auto* rangeValuesFlatVector = orderByValues->asFlatVector(); - auto* rawRangeValues = rangeValuesFlatVector->mutableRawValues(); - + vector_size_t* rawFrameBounds) { if (frameArg.index == kConstantChannel) { auto constantOffset = frameArg.constant.value(); - constantOffset = isKPreceding ? -constantOffset : constantOffset; - for (int i = 0; i < numRows; i++) { - rawRangeValues[i] = rangeValuesFlatVector->valueAt(i) + constantOffset; - } + auto startValue = + startRow + (isKPreceding ? -constantOffset : constantOffset); + std::iota(rawFrameBounds, rawFrameBounds + numRows, startValue); } else { - windowPartition_->extractColumn( + currentPartition_->extractColumn( frameArg.index, partitionOffset_, numRows, 0, frameArg.value); - auto offsets = frameArg.value->values()->as(); - for (auto i = 0; i < numRows; i++) { - VELOX_USER_CHECK( - !frameArg.value->isNullAt(i), "k in frame bounds cannot be null"); - VELOX_USER_CHECK_GE( - offsets[i], 1, "k in frame bounds must be at least 1"); - } - - auto precedingFactor = isKPreceding ? -1 : 1; - for (auto i = 0; i < numRows; i++) { - rawRangeValues[i] = rangeValuesFlatVector->valueAt(i) + - vector_size_t(precedingFactor * offsets[i]); - } - } - - // Set the frame bounds from looking up the rangeValues index. - vector_size_t leftBound = 0; - vector_size_t rightBound = rangeValuesMap_.rowIndices.size() - 1; - auto lastPartitionRow = partitionStartRows_[currentPartition_ + 1] - 1; - auto rangeIndexValues = std::dynamic_pointer_cast>( - rangeValuesMap_.rangeValues); - vector_size_t indexFound; - if (isStartBound) { - vector_size_t dynamicLeftBound = leftBound; - vector_size_t dynamicRightBound = 0; - for (auto i = 0; i < numRows; i++) { - // Handle null. - // Different with duckDB result. May need to separate the handling for - // spark & presto. - if (rangeValuesFlatVector->mayHaveNulls() && - rangeValuesFlatVector->isNullAt(i)) { - rawFrameBounds[i] = i; - continue; - } - // It is supposed the index being found is always on the left of the - // current handling position if we only consider positive lower value - // offset (>= 1). - dynamicRightBound = i; - rawFrameBounds[i] = kRangeStartBoundSearch( - rawRangeValues[i], - dynamicLeftBound, - dynamicRightBound, - rangeIndexValues, - rawPeerStarts, - indexFound); - dynamicLeftBound = indexFound; - } - } else { - vector_size_t dynamicRightBound = rightBound; - vector_size_t dynamicLeftBound = 0; - for (auto i = 0; i < numRows; i++) { - // Handle null. - // Different with duckDB result. May need to separate the handling for - // spark & presto. - if (rangeValuesFlatVector->mayHaveNulls() && - rangeValuesFlatVector->isNullAt(i)) { - rawFrameBounds[i] = i; - continue; - } - // It is supposed the index being found is always on the right of the - // current handling position if we only consider positive higher value - // offset (>= 1). - dynamicLeftBound = i; - rawFrameBounds[i] = kRangeEndBoundSearch( - rawRangeValues[i], - dynamicLeftBound, - dynamicRightBound, - lastPartitionRow, - rangeIndexValues, - rawPeerEnds, - indexFound); - dynamicRightBound = rightBound; + if (frameArg.value->typeKind() == TypeKind::INTEGER) { + updateKRowsOffsetsColumn( + isKPreceding, frameArg.value, startRow, numRows, rawFrameBounds); + } else { + updateKRowsOffsetsColumn( + isKPreceding, frameArg.value, startRow, numRows, rawFrameBounds); } } } @@ -591,8 +241,6 @@ void Window::updateFrameBounds( const vector_size_t* rawPeerStarts, const vector_size_t* rawPeerEnds, vector_size_t* rawFrameBounds) { - auto firstPartitionRow = partitionStartRows_[currentPartition_]; - auto lastPartitionRow = partitionStartRows_[currentPartition_ + 1] - 1; auto windowType = windowFrame.type; auto boundType = isStartBound ? windowFrame.startType : windowFrame.endType; auto frameArg = isStartBound ? windowFrame.start : windowFrame.end; @@ -602,8 +250,7 @@ void Window::updateFrameBounds( std::fill_n(rawFrameBounds, numRows, 0); break; case core::WindowNode::BoundType::kUnboundedFollowing: - std::fill_n( - rawFrameBounds, numRows, lastPartitionRow - firstPartitionRow); + std::fill_n(rawFrameBounds, numRows, currentPartition_->numRows() - 1); break; case core::WindowNode::BoundType::kCurrentRow: { if (windowType == core::WindowNode::WindowType::kRange) { @@ -613,12 +260,8 @@ void Window::updateFrameBounds( } else { // Fills the frameBound buffer with increasing value of row indices // (corresponding to CURRENT ROW) from the startRow of the current - // output buffer. The startRow has to be adjusted relative to the - // partition start row. - std::iota( - rawFrameBounds, - rawFrameBounds + numRows, - startRow - firstPartitionRow); + // output buffer. + std::iota(rawFrameBounds, rawFrameBounds + numRows, startRow); } break; } @@ -627,47 +270,7 @@ void Window::updateFrameBounds( updateKRowsFrameBounds( true, frameArg.value(), startRow, numRows, rawFrameBounds); } else { -#define VELOX_DYNAMIC_LIMITED_SCALAR_TYPE_DISPATCH( \ - TEMPLATE_FUNC, typeKind, ...) \ - [&]() { \ - switch (typeKind) { \ - case ::facebook::velox::TypeKind::INTEGER: { \ - return TEMPLATE_FUNC<::facebook::velox::TypeKind::INTEGER>( \ - __VA_ARGS__); \ - } \ - case ::facebook::velox::TypeKind::TINYINT: { \ - return TEMPLATE_FUNC<::facebook::velox::TypeKind::TINYINT>( \ - __VA_ARGS__); \ - } \ - case ::facebook::velox::TypeKind::SMALLINT: { \ - return TEMPLATE_FUNC<::facebook::velox::TypeKind::SMALLINT>( \ - __VA_ARGS__); \ - } \ - case ::facebook::velox::TypeKind::BIGINT: { \ - return TEMPLATE_FUNC<::facebook::velox::TypeKind::BIGINT>( \ - __VA_ARGS__); \ - } \ - case ::facebook::velox::TypeKind::DATE: { \ - return TEMPLATE_FUNC<::facebook::velox::TypeKind::DATE>(__VA_ARGS__); \ - } \ - default: \ - VELOX_FAIL( \ - "Not supported type for sort key!: {}", \ - mapTypeKindToName(typeKind)); \ - } \ - }() - // Sort key type. - auto sortKeyTypePtr = outputType_->childAt(sortKeyInfo_[0].first); - VELOX_DYNAMIC_LIMITED_SCALAR_TYPE_DISPATCH( - updateKRangeFrameBounds, - sortKeyTypePtr->kind(), - true, - isStartBound, - frameArg.value(), - numRows, - rawFrameBounds, - rawPeerStarts, - rawPeerEnds); + VELOX_NYI("k preceding frame is only supported in ROWS mode"); } break; } @@ -676,19 +279,7 @@ void Window::updateFrameBounds( updateKRowsFrameBounds( false, frameArg.value(), startRow, numRows, rawFrameBounds); } else { - // Sort key type. - auto sortKeyTypePtr = outputType_->childAt(sortKeyInfo_[0].first); - VELOX_DYNAMIC_LIMITED_SCALAR_TYPE_DISPATCH( - updateKRangeFrameBounds, - sortKeyTypePtr->kind(), - false, - isStartBound, - frameArg.value(), - numRows, - rawFrameBounds, - rawPeerStarts, - rawPeerEnds); -#undef VELOX_DYNAMIC_LIMITED_SCALAR_TYPE_DISPATCH + VELOX_NYI("k following frame is only supported in ROWS mode"); } break; } @@ -732,15 +323,9 @@ void computeValidFrames( }; // namespace -void Window::callApplyForPartitionRows( +void Window::computePeerAndFrameBuffers( vector_size_t startRow, - vector_size_t endRow, - const std::vector& result, - vector_size_t resultOffset) { - if (partitionStartRows_[currentPartition_] == startRow) { - callResetPartition(currentPartition_); - } - + vector_size_t endRow) { vector_size_t numRows = endRow - startRow; vector_size_t numFuncs = windowFunctions_.size(); @@ -765,39 +350,8 @@ void Window::callApplyForPartitionRows( rawFrameEnds.push_back(rawFrameEnd); } - auto peerCompare = [&](const char* lhs, const char* rhs) -> bool { - return compareRowsWithKeys(lhs, rhs, sortKeyInfo_); - }; - auto firstPartitionRow = partitionStartRows_[currentPartition_]; - auto lastPartitionRow = partitionStartRows_[currentPartition_ + 1] - 1; - for (auto i = startRow, j = 0; i < endRow; i++, j++) { - // When traversing input partition rows, the peers are the rows - // with the same values for the ORDER BY clause. These rows - // are equal in some ways and affect the results of ranking functions. - // This logic exploits the fact that all rows between the peerStartRow_ - // and peerEndRow_ have the same values for peerStartRow_ and peerEndRow_. - // So we can compute them just once and reuse across the rows in that peer - // interval. Note: peerStartRow_ and peerEndRow_ can be maintained across - // getOutput calls. - - // Compute peerStart and peerEnd rows for the first row of the partition or - // when past the previous peerGroup. - if (i == firstPartitionRow || i >= peerEndRow_) { - peerStartRow_ = i; - peerEndRow_ = i; - while (peerEndRow_ <= lastPartitionRow) { - if (peerCompare(sortedRows_[peerStartRow_], sortedRows_[peerEndRow_])) { - break; - } - peerEndRow_++; - } - } - - // Peer buffer values should be offsets from the start of the partition - // as WindowFunction only sees one partition at a time. - rawPeerStarts[j] = peerStartRow_ - firstPartitionRow; - rawPeerEnds[j] = peerEndRow_ - 1 - firstPartitionRow; - } + std::tie(peerStartRow_, peerEndRow_) = currentPartition_->computePeerBuffers( + startRow, endRow, peerStartRow_, peerEndRow_, rawPeerStarts, rawPeerEnds); for (auto i = 0; i < numFuncs; i++) { const auto& windowFrame = windowFrames_[i]; @@ -828,15 +382,36 @@ void Window::callApplyForPartitionRows( // Ranking functions do not care about frames. So the function decides // further what to do with empty frames. computeValidFrames( - lastPartitionRow - firstPartitionRow, + currentPartition_->numRows() - 1, numRows, rawFrameStarts[i], rawFrameEnds[i], validFrames_[i]); } } +} + +void Window::getInputColumns( + vector_size_t startRow, + vector_size_t endRow, + vector_size_t resultOffset, + const RowVectorPtr& result) { + auto numRows = endRow - startRow; + for (int i = 0; i < numInputColumns_; ++i) { + currentPartition_->extractColumn( + i, partitionOffset_, numRows, resultOffset, result->childAt(i)); + } +} + +void Window::callApplyForPartitionRows( + vector_size_t startRow, + vector_size_t endRow, + vector_size_t resultOffset, + const RowVectorPtr& result) { + getInputColumns(startRow, endRow, resultOffset, result); - // Invoke the apply method for the WindowFunctions. + computePeerAndFrameBuffers(startRow, endRow); + vector_size_t numFuncs = windowFunctions_.size(); for (auto w = 0; w < numFuncs; w++) { windowFunctions_[w]->apply( peerStartBuffer_, @@ -845,46 +420,52 @@ void Window::callApplyForPartitionRows( frameEndBuffers_[w], validFrames_[w], resultOffset, - result[w]); + result->childAt(numInputColumns_ + w)); } + vector_size_t numRows = endRow - startRow; numProcessedRows_ += numRows; partitionOffset_ += numRows; - if (endRow == partitionStartRows_[currentPartition_ + 1]) { - currentPartition_++; - } } void Window::callApplyLoop( vector_size_t numOutputRows, - const std::vector& windowOutputs) { + const RowVectorPtr& result) { // Compute outputs by traversing as many partitions as possible. This // logic takes care of partial partitions output also. - vector_size_t resultIndex = 0; vector_size_t numOutputRowsLeft = numOutputRows; + + // This function requires that the currentPartition_ is available for output. + VELOX_DCHECK_NOT_NULL(currentPartition_); while (numOutputRowsLeft > 0) { auto rowsForCurrentPartition = - partitionStartRows_[currentPartition_ + 1] - numProcessedRows_; + currentPartition_->numRows() - partitionOffset_; if (rowsForCurrentPartition <= numOutputRowsLeft) { // Current partition can fit completely in the output buffer. // So output all its rows. callApplyForPartitionRows( - numProcessedRows_, - numProcessedRows_ + rowsForCurrentPartition, - windowOutputs, - resultIndex); + partitionOffset_, + partitionOffset_ + rowsForCurrentPartition, + resultIndex, + result); resultIndex += rowsForCurrentPartition; numOutputRowsLeft -= rowsForCurrentPartition; + callResetPartition(); + if (!currentPartition_) { + // The WindowBuild doesn't have any more partitions to process right + // now. So break until the next getOutput call. + break; + } } else { // Current partition can fit only partially in the output buffer. // Call apply for the rows that can fit in the buffer and break from // outputting. callApplyForPartitionRows( - numProcessedRows_, - numProcessedRows_ + numOutputRowsLeft, - windowOutputs, - resultIndex); + partitionOffset_, + partitionOffset_ + numOutputRowsLeft, + resultIndex, + result); numOutputRowsLeft = 0; break; } @@ -892,42 +473,36 @@ void Window::callApplyLoop( } RowVectorPtr Window::getOutput() { - if (finished_ || !noMoreInput_) { + if (numRows_ == 0) { return nullptr; } auto numRowsLeft = numRows_ - numProcessedRows_; + if (numRowsLeft == 0) { + return nullptr; + } + + if (!currentPartition_) { + callResetPartition(); + if (!currentPartition_) { + // WindowBuild doesn't have a partition to output. + return nullptr; + } + } + auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft); auto result = std::dynamic_pointer_cast( BaseVector::create(outputType_, numOutputRows, operatorCtx_->pool())); - // Set all passthrough input columns. - for (int i = 0; i < numInputColumns_; ++i) { - data_->extractColumn( - sortedRows_.data() + numProcessedRows_, - numOutputRows, - i, - result->childAt(i)); - } - - // Construct vectors for the window function output columns. - std::vector windowOutputs; - windowOutputs.reserve(windowFunctions_.size()); for (int i = numInputColumns_; i < outputType_->size(); i++) { auto output = BaseVector::create( outputType_->childAt(i), numOutputRows, operatorCtx_->pool()); - windowOutputs.emplace_back(std::move(output)); + result->childAt(i) = output; } // Compute the output values of window functions. - callApplyLoop(numOutputRows, windowOutputs); - - for (int j = numInputColumns_; j < outputType_->size(); j++) { - result->childAt(j) = windowOutputs[j - numInputColumns_]; - } - - finished_ = (numProcessedRows_ == sortedRows_.size()); + callApplyLoop(numOutputRows, result); return result; } -} // namespace facebook::velox::exec +} // namespace facebook::velox::exec \ No newline at end of file diff --git a/velox/exec/Window.h b/velox/exec/Window.h index 72f979894a9c..7065327422bb 100644 --- a/velox/exec/Window.h +++ b/velox/exec/Window.h @@ -17,6 +17,7 @@ #include "velox/exec/Operator.h" #include "velox/exec/RowContainer.h" +#include "velox/exec/WindowBuild.h" #include "velox/exec/WindowFunction.h" #include "velox/exec/WindowPartition.h" @@ -46,7 +47,7 @@ class Window : public Operator { RowVectorPtr getOutput() override; bool needsInput() const override { - return !noMoreInput_; + return !noMoreInput_ && windowBuild_->needsInput(); } void noMoreInput() override; @@ -56,7 +57,7 @@ class Window : public Operator { } bool isFinished() override { - return finished_; + return noMoreInput_ && numRows_ == numProcessedRows_; } private: @@ -80,75 +81,51 @@ class Window : public Operator { const std::optional end; }; - // Helper function to create WindowFunction and frame objects - // for this operator. - void createWindowFunctions( - const std::shared_ptr& windowNode, - const RowTypePtr& inputType); - - // Helper function to initialize range values map for k Range frames. - void initRangeValuesMap(); + // Creates WindowFunction and frame objects for this operator. + void createWindowFunctions(); - // Helper function to create the buffers for peer and frame - // row indices to send in window function apply invocations. + // Creates the buffers for peer and frame row + // indices to send in window function apply invocations. void createPeerAndFrameBuffers(); - // Function to compute the partitionStartRows_ structure. - // partitionStartRows_ is vector of the starting rows index - // of each partition in the data. This is an auxiliary - // structure that helps simplify the window function computations. - void computePartitionStartRows(); - - // This function is invoked after receiving all the input data. - // The input data needs to be separated into partitions and - // ordered within it (as that is the order in which the rows - // will be output for the partition). - // This function achieves this by ordering the input rows by - // (partition keys + order by keys). Doing so orders all rows - // of a partition adjacent to each other and sorted by the - // ORDER BY clause. - void sortPartitions(); - - // Helper function to call WindowFunction::resetPartition() for - // all WindowFunctions. - void callResetPartition(vector_size_t partitionNumber); - - // For k Range frames an auxiliary structure used to look up the index - // of frame values is required. This function computes that structure for - // each partition of rows. - void computeRangeValuesMap(); - - // Helper method to call WindowFunction::apply to all the rows - // of a partition between startRow and endRow. The outputs - // will be written to the vectors in windowFunctionOutputs - // starting at offset resultIndex. + // Compute the peer and frame buffers for rows between + // startRow and endRow in the current partition. + void computePeerAndFrameBuffers(vector_size_t startRow, vector_size_t endRow); + + // Updates all the state for the next partition. + void callResetPartition(); + + // Computes the result vector for a subset of the current + // partition rows starting from startRow to endRow. A single partition + // could span multiple output blocks and a single output block could + // also have multiple partitions in it. So resultOffset is the + // offset in the result vector corresponding to the current range of + // partition rows. void callApplyForPartitionRows( vector_size_t startRow, vector_size_t endRow, - const std::vector& result, - vector_size_t resultOffset); - - // Helper function to compare the rows at lhs and rhs pointers - // using the keyInfo in keys. This can be used to compare the - // rows for partitionKeys, orderByKeys or a combination of both. - inline bool compareRowsWithKeys( - const char* lhs, - const char* rhs, - const std::vector>& keys); - - // Function to compute window function values for the current output - // buffer. The buffer has numOutputRows number of rows. windowOutputs - // has the vectors for window function columns. - void callApplyLoop( - vector_size_t numOutputRows, - const std::vector& windowOutputs); - - // Helper function to convert WindowNode::Frame to Window::WindowFrame. + vector_size_t resultOffset, + const RowVectorPtr& result); + + // Gets the input columns of the current window partition + // between startRow and endRow in result at resultOffset. + void getInputColumns( + vector_size_t startRow, + vector_size_t endRow, + vector_size_t resultOffset, + const RowVectorPtr& result); + + // Computes the result vector for a single output block. The result + // consists of all the input columns followed by the results of the + // window function. + void callApplyLoop(vector_size_t numOutputRows, const RowVectorPtr& result); + + // Converts WindowNode::Frame to Window::WindowFrame. WindowFrame createWindowFrame( core::WindowNode::Frame frame, const RowTypePtr& inputType); - // Helper function to update frame bounds for kPreceding, kFollowing frames. + // Update frame bounds for kPreceding, kFollowing row frames. void updateKRowsFrameBounds( bool isKPreceding, const FrameChannelArg& frameArg, @@ -156,17 +133,6 @@ class Window : public Operator { vector_size_t numRows, vector_size_t* rawFrameBounds); - template - void updateKRangeFrameBounds( - bool isKPreceding, - bool isStartBound, - const FrameChannelArg& frameArg, - vector_size_t numRows, - vector_size_t* rawFrameBounds, - const vector_size_t* rawPeerStarts, - const vector_size_t* rawPeerEnds); - - // Helper function to update frame bounds. void updateFrameBounds( const WindowFrame& windowFrame, const bool isStartBound, @@ -176,54 +142,24 @@ class Window : public Operator { const vector_size_t* rawPeerEnds, vector_size_t* rawFrameBounds); - template - vector_size_t kRangeStartBoundSearch( - const T value, - vector_size_t leftBound, - vector_size_t rightBound, - const FlatVectorPtr& valuesVector, - const vector_size_t* rawPeerStarts, - vector_size_t& indexFound); - - template - vector_size_t kRangeEndBoundSearch( - const T value, - vector_size_t leftBound, - vector_size_t rightBound, - vector_size_t lastRightBoundRow, - const FlatVectorPtr& valuesVector, - const vector_size_t* rawPeerEnds, - vector_size_t& indexFound); - - bool finished_ = false; const vector_size_t numInputColumns_; - // The Window operator needs to see all the input rows before starting - // any function computation. As the Window operators gets input rows - // we store the rows in the RowContainer (data_). - std::unique_ptr data_; + // WindowBuild is used to store input rows and return WindowPartitions + // for the processing. + std::unique_ptr windowBuild_; + + // The cached window plan node used for window function initialization. It is + // reset after the initialization. + std::shared_ptr windowNode_; - // The decodedInputVectors_ are reused across addInput() calls to decode - // the partition and sort keys for the above RowContainer. - std::vector decodedInputVectors_; + // Used to access window partition rows and columns by the window + // operator and functions. This structure is owned by the WindowBuild. + std::unique_ptr currentPartition_; // HashStringAllocator required by functions that allocate out of line // buffers. HashStringAllocator stringAllocator_; - // The below 3 vectors represent the ChannelIndex of the partition keys, - // the order by keys and the concatenation of the 2. These keyInfo are - // used for sorting by those key combinations during the processing. - // partitionKeyInfo_ is used to separate partitions in the rows. - // sortKeyInfo_ is used to identify peer rows in a partition. - // allKeyInfo_ is a combination of (partitionKeyInfo_ and sortKeyInfo_). - // It is used to perform a full sorting of the input rows to be able to - // separate partitions and sort the rows in it. The rows are output in - // this order by the operator. - std::vector> partitionKeyInfo_; - std::vector> sortKeyInfo_; - std::vector> allKeyInfo_; - // Vector of WindowFunction objects required by this operator. // WindowFunction is the base API implemented by all the window functions. // The functions are ordered by their positions in the output columns. @@ -235,25 +171,9 @@ class Window : public Operator { // Number of input rows. vector_size_t numRows_ = 0; - // Vector of pointers to each input row in the data_ RowContainer. - // The rows are sorted by partitionKeys + sortKeys. This total - // ordering can be used to split partitions (with the correct - // order by) for the processing. - std::vector sortedRows_; - - // Window partition object used to provide per-partition - // data to the window function. - std::unique_ptr windowPartition_; - // Number of rows that be fit into an output block. vector_size_t numRowsPerOutput_; - // This is a vector that gives the index of the start row - // (in sortedRows_) of each partition in the RowContainer data_. - // This auxiliary structure helps demarcate partitions in - // getOutput calls. - std::vector partitionStartRows_; - // The following 4 Buffers are used to pass peer and frame start and // end values to the WindowFunction::apply method. These // buffers can be allocated once and reused across all the getOutput @@ -278,36 +198,14 @@ class Window : public Operator { // There is one SelectivityVector per window function. std::vector validFrames_; - // When computing k Range frames, the range value for the frame index needs - // to be mapped to the partition row for the value. - // This is an auxiliary structure to materialize a mapping from - // range value -> row index (in RowContainer) for that purpose. - // It uses a vector of the ordered range values and another vector of the - // corresponding row indices. Ideally a binary search - // tree or B-tree index (especially if the data is spilled to disk) should be - // used. - struct RangeValuesMap { - TypePtr rangeType; - // The range values appear in sorted order in this vector. - VectorPtr rangeValues; - // TODO (Make this a BufferPtr so that it can be allocated in the - // MemoryPool) ? - std::vector rowIndices; - }; - RangeValuesMap rangeValuesMap_; - - // The above mapping is built only if required for k range frames. - bool hasKRangeFrames_; - // Number of rows output from the WindowOperator so far. The rows // are output in the same order of the pointers in sortedRows. This // value is updated as the WindowFunction::apply() function is // called on the partition blocks. vector_size_t numProcessedRows_ = 0; - // Current partition being output. The partition might be - // output across multiple getOutput() calls so this needs to - // be tracked in the operator. - vector_size_t currentPartition_; + + // Tracks how far along the partition rows have been output. + vector_size_t partitionOffset_ = 0; // When traversing input partition rows, the peers are the rows // with the same values for the ORDER BY clause. These rows @@ -315,12 +213,10 @@ class Window : public Operator { // Since all rows between the peerStartRow_ and peerEndRow_ have the same // values for peerStartRow_ and peerEndRow_, we needn't compute // them for each row independently. Since these rows might - // cross getOutput boundaries they are saved in the operator. + // cross getOutput boundaries and be called in subsequent calls to + // computePeerBuffers they are saved here. vector_size_t peerStartRow_ = 0; vector_size_t peerEndRow_ = 0; - - // Tracks how far along the partition rows have been output. - vector_size_t partitionOffset_ = 0; }; -} // namespace facebook::velox::exec +} // namespace facebook::velox::exec \ No newline at end of file diff --git a/velox/exec/WindowBuild.cpp b/velox/exec/WindowBuild.cpp new file mode 100644 index 000000000000..7142749d9f26 --- /dev/null +++ b/velox/exec/WindowBuild.cpp @@ -0,0 +1,86 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/WindowBuild.h" + +#include "velox/exec/Operator.h" + +namespace facebook::velox::exec { + +namespace { +void initKeyInfo( + const RowTypePtr& type, + const std::vector& keys, + const std::vector& orders, + std::vector>& keyInfo) { + const core::SortOrder defaultPartitionSortOrder(true, true); + + keyInfo.reserve(keys.size()); + for (auto i = 0; i < keys.size(); ++i) { + auto channel = exprToChannel(keys[i].get(), type); + VELOX_CHECK( + channel != kConstantChannel, + "Window doesn't allow constant partition or sort keys"); + if (i < orders.size()) { + keyInfo.push_back(std::make_pair(channel, orders[i])); + } else { + keyInfo.push_back(std::make_pair(channel, defaultPartitionSortOrder)); + } + } +} +}; // namespace + +WindowBuild::WindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool) + : numInputColumns_(windowNode->sources()[0]->outputType()->size()), + data_(std::make_unique( + windowNode->sources()[0]->outputType()->children(), + pool)), + decodedInputVectors_(windowNode->sources()[0]->outputType()->size()) { + auto inputType = windowNode->sources()[0]->outputType(); + for (int i = 0; i < inputType->children().size(); i++) { + inputColumns_.push_back(data_->columnAt(i)); + } + + initKeyInfo(inputType, windowNode->partitionKeys(), {}, partitionKeyInfo_); + initKeyInfo( + inputType, + windowNode->sortingKeys(), + windowNode->sortingOrders(), + sortKeyInfo_); +} + +bool WindowBuild::compareRowsWithKeys( + const char* lhs, + const char* rhs, + const std::vector>& keys) { + if (lhs == rhs) { + return false; + } + for (auto& key : keys) { + if (auto result = data_->compare( + lhs, + rhs, + key.first, + {key.second.isNullsFirst(), key.second.isAscending(), false})) { + return result < 0; + } + } + return false; +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/WindowBuild.h b/velox/exec/WindowBuild.h new file mode 100644 index 000000000000..3a3c8800f333 --- /dev/null +++ b/velox/exec/WindowBuild.h @@ -0,0 +1,105 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/exec/RowContainer.h" +#include "velox/exec/WindowPartition.h" + +namespace facebook::velox::exec { + +// The Window operator needs to see all input rows, and separate them into +// partitions based on a partitioning key. There are many approaches to do +// this. e.g with a full-sort, HashTable, streaming etc. This abstraction +// is used by the Window operator to hold the input rows and provide +// partitions to it for processing. Varied implementations of the +// WindowBuild can use different algorithms. +class WindowBuild { + public: + WindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool); + + virtual ~WindowBuild() = default; + + // The Window operator invokes this function to check if the WindowBuild can + // accept input. The Streaming Window build doesn't accept input if it has a + // partition to output. + virtual bool needsInput() = 0; + + // Adds new input rows to the WindowBuild. + virtual void addInput(RowVectorPtr input) = 0; + + // The Window operator invokes this function to indicate that no + // more input rows will be passed from the Window operator to the + // WindowBuild. + // When using a sort based build, all input rows need to be + // seen before any partitions are determined. So this function is + // used to indicate to the WindowBuild that it can proceed with + // building partitions. + virtual void noMoreInput() = 0; + + // Returns true if a new Window partition is available for the Window + // operator to consume. + virtual bool hasNextPartition() = 0; + + // The Window operator invokes this function to get the next Window partition + // to pass along to the WindowFunction. The WindowPartition has APIs to access + // the underlying columns of Window partition data. + // Check hasNextPartition() before invoking this function. This function fails + // if called when no partition is available. + virtual std::unique_ptr nextPartition() = 0; + + // Returns the average size of input rows in bytes stored in the + // data container of the WindowBuild. + std::optional estimateRowSize() { + return data_->estimateRowSize(); + } + + int64_t numRows() { + return numRows_; + } + + protected: + bool compareRowsWithKeys( + const char* lhs, + const char* rhs, + const std::vector>& keys); + + // The below 2 vectors represent the ChannelIndex of the partition keys + // and the order by keys. These keyInfo are used for sorting by those + // key combinations during the processing. + // partitionKeyInfo_ is used to separate partitions in the rows. + // sortKeyInfo_ is used to identify peer rows in a partition. + std::vector> partitionKeyInfo_; + std::vector> sortKeyInfo_; + + const vector_size_t numInputColumns_; + + // The RowContainer holds all the input rows in WindowBuild. + std::unique_ptr data_; + + // The decodedInputVectors_ are reused across addInput() calls to decode + // the partition and sort keys for the above RowContainer. + std::vector decodedInputVectors_; + + // RowColumns for window build used to construct WindowPartition. + std::vector inputColumns_; + + // Number of input rows. + vector_size_t numRows_ = 0; +}; + +} // namespace facebook::velox::exec diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index ef8697579fd6..7f87b03611ae 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -18,9 +18,10 @@ namespace facebook::velox::exec { WindowPartition::WindowPartition( + RowContainer* data, const std::vector& columns, - const std::vector& /* argTypes */) - : columns_(columns) {} + const std::vector>& sortKeyInfo) + : data_(data), columns_(columns), sortKeyInfo_(sortKeyInfo) {} void WindowPartition::resetPartition(const folly::Range& rows) { partition_ = rows; @@ -53,4 +54,121 @@ void WindowPartition::extractColumn( result); } -} // namespace facebook::velox::exec +void WindowPartition::extractNulls( + int32_t columnIndex, + vector_size_t partitionOffset, + vector_size_t numRows, + const BufferPtr& nullsBuffer) const { + RowContainer::extractNulls( + partition_.data() + partitionOffset, + numRows, + columns_[columnIndex], + nullsBuffer); +} + +namespace { + +std::pair findMinMaxFrameBounds( + const SelectivityVector& validRows, + const BufferPtr& frameStarts, + const BufferPtr& frameEnds) { + auto rawFrameStarts = frameStarts->as(); + auto rawFrameEnds = frameEnds->as(); + + auto firstValidRow = validRows.begin(); + vector_size_t minFrame = rawFrameStarts[firstValidRow]; + vector_size_t maxFrame = rawFrameEnds[firstValidRow]; + validRows.applyToSelected([&](auto i) { + minFrame = std::min(minFrame, rawFrameStarts[i]); + maxFrame = std::max(maxFrame, rawFrameEnds[i]); + }); + return {minFrame, maxFrame}; +} + +}; // namespace + +std::optional> +WindowPartition::extractNulls( + column_index_t col, + const SelectivityVector& validRows, + const BufferPtr& frameStarts, + const BufferPtr& frameEnds, + BufferPtr* nulls) const { + VELOX_CHECK(validRows.hasSelections(), "Buffer has no active rows"); + auto [minFrame, maxFrame] = + findMinMaxFrameBounds(validRows, frameStarts, frameEnds); + + // Add 1 since maxFrame is the index of the frame end row. + auto framesSize = maxFrame - minFrame + 1; + AlignedBuffer::reallocate(nulls, framesSize); + + extractNulls(col, minFrame, framesSize, *nulls); + auto foundNull = + bits::findFirstBit((*nulls)->as(), 0, framesSize) >= 0; + return foundNull ? std::make_optional(std::make_pair(minFrame, framesSize)) + : std::nullopt; +} + +bool WindowPartition::compareRowsWithSortKeys(const char* lhs, const char* rhs) + const { + if (lhs == rhs) { + return false; + } + for (auto& key : sortKeyInfo_) { + if (auto result = data_->compare( + lhs, + rhs, + key.first, + {key.second.isNullsFirst(), key.second.isAscending(), false})) { + return result < 0; + } + } + return false; +} + +std::pair WindowPartition::computePeerBuffers( + vector_size_t start, + vector_size_t end, + vector_size_t prevPeerStart, + vector_size_t prevPeerEnd, + vector_size_t* rawPeerStarts, + vector_size_t* rawPeerEnds) const { + auto peerCompare = [&](const char* lhs, const char* rhs) -> bool { + return compareRowsWithSortKeys(lhs, rhs); + }; + + VELOX_CHECK_LE(end, numRows()); + + auto lastPartitionRow = numRows() - 1; + auto peerStart = prevPeerStart; + auto peerEnd = prevPeerEnd; + for (auto i = start, j = 0; i < end; i++, j++) { + // When traversing input partition rows, the peers are the rows + // with the same values for the ORDER BY clause. These rows + // are equal in some ways and affect the results of ranking functions. + // This logic exploits the fact that all rows between the peerStart + // and peerEnd have the same values for rawPeerStarts and rawPeerEnds. + // So we can compute them just once and reuse across the rows in that peer + // interval. Note: peerStart and peerEnd can be maintained across + // getOutput calls. Hence, they are returned to the caller. + + if (i == 0 || i >= peerEnd) { + // Compute peerStart and peerEnd rows for the first row of the partition + // or when past the previous peerGroup. + peerStart = i; + peerEnd = i; + while (peerEnd <= lastPartitionRow) { + if (peerCompare(partition_[peerStart], partition_[peerEnd])) { + break; + } + peerEnd++; + } + } + + rawPeerStarts[j] = peerStart; + rawPeerEnds[j] = peerEnd - 1; + } + return {peerStart, peerEnd}; +} + +} // namespace facebook::velox::exec \ No newline at end of file diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index 003bf232ac9f..faf3a0e58b66 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -25,9 +25,18 @@ namespace facebook::velox::exec { class WindowPartition { public: + /// The WindowPartition is used by the Window operator and WindowFunction + /// objects to access the underlying data and columns of a partition of rows. + /// The WindowPartition is constructed by WindowBuild from the input data. + /// 'data' : Underlying RowContainer of the WindowBuild. + /// 'columns' : Input rows of 'data' used for accessing column data from it. + /// 'sortKeyInfo' : Order by columns used by the the Window operator. Used to + /// get peer rows from the input partition. WindowPartition( + RowContainer* data, const std::vector& columns, - const std::vector& types); + const std::vector>& + sortKeyInfo); /// Returns the number of rows in the current WindowPartition. vector_size_t numRows() const { @@ -55,21 +64,72 @@ class WindowPartition { vector_size_t resultOffset, const VectorPtr& result) const; + /// Extracts null positions at 'columnIndex' into 'nullsBuffer' for + /// 'numRows' starting at positions 'partitionOffset' in the partition + /// input data. + void extractNulls( + int32_t columnIndex, + vector_size_t partitionOffset, + vector_size_t numRows, + const BufferPtr& nullsBuffer) const; + + /// Extracts null positions at 'col' into 'nulls'. The null positions + /// are from the smallest 'frameStarts' value to the greatest 'frameEnds' + /// value for 'validRows'. Both 'frameStarts' and 'frameEnds' are buffers + /// of type vector_size_t. + /// The returned value is an optional pair of vector_size_t. + /// The pair is returned only if null values are found in the nulls + /// extracted. The first value of the pair is the smallest frameStart for + /// nulls. The second is the number of frames extracted. + std::optional> extractNulls( + column_index_t col, + const SelectivityVector& validRows, + const BufferPtr& frameStarts, + const BufferPtr& frameEnds, + BufferPtr* nulls) const; + + /// Sets in 'rawPeerStarts' and in 'rawPeerEnds' the peer start and peer end + /// offsets of the rows between 'start' and 'end' of the current partition. + /// 'peer' row are all the rows having the same value of the order by columns + /// as the current row. + /// computePeerBuffers is called multiple times for each partition. It is + /// called in sequential order of start to end rows. + /// The peerStarts/peerEnds of the startRow could be same as the last row in + /// the previous call to computePeerBuffers (if they have the same order by + /// keys). So peerStart and peerEnd of the last row of this call are returned + /// to be passed as prevPeerStart and prevPeerEnd to the subsequent + /// call to computePeerBuffers. + std::pair computePeerBuffers( + vector_size_t start, + vector_size_t end, + vector_size_t prevPeerStart, + vector_size_t prevPeerEnd, + vector_size_t* rawPeerStarts, + vector_size_t* rawPeerEnds) const; + private: - // This is a copy of the input RowColumn objects that are used for + bool compareRowsWithSortKeys(const char* lhs, const char* rhs) const; + + // The RowContainer associated with the partition. + // It is owned by the WindowBuild that creates the partition. + RowContainer* data_; + + // Copy of the input RowColumn objects that are used for // accessing the partition row columns. These RowColumn objects - // index into the Window Operator RowContainer and can retrieve - // the column values. + // index into RowContainer data_ above and can retrieve the column values. // The order of these columns is the same as that of the input row // of the Window operator. The WindowFunctions know the // corresponding indexes of their input arguments into this vector. // They will request for column vector values at the respective index. std::vector columns_; - // This folly::Range is for the partition rows iterator provided by the + // folly::Range is for the partition rows iterator provided by the // Window operator. The pointers are to rows from a RowContainer owned // by the operator. We can assume these are valid values for the lifetime // of WindowPartition. folly::Range partition_; + + // ORDER BY column info for this partition. + const std::vector> sortKeyInfo_; }; -} // namespace facebook::velox::exec +} // namespace facebook::velox::exec \ No newline at end of file diff --git a/velox/exec/tests/PlanBuilderTest.cpp b/velox/exec/tests/PlanBuilderTest.cpp index f9ff97758018..06e1e6116828 100644 --- a/velox/exec/tests/PlanBuilderTest.cpp +++ b/velox/exec/tests/PlanBuilderTest.cpp @@ -106,7 +106,7 @@ TEST_F(PlanBuilderTest, windowFunctionCall) { .planNode() ->toString(true, false), "-- Window[partition by [a] order by [b ASC NULLS LAST] " - "d := window1(ROW[\"c\"]) RANGE between UNBOUNDED PRECEDING and CURRENT ROW] " + "d := window1(ROW[\"c\"]) RANGE between UNBOUNDED PRECEDING and CURRENT ROW inputsSorted [0]] " "-> a:VARCHAR, b:BIGINT, c:BIGINT, d:BIGINT\n"); VELOX_CHECK_EQ( @@ -116,7 +116,7 @@ TEST_F(PlanBuilderTest, windowFunctionCall) { .planNode() ->toString(true, false), "-- Window[partition by [a] order by [] " - "d := window1(ROW[\"c\"]) RANGE between UNBOUNDED PRECEDING and CURRENT ROW] " + "d := window1(ROW[\"c\"]) RANGE between UNBOUNDED PRECEDING and CURRENT ROW inputsSorted [0]] " "-> a:VARCHAR, b:BIGINT, c:BIGINT, d:BIGINT\n"); VELOX_CHECK_EQ( @@ -126,7 +126,7 @@ TEST_F(PlanBuilderTest, windowFunctionCall) { .planNode() ->toString(true, false), "-- Window[partition by [] order by [] " - "w0 := window1(ROW[\"c\"]) RANGE between UNBOUNDED PRECEDING and CURRENT ROW] " + "w0 := window1(ROW[\"c\"]) RANGE between UNBOUNDED PRECEDING and CURRENT ROW inputsSorted [0]] " "-> a:VARCHAR, b:BIGINT, c:BIGINT, w0:BIGINT\n"); VELOX_ASSERT_THROW( @@ -175,7 +175,7 @@ TEST_F(PlanBuilderTest, windowFrame) { "d7 := window1(ROW[\"c\"]) ROWS between CURRENT ROW and UNBOUNDED FOLLOWING, " "d8 := window1(ROW[\"c\"]) RANGE between CURRENT ROW and UNBOUNDED FOLLOWING, " "d9 := window1(ROW[\"c\"]) RANGE between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING, " - "d10 := window1(ROW[\"c\"]) RANGE between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING] " + "d10 := window1(ROW[\"c\"]) RANGE between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING inputsSorted [0]] " "-> a:VARCHAR, b:BIGINT, c:BIGINT, d1:BIGINT, d2:BIGINT, d3:BIGINT, d4:BIGINT, " "d5:BIGINT, d6:BIGINT, d7:BIGINT, d8:BIGINT, d9:BIGINT, d10:BIGINT\n"); diff --git a/velox/exec/tests/PlanNodeSerdeTest.cpp b/velox/exec/tests/PlanNodeSerdeTest.cpp index e4e8a347fd90..071af57153d4 100644 --- a/velox/exec/tests/PlanNodeSerdeTest.cpp +++ b/velox/exec/tests/PlanNodeSerdeTest.cpp @@ -399,6 +399,23 @@ TEST_F(PlanNodeSerdeTest, window) { .planNode(); testSerde(plan); + + // Test StreamingWindow serde. + plan = + PlanBuilder() + .values({data_}) + .streamingWindow( + {"sum(c0) over (partition by c1 order by c2 rows between 10 preceding and 5 following)"}) + .planNode(); + + testSerde(plan); + + plan = PlanBuilder() + .values({data_}) + .streamingWindow({"sum(c0) over (partition by c1 order by c2)"}) + .planNode(); + + testSerde(plan); } TEST_F(PlanNodeSerdeTest, rowNumber) { diff --git a/velox/exec/tests/PlanNodeToStringTest.cpp b/velox/exec/tests/PlanNodeToStringTest.cpp index 44050cbb232f..259f7d8cacc1 100644 --- a/velox/exec/tests/PlanNodeToStringTest.cpp +++ b/velox/exec/tests/PlanNodeToStringTest.cpp @@ -684,7 +684,7 @@ TEST_F(PlanNodeToStringTest, window) { ASSERT_EQ("-- Window\n", plan->toString()); ASSERT_EQ( "-- Window[partition by [a] order by [b ASC NULLS LAST] " - "d := window1(ROW[\"c\"]) RANGE between 10 PRECEDING and UNBOUNDED FOLLOWING] " + "d := window1(ROW[\"c\"]) RANGE between 10 PRECEDING and UNBOUNDED FOLLOWING inputsSorted [0]] " "-> a:VARCHAR, b:BIGINT, c:BIGINT, d:BIGINT\n", plan->toString(true, false)); @@ -696,7 +696,32 @@ TEST_F(PlanNodeToStringTest, window) { ASSERT_EQ("-- Window\n", plan->toString()); ASSERT_EQ( "-- Window[partition by [a] order by [] " - "w0 := window1(ROW[\"c\"]) RANGE between CURRENT ROW and b FOLLOWING] " + "w0 := window1(ROW[\"c\"]) RANGE between CURRENT ROW and b FOLLOWING inputsSorted [0]] " + "-> a:VARCHAR, b:BIGINT, c:BIGINT, w0:BIGINT\n", + plan->toString(true, false)); + + plan = PlanBuilder() + .tableScan(ROW({"a", "b", "c"}, {VARCHAR(), BIGINT(), BIGINT()})) + .streamingWindow( + {"window1(c) over (partition by a order by b " + "range between 10 preceding and unbounded following) AS d"}) + .planNode(); + ASSERT_EQ("-- Window\n", plan->toString()); + ASSERT_EQ( + "-- Window[partition by [a] order by [b ASC NULLS LAST] " + "d := window1(ROW[\"c\"]) RANGE between 10 PRECEDING and UNBOUNDED FOLLOWING inputsSorted [1]] " + "-> a:VARCHAR, b:BIGINT, c:BIGINT, d:BIGINT\n", + plan->toString(true, false)); + + plan = PlanBuilder() + .tableScan(ROW({"a", "b", "c"}, {VARCHAR(), BIGINT(), BIGINT()})) + .streamingWindow({"window1(c) over (partition by a " + "range between current row and b following)"}) + .planNode(); + ASSERT_EQ("-- Window\n", plan->toString()); + ASSERT_EQ( + "-- Window[partition by [a] order by [] " + "w0 := window1(ROW[\"c\"]) RANGE between CURRENT ROW and b FOLLOWING inputsSorted [1]] " "-> a:VARCHAR, b:BIGINT, c:BIGINT, w0:BIGINT\n", plan->toString(true, false)); } diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 426d25f2da1b..c01feef6dc5b 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -1381,6 +1381,94 @@ PlanBuilder& PlanBuilder::window( sortingOrders, windowNames, windowNodeFunctions, + false, + planNode_); + return *this; +} + +PlanBuilder& PlanBuilder::streamingWindow( + const std::vector& windowFunctions) { + VELOX_CHECK_GT( + windowFunctions.size(), + 0, + "Window Node requires at least one window function."); + + std::vector partitionKeys; + std::vector sortingKeys; + std::vector sortingOrders; + std::vector windowNodeFunctions; + std::vector windowNames; + + bool first = true; + auto inputType = planNode_->outputType(); + int i = 0; + + auto errorOnMismatch = [&](const std::string& windowString, + const std::string& mismatchTypeString) -> void { + std::stringstream error; + error << "Window function invocations " << windowString << " and " + << windowFunctions[0] << " do not match " << mismatchTypeString + << " clauses."; + VELOX_USER_FAIL(error.str()); + }; + + WindowTypeResolver windowResolver; + facebook::velox::duckdb::ParseOptions options; + options.parseIntegerAsBigint = options_.parseIntegerAsBigint; + for (const auto& windowString : windowFunctions) { + const auto& windowExpr = duckdb::parseWindowExpr(windowString, options); + // All window function SQL strings in the list are expected to have the same + // PARTITION BY and ORDER BY clauses. Validate this assumption. + if (first) { + partitionKeys = + parsePartitionKeys(windowExpr, windowString, inputType, pool_); + auto sortPair = + parseOrderByKeys(windowExpr, windowString, inputType, pool_); + sortingKeys = sortPair.first; + sortingOrders = sortPair.second; + first = false; + } else { + auto latestPartitionKeys = + parsePartitionKeys(windowExpr, windowString, inputType, pool_); + auto [latestSortingKeys, latestSortingOrders] = + parseOrderByKeys(windowExpr, windowString, inputType, pool_); + + if (!equalFieldAccessTypedExprPtrList( + partitionKeys, latestPartitionKeys)) { + errorOnMismatch(windowString, "PARTITION BY"); + } + + if (!equalFieldAccessTypedExprPtrList(sortingKeys, latestSortingKeys)) { + errorOnMismatch(windowString, "ORDER BY"); + } + + if (!equalSortOrderList(sortingOrders, latestSortingOrders)) { + errorOnMismatch(windowString, "ORDER BY"); + } + } + + auto windowCall = std::dynamic_pointer_cast( + core::Expressions::inferTypes( + windowExpr.functionCall, planNode_->outputType(), pool_)); + windowNodeFunctions.push_back( + {std::move(windowCall), + createWindowFrame(windowExpr.frame, planNode_->outputType(), pool_), + windowExpr.ignoreNulls}); + if (windowExpr.functionCall->alias().has_value()) { + windowNames.push_back(windowExpr.functionCall->alias().value()); + } else { + windowNames.push_back(fmt::format("w{}", i++)); + } + } + + planNode_ = std::make_shared( + nextPlanNodeId(), + partitionKeys, + sortingKeys, + sortingOrders, + windowNames, + windowNodeFunctions, + true, planNode_); return *this; } diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 705f0273e166..7cbe9890943f 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -709,6 +709,9 @@ class PlanBuilder { /// rows between a + 10 preceding and 10 following)" PlanBuilder& window(const std::vector& windowFunctions); + // Add a Streaming WindowNode to compute one or more windowFunctions. + PlanBuilder& streamingWindow(const std::vector& windowFunctions); + /// Add a RowNumberNode to compute single row_number window function with an /// optional limit and no sorting. PlanBuilder& rowNumber( diff --git a/velox/functions/lib/window/tests/WindowTestBase.cpp b/velox/functions/lib/window/tests/WindowTestBase.cpp index 82aefcfa24ce..d0d934ce8f90 100644 --- a/velox/functions/lib/window/tests/WindowTestBase.cpp +++ b/velox/functions/lib/window/tests/WindowTestBase.cpp @@ -18,6 +18,7 @@ #include #include +#include #include "velox/common/base/tests/GTestUtils.h" #include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -48,6 +49,81 @@ WindowTestBase::QueryInfo WindowTestBase::buildWindowQuery( return {op, functionSql, querySql}; } +namespace { +void splitOrderBy( + const std::string& overClause, + vector_size_t startIdx, + vector_size_t length, + std::vector& orderByClauses) { + auto orderByPars = overClause.substr(startIdx, length); + std::istringstream tokenStream(orderByPars); + std::string token; + while (std::getline(tokenStream, token, ',')) { + orderByClauses.push_back(token); + } +} +}; // namespace + +WindowTestBase::QueryInfo WindowTestBase::makeStreamingWindow( + const std::vector& input, + const std::string& function, + const std::string& overClause, + const std::string& frameClause) { + std::string functionSql = + fmt::format("{} over ({} {})", function, overClause, frameClause); + core::PlanNodePtr op = nullptr; + std::vector orderByClauses; + + std::string partitionByStr = "partition by"; + auto partitionByStartIdx = overClause.find(partitionByStr); + + std::string orderByStr = "order by"; + auto orderByStartIdx = overClause.find(orderByStr); + + // Extract the partition by keys. + if (partitionByStartIdx != std::string::npos) { + auto startIdx = partitionByStartIdx + partitionByStr.length(); + auto length = 0; + if (orderByStartIdx != std::string::npos) { + length = orderByStartIdx - startIdx; + } else { + length = overClause.length() - startIdx; + } + splitOrderBy(overClause, startIdx, length, orderByClauses); + } + + // Add NULLS FIRST after partition by keys. Because the + // defaultPartitionSortOrder in Window.cpp is NULLS FIRST. + for (auto i = 0; i < orderByClauses.size(); i++) { + if (orderByClauses[i].find("NULLS") == std::string::npos && + orderByClauses[i].find("nulls") == std::string::npos) { + orderByClauses[i] = orderByClauses[i] + " NULLS FIRST"; + } + } + + // Extract the order by keys. + if (orderByStartIdx != std::string::npos) { + auto startIdx = orderByStartIdx + orderByStr.length(); + auto length = overClause.length() - startIdx; + splitOrderBy(overClause, startIdx, length, orderByClauses); + } + + // Sort the input data before streaming window. + op = PlanBuilder() + .setParseOptions(options_) + .values(input) + .orderBy(orderByClauses, false) + .streamingWindow({functionSql}) + .planNode(); + + auto rowType = asRowType(input[0]->type()); + std::string columnsString = folly::join(", ", rowType->names()); + std::string querySql = + fmt::format("SELECT {}, {} FROM tmp", columnsString, functionSql); + + return {op, functionSql, querySql}; +} + RowVectorPtr WindowTestBase::makeSimpleVector(vector_size_t size) { return makeRowVector({ makeFlatVector(size, [](auto row) { return row % 5; }), @@ -112,6 +188,7 @@ void WindowTestBase::testWindowFunction( if (createTable) { createDuckDbTable(input); } + for (const auto& overClause : overClauses) { for (auto& frameClause : frameClauses) { auto queryInfo = @@ -157,6 +234,26 @@ void WindowTestBase::testKRangeFrames(const std::string& function) { testWindowFunction({vectors}, function, {overClause}, kRangeFrames); } +void WindowTestBase::testStreamingWindowFunction( + const std::vector& input, + const std::string& function, + const std::vector& overClauses, + const std::vector& frameClauses, + bool createTable) { + if (createTable) { + createDuckDbTable(input); + } + + for (const auto& overClause : overClauses) { + for (auto& frameClause : frameClauses) { + auto queryInfo = + makeStreamingWindow(input, function, overClause, frameClause); + SCOPED_TRACE(queryInfo.functionSql); + assertQuery(queryInfo.planNode, queryInfo.querySql); + } + } +} + void WindowTestBase::assertWindowFunctionError( const std::vector& input, const std::string& function, diff --git a/velox/functions/lib/window/tests/WindowTestBase.h b/velox/functions/lib/window/tests/WindowTestBase.h index 3703e439e7ff..55ecebe00dfa 100644 --- a/velox/functions/lib/window/tests/WindowTestBase.h +++ b/velox/functions/lib/window/tests/WindowTestBase.h @@ -88,13 +88,13 @@ inline const std::vector kFrameClauses = { "rows between c3 preceding and c2 following", "rows between c2 preceding and c3 following", - // Frame clauses with invalid frames. - "rows between unbounded preceding and 1 preceding", - "rows between 1 preceding and 4 preceding", - "rows between 1 following and unbounded following", - "rows between 4 following and 1 following", - "rows between c2 preceding and c3 preceding", - "rows between c2 following and c3 following", + // // Frame clauses with invalid frames. + // "rows between unbounded preceding and 1 preceding", + // "rows between 1 preceding and 4 preceding", + // "rows between 1 following and unbounded following", + // "rows between 4 following and 1 following", + // "rows between c2 preceding and c3 preceding", + // "rows between c2 following and c3 following", }; class WindowTestBase : public exec::test::OperatorTestBase { @@ -139,6 +139,14 @@ class WindowTestBase : public exec::test::OperatorTestBase { const std::string& overClause, const std::string& frameClause); + // This function is used to test the StreamingWindow. It will add the order by + // action to ensure the data is ordered. + QueryInfo makeStreamingWindow( + const std::vector& input, + const std::string& function, + const std::string& overClause, + const std::string& frameClause); + /// This function tests SQL queries for the window function and /// the specified overClauses and frameClauses with the input RowVectors. /// Note : 'function' should be a full window function invocation string @@ -155,6 +163,20 @@ class WindowTestBase : public exec::test::OperatorTestBase { void testKRangeFrames(const std::string& function); + /// This function tests SQL queries for the window function and + /// the specified overClauses and frameClauses with the input RowVectors. + /// Note : 'function' should be a full window function invocation string + /// including input parameters and open/close braces. e.g. rank(), ntile(5). + /// If the frameClauses is not specified, then the default is a single empty + /// clause that corresponds to the default frame of RANGE UNBOUNDED PRECEDING + /// AND CURRENT ROW. + void testStreamingWindowFunction( + const std::vector& input, + const std::string& function, + const std::vector& overClauses, + const std::vector& frameClauses = {""}, + bool createTable = true); + /// This function tests the SQL query for the window function and overClause /// combination with the input RowVectors. It is expected that query execution /// will throw an exception with the errorMessage specified. diff --git a/velox/functions/sparksql/window/tests/CMakeLists.txt b/velox/functions/sparksql/window/tests/CMakeLists.txt index 1f2549aa72bd..d3b7f841e997 100644 --- a/velox/functions/sparksql/window/tests/CMakeLists.txt +++ b/velox/functions/sparksql/window/tests/CMakeLists.txt @@ -18,6 +18,7 @@ set(CMAKE_WINDOW_TEST_LINK_LIBRARIES velox_exec_test_lib velox_functions_spark_window velox_functions_window_test_lib + velox_window velox_vector_fuzzer velox_vector_test_lib gflags::gflags @@ -36,3 +37,14 @@ add_test( target_link_libraries(velox_spark_windows_value_test ${CMAKE_WINDOW_TEST_LINK_LIBRARIES}) + +add_executable(velox_spark_streaming_window_test + StreamingWindowTest.cpp ${CMAKE_WINDOW_TEST_MAIN_FILES}) + +add_test( + NAME velox_spark_streaming_window_test + COMMAND velox_spark_streaming_window_test + WORKING_DIRECTORY .) + +target_link_libraries(velox_spark_streaming_window_test + ${CMAKE_WINDOW_TEST_LINK_LIBRARIES}) diff --git a/velox/functions/sparksql/window/tests/StreamingWindowTest.cpp b/velox/functions/sparksql/window/tests/StreamingWindowTest.cpp new file mode 100644 index 000000000000..a02bbe58ba08 --- /dev/null +++ b/velox/functions/sparksql/window/tests/StreamingWindowTest.cpp @@ -0,0 +1,124 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/functions/lib/window/tests/WindowTestBase.h" +#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h" +#include "velox/functions/sparksql/window/WindowFunctionsRegistration.h" + +using namespace facebook::velox::exec::test; + +namespace facebook::velox::window::test { + +namespace { + +static const std::vector kWindowFunctions = { + std::string("rank()"), + std::string("dense_rank()"), + std::string("percent_rank()"), + std::string("cume_dist()"), + std::string("row_number()"), + std::string("ntile(1)"), + std::string("sum(c2)"), + std::string("min(c2)"), + std::string("max(c2)"), + std::string("count(c2)"), + std::string("avg(c2)"), + std::string("sum(1)")}; + +// The StreamingWindowTestBase class is used to instantiate parameterized window +// function tests. The parameters are based on the function being tested +// and a specific over clause. The window function is tested for the over +// clause and all combinations of frame clauses. Doing so amortizes the +// input vector and DuckDB table construction once across all the frame clauses +// for a (function, over clause) combination. +struct StreamingWindowTestParam { + const std::string function; + const std::string overClause; + const std::string frameClause; +}; + +class StreamingWindowTestBase : public WindowTestBase { + protected: + explicit StreamingWindowTestBase(const StreamingWindowTestParam& testParam) + : function_(testParam.function), + overClause_(testParam.overClause), + frameClause_(testParam.frameClause) {} + + void testStreamingWindowFunction(const std::vector& vectors) { + WindowTestBase::testStreamingWindowFunction( + vectors, function_, {overClause_}, {frameClause_}); + } + + void SetUp() override { + WindowTestBase::SetUp(); + window::prestosql::registerAllWindowFunctions(); + velox::functions::window::sparksql::registerWindowFunctions(""); + } + + const std::string function_; + const std::string overClause_; + const std::string frameClause_; +}; + +std::vector getStreamingWindowTestParams() { + std::vector params; + for (auto function : kWindowFunctions) { + for (auto overClause : kOverClauses) { + for (auto frameClause : kFrameClauses) { + params.push_back({function, overClause, frameClause}); + } + } + } + return params; +} + +class StreamingWindowTest + : public StreamingWindowTestBase, + public testing::WithParamInterface { + public: + StreamingWindowTest() : StreamingWindowTestBase(GetParam()) {} +}; + +// Tests all functions with a dataset with uniform distribution of partitions. +TEST_P(StreamingWindowTest, basic) { + testStreamingWindowFunction({makeSimpleVector(40)}); +} + +// Tests all functions with a dataset with all rows in a single partition, +// but in 2 input vectors. +TEST_P(StreamingWindowTest, singlePartition) { + testStreamingWindowFunction( + {makeSinglePartitionVector(40), makeSinglePartitionVector(50)}); +} + +// Tests all functions with a dataset in which all partitions have a single row. +TEST_P(StreamingWindowTest, singleRowPartitions) { + testStreamingWindowFunction({makeSingleRowPartitionsVector(40)}); +} + +// Tests all functions with a dataset with randomly generated data. +TEST_P(StreamingWindowTest, randomInput) { + testStreamingWindowFunction({makeRandomInputVector(30)}); +} + +// Run above tests for all combinations of window function and over clauses. +VELOX_INSTANTIATE_TEST_SUITE_P( + StreamingWindowTestInstantiation, + StreamingWindowTest, + testing::ValuesIn(getStreamingWindowTestParams())); + +}; // namespace +}; // namespace facebook::velox::window::test