diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index cdb724f1ef0f0..7eee84e4a75d9 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -1747,8 +1747,41 @@ PlanNodePtr RowNumberNode::create(const folly::dynamic& obj, void* context) { source); } +namespace { +std::unordered_map +rankFunctionNames() { + return { + {TopNRowNumberNode::RankFunction::kRowNumber, "row_number"}, + {TopNRowNumberNode::RankFunction::kRank, "rank"}, + {TopNRowNumberNode::RankFunction::kDenseRank, "dense_rank"}, + }; +} +} // namespace + +// static +const char* TopNRowNumberNode::rankFunctionName( + TopNRowNumberNode::RankFunction function) { + static const auto kFunctionNames = rankFunctionNames(); + auto it = kFunctionNames.find(function); + VELOX_CHECK( + it != kFunctionNames.end(), + "Invalid window type {}", + static_cast(function)); + return it->second.c_str(); +} + +// static +TopNRowNumberNode::RankFunction TopNRowNumberNode::rankFunctionFromName( + const std::string& name) { + static const auto kFunctionNames = invertMap(rankFunctionNames()); + auto it = kFunctionNames.find(name); + VELOX_CHECK(it != kFunctionNames.end(), "Invalid rank function " + name); + return it->second; +} + TopNRowNumberNode::TopNRowNumberNode( PlanNodeId id, + RankFunction function, std::vector partitionKeys, std::vector sortingKeys, std::vector sortingOrders, @@ -1756,6 +1789,7 @@ TopNRowNumberNode::TopNRowNumberNode( int32_t limit, PlanNodePtr source) : PlanNode(std::move(id)), + function_(function), partitionKeys_{std::move(partitionKeys)}, sortingKeys_{std::move(sortingKeys)}, sortingOrders_{std::move(sortingOrders)}, @@ -1793,6 +1827,8 @@ TopNRowNumberNode::TopNRowNumberNode( } void TopNRowNumberNode::addDetails(std::stringstream& stream) const { + stream << rankFunctionName(function_) << " "; + if (!partitionKeys_.empty()) { stream << "partition by ("; addFields(stream, partitionKeys_); @@ -1808,6 +1844,7 @@ void TopNRowNumberNode::addDetails(std::stringstream& stream) const { folly::dynamic TopNRowNumberNode::serialize() const { auto obj = PlanNode::serialize(); + obj["function"] = rankFunctionName(function_); obj["partitionKeys"] = ISerializable::serialize(partitionKeys_); obj["sortingKeys"] = ISerializable::serialize(sortingKeys_); obj["sortingOrders"] = serializeSortingOrders(sortingOrders_); @@ -1823,6 +1860,7 @@ PlanNodePtr TopNRowNumberNode::create( const folly::dynamic& obj, void* context) { auto source = deserializeSingleSource(obj, context); + auto function = rankFunctionFromName(obj["function"].asString()); auto partitionKeys = deserializeFields(obj["partitionKeys"], context); auto sortingKeys = deserializeFields(obj["sortingKeys"], context); @@ -1835,6 +1873,7 @@ PlanNodePtr TopNRowNumberNode::create( return std::make_shared( deserializePlanNodeId(obj), + function, partitionKeys, sortingKeys, sortingOrders, diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index b4449989b8d57..82cf670fc0f3b 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -2339,24 +2339,36 @@ class MarkDistinctNode : public PlanNode { const RowTypePtr outputType_; }; -/// Optimized version of a WindowNode for a single row_number function with a -/// limit over sorted partitions. -/// The output of this node contains all input columns followed by an optional +/// Optimized version of a WindowNode for a single row_number, rank or +/// dense_rank function with a limit over sorted partitions. The output of this +/// node contains all input columns followed by an optional /// 'rowNumberColumnName' BIGINT column. class TopNRowNumberNode : public PlanNode { public: + enum class RankFunction { + kRowNumber, + kRank, + kDenseRank, + }; + + static const char* rankFunctionName(TopNRowNumberNode::RankFunction function); + + static RankFunction rankFunctionFromName(const std::string& name); + + /// @param rankFunction RanksFunction (row_number, rank, dense_rank) for TopN. /// @param partitionKeys Partitioning keys. May be empty. /// @param sortingKeys Sorting keys. May not be empty and may not intersect /// with 'partitionKeys'. /// @param sortingOrders Sorting orders, one per sorting key. /// @param rowNumberColumnName Optional name of the column containing row - /// numbers. If not specified, the output doesn't include 'row number' - /// column. This is used when computing partial results. + /// numbers (or rank and dense_rank). If not specified, the output doesn't + /// include 'row number' column. This is used when computing partial results. /// @param limit Per-partition limit. The number of /// rows produced by this node will not exceed this value for any given /// partition. Extra rows will be dropped. TopNRowNumberNode( PlanNodeId id, + RankFunction function, std::vector partitionKeys, std::vector sortingKeys, std::vector sortingOrders, @@ -2364,6 +2376,36 @@ class TopNRowNumberNode : public PlanNode { int32_t limit, PlanNodePtr source); +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY + /// @param partitionKeys Partitioning keys. May be empty. + /// @param sortingKeys Sorting keys. May not be empty and may not intersect + /// with 'partitionKeys'. + /// @param sortingOrders Sorting orders, one per sorting key. + /// @param rowNumberColumnName Optional name of the column containing row + /// numbers (or rank and dense_rank). If not specified, the output doesn't + /// include 'row number' column. This is used when computing partial results. + /// @param limit Per-partition limit. The number of + /// rows produced by this node will not exceed this value for any given + /// partition. Extra rows will be dropped. + TopNRowNumberNode( + PlanNodeId id, + std::vector partitionKeys, + std::vector sortingKeys, + std::vector sortingOrders, + const std::optional& rowNumberColumnName, + int32_t limit, + PlanNodePtr source) + : TopNRowNumberNode( + id, + RankFunction::kRowNumber, + partitionKeys, + sortingKeys, + sortingOrders, + rowNumberColumnName, + limit, + source) {} +#endif + const std::vector& sources() const override { return sources_; } @@ -2396,6 +2438,10 @@ class TopNRowNumberNode : public PlanNode { return limit_; } + RankFunction rankFunction() const { + return function_; + } + bool generateRowNumber() const { return outputType_->size() > sources_[0]->outputType()->size(); } @@ -2411,6 +2457,8 @@ class TopNRowNumberNode : public PlanNode { private: void addDetails(std::stringstream& stream) const override; + const RankFunction function_; + const std::vector partitionKeys_; const std::vector sortingKeys_; diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 2fafa805e3d25..2e0a11da6bd88 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -1263,6 +1263,22 @@ bool RowComparator::operator()(const char* lhs, const char* rhs) { return false; } +int32_t RowComparator::compare(const char* lhs, const char* rhs) { + if (lhs == rhs) { + return false; + } + for (auto& key : keyInfo_) { + if (auto result = rowContainer_->compare( + lhs, + rhs, + key.first, + {key.second.isNullsFirst(), key.second.isAscending(), false})) { + return result; + } + } + return 0; +} + bool RowComparator::operator()( const std::vector& decodedVectors, vector_size_t index, @@ -1279,4 +1295,21 @@ bool RowComparator::operator()( } return false; } + +int32_t RowComparator::compare( + const std::vector& decodedVectors, + vector_size_t index, + const char* rhs) { + for (auto& key : keyInfo_) { + if (auto result = rowContainer_->compare( + rhs, + rowContainer_->columnAt(key.first), + decodedVectors[key.first], + index, + {key.second.isNullsFirst(), key.second.isAscending(), false})) { + return result; + } + } + return 0; +} } // namespace facebook::velox::exec diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index b74f0b821173f..23a5cbcf1ae1a 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -1762,12 +1762,22 @@ class RowComparator { /// Returns true if lhs < rhs, false otherwise. bool operator()(const char* lhs, const char* rhs); + /// Returns 0 for equal, < 0 for 'lhs' < 'rhs', > 0 otherwise. + int compare(const char* lhs, const char* rhs); + /// Returns true if decodeVectors[index] < rhs, false otherwise. bool operator()( const std::vector& decodedVectors, vector_size_t index, const char* rhs); + /// Returns 0 for equal, < 0 for 'decodedVectors[index]' < 'rhs', + /// > 0 otherwise. + int32_t compare( + const std::vector& decodedVectors, + vector_size_t index, + const char* rhs); + private: std::vector> keyInfo_; RowContainer* rowContainer_; diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index bbd592cc09daa..557a116ed10aa 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -20,6 +20,28 @@ namespace facebook::velox::exec { namespace { +#define RANK_FUNCTION_DISPATCH(TEMPLATE_FUNC, functionKind, ...) \ + [&]() { \ + switch (functionKind) { \ + case core::TopNRowNumberNode::RankFunction::kRowNumber: { \ + return TEMPLATE_FUNC< \ + core::TopNRowNumberNode::RankFunction::kRowNumber>(__VA_ARGS__); \ + } \ + case core::TopNRowNumberNode::RankFunction::kRank: { \ + return TEMPLATE_FUNC( \ + __VA_ARGS__); \ + } \ + case core::TopNRowNumberNode::RankFunction::kDenseRank: { \ + return TEMPLATE_FUNC< \ + core::TopNRowNumberNode::RankFunction::kDenseRank>(__VA_ARGS__); \ + } \ + default: \ + VELOX_FAIL( \ + "not a rank function kind: {}", \ + core::TopNRowNumberNode::rankFunctionName(functionKind)); \ + } \ + }() + std::vector reorderInputChannels( const RowTypePtr& inputType, const std::vector& partitionKeys, @@ -114,9 +136,11 @@ TopNRowNumber::TopNRowNumber( node->canSpill(driverCtx->queryConfig()) ? driverCtx->makeSpillConfig(operatorId) : std::nullopt), + rankFunction_(node->rankFunction()), limit_{node->limit()}, generateRowNumber_{node->generateRowNumber()}, numPartitionKeys_{node->partitionKeys().size()}, + numSortingKeys_{node->sortingKeys().size()}, inputChannels_{reorderInputChannels( node->inputType(), node->partitionKeys(), @@ -201,11 +225,12 @@ void TopNRowNumber::addInput(RowVectorPtr input) { // Process input rows. For each row, lookup the partition. If number of rows // in that partition is less than limit, add the new row. Otherwise, check // if row should replace an existing row or be discarded. - for (auto i = 0; i < numInput; ++i) { - auto& partition = partitionAt(lookup_->hits[i]); - processInputRow(i, partition); - } + RANK_FUNCTION_DISPATCH(processInputRowLoop, rankFunction_, numInput, true); + // It is determined that the TopNRowNumber (as a partial) is not rejecting + // enough input rows to make the duplicate detection worthwhile. Hence, + // abandon the processing at this partial TopN and let the final TopN do + // the processing. if (abandonPartialEarly()) { abandonedPartial_ = true; addRuntimeStat("abandonedPartial", RuntimeCounter(1)); @@ -215,9 +240,7 @@ void TopNRowNumber::addInput(RowVectorPtr input) { outputRows_.resize(outputBatchSize_); } } else { - for (auto i = 0; i < numInput; ++i) { - processInputRow(i, *singlePartition_); - } + RANK_FUNCTION_DISPATCH(processInputRowLoop, rankFunction_, numInput, false); } } @@ -242,25 +265,127 @@ void TopNRowNumber::initializeNewPartitions() { } } +template <> +void TopNRowNumber::processRowBelowLimit< + core::TopNRowNumberNode::RankFunction::kRank>( + vector_size_t index, + TopRows& partition) { + auto& topRows = partition.rows; + if (topRows.empty()) { + partition.currentLimit = 1; + } else { + auto topRow = topRows.top(); + auto result = comparator_.compare(decodedVectors_, index, topRow); + if (result > 0) { + partition.currentLimit += 1; + } else if (result < 0) { + // This value is greater than the current highest rank. So the new + // rank is incremented by the number of rows at the top rank. + partition.currentLimit += partition.numTopRankRows(); + } + } +} + +template <> +void TopNRowNumber::processRowBelowLimit< + core::TopNRowNumberNode::RankFunction::kDenseRank>( + vector_size_t index, + TopRows& partition) { + if (!partition.isDuplicate(decodedVectors_, index)) { + partition.currentLimit++; + } +} + +template <> +void TopNRowNumber::processRowBelowLimit< + core::TopNRowNumberNode::RankFunction::kRowNumber>( + vector_size_t index, + TopRows& partition) { + partition.currentLimit++; +} + +template <> +char* TopNRowNumber::processRowAboveLimit< + core::TopNRowNumberNode::RankFunction::kRank>( + vector_size_t index, + TopRows& partition) { + auto& topRows = partition.rows; + + char* topRow = partition.removeTopRankRows(); + char* newRow = data_->initializeRow(topRow, true /* reuse */); + // If limit = 1, then the queue becomes empty. + if (topRows.empty()) { + partition.currentLimit = 1; + } else { + auto numNewTopRankRows = partition.numTopRankRows(); + topRow = topRows.top(); + // Depending on whether the new row is < or = the new top rank row + // the new top rank changes. + if (comparator_.compare(decodedVectors_, index, topRow) == 0) { + partition.currentLimit = topRows.size() - numNewTopRankRows + 1; + } else { + partition.currentLimit = topRows.size() - numNewTopRankRows + 2; + } + } + return newRow; +} + +template <> +char* TopNRowNumber::processRowAboveLimit< + core::TopNRowNumberNode::RankFunction::kDenseRank>( + vector_size_t index, + TopRows& partition) { + char* newRow = nullptr; + if (!partition.isDuplicate(decodedVectors_, index)) { + char* topRow = partition.removeTopRankRows(); + newRow = data_->initializeRow(topRow, true /* reuse */); + } else { + newRow = data_->newRow(); + } + return newRow; +} + +template <> +char* TopNRowNumber::processRowAboveLimit< + core::TopNRowNumberNode::RankFunction::kRowNumber>( + vector_size_t /*index*/, + TopRows& partition) { + // Replace existing row and reuse its memory. + auto& topRows = partition.rows; + char* topRow = topRows.top(); + topRows.pop(); + return data_->initializeRow(topRow, true /* reuse */); +} + +template void TopNRowNumber::processInputRow(vector_size_t index, TopRows& partition) { auto& topRows = partition.rows; char* newRow = nullptr; - if (topRows.size() < limit_) { + if (partition.currentLimit < limit_) { newRow = data_->newRow(); + processRowBelowLimit(index, partition); } else { + // partition.limit >= limit case. char* topRow = topRows.top(); - - if (!comparator_(decodedVectors_, index, topRow)) { - // Drop this input row. + auto result = comparator_.compare(decodedVectors_, index, topRow); + if (result < 0) { return; } + if (result == 0) { + if (rankFunction_ == core::TopNRowNumberNode::RankFunction::kRowNumber) { + // Row number ignores such rows. + return; + } - // Replace existing row. - topRows.pop(); - - // Reuse the topRow's memory. - newRow = data_->initializeRow(topRow, true /* reuse */); + // This row has the same value as the largest value in partition.rows. + // So it needs to be pushed in partition.rows. The currentLimit (highest + // rank) remains unchanged. + newRow = data_->newRow(); + } + if (result > 0) { + newRow = processRowAboveLimit(index, partition); + } } for (auto col = 0; col < decodedVectors_.size(); ++col) { @@ -270,6 +395,19 @@ void TopNRowNumber::processInputRow(vector_size_t index, TopRows& partition) { topRows.push(newRow); } +template +void TopNRowNumber::processInputRowLoop(vector_size_t numInput, bool doLookup) { + if (doLookup) { + for (auto i = 0; i < numInput; ++i) { + processInputRow(i, partitionAt(lookup_->hits[i])); + } + } else { + for (auto i = 0; i < numInput; ++i) { + processInputRow(i, *singlePartition_); + } + } +} + void TopNRowNumber::noMoreInput() { Operator::noMoreInput(); @@ -311,16 +449,30 @@ void TopNRowNumber::updateEstimatedOutputRowSize() { } } +vector_size_t TopNRowNumber::computeTopRank(TopRows& partition) { + if (rankFunction_ == core::TopNRowNumberNode::RankFunction::kRank) { + if (partition.currentLimit > limit_) { + partition.removeTopRankRows(); + auto numNewTopRankRows = partition.numTopRankRows(); + partition.currentLimit = partition.rows.size() - numNewTopRankRows + 1; + } + } + + return partition.currentLimit; +} + TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { if (!table_) { - if (!currentPartition_) { - currentPartition_ = 0; + if (!currentPartitionNumber_) { + currentPartitionNumber_ = 0; + nextRank_ = computeTopRank(*singlePartition_); + numPeers_ = 1; return singlePartition_.get(); } return nullptr; } - if (!currentPartition_) { + if (!currentPartitionNumber_) { numPartitions_ = table_->listAllRows( &partitionIt_, partitions_.size(), @@ -330,44 +482,65 @@ TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { // No more partitions. return nullptr; } - - currentPartition_ = 0; + currentPartitionNumber_ = 0; } else { - ++currentPartition_.value(); - if (currentPartition_ >= numPartitions_) { - currentPartition_.reset(); + ++currentPartitionNumber_.value(); + if (currentPartitionNumber_ >= numPartitions_) { + currentPartitionNumber_.reset(); return nextPartition(); } } - return ¤tPartition(); + auto partition = &partitionAt(partitions_[currentPartitionNumber_.value()]); + nextRank_ = computeTopRank(*partition); + numPeers_ = 1; + return partition; } -TopNRowNumber::TopRows& TopNRowNumber::currentPartition() { - VELOX_CHECK(currentPartition_.has_value()); +void TopNRowNumber::computeNextRankInMemory( + const TopRows& partition, + vector_size_t outputIndex) { + if (rankFunction_ == core::TopNRowNumberNode::RankFunction::kRowNumber) { + nextRank_ -= 1; + return; + } - if (!table_) { - return *singlePartition_; + // This is the logic for rank() and dense_rank(). + // Is the next row a peer of the current one ? If yes, then the rank is + // unchanged, but the number of peers incremented. + if (comparator_.compare(outputRows_[outputIndex], partition.rows.top()) == + 0) { + numPeers_ += 1; + return; } - return partitionAt(partitions_[currentPartition_.value()]); + // The new row is not a peer of the current one. So dense_rank drops the + // rank by 1, but rank drops it by the number of peers (which is then reset). + if (rankFunction_ == core::TopNRowNumberNode::RankFunction::kDenseRank) { + nextRank_ -= 1; + } else { + nextRank_ -= numPeers_; + numPeers_ = 1; + } } void TopNRowNumber::appendPartitionRows( TopRows& partition, - vector_size_t start, - vector_size_t size, + vector_size_t numRows, vector_size_t outputOffset, - FlatVector* rowNumbers) { - // Append 'size' partition rows in reverse order starting from 'start' row. - auto rowNumber = partition.rows.size() - start; - for (auto i = 0; i < size; ++i) { - const auto index = outputOffset + size - i - 1; - if (rowNumbers) { - rowNumbers->set(index, rowNumber--); + FlatVector* rankValues) { + // The partition.rows priority queue pops rows in order of reverse + // ranks. Output rows based on nextRank_ and update it with each row. + for (auto i = 0; i < numRows; ++i) { + auto index = outputOffset + i; + if (rankValues) { + rankValues->set(index, nextRank_); } outputRows_[index] = partition.rows.top(); partition.rows.pop(); + if (!partition.rows.empty()) { + computeNextRankInMemory(partition, index); + } } } @@ -383,7 +556,7 @@ RowVectorPtr TopNRowNumber::getOutput() { return output; } - // We may have input accumulated in 'data_'. + // There could be older rows accumulated in 'data_'. if (data_->numRows() > 0) { return getOutputFromMemory(); } @@ -391,7 +564,7 @@ RowVectorPtr TopNRowNumber::getOutput() { if (noMoreInput_) { finished_ = true; } - + // There is no data to return at this moment. return nullptr; } @@ -399,6 +572,8 @@ RowVectorPtr TopNRowNumber::getOutput() { return nullptr; } + // All the input data is received, so the operator can start producing + // output. RowVectorPtr output; if (merge_ != nullptr) { output = getOutputFromSpill(); @@ -425,37 +600,34 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() { } vector_size_t offset = 0; - if (remainingRowsInPartition_ > 0) { - auto& partition = currentPartition(); - auto start = partition.rows.size() - remainingRowsInPartition_; - const auto numRows = - std::min(outputBatchSize_, remainingRowsInPartition_); - appendPartitionRows(partition, start, numRows, offset, rowNumbers); - offset += numRows; - remainingRowsInPartition_ -= numRows; - } - + // Continue to output as many remaining partitions as possible. while (offset < outputBatchSize_) { - auto* partition = nextPartition(); - if (!partition) { - break; + // No previous partition to output (since this is the first partition). + if (!currentPartition_) { + currentPartition_ = nextPartition(); + if (!currentPartition_) { + break; + } } - auto numRows = partition->rows.size(); - if (offset + numRows > outputBatchSize_) { - remainingRowsInPartition_ = offset + numRows - outputBatchSize_; - - // Add a subset of partition rows. - numRows -= remainingRowsInPartition_; - appendPartitionRows(*partition, 0, numRows, offset, rowNumbers); - offset += numRows; + auto numOutputRowsLeft = outputBatchSize_ - offset; + if (currentPartition_->rows.size() > numOutputRowsLeft) { + // Only a partial partition can be output in this getOutput() call. + // Output as many rows as possible. + appendPartitionRows( + *currentPartition_, numOutputRowsLeft, offset, rowNumbers); + offset += numOutputRowsLeft; break; } // Add all partition rows. - appendPartitionRows(*partition, 0, numRows, offset, rowNumbers); - offset += numRows; - remainingRowsInPartition_ = 0; + auto numPartitionRows = currentPartition_->rows.size(); + appendPartitionRows( + *currentPartition_, numPartitionRows, offset, rowNumbers); + offset += numPartitionRows; + + // Move to the next partition. + currentPartition_ = nextPartition(); } if (offset == 0) { @@ -498,22 +670,64 @@ bool TopNRowNumber::isNewPartition( return false; } +bool TopNRowNumber::isNewPeer( + const RowVectorPtr& output, + vector_size_t index, + SpillMergeStream* next) { + VELOX_CHECK_GT(index, 0); + + for (auto i = numPartitionKeys_; i < numPartitionKeys_ + numSortingKeys_; + ++i) { + if (!output->childAt(inputChannels_[i]) + ->equalValueAt( + next->current().childAt(i).get(), + index - 1, + next->currentIndex())) { + return true; + } + } + return false; +} + void TopNRowNumber::setupNextOutput( const RowVectorPtr& output, - int32_t rowNumber) { + int32_t currentRank, + int32_t numPeers) { auto* lookAhead = merge_->next(); if (lookAhead == nullptr) { - nextRowNumber_ = 0; + nextRank_ = 1; + numPeers_ = 1; return; } if (isNewPartition(output, output->size(), lookAhead)) { - nextRowNumber_ = 0; + nextRank_ = 1; + numPeers_ = 1; return; } - nextRowNumber_ = rowNumber; - if (nextRowNumber_ < limit_) { + nextRank_ = currentRank; + numPeers_ = numPeers; + // This row belongs to the same partition as the previous row. However, + // it should be determined if it is a peer row as well. If peer, then rank + // is not increased. + if (isNewPeer(output, output->size(), lookAhead)) { + if (rankFunction_ == core::TopNRowNumberNode::RankFunction::kDenseRank) { + nextRank_ += 1; + numPeers_ = 1; + } else if (rankFunction_ == core::TopNRowNumberNode::RankFunction::kRank) { + nextRank_ += numPeers_; + numPeers_ = 1; + } + } else { + numPeers_ += 1; + } + + if (rankFunction_ == core::TopNRowNumberNode::RankFunction::kRowNumber) { + nextRank_ += 1; + } + + if (nextRank_ <= limit_) { return; } @@ -521,14 +735,16 @@ void TopNRowNumber::setupNextOutput( lookAhead->pop(); while (auto* next = merge_->next()) { if (isNewPartition(output, output->size(), next)) { - nextRowNumber_ = 0; + nextRank_ = 1; + numPeers_ = 1; return; } next->pop(); } // This partition is the last partition. - nextRowNumber_ = 0; + nextRank_ = 1; + numPeers_ = 1; } RowVectorPtr TopNRowNumber::getOutputFromSpill() { @@ -541,34 +757,63 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() { // row number to zero. Once row number reaches the 'limit_', we'll start // dropping rows until the next partition starts. // We'll emit output every time we accumulate 'outputBatchSize_' rows. - auto output = BaseVector::create(outputType_, outputBatchSize_, pool()); - FlatVector* rowNumbers = nullptr; + FlatVector* rankValues = nullptr; if (generateRowNumber_) { - rowNumbers = output->children().back()->as>(); + rankValues = output->children().back()->as>(); } // Index of the next row to append to output. vector_size_t index = 0; - // Row number of the next row in the current partition. - vector_size_t rowNumber = nextRowNumber_; - VELOX_CHECK_LT(rowNumber, limit_); + // Rank of the next row in the current partition. + vector_size_t rank = nextRank_; + VELOX_CHECK_LE(rank, limit_); + // Tracks the number of peers of the current row seen thus far. + // This is used to increment ranks for the rank function. + vector_size_t numPeers = numPeers_; for (;;) { auto next = merge_->next(); if (next == nullptr) { break; } - // Check if this row comes from a new partition. - if (index > 0 && isNewPartition(output, index, next)) { - rowNumber = 0; + if (index > 0) { + // Check if this row comes from a new partition. + if (isNewPartition(output, index, next)) { + rank = 1; + numPeers = 1; + } else { + // This row is the same partition as the previous. Check if it is a + // peer or not. If it is a new peer (so different order by value) then + // the ranks are updated. + if (isNewPeer(output, index, next)) { + if (rankFunction_ == core::TopNRowNumberNode::RankFunction::kRank) { + rank += numPeers; + } else if ( + rankFunction_ == + core::TopNRowNumberNode::RankFunction::kDenseRank) { + rank += 1; + } + numPeers = 1; + } else { + // This row was a peer with the same order by values as the previous + // row so the rank remains the same. + numPeers += 1; + } + + // Rank is always incremented for Row number function. + if (rankFunction_ == + core::TopNRowNumberNode::RankFunction::kRowNumber) { + rank += 1; + } + } } // Copy this row to the output buffer if this partition has // < limit_ rows output. - if (rowNumber < limit_) { + if (rank <= limit_) { for (auto i = 0; i < inputChannels_.size(); ++i) { output->childAt(inputChannels_[i]) ->copy( @@ -577,12 +822,10 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() { next->currentIndex(), 1); } - if (rowNumbers) { - // Row numbers start with 1. - rowNumbers->set(index, rowNumber + 1); + if (rankValues) { + rankValues->set(index, rank); } ++index; - ++rowNumber; } // Pop this row from the spill. @@ -593,8 +836,9 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() { // Prepare the next batch : // i) If 'limit_' is reached for this partition, then skip the rows // until the next partition. - // ii) If the next row is from a new partition, then reset rowNumber_. - setupNextOutput(output, rowNumber); + // ii) If the next row is from a new partition, then reset rank_. + setupNextOutput(output, rank, numPeers); + return output; } } @@ -760,4 +1004,82 @@ void TopNRowNumber::setupSpiller() { &spillConfig_.value(), &spillStats_); } + +char* TopNRowNumber::TopRows::removeTopRankRows() { + VELOX_CHECK(!rows.empty()); + + char* topRow = rows.top(); + rows.pop(); + + while (!rows.empty()) { + char* newTopRow = rows.top(); + if (rowComparator.compare(topRow, newTopRow) != 0) { + return topRow; + } + rows.pop(); + } + return topRow; +} + +vector_size_t TopNRowNumber::TopRows::numTopRankRows() { + // auto& topRows = rows; + VELOX_CHECK(!rows.empty()); + + std::vector allTopRows{}; + allTopRows.reserve(rows.size()); + + auto pushTopRows = [&]() -> void { + for (auto row : allTopRows) { + rows.push(row); + } + }; + + auto popTopRows = [&]() -> void { + allTopRows.push_back(rows.top()); + rows.pop(); + }; + + char* topRow = rows.top(); + popTopRows(); + vector_size_t numRows = 1; + while (!rows.empty()) { + char* newTopRow = rows.top(); + if (rowComparator.compare(topRow, newTopRow) != 0) { + pushTopRows(); + return numRows; + } + numRows += 1; + popTopRows(); + } + + // All rows in the topRows have the same value. So the top rank = 1. + pushTopRows(); + return numRows; +} + +namespace { +template +S& PriorityQueueVector(std::priority_queue& q) { + struct PrivateQueue : private std::priority_queue { + static S& Container(std::priority_queue& q) { + return q.*&PrivateQueue::c; + } + }; + return PrivateQueue::Container(q); +} +} // namespace + +bool TopNRowNumber::TopRows::isDuplicate( + const std::vector& decodedVectors, + vector_size_t index) { + const std::vector> partitionRowsVector = + PriorityQueueVector(rows); + for (const char* row : partitionRowsVector) { + if (rowComparator.compare(decodedVectors, index, row) == 0) { + return true; + } + } + return false; +} + } // namespace facebook::velox::exec diff --git a/velox/exec/TopNRowNumber.h b/velox/exec/TopNRowNumber.h index 1136422a3afce..8772d6e8dbe77 100644 --- a/velox/exec/TopNRowNumber.h +++ b/velox/exec/TopNRowNumber.h @@ -82,8 +82,27 @@ class TopNRowNumber : public Operator { std::priority_queue>, Compare> rows; + RowComparator& rowComparator; + + // This is the highest rank seen so far in the input rows. It is compared + // with the limit for the operator. + int64_t currentLimit = 0; + + // Number of rows with the highest rank in the partition. + vector_size_t numTopRankRows(); + + // Remove all rows with the highest rank in the partition. + char* removeTopRankRows(); + + // Returns true if the row at decodedVectors[index] has the same order by + // keys as another row in the partition's top rows. + bool isDuplicate( + const std::vector& decodedVectors, + vector_size_t index); + TopRows(HashStringAllocator* allocator, RowComparator& comparator) - : rows{{comparator}, StlAllocator(allocator)} {} + : rows{{comparator}, StlAllocator(allocator)}, + rowComparator(comparator) {} }; void initializeNewPartitions(); @@ -93,24 +112,38 @@ class TopNRowNumber : public Operator { } // Adds input row to a partition or discards the row. + template void processInputRow(vector_size_t index, TopRows& partition); + template + void processInputRowLoop(vector_size_t numInput, bool lookup); + + // Input row processing varies if the limit is not yet reached in the + // input rows seen thus far. + template + void processRowBelowLimit(vector_size_t index, TopRows& partition); + + // Input row processing after the limit is reached in the partition. + template + char* processRowAboveLimit(vector_size_t index, TopRows& partition); + // Returns next partition to add to output or nullptr if there are no // partitions left. TopRows* nextPartition(); - // Returns partition that was partially added to the previous output batch. - TopRows& currentPartition(); - - // Appends partition rows to outputRows_ and optionally populates row - // numbers. + // Appends numRows of partition rows to outputRows_. Note : partition.rows + // tops rows in reverse row number order. void appendPartitionRows( TopRows& partition, - vector_size_t start, - vector_size_t size, + vector_size_t numRows, vector_size_t outputOffset, FlatVector* rowNumbers); + // Computes the rank for the next row to be output. + void computeNextRankInMemory( + const TopRows& partition, + vector_size_t outputIndex); + bool spillEnabled() const { return spillConfig_.has_value(); } @@ -133,15 +166,26 @@ class TopNRowNumber : public Operator { vector_size_t index, SpillMergeStream* next); + // Returns true if 'next' row is a new peer (rows differ on order by keys) + // of the previous row in the partition (at output[index] of the + // output block). + bool isNewPeer( + const RowVectorPtr& output, + vector_size_t index, + SpillMergeStream* next); + // Sets nextRowNumber_ to rowNumber. Checks if next row in 'merge_' belongs to // a different partition than last row in 'output' and if so updates - // nextRowNumber_ to 0. Also, checks current partition reached the limit on - // number of rows and if so advances 'merge_' to the first row on the next - // partition and sets nextRowNumber_ to 0. + // nextRank_ and numPeers_ to 0. Also, checks current partition reached + // the limit on rank and if so advances 'merge_' to the first row on the next + // partition and sets nextRank_ and numPeers_ to 0. // // @post 'merge_->next()' is either at end or points to a row that should be // included in the next output batch using 'nextRowNumber_'. - void setupNextOutput(const RowVectorPtr& output, int32_t rowNumber); + void setupNextOutput( + const RowVectorPtr& output, + int32_t rowNumberw, + int32_t numPeers); // Called in noMoreInput() and spill(). void updateEstimatedOutputRowSize(); @@ -150,9 +194,15 @@ class TopNRowNumber : public Operator { // cardinality sufficiently. Returns false if spilling was triggered earlier. bool abandonPartialEarly() const; + vector_size_t computeTopRank(TopRows& partition); + + // Rank function semantics of operator. + const core::TopNRowNumberNode::RankFunction rankFunction_; + const int32_t limit_; const bool generateRowNumber_; const size_t numPartitionKeys_; + const size_t numSortingKeys_; // Input columns in the order of: partition keys, sorting keys, the rest. const std::vector inputChannels_; @@ -208,8 +258,11 @@ class TopNRowNumber : public Operator { // Maximum number of rows in the output batch. vector_size_t outputBatchSize_; - std::vector outputRows_; + // The below variables are used when outputting from memory. + // Vector of pointers to individual rows in the RowContainer for the current + // output block. + std::vector outputRows_; // Number of partitions to fetch from a HashTable in a single listAllRows // call. static const size_t kPartitionBatchSize = 100; @@ -217,16 +270,25 @@ class TopNRowNumber : public Operator { BaseHashTable::RowsIterator partitionIt_; std::vector partitions_{kPartitionBatchSize}; size_t numPartitions_{0}; - std::optional currentPartition_; - vector_size_t remainingRowsInPartition_{0}; - + // THis is the index of the current partition within partitions_ which is + // obtained from the HashTable iterator. + std::optional currentPartitionNumber_; + // This is the currentPartition being output. It is possible that the + // partition is output across multiple output blocks. + TopNRowNumber::TopRows* currentPartition_{nullptr}; + + // The below variables are used when outputting from the spiller. // Spiller for contents of the 'data_'. std::unique_ptr spiller_; // Used to sort-merge spilled data. std::unique_ptr> merge_; - // Row number for the first row in the next output batch. - int32_t nextRowNumber_{0}; + // Row number/rank or dense_rank for the first row in the next output batch + // from the spiller. + vector_size_t nextRank_{1}; + // Number of peers of first row in the previous output batch. This is used + // in rank calculation. + vector_size_t numPeers_{1}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/PlanNodeSerdeTest.cpp b/velox/exec/tests/PlanNodeSerdeTest.cpp index 03159c2e276f5..d381c17078416 100644 --- a/velox/exec/tests/PlanNodeSerdeTest.cpp +++ b/velox/exec/tests/PlanNodeSerdeTest.cpp @@ -537,24 +537,35 @@ TEST_F(PlanNodeSerdeTest, scan) { testSerde(plan); } -TEST_F(PlanNodeSerdeTest, topNRowNumber) { - auto plan = PlanBuilder() - .values({data_}) - .topNRowNumber({}, {"c0", "c2"}, 10, false) - .planNode(); +#define TOPN_SERDE_TEST(funcname) \ + auto plan = PlanBuilder() \ + .values({data_}) \ + .funcname({}, {"c0", "c2"}, 10, false) \ + .planNode(); \ + testSerde(plan); \ + \ + plan = PlanBuilder() \ + .values({data_}) \ + .funcname({}, {"c0", "c2"}, 10, true) \ + .planNode(); \ + testSerde(plan); \ + \ + plan = PlanBuilder() \ + .values({data_}) \ + .funcname({"c0"}, {"c1", "c2"}, 10, false) \ + .planNode(); \ testSerde(plan); - plan = PlanBuilder() - .values({data_}) - .topNRowNumber({}, {"c0", "c2"}, 10, true) - .planNode(); - testSerde(plan); +TEST_F(PlanNodeSerdeTest, topNRowNumber) { + TOPN_SERDE_TEST(topNRowNumber); +} - plan = PlanBuilder() - .values({data_}) - .topNRowNumber({"c0"}, {"c1", "c2"}, 10, false) - .planNode(); - testSerde(plan); +TEST_F(PlanNodeSerdeTest, topNRank) { + TOPN_SERDE_TEST(topNRank); +} + +TEST_F(PlanNodeSerdeTest, topNDemseRank) { + TOPN_SERDE_TEST(topNDenseRank); } TEST_F(PlanNodeSerdeTest, write) { diff --git a/velox/exec/tests/PlanNodeToStringTest.cpp b/velox/exec/tests/PlanNodeToStringTest.cpp index 280f484798b57..4ab1739185b91 100644 --- a/velox/exec/tests/PlanNodeToStringTest.cpp +++ b/velox/exec/tests/PlanNodeToStringTest.cpp @@ -925,37 +925,54 @@ TEST_F(PlanNodeToStringTest, rowNumber) { plan->toString(true, false)); } -TEST_F(PlanNodeToStringTest, topNRowNumber) { - auto rowType = ROW({"a", "b"}, {BIGINT(), VARCHAR()}); - auto plan = PlanBuilder() - .tableScan(rowType) - .topNRowNumber({}, {"a DESC"}, 10, false) - .planNode(); - - ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); - ASSERT_EQ( - "-- TopNRowNumber[1][order by (a DESC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR\n", +#define TOPN_PLANNODE_TO_STRING_TEST(nodename, functionname) \ + auto rowType = ROW({"a", "b"}, {BIGINT(), VARCHAR()}); \ + auto plan = PlanBuilder() \ + .tableScan(rowType) \ + .nodename({}, {"a DESC"}, 10, false) \ + .planNode(); \ + \ + ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); \ + ASSERT_EQ( \ + fmt::format( \ + "-- TopNRowNumber[1][{} order by (a DESC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR\n", \ + functionname), \ + plan->toString(true, false)); \ + \ + plan = PlanBuilder() \ + .tableScan(rowType) \ + .nodename({}, {"a DESC"}, 10, true) \ + .planNode(); \ + \ + ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); \ + ASSERT_EQ( \ + fmt::format( \ + "-- TopNRowNumber[1][{} order by (a DESC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR, row_number:BIGINT\n", \ + functionname), \ + plan->toString(true, false)); \ + \ + plan = PlanBuilder() \ + .tableScan(rowType) \ + .nodename({"a"}, {"b"}, 10, false) \ + .planNode(); \ + \ + ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); \ + ASSERT_EQ( \ + fmt::format( \ + "-- TopNRowNumber[1][{} partition by (a) order by (b ASC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR\n", \ + functionname), \ plan->toString(true, false)); - plan = PlanBuilder() - .tableScan(rowType) - .topNRowNumber({}, {"a DESC"}, 10, true) - .planNode(); - - ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); - ASSERT_EQ( - "-- TopNRowNumber[1][order by (a DESC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR, row_number:BIGINT\n", - plan->toString(true, false)); +TEST_F(PlanNodeToStringTest, topNRowNumber) { + TOPN_PLANNODE_TO_STRING_TEST(topNRowNumber, "row_number"); +} - plan = PlanBuilder() - .tableScan(rowType) - .topNRowNumber({"a"}, {"b"}, 10, false) - .planNode(); +TEST_F(PlanNodeToStringTest, topNRank) { + TOPN_PLANNODE_TO_STRING_TEST(topNRank, "rank"); +} - ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); - ASSERT_EQ( - "-- TopNRowNumber[1][partition by (a) order by (b ASC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR\n", - plan->toString(true, false)); +TEST_F(PlanNodeToStringTest, topNDenseRank) { + TOPN_PLANNODE_TO_STRING_TEST(topNDenseRank, "dense_rank"); } TEST_F(PlanNodeToStringTest, markDistinct) { diff --git a/velox/exec/tests/TopNRowNumberTest.cpp b/velox/exec/tests/TopNRowNumberTest.cpp index 10f2fffca4e04..5e02fbc0ba165 100644 --- a/velox/exec/tests/TopNRowNumberTest.cpp +++ b/velox/exec/tests/TopNRowNumberTest.cpp @@ -27,14 +27,101 @@ namespace facebook::velox::exec { namespace { +#define BUILD_TOPN(nodename) \ + planBuilder.values(values) \ + .nodename(partitionKeys, sortingKeys, limit, generateRowNumber) \ + .planNode() + +#define BUILD_TOPN_PLANNODEID(nodename) \ + planBuilder.values(values) \ + .nodename(partitionKeys, sortingKeys, limit, generateRowNumber) \ + .capturePlanNodeId(planNodeId) \ + .planNode() + +#define BUILD_TOPN_PARTIAL_FINAL(nodename) \ + planBuilder.values(values) \ + .nodename(partitionKeys, sortingKeys, limit, false) \ + .capturePlanNodeId(planNodeId) \ + .nodename(partitionKeys, sortingKeys, limit, true) \ + .planNode() + class TopNRowNumberTest : public OperatorTestBase { protected: - TopNRowNumberTest() { + explicit TopNRowNumberTest(core::TopNRowNumberNode::RankFunction function) + : function_(function) {} + + void SetUp() override { + exec::test::OperatorTestBase::SetUp(); filesystems::registerLocalFileSystem(); } + + protected: + const core::PlanNodePtr& topnNode( + PlanBuilder& planBuilder, + const std::vector& values, + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRowNumber) { + switch (function_) { + case core::TopNRowNumberNode::RankFunction::kRowNumber: + return BUILD_TOPN(topNRowNumber); + case core::TopNRowNumberNode::RankFunction::kRank: + return BUILD_TOPN(topNRank); + case core::TopNRowNumberNode::RankFunction::kDenseRank: + return BUILD_TOPN(topNDenseRank); + } + VELOX_UNREACHABLE(); + } + + const core::PlanNodePtr& topnNodeId( + PlanBuilder& planBuilder, + core::PlanNodeId& planNodeId, + const std::vector& values, + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRowNumber) { + switch (function_) { + case core::TopNRowNumberNode::RankFunction::kRowNumber: + return BUILD_TOPN_PLANNODEID(topNRowNumber); + case core::TopNRowNumberNode::RankFunction::kRank: + return BUILD_TOPN_PLANNODEID(topNRank); + case core::TopNRowNumberNode::RankFunction::kDenseRank: + return BUILD_TOPN_PLANNODEID(topNDenseRank); + } + VELOX_UNREACHABLE(); + } + + const core::PlanNodePtr& topnNodePartialFinal( + PlanBuilder& planBuilder, + core::PlanNodeId& planNodeId, + const std::vector& values, + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit) { + switch (function_) { + case core::TopNRowNumberNode::RankFunction::kRowNumber: + return BUILD_TOPN_PARTIAL_FINAL(topNRowNumber); + case core::TopNRowNumberNode::RankFunction::kRank: + return BUILD_TOPN_PARTIAL_FINAL(topNRank); + case core::TopNRowNumberNode::RankFunction::kDenseRank: + return BUILD_TOPN_PARTIAL_FINAL(topNDenseRank); + } + VELOX_UNREACHABLE(); + } + + const core::TopNRowNumberNode::RankFunction function_; }; -TEST_F(TopNRowNumberTest, basic) { +class MultiTopNRowNumberTest : public TopNRowNumberTest, + public testing::WithParamInterface< + core::TopNRowNumberNode::RankFunction> { + public: + MultiTopNRowNumberTest() : TopNRowNumberTest(GetParam()) {} +}; + +TEST_P(MultiTopNRowNumberTest, basic) { auto data = makeRowVector({ // Partitioning key. makeFlatVector({1, 1, 2, 2, 1, 2, 1}), @@ -48,40 +135,37 @@ TEST_F(TopNRowNumberTest, basic) { auto testLimit = [&](auto limit) { // Emit row numbers. - auto plan = PlanBuilder() - .values({data}) - .topNRowNumber({"c0"}, {"c1"}, limit, true) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = topnNode(planBuilder, {data}, {"c0"}, {"c1"}, limit, true); assertQuery( plan, fmt::format( - "SELECT * FROM (SELECT *, row_number() over (partition by c0 order by c1) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (partition by c0 order by c1) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit)); // Do not emit row numbers. - plan = PlanBuilder() - .values({data}) - .topNRowNumber({"c0"}, {"c1"}, limit, false) - .planNode(); + auto planBuilder2 = PlanBuilder(); + plan = topnNode(planBuilder2, {data}, {"c0"}, {"c1"}, limit, false); assertQuery( plan, fmt::format( - "SELECT c0, c1, c2 FROM (SELECT *, row_number() over (partition by c0 order by c1) as rn FROM tmp) " + "SELECT c0, c1, c2 FROM (SELECT *, {}() over (partition by c0 order by c1) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit)); // No partitioning keys. - plan = PlanBuilder() - .values({data}) - .topNRowNumber({}, {"c1"}, limit, true) - .planNode(); + auto planBuilder3 = PlanBuilder(); + plan = topnNode(planBuilder3, {data}, {}, {"c1"}, limit, true); assertQuery( plan, fmt::format( - "SELECT * FROM (SELECT *, row_number() over (order by c1) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (order by c1) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit)); }; @@ -91,7 +175,7 @@ TEST_F(TopNRowNumberTest, basic) { testLimit(5); } -TEST_F(TopNRowNumberTest, largeOutput) { +TEST_P(MultiTopNRowNumberTest, largeOutput) { // Make 10 vectors. Use different types for partitioning key, sorting key and // data. Use order of columns different from partitioning keys, followed by // sorting keys, followed by data. @@ -117,15 +201,14 @@ TEST_F(TopNRowNumberTest, largeOutput) { auto testLimit = [&](auto limit) { SCOPED_TRACE(fmt::format("Limit: {}", limit)); core::PlanNodeId topNRowNumberId; - auto plan = PlanBuilder() - .values(data) - .topNRowNumber({"p"}, {"s"}, limit, true) - .capturePlanNodeId(topNRowNumberId) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = topnNodeId( + planBuilder, topNRowNumberId, data, {"p"}, {"s"}, limit, true); auto sql = fmt::format( - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit); AssertQueryBuilder(plan, duckDbQueryRunner_) .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") @@ -152,16 +235,15 @@ TEST_F(TopNRowNumberTest, largeOutput) { } // No partitioning keys. - plan = PlanBuilder() - .values(data) - .topNRowNumber({}, {"s"}, limit, true) - .planNode(); + auto planBuilder2 = PlanBuilder(); + plan = topnNode(planBuilder2, data, {}, {"s"}, limit, true); AssertQueryBuilder(plan, duckDbQueryRunner_) .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") .assertResults(fmt::format( - "SELECT * FROM (SELECT *, row_number() over (order by s) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (order by s) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit)); }; @@ -172,7 +254,7 @@ TEST_F(TopNRowNumberTest, largeOutput) { testLimit(2000); } -TEST_F(TopNRowNumberTest, manyPartitions) { +TEST_P(MultiTopNRowNumberTest, manyPartitions) { const vector_size_t size = 10'000; auto data = split( makeRowVector( @@ -201,15 +283,14 @@ TEST_F(TopNRowNumberTest, manyPartitions) { auto testLimit = [&](auto limit, size_t outputBatchBytes = 1024) { SCOPED_TRACE(fmt::format("Limit: {}", limit)); core::PlanNodeId topNRowNumberId; - auto plan = PlanBuilder() - .values(data) - .topNRowNumber({"p"}, {"s"}, limit, true) - .capturePlanNodeId(topNRowNumberId) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = topnNodeId( + planBuilder, topNRowNumberId, data, {"p"}, {"s"}, limit, true); auto sql = fmt::format( - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit); assertQuery(plan, sql); @@ -243,7 +324,73 @@ TEST_F(TopNRowNumberTest, manyPartitions) { testLimit(1, 1); } -TEST_F(TopNRowNumberTest, abandonPartialEarly) { +TEST_P(MultiTopNRowNumberTest, fewPartitions) { + const vector_size_t size = 10'000; + auto data = split( + makeRowVector( + {"d", "s", "p"}, + { + // Data. Make it a constant to avoid ordering issues. + makeConstant((int64_t)123'456, size), + // Sorting key. Ensure enough repetition as we are testing + // rank and dense_rank. + makeFlatVector( + size, + [](auto row) { return (row % 10) * 10; }/*, + [](auto row) { return (row % 50) == 0; }*/), + // Partitioning key. Each partition has 1000 rows. + makeFlatVector( + size, [](auto row) { return row % 5; }/*, nullEvery(7)*/), + }), + 10); + + createDuckDbTable(data); + + auto spillDirectory = exec::test::TempDirectoryPath::create(); + + auto testLimit = [&](auto limit, size_t outputBatchBytes = 1024) { + SCOPED_TRACE(fmt::format("Limit: {}", limit)); + core::PlanNodeId topNRowNumberId; + auto planBuilder = PlanBuilder(); + auto plan = topnNodeId( + planBuilder, topNRowNumberId, data, {"p"}, {"s"}, limit, true); + + auto sql = fmt::format( + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " + " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), + limit); + assertQuery(plan, sql); + + // Spilling. + { + TestScopedSpillInjection scopedSpillInjection(100); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config( + core::QueryConfig::kPreferredOutputBatchBytes, + fmt::format("{}", outputBatchBytes)) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") + .spillDirectory(spillDirectory->getPath()) + .assertResults(sql); + + auto taskStats = exec::toPlanStats(task->taskStats()); + const auto& stats = taskStats.at(topNRowNumberId); + + ASSERT_GT(stats.spilledBytes, 0); + ASSERT_GT(stats.spilledRows, 0); + ASSERT_GT(stats.spilledFiles, 0); + ASSERT_GT(stats.spilledPartitions, 0); + } + }; + + testLimit(10); + testLimit(20); + testLimit(100); +} + +TEST_P(MultiTopNRowNumberTest, abandonPartialEarly) { auto data = makeRowVector( {"p", "s"}, { @@ -255,21 +402,19 @@ TEST_F(TopNRowNumberTest, abandonPartialEarly) { core::PlanNodeId topNRowNumberId; auto runPlan = [&](int32_t minRows) { - auto plan = PlanBuilder() - .values(split(data, 10)) - .topNRowNumber({"p"}, {"s"}, 99, false) - .capturePlanNodeId(topNRowNumberId) - .topNRowNumber({"p"}, {"s"}, 99, true) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = topnNodePartialFinal( + planBuilder, topNRowNumberId, split(data, 10), {"p"}, {"s"}, 99); auto task = AssertQueryBuilder(plan, duckDbQueryRunner_) .config( core::QueryConfig::kAbandonPartialTopNRowNumberMinRows, fmt::format("{}", minRows)) .config(core::QueryConfig::kAbandonPartialTopNRowNumberMinPct, "80") - .assertResults( - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " - "WHERE rn <= 99"); + .assertResults(fmt::format( + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " + "WHERE rn <= 99", + core::TopNRowNumberNode::rankFunctionName(function_))); return exec::toPlanStats(task->taskStats()); }; @@ -291,7 +436,7 @@ TEST_F(TopNRowNumberTest, abandonPartialEarly) { } } -TEST_F(TopNRowNumberTest, planNodeValidation) { +TEST_P(MultiTopNRowNumberTest, planNodeValidation) { auto data = makeRowVector( ROW({"a", "b", "c", "d", "e"}, { @@ -306,10 +451,8 @@ TEST_F(TopNRowNumberTest, planNodeValidation) { auto plan = [&](const std::vector& partitionKeys, const std::vector& sortingKeys, int32_t limit = 10) { - PlanBuilder() - .values({data}) - .topNRowNumber(partitionKeys, sortingKeys, limit, true) - .planNode(); + auto planBuilder = PlanBuilder(); + topnNode(planBuilder, {data}, partitionKeys, sortingKeys, limit, true); }; VELOX_ASSERT_THROW( @@ -334,15 +477,14 @@ TEST_F(TopNRowNumberTest, planNodeValidation) { plan({"a", "b"}, {"c"}, 0), "Limit must be greater than zero"); } -TEST_F(TopNRowNumberTest, maxSpillBytes) { +TEST_P(MultiTopNRowNumberTest, maxSpillBytes) { const auto rowType = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), VARCHAR()}); const auto vectors = createVectors(rowType, 1024, 15 << 20); auto planNodeIdGenerator = std::make_shared(); - auto plan = PlanBuilder(planNodeIdGenerator) - .values(vectors) - .topNRowNumber({"c0"}, {"c1"}, 100, true) - .planNode(); + auto planBuilder = PlanBuilder(planNodeIdGenerator); + auto plan = topnNode(planBuilder, vectors, {"c0"}, {"c1"}, 100, true); + struct { int32_t maxSpilledBytes; bool expectedExceedLimit; @@ -382,7 +524,7 @@ TEST_F(TopNRowNumberTest, maxSpillBytes) { // This test verifies that TopNRowNumber operator reclaim all the memory after // spill. -DEBUG_ONLY_TEST_F(TopNRowNumberTest, memoryUsageCheckAfterReclaim) { +DEBUG_ONLY_TEST_P(MultiTopNRowNumberTest, memoryUsageCheckAfterReclaim) { std::atomic_int inputCount{0}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::addInput", @@ -425,15 +567,14 @@ DEBUG_ONLY_TEST_F(TopNRowNumberTest, memoryUsageCheckAfterReclaim) { auto spillDirectory = exec::test::TempDirectoryPath::create(); core::PlanNodeId topNRowNumberId; - auto plan = PlanBuilder() - .values(data) - .topNRowNumber({"p"}, {"s"}, 1'000, true) - .capturePlanNodeId(topNRowNumberId) - .planNode(); - - const auto sql = - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " - " WHERE rn <= 1000"; + auto planBuilder = PlanBuilder(); + auto plan = + topnNodeId(planBuilder, topNRowNumberId, data, {"p"}, {"s"}, 1'000, true); + + const auto sql = fmt::format( + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " + " WHERE rn <= 1000", + core::TopNRowNumberNode::rankFunctionName(function_)); auto task = AssertQueryBuilder(plan, duckDbQueryRunner_) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") @@ -451,7 +592,7 @@ DEBUG_ONLY_TEST_F(TopNRowNumberTest, memoryUsageCheckAfterReclaim) { // This test verifies that TopNRowNumber operator can be closed twice which // might be triggered by memory pool abort. -DEBUG_ONLY_TEST_F(TopNRowNumberTest, doubleClose) { +DEBUG_ONLY_TEST_P(MultiTopNRowNumberTest, doubleClose) { const std::string errorMessage("doubleClose"); SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::noMoreInput", @@ -485,17 +626,24 @@ DEBUG_ONLY_TEST_F(TopNRowNumberTest, doubleClose) { 10); core::PlanNodeId topNRowNumberId; - auto plan = PlanBuilder() - .values(data) - .topNRowNumber({"p"}, {"s"}, 1'000, true) - .capturePlanNodeId(topNRowNumberId) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = + topnNodeId(planBuilder, topNRowNumberId, data, {"p"}, {"s"}, 1'000, true); - const auto sql = - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " - " WHERE rn <= 1000"; + const auto sql = fmt::format( + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " + " WHERE rn <= 1000", + core::TopNRowNumberNode::rankFunctionName(function_)); VELOX_ASSERT_THROW(assertQuery(plan, sql), errorMessage); } + +VELOX_INSTANTIATE_TEST_SUITE_P( + TopNRowNumberTest, + MultiTopNRowNumberTest, + testing::ValuesIn(std::vector( + {core::TopNRowNumberNode::RankFunction::kRowNumber, + core::TopNRowNumberNode::RankFunction::kRank, + core::TopNRowNumberNode::RankFunction::kDenseRank}))); } // namespace } // namespace facebook::velox::exec diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index e95ea01f101e1..606c2f9d679e5 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -1862,7 +1862,8 @@ PlanBuilder& PlanBuilder::rowNumber( return *this; } -PlanBuilder& PlanBuilder::topNRowNumber( +PlanBuilder& PlanBuilder::topNRowNumberBase( + core::TopNRowNumberNode::RankFunction function, const std::vector& partitionKeys, const std::vector& sortingKeys, int32_t limit, @@ -1876,6 +1877,7 @@ PlanBuilder& PlanBuilder::topNRowNumber( } planNode_ = std::make_shared( nextPlanNodeId(), + function, fields(partitionKeys), sortingFields, sortingOrders, @@ -1885,6 +1887,45 @@ PlanBuilder& PlanBuilder::topNRowNumber( return *this; } +PlanBuilder& PlanBuilder::topNRowNumber( + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRowNumber) { + return topNRowNumberBase( + core::TopNRowNumberNode::RankFunction::kRowNumber, + partitionKeys, + sortingKeys, + limit, + generateRowNumber); +} + +PlanBuilder& PlanBuilder::topNRank( + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRank) { + return topNRowNumberBase( + core::TopNRowNumberNode::RankFunction::kRank, + partitionKeys, + sortingKeys, + limit, + generateRank); +} + +PlanBuilder& PlanBuilder::topNDenseRank( + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRank) { + return topNRowNumberBase( + core::TopNRowNumberNode::RankFunction::kDenseRank, + partitionKeys, + sortingKeys, + limit, + generateRank); +} + PlanBuilder& PlanBuilder::markDistinct( std::string markerKey, const std::vector& distinctKeys) { diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index f380ef4f03c7d..054a74f967e83 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -1003,6 +1003,22 @@ class PlanBuilder { int32_t limit, bool generateRowNumber); + /// Add a TopNRowNumberNode to compute single rank window function with + /// a limit applied to sorted partitions. + PlanBuilder& topNRank( + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRank); + + /// Add a TopNRowNumberNode to compute single dense_rank window function with + /// a limit applied to sorted partitions. + PlanBuilder& topNDenseRank( + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRank); + /// Add a MarkDistinctNode to compute aggregate mask channel /// @param markerKey Name of output mask channel /// @param distinctKeys List of columns to be marked distinct. @@ -1125,6 +1141,13 @@ class PlanBuilder { const std::vector& windowFunctions, bool inputSorted); + PlanBuilder& topNRowNumberBase( + core::TopNRowNumberNode::RankFunction function, + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRowNumber); + protected: core::PlanNodePtr planNode_; parse::ParseOptions options_;