diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index b74f0b821173f..f05b66e0c735d 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -769,10 +769,21 @@ class RowContainer { return (row[nullByte] & nullMask) != 0; } - static inline bool isNullAt(const char* row, RowColumn rowColumn) { + static inline bool isNullAt(const char* row, const RowColumn& rowColumn) { return (row[rowColumn.nullByte()] & rowColumn.nullMask()) != 0; } + /// Returns true if the value at rowColumn in row is NaN. + template < + typename T, + std::enable_if_t, int32_t> = 0> + static inline bool isNanAt(const char* row, const RowColumn& rowColumn) { + if (isNullAt(row, rowColumn.nullByte(), rowColumn.nullMask())) { + return false; + } + return std::isnan(valueAt(row, rowColumn.offset())); + } + /// Creates a container to store a partition number for each row in this row /// container. This is used by parallel join build which is responsible for /// filling this. This function also marks this row container as immutable diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index a414f0e20f78d..40ea5752709ff 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -409,7 +409,8 @@ void Window::updateFrameBounds( const vector_size_t numRows, const vector_size_t* rawPeerStarts, const vector_size_t* rawPeerEnds, - vector_size_t* rawFrameBounds) { + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) { const auto windowType = windowFrame.type; const auto boundType = isStartBound ? windowFrame.startType : windowFrame.endType; @@ -447,7 +448,8 @@ void Window::updateFrameBounds( startRow, numRows, rawPeerBuffer, - rawFrameBounds); + rawFrameBounds, + validFrames); } break; } @@ -463,7 +465,8 @@ void Window::updateFrameBounds( startRow, numRows, rawPeerBuffer, - rawFrameBounds); + rawFrameBounds, + validFrames); } break; } @@ -486,6 +489,9 @@ void computeValidFrames( vector_size_t* rawFrameEnds, SelectivityVector& validFrames) { for (auto i = 0; i < numRows; ++i) { + if (!validFrames.isValid(i)) { + continue; + } const vector_size_t frameStart = rawFrameStarts[i]; const vector_size_t frameEnd = rawFrameEnds[i]; // All valid frames require frameStart <= frameEnd to define the frame rows. @@ -545,7 +551,8 @@ void Window::computePeerAndFrameBuffers( numRows, rawPeerStarts, rawPeerEnds, - rawFrameStarts[i]); + rawFrameStarts[i], + validFrames_[i]); updateFrameBounds( windowFrame, false, @@ -553,7 +560,8 @@ void Window::computePeerAndFrameBuffers( numRows, rawPeerStarts, rawPeerEnds, - rawFrameEnds[i]); + rawFrameEnds[i], + validFrames_[i]); if (windowFrames_[i].start || windowFrames_[i].end) { // k preceding and k following bounds can be problematic. They can go over // the partition limits or result in empty frames. Fix the frame diff --git a/velox/exec/Window.h b/velox/exec/Window.h index eb3389369a230..8cbd3b1ee85c8 100644 --- a/velox/exec/Window.h +++ b/velox/exec/Window.h @@ -149,6 +149,9 @@ class Window : public Operator { vector_size_t numRows, vector_size_t* rawFrameBounds); + // Populate frame bounds in the current partition into rawFrameBounds. + // Unselect rows from validFrames where the frame bounds are NaN that are + // invalid. void updateFrameBounds( const WindowFrame& windowFrame, const bool isStartBound, @@ -156,7 +159,8 @@ class Window : public Operator { const vector_size_t numRows, const vector_size_t* rawPeerStarts, const vector_size_t* rawPeerEnds, - vector_size_t* rawFrameBounds); + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames); const vector_size_t numInputColumns_; diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index 8ab0b451dea38..c85bdd8f4555f 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -349,6 +349,7 @@ vector_size_t WindowPartition::linearSearchFrameValue( return end == numRows() ? numRows() + 1 : -1; } +template void WindowPartition::updateKRangeFrameBounds( bool firstMatch, bool isPreceding, @@ -357,21 +358,32 @@ void WindowPartition::updateKRangeFrameBounds( vector_size_t numRows, column_index_t frameColumn, const vector_size_t* rawPeerBounds, - vector_size_t* rawFrameBounds) const { + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) const { column_index_t orderByColumn = sortKeyInfo_[0].first; column_index_t mappedFrameColumn = inputMapping_[frameColumn]; vector_size_t start = 0; vector_size_t end; // frameColumn is a column index into the original input rows, while - // orderByColumn is a column index into rows in data_ after the columns are - // reordered as per inputMapping_. + // orderByColumn and mappedFrameColumn are column indices into rows in data_ + // after the columns are reordered as per inputMapping_. VELOX_DEBUG_ONLY RowColumn frameRowColumn = columns_[frameColumn]; RowColumn orderByRowColumn = data_->columnAt(orderByColumn); + RowColumn mappedFrameRowColumn = data_->columnAt(mappedFrameColumn); for (auto i = 0; i < numRows; i++) { auto currentRow = startRow + i; auto* partitionRow = partition_[currentRow]; + // Mark the frame invalid if the frame bound is NaN. + if constexpr (std::is_floating_point_v) { + if (data_->isNanAt(partitionRow, mappedFrameRowColumn) && + !data_->isNanAt(partitionRow, orderByRowColumn)) { + validFrames.setValid(currentRow, false); + continue; + } + } + // The user is expected to set the frame column equal to NULL when the // ORDER BY value is NULL and not in any other case. Validate this // assumption. @@ -423,20 +435,48 @@ void WindowPartition::computeKRangeFrameBounds( vector_size_t startRow, vector_size_t numRows, const vector_size_t* rawPeerBuffer, - vector_size_t* rawFrameBounds) const { + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) const { CompareFlags flags; flags.ascending = sortKeyInfo_[0].second.isAscending(); flags.nullsFirst = sortKeyInfo_[0].second.isNullsFirst(); + // Start bounds require first match. End bounds require last match. - updateKRangeFrameBounds( - isStartBound, - isPreceding, - flags, - startRow, - numRows, - frameColumn, - rawPeerBuffer, - rawFrameBounds); + const auto frameType = data_->columnTypes()[inputMapping_[frameColumn]]; + if (frameType->isReal()) { + updateKRangeFrameBounds( + isStartBound, + isPreceding, + flags, + startRow, + numRows, + frameColumn, + rawPeerBuffer, + rawFrameBounds, + validFrames); + } else if (frameType->isDouble()) { + updateKRangeFrameBounds( + isStartBound, + isPreceding, + flags, + startRow, + numRows, + frameColumn, + rawPeerBuffer, + rawFrameBounds, + validFrames); + } else { + updateKRangeFrameBounds( + isStartBound, + isPreceding, + flags, + startRow, + numRows, + frameColumn, + rawPeerBuffer, + rawFrameBounds, + validFrames); + } } } // namespace facebook::velox::exec diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index 7948611a98295..42ad6fe234f79 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -158,6 +158,9 @@ class WindowPartition { /// @param numRows number of rows to compute buffer for. /// @param rawPeerStarts buffer of peer row values for each row. If the frame /// column is null, then its peer row value is the frame boundary. + /// @param validFrames SelectivityVector to keep track of valid frames. + /// This function unselect rows in validFrames where the frame bounds are NaN + /// that are invalid. void computeKRangeFrameBounds( bool isStartBound, bool isPreceding, @@ -165,7 +168,8 @@ class WindowPartition { vector_size_t startRow, vector_size_t numRows, const vector_size_t* rawPeerStarts, - vector_size_t* rawFrameBounds) const; + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) const; private: WindowPartition( @@ -216,7 +220,13 @@ class WindowPartition { column_index_t frameColumn, const CompareFlags& flags) const; - // Iterates over 'numBlockRows' and searches frame value for each row. + /// Iterates over 'numBlockRows' and searches frame value for each row. + /// @tparam T The C++ type of the order-by and frame columns. When T is float + /// or double, this method checks for rows with NaN frame bound(s) but non-NaN + /// order-by value. These frames are invalid and we unselect these rows from + /// 'validFrames'. If the order-by and frame columns are not of floating-point + /// types, T is not used and should be set to a non-floating-point C++ type. + template void updateKRangeFrameBounds( bool firstMatch, bool isPreceding, @@ -225,7 +235,8 @@ class WindowPartition { vector_size_t numRows, column_index_t frameColumn, const vector_size_t* rawPeerBounds, - vector_size_t* rawFrameBounds) const; + vector_size_t* rawFrameBounds, + SelectivityVector& validFrames) const; // Indicates if this is a partial partition for RowStreamWindowBuild // processing. diff --git a/velox/exec/tests/RowContainerTest.cpp b/velox/exec/tests/RowContainerTest.cpp index eb510c3c9ba2f..b1ff6dc6a412b 100644 --- a/velox/exec/tests/RowContainerTest.cpp +++ b/velox/exec/tests/RowContainerTest.cpp @@ -2290,3 +2290,53 @@ TEST_F(RowContainerTest, customComparisonRow) { expectedOrder, BIGINT_TYPE_WITH_CUSTOM_COMPARISON())}); }); } + +TEST_F(RowContainerTest, isNanAt) { + const auto kNan = std::numeric_limits::quiet_NaN(); + const auto kNanF = std::numeric_limits::quiet_NaN(); + auto rowVector = makeRowVector({ + makeFlatVector({kNanF, 0.2, 0.3, 0.4}), + makeFlatVector({0.1, kNan, 0.3, 0.4}), + makeFlatVector({0.1, 0.2, kNanF, 0.4}), + makeFlatVector({0.1, 0.2, 0.3, kNan}), + }); + const auto kNumRows = rowVector->size(); + + auto rowContainer = + makeRowContainer({REAL(), DOUBLE()}, {REAL(), DOUBLE()}, false); + std::vector rows; + rows.reserve(kNumRows); + + SelectivityVector allRows(kNumRows); + for (size_t i = 0; i < kNumRows; i++) { + auto row = rowContainer->newRow(); + rows.push_back(row); + } + for (int j = 0; j < rowContainer->columnTypes().size(); ++j) { + DecodedVector decoded(*rowVector->childAt(j), allRows); + rowContainer->store(decoded, folly::Range(rows.data(), kNumRows), j); + } + + for (int i = 0; i < kNumRows; ++i) { + const auto* row = rows[i]; + for (int j = 0; j < 4; ++j) { + if (i % 2 == 0) { + if (i == j) { + EXPECT_TRUE( + rowContainer->isNanAt(row, rowContainer->columnAt(j))); + } else { + EXPECT_FALSE( + rowContainer->isNanAt(row, rowContainer->columnAt(j))); + } + } else { + if (i == j) { + EXPECT_TRUE( + rowContainer->isNanAt(row, rowContainer->columnAt(j))); + } else { + EXPECT_FALSE( + rowContainer->isNanAt(row, rowContainer->columnAt(j))); + } + } + } + } +} diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index 23981e743c14e..a725d359207d8 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -692,5 +692,65 @@ DEBUG_ONLY_TEST_F(WindowTest, reserveMemorySort) { } } +TEST_F(WindowTest, NaNFrameBound) { + const auto kNan = std::numeric_limits::quiet_NaN(); + auto data = makeRowVector( + {"c0", "s0", "off0", "off1"}, + { + makeFlatVector({1, 2, 3, 4}), + makeFlatVector({1.0, 2.0, 3.0, kNan}), + makeFlatVector({0.1, 2.0, 1.9, kNan}), + makeFlatVector({kNan, 2.0, kNan, kNan}), + }); + + const auto makeFrames = [](const std::string& call) { + std::vector frames; + + std::vector orders{"asc", "desc"}; + std::vector bounds{"preceding", "following"}; + for (const std::string& order : orders) { + for (const std::string& startBound : bounds) { + for (const std::string& endBound : bounds) { + // Frames starting from following and ending at preceding are not + // allowed. + if (startBound == "following" && endBound == "preceding") { + continue; + } + frames.push_back(fmt::format( + "{} over (order by s0 {} range between off0 {} and off1 {})", + call, + order, + startBound, + endBound)); + frames.push_back(fmt::format( + "{} over (order by s0 {} range between off1 {} and off0 {})", + call, + order, + startBound, + endBound)); + } + } + } + return frames; + }; + + auto expected = makeRowVector( + {makeNullableFlatVector({std::nullopt, 2, std::nullopt, 4})}); + for (const auto& frame : makeFrames("sum(c0)")) { + auto plan = + PlanBuilder().values({data}).window({frame}).project({"w0"}).planNode(); + AssertQueryBuilder(plan).assertResults(expected); + } + + // rank() should not be affected by the frames, so added this test to ensure + // rank() produces correct results even if the frame bounds contain NaN. + expected = makeRowVector({makeFlatVector({1, 2, 3, 4})}); + for (const auto& frame : makeFrames("rank()")) { + auto plan = + PlanBuilder().values({data}).window({frame}).project({"w0"}).planNode(); + AssertQueryBuilder(plan).assertResults(expected); + } +} + } // namespace } // namespace facebook::velox::exec