From b249ee470092dfbd7db2e3683833fcf9472721b8 Mon Sep 17 00:00:00 2001 From: Wei He Date: Thu, 17 Oct 2024 14:08:04 -0700 Subject: [PATCH] Fix Window operator with NaN as the frame bound (#11293) Summary: NaN could appear as the frame bound of k-range frames in window operations. When NaN appear in either of the frame bounds, the window operation should return NULL at this row. This diff fixes https://github.com/facebookincubator/velox/issues/11213. Differential Revision: D64510519 --- velox/exec/RowContainer.h | 12 +++++++ velox/exec/Window.cpp | 18 ++++++++--- velox/exec/Window.h | 6 +++- velox/exec/WindowPartition.cpp | 23 +++++++++++-- velox/exec/WindowPartition.h | 12 +++++-- velox/exec/tests/WindowTest.cpp | 57 +++++++++++++++++++++++++++++++++ 6 files changed, 116 insertions(+), 12 deletions(-) diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 7dc2303d582ec..5fffc44cf4b12 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -773,6 +773,18 @@ class RowContainer { return (row[rowColumn.nullByte()] & rowColumn.nullMask()) != 0; } + /// Returns true if the column at columnIndex in row is NaN. + template < + typename T, + std::enable_if_t, int32_t> = 0> + bool isNanAt(const char* row, int32_t columnIndex) { + auto column = columnAt(columnIndex); + if (isNullAt(row, column.nullByte(), column.nullMask())) { + return false; + } + return std::isnan(valueAt(row, column.offset())); + } + /// Creates a container to store a partition number for each row in this row /// container. This is used by parallel join build which is responsible for /// filling this. This function also marks this row container as immutable diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index a414f0e20f78d..40ea5752709ff 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -409,7 +409,8 @@ void Window::updateFrameBounds( const vector_size_t numRows, const vector_size_t* rawPeerStarts, const vector_size_t* rawPeerEnds, - vector_size_t* rawFrameBounds) { + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) { const auto windowType = windowFrame.type; const auto boundType = isStartBound ? windowFrame.startType : windowFrame.endType; @@ -447,7 +448,8 @@ void Window::updateFrameBounds( startRow, numRows, rawPeerBuffer, - rawFrameBounds); + rawFrameBounds, + validFrames); } break; } @@ -463,7 +465,8 @@ void Window::updateFrameBounds( startRow, numRows, rawPeerBuffer, - rawFrameBounds); + rawFrameBounds, + validFrames); } break; } @@ -486,6 +489,9 @@ void computeValidFrames( vector_size_t* rawFrameEnds, SelectivityVector& validFrames) { for (auto i = 0; i < numRows; ++i) { + if (!validFrames.isValid(i)) { + continue; + } const vector_size_t frameStart = rawFrameStarts[i]; const vector_size_t frameEnd = rawFrameEnds[i]; // All valid frames require frameStart <= frameEnd to define the frame rows. @@ -545,7 +551,8 @@ void Window::computePeerAndFrameBuffers( numRows, rawPeerStarts, rawPeerEnds, - rawFrameStarts[i]); + rawFrameStarts[i], + validFrames_[i]); updateFrameBounds( windowFrame, false, @@ -553,7 +560,8 @@ void Window::computePeerAndFrameBuffers( numRows, rawPeerStarts, rawPeerEnds, - rawFrameEnds[i]); + rawFrameEnds[i], + validFrames_[i]); if (windowFrames_[i].start || windowFrames_[i].end) { // k preceding and k following bounds can be problematic. They can go over // the partition limits or result in empty frames. Fix the frame diff --git a/velox/exec/Window.h b/velox/exec/Window.h index eb3389369a230..8cbd3b1ee85c8 100644 --- a/velox/exec/Window.h +++ b/velox/exec/Window.h @@ -149,6 +149,9 @@ class Window : public Operator { vector_size_t numRows, vector_size_t* rawFrameBounds); + // Populate frame bounds in the current partition into rawFrameBounds. + // Unselect rows from validFrames where the frame bounds are NaN that are + // invalid. void updateFrameBounds( const WindowFrame& windowFrame, const bool isStartBound, @@ -156,7 +159,8 @@ class Window : public Operator { const vector_size_t numRows, const vector_size_t* rawPeerStarts, const vector_size_t* rawPeerEnds, - vector_size_t* rawFrameBounds); + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames); const vector_size_t numInputColumns_; diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index 6995493b4c019..02b9a169bf417 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -357,7 +357,8 @@ void WindowPartition::updateKRangeFrameBounds( vector_size_t numRows, column_index_t frameColumn, const vector_size_t* rawPeerBounds, - vector_size_t* rawFrameBounds) const { + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) const { column_index_t orderByColumn = sortKeyInfo_[0].first; column_index_t mappedFrameColumn = inputMapping_[frameColumn]; @@ -383,6 +384,20 @@ void WindowPartition::updateKRangeFrameBounds( orderByRowColumn.nullByte(), orderByRowColumn.nullMask())); + // Mark the frame invalid if the frame bound is NaN. + auto frameType = data_->columnTypes()[mappedFrameColumn]; + if (frameType->isReal()) { + if (data_->isNanAt(partitionRow, mappedFrameColumn)) { + validFrames.setValid(currentRow, false); + continue; + } + } else if (frameType->isDouble()) { + if (data_->isNanAt(partitionRow, mappedFrameColumn)) { + validFrames.setValid(currentRow, false); + continue; + } + } + // If the frame is NULL or 0 preceding or 0 following then the current row // has same values for order by and frame column. In that case // the bound matches the peer row for this row. @@ -423,7 +438,8 @@ void WindowPartition::computeKRangeFrameBounds( vector_size_t startRow, vector_size_t numRows, const vector_size_t* rawPeerBuffer, - vector_size_t* rawFrameBounds) const { + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) const { CompareFlags flags; flags.ascending = sortKeyInfo_[0].second.isAscending(); flags.nullsFirst = sortKeyInfo_[0].second.isNullsFirst(); @@ -436,7 +452,8 @@ void WindowPartition::computeKRangeFrameBounds( numRows, frameColumn, rawPeerBuffer, - rawFrameBounds); + rawFrameBounds, + validFrames); } } // namespace facebook::velox::exec diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index 7948611a98295..a15ff859d11dd 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -158,6 +158,9 @@ class WindowPartition { /// @param numRows number of rows to compute buffer for. /// @param rawPeerStarts buffer of peer row values for each row. If the frame /// column is null, then its peer row value is the frame boundary. + /// @param rawFrameBounds SelectivityVector to keep track of valid frames. + /// This function unselect rows in validFrames where the frame bounds are NaN + /// that are invalid. void computeKRangeFrameBounds( bool isStartBound, bool isPreceding, @@ -165,7 +168,8 @@ class WindowPartition { vector_size_t startRow, vector_size_t numRows, const vector_size_t* rawPeerStarts, - vector_size_t* rawFrameBounds) const; + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) const; private: WindowPartition( @@ -216,7 +220,8 @@ class WindowPartition { column_index_t frameColumn, const CompareFlags& flags) const; - // Iterates over 'numBlockRows' and searches frame value for each row. + // Iterates over 'numBlockRows' and searches frame value for each row. If the + // frame bound is NaN that is invalid, unselect the row from 'validFrames'. void updateKRangeFrameBounds( bool firstMatch, bool isPreceding, @@ -225,7 +230,8 @@ class WindowPartition { vector_size_t numRows, column_index_t frameColumn, const vector_size_t* rawPeerBounds, - vector_size_t* rawFrameBounds) const; + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) const; // Indicates if this is a partial partition for RowStreamWindowBuild // processing. diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index d9b315ee5bf00..a0f4828fe5404 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -567,5 +567,62 @@ DEBUG_ONLY_TEST_F(WindowTest, frameColumnNullCheck) { AssertQueryBuilder(makePlan(inputNoThrow)).copyResults(pool())); } +TEST_F(WindowTest, NaNFrameBound) { + const auto kNan = std::numeric_limits::quiet_NaN(); + auto data = makeRowVector( + {"c0", "s0", "off0", "off1"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({1.0, 1.0, 1.0}), + makeFlatVector({1.0, 1.0, 1.0}), + makeFlatVector({kNan, kNan, kNan}), + }); + + const auto makeFrames = [](const std::string& call) { + std::vector frames; + + std::vector orders{"asc", "desc"}; + std::vector bounds{"preceding", "following"}; + for (const std::string& order : orders) { + for (const std::string& startBound : bounds) { + for (const std::string& endBound : bounds) { + // Frames starting from following and ending at preceding are not + // allowed. + if (startBound == "following" && endBound == "preceding") { + continue; + } + frames.push_back(fmt::format( + "{} over (order by s0 {} range between off0 {} and off1 {})", + call, + order, + startBound, + endBound)); + frames.push_back(fmt::format( + "{} over (order by s0 {} range between off1 {} and off0 {})", + call, + order, + startBound, + endBound)); + } + } + } + return frames; + }; + + auto expected = makeRowVector({makeNullConstant(TypeKind::BIGINT, 3)}); + for (const auto& frame : makeFrames("sum(c0)")) { + auto plan = + PlanBuilder().values({data}).window({frame}).project({"w0"}).planNode(); + AssertQueryBuilder(plan).assertResults(expected); + } + + expected = makeRowVector({makeFlatVector({1, 1, 1})}); + for (const auto& frame : makeFrames("rank()")) { + auto plan = + PlanBuilder().values({data}).window({frame}).project({"w0"}).planNode(); + AssertQueryBuilder(plan).assertResults(expected); + } +} + } // namespace } // namespace facebook::velox::exec