Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Separate null count and minmax from column stats #11860

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ std::optional<uint64_t> HashProbe::estimatedRowSize(
std::vector<RowColumn::Stats> varSizeListColumnsStats;
varSizeListColumnsStats.reserve(varSizedColumns.size());
for (uint32_t i = 0; i < varSizedColumns.size(); ++i) {
auto statsOpt = columnStats(varSizedColumns[i]);
if (!statsOpt.has_value()) {
const auto statsOpt = columnStats(varSizedColumns[i]);
if (!statsOpt.has_value() || !statsOpt->minMaxColumnStatsValid()) {
return std::nullopt;
}
varSizeListColumnsStats.push_back(statsOpt.value());
Expand Down
58 changes: 43 additions & 15 deletions velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ RowContainer::RowContainer(
if (nullableKeys_) {
++nullOffset;
}
columnHasNulls_.push_back(false);
}
// Make offset at least sizeof pointer so that there is space for a
// free list next pointer below the bit at 'freeFlagOffset_'.
Expand Down Expand Up @@ -217,7 +216,6 @@ RowContainer::RowContainer(
nullOffsets_.push_back(nullOffset);
++nullOffset;
isVariableWidth |= !type->isFixedWidth();
columnHasNulls_.push_back(false);
}
if (hasProbedFlag) {
nullOffsets_.push_back(nullOffset);
Expand Down Expand Up @@ -336,16 +334,35 @@ char* RowContainer::initializeRow(char* row, bool reuse) {
return row;
}

void RowContainer::removeOrUpdateRowColumnStats(
const char* row,
bool setToNull) {
// Update row column stats accordingly
for (auto i = 0; i < types_.size(); i++) {
if (isNullAt(row, columnAt(i))) {
rowColumnsStats_[i].removeOrUpdateCellStats(0, true, setToNull);
} else if (types_[i]->isFixedWidth()) {
rowColumnsStats_[i].removeOrUpdateCellStats(
fixedSizeAt(i), false, setToNull);
} else {
rowColumnsStats_[i].removeOrUpdateCellStats(
variableSizeAt(row, i), false, setToNull);
}
}
invalidateMinMaxColumnStats();
}

void RowContainer::eraseRows(folly::Range<char**> rows) {
freeRowsExtraMemory(rows, /*freeNextRowVector=*/true);
for (auto* row : rows) {
VELOX_CHECK(!bits::isBitSet(row, freeFlagOffset_), "Double free of row");
removeOrUpdateRowColumnStats(row, /*setToNull=*/false);

bits::setBit(row, freeFlagOffset_);
nextFree(row) = firstFreeRow_;
firstFreeRow_ = row;
}
numFreeRows_ += rows.size();
invalidateColumnStats();
}

int32_t RowContainer::findRows(folly::Range<char**> rows, char** result) const {
Expand Down Expand Up @@ -466,11 +483,26 @@ void RowContainer::freeRowsExtraMemory(
numRows_ -= rows.size();
}

void RowContainer::invalidateColumnStats() {
if (rowColumnsStats_.empty()) {
return;
void RowColumn::Stats::removeOrUpdateCellStats(
int32_t bytes,
bool wasNull,
bool setToNull) {
// we only update nullCount, nonNullCount, and numBytes
// when the cell is removed. Because min/max need the
// full column data and not recorded in stats.
if (wasNull) {
VELOX_DCHECK_EQ(bytes, 0);
if (!setToNull) {
--nullCount_;
}
} else {
--nonNullCount_;
sumBytes_ -= bytes;
if (setToNull) {
++nullCount_;
}
}
rowColumnsStats_.clear();
invalidateMinMaxColumnStats();
}

// static
Expand Down Expand Up @@ -522,12 +554,6 @@ void RowContainer::updateColumnStats(

void RowContainer::updateColumnStats(char* row, int32_t columnIndex) {
const bool nullColumn = isNullAt(row, rowColumns_[columnIndex]);
updateColumnHasNulls(columnIndex, nullColumn);

if (rowColumnsStats_.empty()) {
// Column stats have been invalidated.
return;
}

auto& columnStats = rowColumnsStats_[columnIndex];
if (nullColumn) {
Expand Down Expand Up @@ -589,7 +615,7 @@ void RowContainer::store(
offsets_[column],
column);
} else {
const auto rowColumn = rowColumns_[column];
const auto& rowColumn = rowColumns_[column];
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
storeWithNullsBatch,
typeKinds_[column],
Expand Down Expand Up @@ -836,7 +862,6 @@ void RowContainer::storeComplexType(
if (decoded.isNullAt(index)) {
VELOX_DCHECK(nullMask);
row[nullByte] |= nullMask;
updateColumnHasNulls(column, true);
return;
}
RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_);
Expand Down Expand Up @@ -1009,6 +1034,9 @@ void RowContainer::clear() {
normalizedKeySize_ = originalNormalizedKeySize_;
numFreeRows_ = 0;
firstFreeRow_ = nullptr;

rowColumnsStats_.clear();
rowColumnsStats_.resize(types_.size());
}

void RowContainer::setProbedFlag(char** rows, int32_t numRows) {
Expand Down
47 changes: 32 additions & 15 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ class RowColumn {
++nullCount_;
}

void removeOrUpdateCellStats(int32_t bytes, bool wasNull, bool setToNull);

int32_t maxBytes() const {
return maxBytes_;
}
Expand Down Expand Up @@ -218,13 +220,22 @@ class RowColumn {
return nullCount_ + nonNullCount_;
}

void invalidateMinMaxColumnStats() {
minMaxStatsValid_ = false;
}

bool minMaxColumnStatsValid() const {
return minMaxStatsValid_;
}

/// Merges multiple aggregated stats of the same column into a single one.
static Stats merge(const std::vector<Stats>& statsList);

private:
// Aggregated stats for non-null rows of the column.
int32_t minBytes_{0};
int32_t maxBytes_{0};
bool minMaxStatsValid_{true};
Copy link
Contributor

@tanjialiang tanjialiang Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the name of this variable more general? We could potentially have some more stats that could be invalidable.
And shall we add comments to the part of the stats that are invalidable and the other part of the stats that are not? Something like:

int32_t minBytes_{0};
int32_t maxBytes_{0};

// Flag that ... above stats ...
bool fragileStatsValid_{true};

uint64_t sumBytes_{0};
uint32_t nonNullCount_{0};
uint32_t nullCount_{0};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to leave minMax name here for now to make it more explicit so people easily know what stats are invalidated. But in the future, if there are more stats that cannot be tracked, yes I think we should rename it.

uint64_t sumBytes_{0};

uint32_t nonNullCount_{0};
Expand Down Expand Up @@ -320,11 +331,11 @@ class RowContainer {
/// a row with uninitialized keys for aggregates with no-op partial
/// aggregation.
void setAllNull(char* row) {
removeOrUpdateRowColumnStats(row, /*setToNull=*/true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to call removeOrUpdateRowColumnStats here? I saw setAllNull is only used for a new row in

intermediateRows_->setAllNull(intermediateGroups_[i]);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It breaks the related unit tests. So I guess although it is only used in new row for now, the assumption is that it can be used in other places?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We actually should implement setAllNull as add null for all the columns involved.

if (!nullOffsets_.empty()) {
memset(row + nullByte(nullOffsets_[0]), 0xff, initialNulls_.size());
bits::clearBit(row, freeFlagOffset_);
}
invalidateColumnStats();
}

/// The row size excluding any out-of-line stored variable length values.
Expand Down Expand Up @@ -817,13 +828,18 @@ class RowContainer {
/// invalidated. Any row erase operations will invalidate column stats.
std::optional<RowColumn::Stats> columnStats(int32_t columnIndex) const;

uint32_t columnNullCount(int32_t columnIndex) {
return rowColumnsStats_[columnIndex].nullCount();
}

const auto& keyTypes() const {
return keyTypes_;
}

/// Returns true if specified column may have nulls, false otherwise.
/// Returns true if specified column has nulls, false otherwise.
inline bool columnHasNulls(int32_t columnIndex) const {
return columnHasNulls_[columnIndex];
return columnStats(columnIndex)->numCells() > 0 &&
columnStats(columnIndex)->nullCount() > 0;
}

const std::vector<Accumulator>& accumulators() const {
Expand Down Expand Up @@ -990,6 +1006,13 @@ class RowContainer {
}
}

/// Removes or updates the column stats of a given row by updating each column
/// stats.
/// @param row - Points to the row to be removed or updated.
/// @param setToNull - If true, the row stats is set to a null row,
/// otherwise, the stats is erased from the columns stats.
void removeOrUpdateRowColumnStats(const char* row, bool setToNull);

char*& nextFree(char* row) const {
return *reinterpret_cast<char**>(row + kNextFreeOffset);
}
Expand All @@ -1015,7 +1038,6 @@ class RowContainer {
// Do not leave an uninitialized value in the case of a
// null. This is an error with valgrind/asan.
*reinterpret_cast<T*>(row + offset) = T();
updateColumnHasNulls(columnIndex, true);
return;
}
if constexpr (std::is_same_v<T, StringView>) {
Expand Down Expand Up @@ -1469,14 +1491,12 @@ class RowContainer {
// Updates column stats for serialized row.
inline void updateColumnStats(char* row, int32_t columnIndex);

// Light weight aggregated column stats does not support row erasures. This
// Min/max column stats do not support row erasures. This
// method is called whenever a row is erased.
void invalidateColumnStats();

// Updates the specific column's columnHasNulls_ flag, if 'hasNulls' is true.
// columnHasNulls_ flag is false by default.
inline void updateColumnHasNulls(int32_t columnIndex, bool hasNulls) {
columnHasNulls_[columnIndex] = columnHasNulls_[columnIndex] || hasNulls;
void invalidateMinMaxColumnStats() {
zation99 marked this conversation as resolved.
Show resolved Hide resolved
for (auto columnStats : rowColumnsStats_) {
columnStats.invalidateMinMaxColumnStats();
}
}

const std::vector<TypePtr> keyTypes_;
Expand All @@ -1487,8 +1507,6 @@ class RowContainer {

const std::unique_ptr<HashStringAllocator> stringAllocator_;

std::vector<bool> columnHasNulls_;

// Indicates if we can add new row to this row container. It is set to false
// after user calls 'getRowPartitions()' to create 'rowPartitions' object for
// parallel join build.
Expand All @@ -1513,7 +1531,7 @@ class RowContainer {
// Offset and null indicator offset of non-aggregate fields as a single word.
// Corresponds pairwise to 'types_'.
std::vector<RowColumn> rowColumns_;
// Optional aggregated column stats(e.g. min/max size) for non-aggregate
// Aggregated column stats(e.g. min/max size) for non-aggregate
// fields. Index aligns with 'rowColumns_'.
std::vector<RowColumn::Stats> rowColumnsStats_;
// Bit offset of the probed flag for a full or right outer join payload. 0 if
Expand Down Expand Up @@ -1643,7 +1661,6 @@ inline void RowContainer::storeWithNulls<TypeKind::HUGEINT>(
if (decoded.isNullAt(rowIndex)) {
row[nullByte] |= nullMask;
memset(row + offset, 0, sizeof(int128_t));
updateColumnHasNulls(columnIndex, true);
return;
}
HugeInt::serialize(decoded.valueAt<int128_t>(rowIndex), row + offset);
Expand Down
63 changes: 49 additions & 14 deletions velox/exec/tests/RowContainerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ class RowContainerTestHelper {
if (rowContainer_->types_[i]->isFixedWidth()) {
continue;
}
VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes());
VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes());
if (storedStats.minMaxColumnStatsValid()) {
VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes());
VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes());
}
VELOX_CHECK_EQ(expectedStats.sumBytes(), storedStats.sumBytes());
VELOX_CHECK_EQ(expectedStats.avgBytes(), storedStats.avgBytes());
VELOX_CHECK_EQ(
Expand Down Expand Up @@ -1974,25 +1976,25 @@ TEST_F(RowContainerTest, extractSerializedRow) {

auto rows = store(rowContainer, data);

for (int col = 0; col < rowType->size(); ++col) {
ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]);
}

// Extract serialized rows.
auto serialized = BaseVector::create<FlatVector<StringView>>(
VARBINARY(), data->size(), pool());
rowContainer.extractSerializedRows(
folly::Range(rows.data(), rows.size()), serialized);

for (int col = 0; col < rowType->size(); ++col) {
ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]);
}

rowContainer.clear();
// TODO: fix these once we reset the column stats when clear the row
// container.
for (int col = 0; col < rowType->size(); ++col) {
ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]);
ASSERT_EQ(rowContainer.columnHasNulls(col), false);
}
rows.clear();
for (int col = 0; col < rowType->size(); ++col) {
ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]);
}

// Load serialized rows back.
for (auto i = 0; i < data->size(); ++i) {
Expand Down Expand Up @@ -2171,7 +2173,7 @@ TEST_F(RowContainerTest, columnHasNulls) {
auto rowContainer =
makeRowContainer({BIGINT(), BIGINT()}, {BIGINT(), BIGINT()}, false);
for (int i = 0; i < rowContainer->columnTypes().size(); ++i) {
ASSERT_TRUE(!rowContainer->columnHasNulls(i));
ASSERT_FALSE(rowContainer->columnHasNulls(i));
}

const uint64_t kNumRows = 1000;
Expand Down Expand Up @@ -2554,11 +2556,11 @@ TEST_F(RowContainerTest, invalidatedColumnStats) {
invalidateFunc(rowContainer.get(), rows);
RowContainerTestHelper(rowContainer.get()).checkConsistency();

ASSERT_FALSE(rowContainer->columnStats(0).has_value());
ASSERT_FALSE(rowContainer->columnStats(1).has_value());
ASSERT_FALSE(rowContainer->columnStats(2).has_value());
ASSERT_FALSE(rowContainer->columnStats(3).has_value());
ASSERT_FALSE(rowContainer->columnStats(4).has_value());
ASSERT_TRUE(rowContainer->columnStats(0).has_value());
ASSERT_TRUE(rowContainer->columnStats(1).has_value());
ASSERT_TRUE(rowContainer->columnStats(2).has_value());
ASSERT_TRUE(rowContainer->columnStats(3).has_value());
ASSERT_TRUE(rowContainer->columnStats(4).has_value());
}
}

Expand Down Expand Up @@ -2621,6 +2623,22 @@ TEST_F(RowContainerTest, rowColumnStats) {
EXPECT_EQ(stats.nonNullCount(), 6);
EXPECT_EQ(stats.nullCount(), 4);
EXPECT_EQ(stats.numCells(), 10);

stats.removeOrUpdateCellStats(25, false, false);
EXPECT_EQ(stats.minMaxColumnStatsValid(), false);
EXPECT_EQ(stats.sumBytes(), 60);
EXPECT_EQ(stats.avgBytes(), 12);
EXPECT_EQ(stats.numCells(), 9);
EXPECT_EQ(stats.nonNullCount(), 5);
EXPECT_EQ(stats.nullCount(), 4);

stats.removeOrUpdateCellStats(0, true, false);
EXPECT_EQ(stats.minMaxColumnStatsValid(), false);
EXPECT_EQ(stats.sumBytes(), 60);
EXPECT_EQ(stats.avgBytes(), 12);
EXPECT_EQ(stats.numCells(), 8);
EXPECT_EQ(stats.nonNullCount(), 5);
EXPECT_EQ(stats.nullCount(), 3);
}

TEST_F(RowContainerTest, storeAndCollectColumnStats) {
Expand Down Expand Up @@ -2661,5 +2679,22 @@ TEST_F(RowContainerTest, storeAndCollectColumnStats) {
EXPECT_EQ(stats.avgBytes(), 13);
}
}

rowContainer->eraseRows(folly::Range(rows.data(), 10)); // there are 2 nulls
for (int i = 0; i < rowContainer->columnTypes().size(); ++i) {
const auto stats = rowContainer->columnStats(i).value();
EXPECT_EQ(stats.nonNullCount(), 849);
EXPECT_EQ(stats.nullCount(), 141);
EXPECT_EQ(stats.numCells(), kNumRows - 10);
if (rowVector->childAt(i)->typeKind() == TypeKind::VARCHAR) {
EXPECT_EQ(stats.sumBytes(), 11809);
EXPECT_EQ(stats.avgBytes(), 13);
}
}
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
rowContainer->clear();
for (int i = 0; i < rowContainer->columnTypes().size(); ++i) {
EXPECT_EQ(rowContainer->columnStats(i).value().numCells(), 0);
}
}

} // namespace facebook::velox::exec::test
Loading