Skip to content

Commit

Permalink
Fix Window operator with NaN as the frame bound (facebookincubator#11293
Browse files Browse the repository at this point in the history
)

Summary:

NaN could appear as the frame bound of k-range frames in window 
operations. When NaN appear in either of the frame bounds, the window 
operation should return NULL at this row.

This diff fixes facebookincubator#11213.

Differential Revision: D64510519
  • Loading branch information
kagamiori authored and facebook-github-bot committed Oct 29, 2024
1 parent b1834bd commit 4acb017
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 12 deletions.
12 changes: 12 additions & 0 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,18 @@ class RowContainer {
return (row[rowColumn.nullByte()] & rowColumn.nullMask()) != 0;
}

/// Returns true if the column at columnIndex in row is NaN.
template <
typename T,
std::enable_if_t<std::is_floating_point_v<T>, int32_t> = 0>
bool isNanAt(const char* row, int32_t columnIndex) {
const auto column = columnAt(columnIndex);
if (isNullAt(row, column.nullByte(), column.nullMask())) {
return false;
}
return std::isnan(valueAt<T>(row, column.offset()));
}

/// Creates a container to store a partition number for each row in this row
/// container. This is used by parallel join build which is responsible for
/// filling this. This function also marks this row container as immutable
Expand Down
18 changes: 13 additions & 5 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ void Window::updateFrameBounds(
const vector_size_t numRows,
const vector_size_t* rawPeerStarts,
const vector_size_t* rawPeerEnds,
vector_size_t* rawFrameBounds) {
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) {
const auto windowType = windowFrame.type;
const auto boundType =
isStartBound ? windowFrame.startType : windowFrame.endType;
Expand Down Expand Up @@ -447,7 +448,8 @@ void Window::updateFrameBounds(
startRow,
numRows,
rawPeerBuffer,
rawFrameBounds);
rawFrameBounds,
validFrames);
}
break;
}
Expand All @@ -463,7 +465,8 @@ void Window::updateFrameBounds(
startRow,
numRows,
rawPeerBuffer,
rawFrameBounds);
rawFrameBounds,
validFrames);
}
break;
}
Expand All @@ -486,6 +489,9 @@ void computeValidFrames(
vector_size_t* rawFrameEnds,
SelectivityVector& validFrames) {
for (auto i = 0; i < numRows; ++i) {
if (!validFrames.isValid(i)) {
continue;
}
const vector_size_t frameStart = rawFrameStarts[i];
const vector_size_t frameEnd = rawFrameEnds[i];
// All valid frames require frameStart <= frameEnd to define the frame rows.
Expand Down Expand Up @@ -545,15 +551,17 @@ void Window::computePeerAndFrameBuffers(
numRows,
rawPeerStarts,
rawPeerEnds,
rawFrameStarts[i]);
rawFrameStarts[i],
validFrames_[i]);
updateFrameBounds(
windowFrame,
false,
startRow,
numRows,
rawPeerStarts,
rawPeerEnds,
rawFrameEnds[i]);
rawFrameEnds[i],
validFrames_[i]);
if (windowFrames_[i].start || windowFrames_[i].end) {
// k preceding and k following bounds can be problematic. They can go over
// the partition limits or result in empty frames. Fix the frame
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/Window.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,18 @@ class Window : public Operator {
vector_size_t numRows,
vector_size_t* rawFrameBounds);

// Populate frame bounds in the current partition into rawFrameBounds.
// Unselect rows from validFrames where the frame bounds are NaN that are
// invalid.
void updateFrameBounds(
const WindowFrame& windowFrame,
const bool isStartBound,
const vector_size_t startRow,
const vector_size_t numRows,
const vector_size_t* rawPeerStarts,
const vector_size_t* rawPeerEnds,
vector_size_t* rawFrameBounds);
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames);

const vector_size_t numInputColumns_;

