From bf520c042793f38a5727bc29d50b1bece3e8143f Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Mon, 20 May 2024 21:14:50 +0800 Subject: [PATCH] Resolve comments --- velox/exec/Window.cpp | 22 ++++++++++++---------- velox/exec/WindowBuild.h | 2 +- velox/exec/WindowFunction.cpp | 2 +- velox/exec/WindowFunction.h | 14 +++++++++----- velox/exec/WindowPartition.cpp | 1 + velox/exec/WindowPartition.h | 13 +++++++++++++ velox/functions/lib/window/Rank.cpp | 2 +- 7 files changed, 38 insertions(+), 18 deletions(-) diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index 73d982f17c86..d60f0f37ef11 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -635,15 +635,17 @@ vector_size_t Window::callApplyLoop( resultIndex += rowsForCurrentPartition; numOutputRowsLeft -= rowsForCurrentPartition; - if (currentPartition_->isComplete()) { - callResetPartition(); - - if (!currentPartition_) { - // The WindowBuild doesn't have any more partitions to process right - // now. So break until the next getOutput call. - break; - } - } else { + if (!currentPartition_->isComplete()) { + // Still more data for the current partition would need to be processed. + // So resume on the next getOutput call. + break; + } + + callResetPartition(); + + if (!currentPartition_) { + // The WindowBuild doesn't have any more partitions to process right + // now. So break until the next getOutput call. break; } @@ -693,7 +695,7 @@ RowVectorPtr Window::getOutput() { // Compute the output values of window functions. auto numResultRows = callApplyLoop(numOutputRows, result); - if (currentPartition_) { + if (currentPartition_ && currentPartition_->isPartial()) { currentPartition_->clearOutputRows(numResultRows); } diff --git a/velox/exec/WindowBuild.h b/velox/exec/WindowBuild.h index b247e67074d4..db8f828ee774 100644 --- a/velox/exec/WindowBuild.h +++ b/velox/exec/WindowBuild.h @@ -118,7 +118,7 @@ class WindowBuild { // Number of input rows. vector_size_t numRows_ = 0; - // Number of rows that be fit into an output block. + // The maximum number of rows that can fit into an output block. vector_size_t numRowsPerOutput_; }; diff --git a/velox/exec/WindowFunction.cpp b/velox/exec/WindowFunction.cpp index 231d14cd8760..b3419c3ba380 100644 --- a/velox/exec/WindowFunction.cpp +++ b/velox/exec/WindowFunction.cpp @@ -45,7 +45,7 @@ bool registerWindowFunction( WindowFunctionMetadata metadata) { auto sanitizedName = sanitizeName(name); windowFunctions()[sanitizedName] = { - std::move(signatures), std::move(factory), metadata}; + std::move(signatures), std::move(factory), std::move(metadata)}; return true; } diff --git a/velox/exec/WindowFunction.h b/velox/exec/WindowFunction.h index a64eb4a9f5c8..fb6727d7903e 100644 --- a/velox/exec/WindowFunction.h +++ b/velox/exec/WindowFunction.h @@ -31,16 +31,19 @@ struct WindowFunctionArg { std::optional index; }; -/// The scope for calculating the window function in a streaming -/// manner. kRows indicates that the calculation begins as soon as rows are -/// available within a single partition, without waiting for all data in the -/// partition to be ready. kPartition indicates that the calculation begins only -/// when all rows in a partition are ready. +/// The scope for calculating the window function. kRows indicates that the +/// calculation begins as soon as rows are available within a single partition, +/// without waiting for all data in the partition to be ready. kPartition +/// indicates that the calculation begins only when all rows in a partition are +/// ready. enum class Scope { kPartition, kRows, }; +/// A sliding window frame is a dynamic set of rows that moves relative to the +/// current row being processed, allowing for calculations over a range of rows +/// that can change as the query progresses. struct WindowFunctionMetadata { Scope scope; bool supportsSlidingFrame; @@ -178,6 +181,7 @@ struct WindowFunctionEntry { WindowFunctionMetadata metadata; }; +/// Returns std::nullopt if the function doesn't exist in the WindowFunctionMap. std::optional getWindowFunctionMetadata( const std::string& name); diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index 1efd9206cfef..065339e7ee2b 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -45,6 +45,7 @@ WindowPartition::WindowPartition( rows_.clear(); partition_ = folly::Range(rows_.data(), rows_.size()); complete_ = false; + partial_ = true; } void WindowPartition::addRows(const std::vector& rows) { diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index 794f1417680f..38a7efc269fc 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -20,6 +20,11 @@ /// Simple WindowPartition that builds over the RowContainer used for storing /// the input rows in the Window Operator. This works completely in-memory. +/// WindowPartition supports partial window partitioning to facilitate +/// RowsStreamingWindowBuild, which means that subsequent calculations within +/// the WindowPartition do not need to wait until the current partition is fully +/// ready before commencing. Calculations can begin as soon as a portion of the +/// rows are ready. /// TODO: This implementation will be revised for Spill to disk semantics. namespace facebook::velox::exec { @@ -68,6 +73,10 @@ class WindowPartition { return complete_; } + bool isPartial() const { + return partial_; + } + void setComplete() { complete_ = true; } @@ -221,6 +230,10 @@ class WindowPartition { // The partition offset of the first row in rows_. vector_size_t startRow_ = 0; + // Indicates that the partial window partitioning process has been completed. bool complete_ = false; + + // Indicates partial window partition. + bool partial_ = false; }; } // namespace facebook::velox::exec diff --git a/velox/functions/lib/window/Rank.cpp b/velox/functions/lib/window/Rank.cpp index 6e59324a764b..a66a186d41f8 100644 --- a/velox/functions/lib/window/Rank.cpp +++ b/velox/functions/lib/window/Rank.cpp @@ -109,7 +109,7 @@ void registerRankInternal( return std::make_unique>(resultType); }; - if constexpr (TRank == RankType::kRank) { + if constexpr (TRank == RankType::kRank || TRank == RankType::kDenseRank) { exec::registerWindowFunction( name, std::move(signatures),