Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Window operator with NaN as the frame bound #11293

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,21 @@ class RowContainer {
return (row[nullByte] & nullMask) != 0;
}

static inline bool isNullAt(const char* row, RowColumn rowColumn) {
static inline bool isNullAt(const char* row, const RowColumn& rowColumn) {
return (row[rowColumn.nullByte()] & rowColumn.nullMask()) != 0;
}

/// Returns true if the value at rowColumn in row is NaN.
template <
typename T,
std::enable_if_t<std::is_floating_point_v<T>, int32_t> = 0>
static inline bool isNanAt(const char* row, const RowColumn& rowColumn) {
if (isNullAt(row, rowColumn.nullByte(), rowColumn.nullMask())) {
return false;
}
return std::isnan(valueAt<T>(row, rowColumn.offset()));
}

/// Creates a container to store a partition number for each row in this row
/// container. This is used by parallel join build which is responsible for
/// filling this. This function also marks this row container as immutable
Expand Down
18 changes: 13 additions & 5 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ void Window::updateFrameBounds(
const vector_size_t numRows,
const vector_size_t* rawPeerStarts,
const vector_size_t* rawPeerEnds,
vector_size_t* rawFrameBounds) {
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) {
const auto windowType = windowFrame.type;
const auto boundType =
isStartBound ? windowFrame.startType : windowFrame.endType;
Expand Down Expand Up @@ -447,7 +448,8 @@ void Window::updateFrameBounds(
startRow,
numRows,
rawPeerBuffer,
rawFrameBounds);
rawFrameBounds,
validFrames);
}
break;
}
Expand All @@ -463,7 +465,8 @@ void Window::updateFrameBounds(
startRow,
numRows,
rawPeerBuffer,
rawFrameBounds);
rawFrameBounds,
validFrames);
}
break;
}
Expand All @@ -486,6 +489,9 @@ void computeValidFrames(
vector_size_t* rawFrameEnds,
SelectivityVector& validFrames) {
for (auto i = 0; i < numRows; ++i) {
if (!validFrames.isValid(i)) {
continue;
}
const vector_size_t frameStart = rawFrameStarts[i];
const vector_size_t frameEnd = rawFrameEnds[i];
// All valid frames require frameStart <= frameEnd to define the frame rows.
Expand Down Expand Up @@ -545,15 +551,17 @@ void Window::computePeerAndFrameBuffers(
numRows,
rawPeerStarts,
rawPeerEnds,
rawFrameStarts[i]);
rawFrameStarts[i],
validFrames_[i]);
updateFrameBounds(
windowFrame,
false,
startRow,
numRows,
rawPeerStarts,
rawPeerEnds,
rawFrameEnds[i]);
rawFrameEnds[i],
validFrames_[i]);
if (windowFrames_[i].start || windowFrames_[i].end) {
// k preceding and k following bounds can be problematic. They can go over
// the partition limits or result in empty frames. Fix the frame
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/Window.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,18 @@ class Window : public Operator {
vector_size_t numRows,
vector_size_t* rawFrameBounds);

// Populate frame bounds in the current partition into rawFrameBounds.
// Unselect rows from validFrames where the frame bounds are NaN that are
// invalid.
void updateFrameBounds(
const WindowFrame& windowFrame,
const bool isStartBound,
const vector_size_t startRow,
const vector_size_t numRows,
const vector_size_t* rawPeerStarts,
const vector_size_t* rawPeerEnds,
vector_size_t* rawFrameBounds);
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames);

const vector_size_t numInputColumns_;

Expand Down
68 changes: 55 additions & 13 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ vector_size_t WindowPartition::linearSearchFrameValue(
return end == numRows() ? numRows() + 1 : -1;
}

