From 7a3099846d269e0230442dbbd67c5a26e9a33a29 Mon Sep 17 00:00:00 2001 From: Jiaqi Zhang Date: Thu, 19 Dec 2024 01:20:23 -0800 Subject: [PATCH] feat: Separate null count and minmax from column stats (#11860) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11860 PR to address #11741 - Removed the use of columnHasNulls in RowContainer and replaced them with row stats - Separate null count/sumBytes from minmax. In the case of rows erasure, only min/max is invalidated. Reviewed By: xiaoxmeng Differential Revision: D67229925 fbshipit-source-id: ef66a1a419942f0b1d699749849c19e05be378e7 --- velox/exec/HashProbe.cpp | 4 +- velox/exec/RowContainer.cpp | 58 +++++++++++++++++------- velox/exec/RowContainer.h | 47 +++++++++++++------- velox/exec/tests/RowContainerTest.cpp | 63 +++++++++++++++++++++------ 4 files changed, 126 insertions(+), 46 deletions(-) diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index a788cc8e2ae5..8fe132dc0dbb 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -302,8 +302,8 @@ std::optional HashProbe::estimatedRowSize( std::vector varSizeListColumnsStats; varSizeListColumnsStats.reserve(varSizedColumns.size()); for (uint32_t i = 0; i < varSizedColumns.size(); ++i) { - auto statsOpt = columnStats(varSizedColumns[i]); - if (!statsOpt.has_value()) { + const auto statsOpt = columnStats(varSizedColumns[i]); + if (!statsOpt.has_value() || !statsOpt->minMaxColumnStatsValid()) { return std::nullopt; } varSizeListColumnsStats.push_back(statsOpt.value()); diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 9e9ace14ee0f..0cd6d3d83173 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -188,7 +188,6 @@ RowContainer::RowContainer( if (nullableKeys_) { ++nullOffset; } - columnHasNulls_.push_back(false); } // Make offset at least sizeof pointer so that there is space for a // free list next pointer below the bit at 'freeFlagOffset_'. @@ -217,7 +216,6 @@ RowContainer::RowContainer( nullOffsets_.push_back(nullOffset); ++nullOffset; isVariableWidth |= !type->isFixedWidth(); - columnHasNulls_.push_back(false); } if (hasProbedFlag) { nullOffsets_.push_back(nullOffset); @@ -336,16 +334,35 @@ char* RowContainer::initializeRow(char* row, bool reuse) { return row; } +void RowContainer::removeOrUpdateRowColumnStats( + const char* row, + bool setToNull) { + // Update row column stats accordingly + for (auto i = 0; i < types_.size(); i++) { + if (isNullAt(row, columnAt(i))) { + rowColumnsStats_[i].removeOrUpdateCellStats(0, true, setToNull); + } else if (types_[i]->isFixedWidth()) { + rowColumnsStats_[i].removeOrUpdateCellStats( + fixedSizeAt(i), false, setToNull); + } else { + rowColumnsStats_[i].removeOrUpdateCellStats( + variableSizeAt(row, i), false, setToNull); + } + } + invalidateMinMaxColumnStats(); +} + void RowContainer::eraseRows(folly::Range rows) { freeRowsExtraMemory(rows, /*freeNextRowVector=*/true); for (auto* row : rows) { VELOX_CHECK(!bits::isBitSet(row, freeFlagOffset_), "Double free of row"); + removeOrUpdateRowColumnStats(row, /*setToNull=*/false); + bits::setBit(row, freeFlagOffset_); nextFree(row) = firstFreeRow_; firstFreeRow_ = row; } numFreeRows_ += rows.size(); - invalidateColumnStats(); } int32_t RowContainer::findRows(folly::Range rows, char** result) const { @@ -466,11 +483,26 @@ void RowContainer::freeRowsExtraMemory( numRows_ -= rows.size(); } -void RowContainer::invalidateColumnStats() { - if (rowColumnsStats_.empty()) { - return; +void RowColumn::Stats::removeOrUpdateCellStats( + int32_t bytes, + bool wasNull, + bool setToNull) { + // we only update nullCount, nonNullCount, and numBytes + // when the cell is removed. Because min/max need the + // full column data and not recorded in stats. + if (wasNull) { + VELOX_DCHECK_EQ(bytes, 0); + if (!setToNull) { + --nullCount_; + } + } else { + --nonNullCount_; + sumBytes_ -= bytes; + if (setToNull) { + ++nullCount_; + } } - rowColumnsStats_.clear(); + invalidateMinMaxColumnStats(); } // static @@ -522,12 +554,6 @@ void RowContainer::updateColumnStats( void RowContainer::updateColumnStats(char* row, int32_t columnIndex) { const bool nullColumn = isNullAt(row, rowColumns_[columnIndex]); - updateColumnHasNulls(columnIndex, nullColumn); - - if (rowColumnsStats_.empty()) { - // Column stats have been invalidated. - return; - } auto& columnStats = rowColumnsStats_[columnIndex]; if (nullColumn) { @@ -589,7 +615,7 @@ void RowContainer::store( offsets_[column], column); } else { - const auto rowColumn = rowColumns_[column]; + const auto& rowColumn = rowColumns_[column]; VELOX_DYNAMIC_TYPE_DISPATCH_ALL( storeWithNullsBatch, typeKinds_[column], @@ -836,7 +862,6 @@ void RowContainer::storeComplexType( if (decoded.isNullAt(index)) { VELOX_DCHECK(nullMask); row[nullByte] |= nullMask; - updateColumnHasNulls(column, true); return; } RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_); @@ -1009,6 +1034,9 @@ void RowContainer::clear() { normalizedKeySize_ = originalNormalizedKeySize_; numFreeRows_ = 0; firstFreeRow_ = nullptr; + + rowColumnsStats_.clear(); + rowColumnsStats_.resize(types_.size()); } void RowContainer::setProbedFlag(char** rows, int32_t numRows) { diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 2d3f65346717..13e5bcdb87b8 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -187,6 +187,8 @@ class RowColumn { ++nullCount_; } + void removeOrUpdateCellStats(int32_t bytes, bool wasNull, bool setToNull); + int32_t maxBytes() const { return maxBytes_; } @@ -218,6 +220,14 @@ class RowColumn { return nullCount_ + nonNullCount_; } + void invalidateMinMaxColumnStats() { + minMaxStatsValid_ = false; + } + + bool minMaxColumnStatsValid() const { + return minMaxStatsValid_; + } + /// Merges multiple aggregated stats of the same column into a single one. static Stats merge(const std::vector& statsList); @@ -225,6 +235,7 @@ class RowColumn { // Aggregated stats for non-null rows of the column. int32_t minBytes_{0}; int32_t maxBytes_{0}; + bool minMaxStatsValid_{true}; uint64_t sumBytes_{0}; uint32_t nonNullCount_{0}; @@ -320,11 +331,11 @@ class RowContainer { /// a row with uninitialized keys for aggregates with no-op partial /// aggregation. void setAllNull(char* row) { + removeOrUpdateRowColumnStats(row, /*setToNull=*/true); if (!nullOffsets_.empty()) { memset(row + nullByte(nullOffsets_[0]), 0xff, initialNulls_.size()); bits::clearBit(row, freeFlagOffset_); } - invalidateColumnStats(); } /// The row size excluding any out-of-line stored variable length values. @@ -817,13 +828,18 @@ class RowContainer { /// invalidated. Any row erase operations will invalidate column stats. std::optional columnStats(int32_t columnIndex) const; + uint32_t columnNullCount(int32_t columnIndex) { + return rowColumnsStats_[columnIndex].nullCount(); + } + const auto& keyTypes() const { return keyTypes_; } - /// Returns true if specified column may have nulls, false otherwise. + /// Returns true if specified column has nulls, false otherwise. inline bool columnHasNulls(int32_t columnIndex) const { - return columnHasNulls_[columnIndex]; + return columnStats(columnIndex)->numCells() > 0 && + columnStats(columnIndex)->nullCount() > 0; } const std::vector& accumulators() const { @@ -990,6 +1006,13 @@ class RowContainer { } } + /// Removes or updates the column stats of a given row by updating each column + /// stats. + /// @param row - Points to the row to be removed or updated. + /// @param setToNull - If true, the row stats is set to a null row, + /// otherwise, the stats is erased from the columns stats. + void removeOrUpdateRowColumnStats(const char* row, bool setToNull); + char*& nextFree(char* row) const { return *reinterpret_cast(row + kNextFreeOffset); } @@ -1015,7 +1038,6 @@ class RowContainer { // Do not leave an uninitialized value in the case of a // null. This is an error with valgrind/asan. *reinterpret_cast(row + offset) = T(); - updateColumnHasNulls(columnIndex, true); return; } if constexpr (std::is_same_v) { @@ -1469,14 +1491,12 @@ class RowContainer { // Updates column stats for serialized row. inline void updateColumnStats(char* row, int32_t columnIndex); - // Light weight aggregated column stats does not support row erasures. This + // Min/max column stats do not support row erasures. This // method is called whenever a row is erased. - void invalidateColumnStats(); - - // Updates the specific column's columnHasNulls_ flag, if 'hasNulls' is true. - // columnHasNulls_ flag is false by default. - inline void updateColumnHasNulls(int32_t columnIndex, bool hasNulls) { - columnHasNulls_[columnIndex] = columnHasNulls_[columnIndex] || hasNulls; + void invalidateMinMaxColumnStats() { + for (auto columnStats : rowColumnsStats_) { + columnStats.invalidateMinMaxColumnStats(); + } } const std::vector keyTypes_; @@ -1487,8 +1507,6 @@ class RowContainer { const std::unique_ptr stringAllocator_; - std::vector columnHasNulls_; - // Indicates if we can add new row to this row container. It is set to false // after user calls 'getRowPartitions()' to create 'rowPartitions' object for // parallel join build. @@ -1513,7 +1531,7 @@ class RowContainer { // Offset and null indicator offset of non-aggregate fields as a single word. // Corresponds pairwise to 'types_'. std::vector rowColumns_; - // Optional aggregated column stats(e.g. min/max size) for non-aggregate + // Aggregated column stats(e.g. min/max size) for non-aggregate // fields. Index aligns with 'rowColumns_'. std::vector rowColumnsStats_; // Bit offset of the probed flag for a full or right outer join payload. 0 if @@ -1643,7 +1661,6 @@ inline void RowContainer::storeWithNulls( if (decoded.isNullAt(rowIndex)) { row[nullByte] |= nullMask; memset(row + offset, 0, sizeof(int128_t)); - updateColumnHasNulls(columnIndex, true); return; } HugeInt::serialize(decoded.valueAt(rowIndex), row + offset); diff --git a/velox/exec/tests/RowContainerTest.cpp b/velox/exec/tests/RowContainerTest.cpp index 4189adc5de72..9d0451838a40 100644 --- a/velox/exec/tests/RowContainerTest.cpp +++ b/velox/exec/tests/RowContainerTest.cpp @@ -90,8 +90,10 @@ class RowContainerTestHelper { if (rowContainer_->types_[i]->isFixedWidth()) { continue; } - VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes()); - VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes()); + if (storedStats.minMaxColumnStatsValid()) { + VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes()); + VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes()); + } VELOX_CHECK_EQ(expectedStats.sumBytes(), storedStats.sumBytes()); VELOX_CHECK_EQ(expectedStats.avgBytes(), storedStats.avgBytes()); VELOX_CHECK_EQ( @@ -1974,25 +1976,25 @@ TEST_F(RowContainerTest, extractSerializedRow) { auto rows = store(rowContainer, data); + for (int col = 0; col < rowType->size(); ++col) { + ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]); + } + // Extract serialized rows. auto serialized = BaseVector::create>( VARBINARY(), data->size(), pool()); rowContainer.extractSerializedRows( folly::Range(rows.data(), rows.size()), serialized); + for (int col = 0; col < rowType->size(); ++col) { ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]); } rowContainer.clear(); - // TODO: fix these once we reset the column stats when clear the row - // container. for (int col = 0; col < rowType->size(); ++col) { - ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]); + ASSERT_EQ(rowContainer.columnHasNulls(col), false); } rows.clear(); - for (int col = 0; col < rowType->size(); ++col) { - ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]); - } // Load serialized rows back. for (auto i = 0; i < data->size(); ++i) { @@ -2171,7 +2173,7 @@ TEST_F(RowContainerTest, columnHasNulls) { auto rowContainer = makeRowContainer({BIGINT(), BIGINT()}, {BIGINT(), BIGINT()}, false); for (int i = 0; i < rowContainer->columnTypes().size(); ++i) { - ASSERT_TRUE(!rowContainer->columnHasNulls(i)); + ASSERT_FALSE(rowContainer->columnHasNulls(i)); } const uint64_t kNumRows = 1000; @@ -2554,11 +2556,11 @@ TEST_F(RowContainerTest, invalidatedColumnStats) { invalidateFunc(rowContainer.get(), rows); RowContainerTestHelper(rowContainer.get()).checkConsistency(); - ASSERT_FALSE(rowContainer->columnStats(0).has_value()); - ASSERT_FALSE(rowContainer->columnStats(1).has_value()); - ASSERT_FALSE(rowContainer->columnStats(2).has_value()); - ASSERT_FALSE(rowContainer->columnStats(3).has_value()); - ASSERT_FALSE(rowContainer->columnStats(4).has_value()); + ASSERT_TRUE(rowContainer->columnStats(0).has_value()); + ASSERT_TRUE(rowContainer->columnStats(1).has_value()); + ASSERT_TRUE(rowContainer->columnStats(2).has_value()); + ASSERT_TRUE(rowContainer->columnStats(3).has_value()); + ASSERT_TRUE(rowContainer->columnStats(4).has_value()); } } @@ -2621,6 +2623,22 @@ TEST_F(RowContainerTest, rowColumnStats) { EXPECT_EQ(stats.nonNullCount(), 6); EXPECT_EQ(stats.nullCount(), 4); EXPECT_EQ(stats.numCells(), 10); + + stats.removeOrUpdateCellStats(25, false, false); + EXPECT_EQ(stats.minMaxColumnStatsValid(), false); + EXPECT_EQ(stats.sumBytes(), 60); + EXPECT_EQ(stats.avgBytes(), 12); + EXPECT_EQ(stats.numCells(), 9); + EXPECT_EQ(stats.nonNullCount(), 5); + EXPECT_EQ(stats.nullCount(), 4); + + stats.removeOrUpdateCellStats(0, true, false); + EXPECT_EQ(stats.minMaxColumnStatsValid(), false); + EXPECT_EQ(stats.sumBytes(), 60); + EXPECT_EQ(stats.avgBytes(), 12); + EXPECT_EQ(stats.numCells(), 8); + EXPECT_EQ(stats.nonNullCount(), 5); + EXPECT_EQ(stats.nullCount(), 3); } TEST_F(RowContainerTest, storeAndCollectColumnStats) { @@ -2661,5 +2679,22 @@ TEST_F(RowContainerTest, storeAndCollectColumnStats) { EXPECT_EQ(stats.avgBytes(), 13); } } + + rowContainer->eraseRows(folly::Range(rows.data(), 10)); // there are 2 nulls + for (int i = 0; i < rowContainer->columnTypes().size(); ++i) { + const auto stats = rowContainer->columnStats(i).value(); + EXPECT_EQ(stats.nonNullCount(), 849); + EXPECT_EQ(stats.nullCount(), 141); + EXPECT_EQ(stats.numCells(), kNumRows - 10); + if (rowVector->childAt(i)->typeKind() == TypeKind::VARCHAR) { + EXPECT_EQ(stats.sumBytes(), 11809); + EXPECT_EQ(stats.avgBytes(), 13); + } + } + rowContainer->clear(); + for (int i = 0; i < rowContainer->columnTypes().size(); ++i) { + EXPECT_EQ(rowContainer->columnStats(i).value().numCells(), 0); + } } + } // namespace facebook::velox::exec::test