Expand Down
26 changes: 23 additions & 3 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ void WindowPartition::updateKRangeFrameBounds(
vector_size_t numRows,
column_index_t frameColumn,
const vector_size_t* rawPeerBounds,
vector_size_t* rawFrameBounds) const {
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const {
column_index_t orderByColumn = sortKeyInfo_[0].first;
column_index_t mappedFrameColumn = inputMapping_[frameColumn];

Expand All @@ -368,6 +369,23 @@ void WindowPartition::updateKRangeFrameBounds(
// reordered as per inputMapping_.
RowColumn frameRowColumn = columns_[frameColumn];
RowColumn orderByRowColumn = data_->columnAt(orderByColumn);

// Mark the frame invalid if the frame bound is NaN.
auto frameType = data_->columnTypes()[mappedFrameColumn];
if (frameType->isReal()) {
for (auto i = startRow; i < startRow + numRows; i++) {
if (data_->isNanAt<float>(partition_[i], mappedFrameColumn)) {
validFrames.setValid(i, false);
}
}
} else if (frameType->isDouble()) {
for (auto i = startRow; i < startRow + numRows; i++) {
if (data_->isNanAt<double>(partition_[i], mappedFrameColumn)) {
validFrames.setValid(i, false);
}
}
}

for (auto i = 0; i < numRows; i++) {
auto currentRow = startRow + i;
auto* partitionRow = partition_[currentRow];
Expand Down Expand Up @@ -423,7 +441,8 @@ void WindowPartition::computeKRangeFrameBounds(
vector_size_t startRow,
vector_size_t numRows,
const vector_size_t* rawPeerBuffer,
vector_size_t* rawFrameBounds) const {
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const {
CompareFlags flags;
flags.ascending = sortKeyInfo_[0].second.isAscending();
flags.nullsFirst = sortKeyInfo_[0].second.isNullsFirst();
Expand All @@ -436,7 +455,8 @@ void WindowPartition::computeKRangeFrameBounds(
numRows,
frameColumn,
rawPeerBuffer,
rawFrameBounds);
rawFrameBounds,
validFrames);
}

} // namespace facebook::velox::exec
12 changes: 9 additions & 3 deletions velox/exec/WindowPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,18 @@ class WindowPartition {
/// @param numRows number of rows to compute buffer for.
/// @param rawPeerStarts buffer of peer row values for each row. If the frame
/// column is null, then its peer row value is the frame boundary.
/// @param validFrames SelectivityVector to keep track of valid frames.
/// This function unselect rows in validFrames where the frame bounds are NaN
/// that are invalid.
void computeKRangeFrameBounds(
bool isStartBound,
bool isPreceding,
column_index_t frameColumn,
vector_size_t startRow,
vector_size_t numRows,
const vector_size_t* rawPeerStarts,
vector_size_t* rawFrameBounds) const;
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const;

private:
WindowPartition(
Expand Down Expand Up @@ -216,7 +220,8 @@ class WindowPartition {
column_index_t frameColumn,
const CompareFlags& flags) const;

// Iterates over 'numBlockRows' and searches frame value for each row.
// Iterates over 'numBlockRows' and searches frame value for each row. If the
// frame bound is NaN that is invalid, unselect the row from 'validFrames'.
void updateKRangeFrameBounds(
bool firstMatch,
bool isPreceding,
Expand All @@ -225,7 +230,8 @@ class WindowPartition {
vector_size_t numRows,
column_index_t frameColumn,
const vector_size_t* rawPeerBounds,
vector_size_t* rawFrameBounds) const;
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const;

// Indicates if this is a partial partition for RowStreamWindowBuild
// processing.
Expand Down
57 changes: 57 additions & 0 deletions velox/exec/tests/WindowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,5 +679,62 @@ DEBUG_ONLY_TEST_F(WindowTest, reserveMemorySort) {
}
}

TEST_F(WindowTest, NaNFrameBound) {
const auto kNan = std::numeric_limits<double>::quiet_NaN();
auto data = makeRowVector(
{"c0", "s0", "off0", "off1"},
{
makeFlatVector<int64_t>({1, 2, 3}),
makeFlatVector<double>({1.0, 1.0, 1.0}),
makeFlatVector<double>({1.0, 1.0, 1.0}),
makeFlatVector<double>({kNan, kNan, kNan}),
});

const auto makeFrames = [](const std::string& call) {
std::vector<std::string> frames;

std::vector<std::string> orders{"asc", "desc"};
std::vector<std::string> bounds{"preceding", "following"};
for (const std::string& order : orders) {
for (const std::string& startBound : bounds) {
for (const std::string& endBound : bounds) {
// Frames starting from following and ending at preceding are not
// allowed.
if (startBound == "following" && endBound == "preceding") {
continue;
}
frames.push_back(fmt::format(
"{} over (order by s0 {} range between off0 {} and off1 {})",
call,
order,
startBound,
endBound));
frames.push_back(fmt::format(
"{} over (order by s0 {} range between off1 {} and off0 {})",
call,
order,
startBound,
endBound));
}
}
}
return frames;
};

auto expected = makeRowVector({makeNullConstant(TypeKind::BIGINT, 3)});
for (const auto& frame : makeFrames("sum(c0)")) {
auto plan =
PlanBuilder().values({data}).window({frame}).project({"w0"}).planNode();
AssertQueryBuilder(plan).assertResults(expected);
}

expected = makeRowVector({makeFlatVector<int64_t>({1, 1, 1})});
for (const auto& frame : makeFrames("rank()")) {
auto plan =
PlanBuilder().values({data}).window({frame}).project({"w0"}).planNode();
AssertQueryBuilder(plan).assertResults(expected);
}
}

} // namespace
} // namespace facebook::velox::exec

0 comments on commit 4acb017

Please sign in to comment.