template <typename T>
void WindowPartition::updateKRangeFrameBounds(
bool firstMatch,
bool isPreceding,
Expand All @@ -357,21 +358,34 @@ void WindowPartition::updateKRangeFrameBounds(
vector_size_t numRows,
column_index_t frameColumn,
const vector_size_t* rawPeerBounds,
vector_size_t* rawFrameBounds) const {
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const {
column_index_t orderByColumn = sortKeyInfo_[0].first;
column_index_t mappedFrameColumn = inputMapping_[frameColumn];

vector_size_t start = 0;
vector_size_t end;
// frameColumn is a column index into the original input rows, while
// orderByColumn is a column index into rows in data_ after the columns are
// reordered as per inputMapping_.
// orderByColumn and mappedFrameColumn are column indices into rows in data_
// after the columns are reordered as per inputMapping_.
VELOX_DEBUG_ONLY RowColumn frameRowColumn = columns_[frameColumn];
RowColumn orderByRowColumn = data_->columnAt(orderByColumn);
RowColumn mappedFrameRowColumn = data_->columnAt(mappedFrameColumn);
for (auto i = 0; i < numRows; i++) {
auto currentRow = startRow + i;
auto* partitionRow = partition_[currentRow];

// Mark the frame invalid if the frame bound is NaN, except if NaN in the
// frame column is derived from NaN in the order-by column.
// https://github.com/facebookincubator/velox/pull/11293#issuecomment-2475391888
if constexpr (std::is_floating_point_v<T>) {
if (data_->isNanAt<T>(partitionRow, mappedFrameRowColumn) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment to describe this edge case as chat offline? thanks!

!data_->isNanAt<T>(partitionRow, orderByRowColumn)) {
validFrames.setValid(currentRow, false);
continue;
}
}

// The user is expected to set the frame column equal to NULL when the
// ORDER BY value is NULL and not in any other case. Validate this
// assumption.
Expand Down Expand Up @@ -423,20 +437,48 @@ void WindowPartition::computeKRangeFrameBounds(
vector_size_t startRow,
vector_size_t numRows,
const vector_size_t* rawPeerBuffer,
vector_size_t* rawFrameBounds) const {
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const {
CompareFlags flags;
flags.ascending = sortKeyInfo_[0].second.isAscending();
flags.nullsFirst = sortKeyInfo_[0].second.isNullsFirst();

// Start bounds require first match. End bounds require last match.
updateKRangeFrameBounds(
isStartBound,
isPreceding,
flags,
startRow,
numRows,
frameColumn,
rawPeerBuffer,
rawFrameBounds);
const auto frameType = data_->columnTypes()[inputMapping_[frameColumn]];
if (frameType->isReal()) {
updateKRangeFrameBounds<float>(
isStartBound,
isPreceding,
flags,
startRow,
numRows,
frameColumn,
rawPeerBuffer,
rawFrameBounds,
validFrames);
} else if (frameType->isDouble()) {
updateKRangeFrameBounds<double>(
isStartBound,
isPreceding,
flags,
startRow,
numRows,
frameColumn,
rawPeerBuffer,
rawFrameBounds,
validFrames);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL here instead of passing void?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @xiaoxmeng, I intentionally attempted to avoid using VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL. This will cause the code instantiation for every type, but we don't really differentiate the code for non-floating-point types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL here instead of passing void?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL here instead of passing void for non-float or double type?

updateKRangeFrameBounds<void>(
isStartBound,
isPreceding,
flags,
startRow,
numRows,
frameColumn,
rawPeerBuffer,
rawFrameBounds,
validFrames);
}
}

} // namespace facebook::velox::exec
17 changes: 14 additions & 3 deletions velox/exec/WindowPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,18 @@ class WindowPartition {
/// @param numRows number of rows to compute buffer for.
/// @param rawPeerStarts buffer of peer row values for each row. If the frame
/// column is null, then its peer row value is the frame boundary.
/// @param validFrames SelectivityVector to keep track of valid frames.
/// This function unselect rows in validFrames where the frame bounds are NaN
/// that are invalid.
void computeKRangeFrameBounds(
bool isStartBound,
bool isPreceding,
column_index_t frameColumn,
vector_size_t startRow,
vector_size_t numRows,
const vector_size_t* rawPeerStarts,
vector_size_t* rawFrameBounds) const;
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const;

private:
WindowPartition(
Expand Down Expand Up @@ -216,7 +220,13 @@ class WindowPartition {
column_index_t frameColumn,
const CompareFlags& flags) const;

// Iterates over 'numBlockRows' and searches frame value for each row.
/// Iterates over 'numBlockRows' and searches frame value for each row.
/// @tparam T The C++ type of the order-by and frame columns. When T is float
/// or double, this method checks for rows with NaN frame bound(s) but non-NaN
/// order-by value. These frames are invalid and we unselect these rows from
/// 'validFrames'. If the order-by and frame columns are not of floating-point
/// types, T should be set to void.
template <typename T>
void updateKRangeFrameBounds(
bool firstMatch,
bool isPreceding,
Expand All @@ -225,7 +235,8 @@ class WindowPartition {
vector_size_t numRows,
column_index_t frameColumn,
const vector_size_t* rawPeerBounds,
vector_size_t* rawFrameBounds) const;
vector_size_t* rawFrameBounds,
SelectivityVector& validFrames) const;

// Indicates if this is a partial partition for RowStreamWindowBuild
// processing.
Expand Down
50 changes: 50 additions & 0 deletions velox/exec/tests/RowContainerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2290,3 +2290,53 @@ TEST_F(RowContainerTest, customComparisonRow) {
expectedOrder, BIGINT_TYPE_WITH_CUSTOM_COMPARISON())});
});
}

TEST_F(RowContainerTest, isNanAt) {
const auto kNan = std::numeric_limits<double>::quiet_NaN();
const auto kNanF = std::numeric_limits<float>::quiet_NaN();
auto rowVector = makeRowVector({
makeFlatVector<float>({kNanF, 0.2, 0.3, 0.4}),
makeFlatVector<double>({0.1, kNan, 0.3, 0.4}),
makeFlatVector<float>({0.1, 0.2, kNanF, 0.4}),
makeFlatVector<double>({0.1, 0.2, 0.3, kNan}),
});
const auto kNumRows = rowVector->size();

auto rowContainer =
makeRowContainer({REAL(), DOUBLE()}, {REAL(), DOUBLE()}, false);
std::vector<char*> rows;
rows.reserve(kNumRows);

SelectivityVector allRows(kNumRows);
for (size_t i = 0; i < kNumRows; i++) {
auto row = rowContainer->newRow();
rows.push_back(row);
}
for (int j = 0; j < rowContainer->columnTypes().size(); ++j) {
DecodedVector decoded(*rowVector->childAt(j), allRows);
rowContainer->store(decoded, folly::Range(rows.data(), kNumRows), j);
}

for (int i = 0; i < kNumRows; ++i) {
const auto* row = rows[i];
for (int j = 0; j < 4; ++j) {
if (i % 2 == 0) {
if (i == j) {
EXPECT_TRUE(
rowContainer->isNanAt<float>(row, rowContainer->columnAt(j)));
} else {
EXPECT_FALSE(
rowContainer->isNanAt<float>(row, rowContainer->columnAt(j)));
}
} else {
if (i == j) {
EXPECT_TRUE(
rowContainer->isNanAt<double>(row, rowContainer->columnAt(j)));
} else {
EXPECT_FALSE(
rowContainer->isNanAt<double>(row, rowContainer->columnAt(j)));
}
}
}
}
}
60 changes: 60 additions & 0 deletions velox/exec/tests/WindowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,5 +692,65 @@ DEBUG_ONLY_TEST_F(WindowTest, reserveMemorySort) {
}
}

