Skip to content

Commit

Permalink
Fix Window operator with NaN as the frame bound (#11293)
Browse files Browse the repository at this point in the history
Summary:

NaN could appear as the frame bound of k-range frames in window 
operations. When NaN appear in either of the frame bounds, the window 
operation should return NULL at this row.

This diff fixes #11213.

Reviewed By: xiaoxmeng

Differential Revision: D64510519
  • Loading branch information
kagamiori authored and facebook-github-bot committed Nov 12, 2024
1 parent 7437093 commit 84c41ed
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 23 deletions.
13 changes: 12 additions & 1 deletion velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,21 @@ class RowContainer {
return (row[nullByte] & nullMask) != 0;
}

static inline bool isNullAt(const char* row, RowColumn rowColumn) {
static inline bool isNullAt(const char* row, const RowColumn& rowColumn) {
return (row[rowColumn.nullByte()] & rowColumn.nullMask()) != 0;
}

/// Returns true if the value at rowColumn in row is NaN.
template <
typename T,
std::enable_if_t<std::is_floating_point_v<T>, int32_t> = 0>
static inline bool isNanAt(const char* row, const RowColumn& rowColumn) {
if (isNullAt(row, rowColumn.nullByte(), rowColumn.nullMask())) {
return false;
}
return std::isnan(valueAt<T>(row, rowColumn.offset()));
}

/// Creates a container to store a partition number for each row in this row
/// container. This is used by parallel join build which is responsible for
/// filling this. This function also marks this row container as immutable
Expand Down
18 changes: 13 additions & 5 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ void Window::updateFrameBounds(
const vector_size_t numRows,
const vector_size_t* rawPeerStarts,
const vector_size_t* rawPeerEnds,
vector_size_t* rawFrameBounds) {
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) {
const auto windowType = windowFrame.type;
const auto boundType =
isStartBound ? windowFrame.startType : windowFrame.endType;
Expand Down Expand Up @@ -447,7 +448,8 @@ void Window::updateFrameBounds(
startRow,
numRows,
rawPeerBuffer,
rawFrameBounds);
rawFrameBounds,
validFrames);
}
break;
}
Expand All @@ -463,7 +465,8 @@ void Window::updateFrameBounds(
startRow,
numRows,
rawPeerBuffer,
rawFrameBounds);
rawFrameBounds,
validFrames);
}
break;
}
Expand All @@ -486,6 +489,9 @@ void computeValidFrames(
vector_size_t* rawFrameEnds,
SelectivityVector& validFrames) {
for (auto i = 0; i < numRows; ++i) {
if (!validFrames.isValid(i)) {
continue;
}
const vector_size_t frameStart = rawFrameStarts[i];
const vector_size_t frameEnd = rawFrameEnds[i];
// All valid frames require frameStart <= frameEnd to define the frame rows.
Expand Down Expand Up @@ -545,15 +551,17 @@ void Window::computePeerAndFrameBuffers(
numRows,
rawPeerStarts,
rawPeerEnds,
rawFrameStarts[i]);
rawFrameStarts[i],
validFrames_[i]);
updateFrameBounds(
windowFrame,
false,
startRow,
numRows,
rawPeerStarts,
rawPeerEnds,
rawFrameEnds[i]);
rawFrameEnds[i],
validFrames_[i]);
if (windowFrames_[i].start || windowFrames_[i].end) {
// k preceding and k following bounds can be problematic. They can go over
// the partition limits or result in empty frames. Fix the frame
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/Window.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,18 @@ class Window : public Operator {
vector_size_t numRows,
vector_size_t* rawFrameBounds);

// Populate frame bounds in the current partition into rawFrameBounds.
// Unselect rows from validFrames where the frame bounds are NaN that are
// invalid.
void updateFrameBounds(
const WindowFrame& windowFrame,
const bool isStartBound,
const vector_size_t startRow,
const vector_size_t numRows,
const vector_size_t* rawPeerStarts,
const vector_size_t* rawPeerEnds,
vector_size_t* rawFrameBounds);
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames);

const vector_size_t numInputColumns_;

Expand Down
66 changes: 53 additions & 13 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ vector_size_t WindowPartition::linearSearchFrameValue(
return end == numRows() ? numRows() + 1 : -1;
}

