Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored and binwei committed May 25, 2024
1 parent ad65eb5 commit bf520c0
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 18 deletions.
22 changes: 12 additions & 10 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,15 +635,17 @@ vector_size_t Window::callApplyLoop(
resultIndex += rowsForCurrentPartition;
numOutputRowsLeft -= rowsForCurrentPartition;

if (currentPartition_->isComplete()) {
callResetPartition();

if (!currentPartition_) {
// The WindowBuild doesn't have any more partitions to process right
// now. So break until the next getOutput call.
break;
}
} else {
if (!currentPartition_->isComplete()) {
// Still more data for the current partition would need to be processed.
// So resume on the next getOutput call.
break;
}

callResetPartition();

if (!currentPartition_) {
// The WindowBuild doesn't have any more partitions to process right
// now. So break until the next getOutput call.
break;
}

Expand Down Expand Up @@ -693,7 +695,7 @@ RowVectorPtr Window::getOutput() {

// Compute the output values of window functions.
auto numResultRows = callApplyLoop(numOutputRows, result);
if (currentPartition_) {
if (currentPartition_ && currentPartition_->isPartial()) {
currentPartition_->clearOutputRows(numResultRows);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/WindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class WindowBuild {
// Number of input rows.
vector_size_t numRows_ = 0;

// Number of rows that be fit into an output block.
// The maximum number of rows that can fit into an output block.
vector_size_t numRowsPerOutput_;
};

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/WindowFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ bool registerWindowFunction(
WindowFunctionMetadata metadata) {
auto sanitizedName = sanitizeName(name);
windowFunctions()[sanitizedName] = {
std::move(signatures), std::move(factory), metadata};
std::move(signatures), std::move(factory), std::move(metadata)};
return true;
}

Expand Down
14 changes: 9 additions & 5 deletions velox/exec/WindowFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@ struct WindowFunctionArg {
std::optional<const column_index_t> index;
};

/// The scope for calculating the window function in a streaming
/// manner. kRows indicates that the calculation begins as soon as rows are
/// available within a single partition, without waiting for all data in the
/// partition to be ready. kPartition indicates that the calculation begins only
/// when all rows in a partition are ready.
/// The scope for calculating the window function. kRows indicates that the
/// calculation begins as soon as rows are available within a single partition,
/// without waiting for all data in the partition to be ready. kPartition
/// indicates that the calculation begins only when all rows in a partition are
/// ready.
enum class Scope {
kPartition,
kRows,
};

/// A sliding window frame is a dynamic set of rows that moves relative to the
/// current row being processed, allowing for calculations over a range of rows
/// that can change as the query progresses.
struct WindowFunctionMetadata {
Scope scope;
bool supportsSlidingFrame;
Expand Down Expand Up @@ -178,6 +181,7 @@ struct WindowFunctionEntry {
WindowFunctionMetadata metadata;
};

/// Returns std::nullopt if the function doesn't exist in the WindowFunctionMap.
std::optional<WindowFunctionMetadata> getWindowFunctionMetadata(
const std::string& name);

Expand Down
1 change: 1 addition & 0 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ WindowPartition::WindowPartition(
rows_.clear();
partition_ = folly::Range(rows_.data(), rows_.size());
complete_ = false;
partial_ = true;
}

void WindowPartition::addRows(const std::vector<char*>& rows) {
Expand Down
13 changes: 13 additions & 0 deletions velox/exec/WindowPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

/// Simple WindowPartition that builds over the RowContainer used for storing
/// the input rows in the Window Operator. This works completely in-memory.
/// WindowPartition supports partial window partitioning to facilitate
/// RowsStreamingWindowBuild, which means that subsequent calculations within
/// the WindowPartition do not need to wait until the current partition is fully
/// ready before commencing. Calculations can begin as soon as a portion of the
/// rows are ready.
/// TODO: This implementation will be revised for Spill to disk semantics.

namespace facebook::velox::exec {
Expand Down Expand Up @@ -68,6 +73,10 @@ class WindowPartition {
return complete_;
}

bool isPartial() const {
return partial_;
}

void setComplete() {
complete_ = true;
}
Expand Down Expand Up @@ -221,6 +230,10 @@ class WindowPartition {
// The partition offset of the first row in rows_.
vector_size_t startRow_ = 0;

// Indicates that the partial window partitioning process has been completed.
bool complete_ = false;

// Indicates partial window partition.
bool partial_ = false;
};
} // namespace facebook::velox::exec
2 changes: 1 addition & 1 deletion velox/functions/lib/window/Rank.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void registerRankInternal(
return std::make_unique<RankFunction<TRank, TResult>>(resultType);
};

if constexpr (TRank == RankType::kRank) {
if constexpr (TRank == RankType::kRank || TRank == RankType::kDenseRank) {
exec::registerWindowFunction(
name,
std::move(signatures),
Expand Down

0 comments on commit bf520c0

Please sign in to comment.