TEST_F(WindowTest, NaNFrameBound) {
const auto kNan = std::numeric_limits<double>::quiet_NaN();
auto data = makeRowVector(
{"c0", "s0", "off0", "off1"},
{
makeFlatVector<int64_t>({1, 2, 3, 4}),
makeFlatVector<double>({1.0, 2.0, 3.0, kNan}),
makeFlatVector<double>({0.1, 2.0, 1.9, kNan}),
makeFlatVector<double>({kNan, 2.0, kNan, kNan}),
});

const auto makeFrames = [](const std::string& call) {
std::vector<std::string> frames;

std::vector<std::string> orders{"asc", "desc"};
std::vector<std::string> bounds{"preceding", "following"};
for (const std::string& order : orders) {
for (const std::string& startBound : bounds) {
for (const std::string& endBound : bounds) {
// Frames starting from following and ending at preceding are not
// allowed.
if (startBound == "following" && endBound == "preceding") {
continue;
}
frames.push_back(fmt::format(
"{} over (order by s0 {} range between off0 {} and off1 {})",
call,
order,
startBound,
endBound));
frames.push_back(fmt::format(
"{} over (order by s0 {} range between off1 {} and off0 {})",
call,
order,
startBound,
endBound));
}
}
}
return frames;
};

auto expected = makeRowVector(
{makeNullableFlatVector<int64_t>({std::nullopt, 2, std::nullopt, 4})});
for (const auto& frame : makeFrames("sum(c0)")) {
auto plan =
PlanBuilder().values({data}).window({frame}).project({"w0"}).planNode();
AssertQueryBuilder(plan).assertResults(expected);
}

// rank() should not be affected by the frames, so added this test to ensure
// rank() produces correct results even if the frame bounds contain NaN.
expected = makeRowVector({makeFlatVector<int64_t>({1, 2, 3, 4})});
for (const auto& frame : makeFrames("rank()")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kagamiori : rank() is not affected by frame values. Only value functions (first_value, last_value, nth_value) and aggregate functions are affected by frame boundaries. MIght be better to use one of those functions in the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @aditi-pandit, I intentionally added the test case of rank() to ensure it is not affected by the handling of NaN frame bound as you point out. The regular case is tested by the sum() function above.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kagamiori : I see. Maybe then add a comment to indicate this is for a function not affected by the Nan frames.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit, sure, will do. Thanks for the suggestion!

auto plan =
PlanBuilder().values({data}).window({frame}).project({"w0"}).planNode();
AssertQueryBuilder(plan).assertResults(expected);
}
}

} // namespace
} // namespace facebook::velox::exec
Loading