template <typename T>
void WindowPartition::updateKRangeFrameBounds(
bool firstMatch,
bool isPreceding,
Expand All @@ -357,21 +358,32 @@ void WindowPartition::updateKRangeFrameBounds(
vector_size_t numRows,
column_index_t frameColumn,
const vector_size_t* rawPeerBounds,
vector_size_t* rawFrameBounds) const {
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const {
column_index_t orderByColumn = sortKeyInfo_[0].first;
column_index_t mappedFrameColumn = inputMapping_[frameColumn];

vector_size_t start = 0;
vector_size_t end;
// frameColumn is a column index into the original input rows, while
// orderByColumn is a column index into rows in data_ after the columns are
// reordered as per inputMapping_.
// orderByColumn and mappedFrameColumn are column indices into rows in data_
// after the columns are reordered as per inputMapping_.
VELOX_DEBUG_ONLY RowColumn frameRowColumn = columns_[frameColumn];
RowColumn orderByRowColumn = data_->columnAt(orderByColumn);
RowColumn mappedFrameRowColumn = data_->columnAt(mappedFrameColumn);
for (auto i = 0; i < numRows; i++) {
auto currentRow = startRow + i;
auto* partitionRow = partition_[currentRow];

// Mark the frame invalid if the frame bound is NaN.
if constexpr (std::is_floating_point_v<T>) {
if (data_->isNanAt<T>(partitionRow, mappedFrameRowColumn) &&
!data_->isNanAt<T>(partitionRow, orderByRowColumn)) {
validFrames.setValid(currentRow, false);
continue;
}
}

// The user is expected to set the frame column equal to NULL when the
// ORDER BY value is NULL and not in any other case. Validate this
// assumption.
Expand Down Expand Up @@ -423,20 +435,48 @@ void WindowPartition::computeKRangeFrameBounds(
vector_size_t startRow,
vector_size_t numRows,
const vector_size_t* rawPeerBuffer,
vector_size_t* rawFrameBounds) const {
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const {
CompareFlags flags;
flags.ascending = sortKeyInfo_[0].second.isAscending();
flags.nullsFirst = sortKeyInfo_[0].second.isNullsFirst();

// Start bounds require first match. End bounds require last match.
updateKRangeFrameBounds(
isStartBound,
isPreceding,
flags,
startRow,
numRows,
frameColumn,
rawPeerBuffer,
rawFrameBounds);
const auto frameType = data_->columnTypes()[inputMapping_[frameColumn]];
if (frameType->isReal()) {
updateKRangeFrameBounds<float>(
isStartBound,
isPreceding,
flags,
startRow,
numRows,
frameColumn,
rawPeerBuffer,
rawFrameBounds,
validFrames);
} else if (frameType->isDouble()) {
updateKRangeFrameBounds<double>(
isStartBound,
isPreceding,
flags,
startRow,
numRows,
frameColumn,
rawPeerBuffer,
rawFrameBounds,
validFrames);
} else {
updateKRangeFrameBounds<void>(
isStartBound,
isPreceding,
flags,
startRow,
numRows,
frameColumn,
rawPeerBuffer,
rawFrameBounds,
validFrames);
}
}

} // namespace facebook::velox::exec
17 changes: 14 additions & 3 deletions velox/exec/WindowPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,18 @@ class WindowPartition {
/// @param numRows number of rows to compute buffer for.
/// @param rawPeerStarts buffer of peer row values for each row. If the frame
/// column is null, then its peer row value is the frame boundary.
/// @param validFrames SelectivityVector to keep track of valid frames.
/// This function unselect rows in validFrames where the frame bounds are NaN
/// that are invalid.
void computeKRangeFrameBounds(
bool isStartBound,
bool isPreceding,
column_index_t frameColumn,
vector_size_t startRow,
vector_size_t numRows,
const vector_size_t* rawPeerStarts,
vector_size_t* rawFrameBounds) const;
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const;

private:
WindowPartition(
Expand Down Expand Up @@ -216,7 +220,13 @@ class WindowPartition {
column_index_t frameColumn,
const CompareFlags& flags) const;

// Iterates over 'numBlockRows' and searches frame value for each row.
/// Iterates over 'numBlockRows' and searches frame value for each row.
/// @tparam T The C++ type of the order-by and frame columns. When T is float
/// or double, this method checks for rows with NaN frame bound(s) but non-NaN
/// order-by value. These frames are invalid and we unselect these rows from
/// 'validFrames'. If the order-by and frame columns are not of floating-point
/// types, T is not used and should be set to a non-floating-point C++ type.
template <typename T>
void updateKRangeFrameBounds(
bool firstMatch,
bool isPreceding,
Expand All @@ -225,7 +235,8 @@ class WindowPartition {
vector_size_t numRows,
column_index_t frameColumn,
const vector_size_t* rawPeerBounds,
vector_size_t* rawFrameBounds) const;
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const;

// Indicates if this is a partial partition for RowStreamWindowBuild
// processing.
Expand Down
50 changes: 50 additions & 0 deletions velox/exec/tests/RowContainerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2290,3 +2290,53 @@ TEST_F(RowContainerTest, customComparisonRow) {
expectedOrder, BIGINT_TYPE_WITH_CUSTOM_COMPARISON())});
});
}

TEST_F(RowContainerTest, isNanAt) {
const auto kNan = std::numeric_limits<double>::quiet_NaN();
const auto kNanF = std::numeric_limits<float>::quiet_NaN();
auto rowVector = makeRowVector({
makeFlatVector<float>({kNanF, 0.2, 0.3, 0.4}),
makeFlatVector<double>({0.1, kNan, 0.3, 0.4}),
makeFlatVector<float>({0.1, 0.2, kNanF, 0.4}),
makeFlatVector<double>({0.1, 0.2, 0.3, kNan}),
});
const auto kNumRows = rowVector->size();

auto rowContainer =
makeRowContainer({REAL(), DOUBLE()}, {REAL(), DOUBLE()}, false);
std::vector<char*> rows;
rows.reserve(kNumRows);

SelectivityVector allRows(kNumRows);
for (size_t i = 0; i < kNumRows; i++) {
auto row = rowContainer->newRow();
rows.push_back(row);
}
for (int j = 0; j < rowContainer->columnTypes().size(); ++j) {
DecodedVector decoded(*rowVector->childAt(j), allRows);
rowContainer->store(decoded, folly::Range(rows.data(), kNumRows), j);
}

for (int i = 0; i < kNumRows; ++i) {
const auto* row = rows[i];
for (int j = 0; j < 4; ++j) {
if (i % 2 == 0) {
if (i == j) {
EXPECT_TRUE(
rowContainer->isNanAt<float>(row, rowContainer->columnAt(j)));
} else {
EXPECT_FALSE(
rowContainer->isNanAt<float>(row, rowContainer->columnAt(j)));
}
} else {
if (i == j) {
EXPECT_TRUE(
rowContainer->isNanAt<double>(row, rowContainer->columnAt(j)));
} else {
EXPECT_FALSE(
rowContainer->isNanAt<double>(row, rowContainer->columnAt(j)));
}
}
}
}
}
60 changes: 60 additions & 0 deletions velox/exec/tests/WindowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,5 +692,65 @@ DEBUG_ONLY_TEST_F(WindowTest, reserveMemorySort) {
}
}

TEST_F(WindowTest, NaNFrameBound) {
const auto kNan = std::numeric_limits<double>::quiet_NaN();
auto data = makeRowVector(
{"c0", "s0", "off0", "off1"},
{
makeFlatVector<int64_t>({1, 2, 3, 4}),
makeFlatVector<double>({1.0, 2.0, 3.0, kNan}),
makeFlatVector<double>({0.1, 2.0, 1.9, kNan}),
makeFlatVector<double>({kNan, 2.0, kNan, kNan}),
});

const auto makeFrames = [](const std::string& call) {
std::vector<std::string> frames;

std::vector<std::string> orders{"asc", "desc"};
std::vector<std::string> bounds{"preceding", "following"};
for (const std::string& order : orders) {
for (const std::string& startBound : bounds) {
for (const std::string& endBound : bounds) {
// Frames starting from following and ending at preceding are not
// allowed.
if (startBound == "following" && endBound == "preceding") {
continue;
}
frames.push_back(fmt::format(
"{} over (order by s0 {} range between off0 {} and off1 {})",
call,
order,
startBound,
endBound));
frames.push_back(fmt::format(
"{} over (order by s0 {} range between off1 {} and off0 {})",
call,
order,
startBound,
endBound));
}
}
}
return frames;
};

auto expected = makeRowVector(
{makeNullableFlatVector<int64_t>({std::nullopt, 2, std::nullopt, 4})});
for (const auto& frame : makeFrames("sum(c0)")) {
auto plan =
PlanBuilder().values({data}).window({frame}).project({"w0"}).planNode();
AssertQueryBuilder(plan).assertResults(expected);
}

// rank() should not be affected by the frames, so added this test to ensure
// rank() produces correct results even if the frame bounds contain NaN.
expected = makeRowVector({makeFlatVector<int64_t>({1, 2, 3, 4})});
for (const auto& frame : makeFrames("rank()")) {
auto plan =
PlanBuilder().values({data}).window({frame}).project({"w0"}).planNode();
AssertQueryBuilder(plan).assertResults(expected);
}
}

} // namespace
} // namespace facebook::velox::exec

0 comments on commit 84c41ed

Please sign in to comment.