Skip to content

Commit

Permalink
feat: Separate null count and minmax from column stats (facebookincub…
Browse files Browse the repository at this point in the history
…ator#11860)

Summary:
Pull Request resolved: facebookincubator#11860

PR to address facebookincubator#11741

- Removed the use of columnHasNulls in RowContainer and replaced them with row stats
- Separate null count/sumBytes from minmax. In the case of rows erasure, only min/max is invalidated.

Reviewed By: xiaoxmeng

Differential Revision: D67229925

fbshipit-source-id: ef66a1a419942f0b1d699749849c19e05be378e7
  • Loading branch information
zation99 authored and facebook-github-bot committed Dec 19, 2024
1 parent f3585b1 commit 7a30998
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 46 deletions.
4 changes: 2 additions & 2 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ std::optional<uint64_t> HashProbe::estimatedRowSize(
std::vector<RowColumn::Stats> varSizeListColumnsStats;
varSizeListColumnsStats.reserve(varSizedColumns.size());
for (uint32_t i = 0; i < varSizedColumns.size(); ++i) {
auto statsOpt = columnStats(varSizedColumns[i]);
if (!statsOpt.has_value()) {
const auto statsOpt = columnStats(varSizedColumns[i]);
if (!statsOpt.has_value() || !statsOpt->minMaxColumnStatsValid()) {
return std::nullopt;
}
varSizeListColumnsStats.push_back(statsOpt.value());
Expand Down
58 changes: 43 additions & 15 deletions velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ RowContainer::RowContainer(
if (nullableKeys_) {
++nullOffset;
}
columnHasNulls_.push_back(false);
}
// Make offset at least sizeof pointer so that there is space for a
// free list next pointer below the bit at 'freeFlagOffset_'.
Expand Down Expand Up @@ -217,7 +216,6 @@ RowContainer::RowContainer(
nullOffsets_.push_back(nullOffset);
++nullOffset;
isVariableWidth |= !type->isFixedWidth();
columnHasNulls_.push_back(false);
}
if (hasProbedFlag) {
nullOffsets_.push_back(nullOffset);
Expand Down Expand Up @@ -336,16 +334,35 @@ char* RowContainer::initializeRow(char* row, bool reuse) {
return row;
}

void RowContainer::removeOrUpdateRowColumnStats(
const char* row,
bool setToNull) {
// Update row column stats accordingly
for (auto i = 0; i < types_.size(); i++) {
if (isNullAt(row, columnAt(i))) {
rowColumnsStats_[i].removeOrUpdateCellStats(0, true, setToNull);
} else if (types_[i]->isFixedWidth()) {
rowColumnsStats_[i].removeOrUpdateCellStats(
fixedSizeAt(i), false, setToNull);
} else {
rowColumnsStats_[i].removeOrUpdateCellStats(
variableSizeAt(row, i), false, setToNull);
}
}
invalidateMinMaxColumnStats();
}

void RowContainer::eraseRows(folly::Range<char**> rows) {
freeRowsExtraMemory(rows, /*freeNextRowVector=*/true);
for (auto* row : rows) {
VELOX_CHECK(!bits::isBitSet(row, freeFlagOffset_), "Double free of row");
removeOrUpdateRowColumnStats(row, /*setToNull=*/false);

bits::setBit(row, freeFlagOffset_);
nextFree(row) = firstFreeRow_;
firstFreeRow_ = row;
}
numFreeRows_ += rows.size();
invalidateColumnStats();
}

int32_t RowContainer::findRows(folly::Range<char**> rows, char** result) const {
Expand Down Expand Up @@ -466,11 +483,26 @@ void RowContainer::freeRowsExtraMemory(
numRows_ -= rows.size();
}

void RowContainer::invalidateColumnStats() {
if (rowColumnsStats_.empty()) {
return;
void RowColumn::Stats::removeOrUpdateCellStats(
int32_t bytes,
bool wasNull,
bool setToNull) {
// we only update nullCount, nonNullCount, and numBytes
// when the cell is removed. Because min/max need the
// full column data and not recorded in stats.
if (wasNull) {
VELOX_DCHECK_EQ(bytes, 0);
if (!setToNull) {
--nullCount_;
}
} else {
--nonNullCount_;
sumBytes_ -= bytes;
if (setToNull) {
++nullCount_;
}
}
rowColumnsStats_.clear();
invalidateMinMaxColumnStats();
}

// static
Expand Down Expand Up @@ -522,12 +554,6 @@ void RowContainer::updateColumnStats(

void RowContainer::updateColumnStats(char* row, int32_t columnIndex) {
const bool nullColumn = isNullAt(row, rowColumns_[columnIndex]);
updateColumnHasNulls(columnIndex, nullColumn);

if (rowColumnsStats_.empty()) {
// Column stats have been invalidated.
return;
}

auto& columnStats = rowColumnsStats_[columnIndex];
if (nullColumn) {
Expand Down Expand Up @@ -589,7 +615,7 @@ void RowContainer::store(
offsets_[column],
column);
} else {
const auto rowColumn = rowColumns_[column];
const auto& rowColumn = rowColumns_[column];
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
storeWithNullsBatch,
typeKinds_[column],
Expand Down Expand Up @@ -836,7 +862,6 @@ void RowContainer::storeComplexType(
if (decoded.isNullAt(index)) {
VELOX_DCHECK(nullMask);
row[nullByte] |= nullMask;
updateColumnHasNulls(column, true);
return;
}
RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_);
Expand Down Expand Up @@ -1009,6 +1034,9 @@ void RowContainer::clear() {
normalizedKeySize_ = originalNormalizedKeySize_;
numFreeRows_ = 0;
firstFreeRow_ = nullptr;

rowColumnsStats_.clear();
rowColumnsStats_.resize(types_.size());
}

void RowContainer::setProbedFlag(char** rows, int32_t numRows) {
Expand Down
47 changes: 32 additions & 15 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ class RowColumn {
++nullCount_;
}

void removeOrUpdateCellStats(int32_t bytes, bool wasNull, bool setToNull);

int32_t maxBytes() const {
return maxBytes_;
}
Expand Down Expand Up @@ -218,13 +220,22 @@ class RowColumn {
return nullCount_ + nonNullCount_;
}

void invalidateMinMaxColumnStats() {
minMaxStatsValid_ = false;
}

bool minMaxColumnStatsValid() const {
return minMaxStatsValid_;
}

/// Merges multiple aggregated stats of the same column into a single one.
static Stats merge(const std::vector<Stats>& statsList);

private:
// Aggregated stats for non-null rows of the column.
int32_t minBytes_{0};
int32_t maxBytes_{0};
bool minMaxStatsValid_{true};
uint64_t sumBytes_{0};

uint32_t nonNullCount_{0};
Expand Down Expand Up @@ -320,11 +331,11 @@ class RowContainer {
/// a row with uninitialized keys for aggregates with no-op partial
/// aggregation.
void setAllNull(char* row) {
removeOrUpdateRowColumnStats(row, /*setToNull=*/true);
if (!nullOffsets_.empty()) {
memset(row + nullByte(nullOffsets_[0]), 0xff, initialNulls_.size());
bits::clearBit(row, freeFlagOffset_);
}
invalidateColumnStats();
}

/// The row size excluding any out-of-line stored variable length values.
Expand Down Expand Up @@ -817,13 +828,18 @@ class RowContainer {
/// invalidated. Any row erase operations will invalidate column stats.
std::optional<RowColumn::Stats> columnStats(int32_t columnIndex) const;

uint32_t columnNullCount(int32_t columnIndex) {
return rowColumnsStats_[columnIndex].nullCount();
}

const auto& keyTypes() const {
return keyTypes_;
}

/// Returns true if specified column may have nulls, false otherwise.
/// Returns true if specified column has nulls, false otherwise.
inline bool columnHasNulls(int32_t columnIndex) const {
return columnHasNulls_[columnIndex];
return columnStats(columnIndex)->numCells() > 0 &&
columnStats(columnIndex)->nullCount() > 0;
}

const std::vector<Accumulator>& accumulators() const {
Expand Down Expand Up @@ -990,6 +1006,13 @@ class RowContainer {
}
}

/// Removes or updates the column stats of a given row by updating each column
/// stats.
/// @param row - Points to the row to be removed or updated.
/// @param setToNull - If true, the row stats is set to a null row,
/// otherwise, the stats is erased from the columns stats.
void removeOrUpdateRowColumnStats(const char* row, bool setToNull);

char*& nextFree(char* row) const {
return *reinterpret_cast<char**>(row + kNextFreeOffset);
}
Expand All @@ -1015,7 +1038,6 @@ class RowContainer {
// Do not leave an uninitialized value in the case of a
// null. This is an error with valgrind/asan.
*reinterpret_cast<T*>(row + offset) = T();
updateColumnHasNulls(columnIndex, true);
return;
}
if constexpr (std::is_same_v<T, StringView>) {
Expand Down Expand Up @@ -1469,14 +1491,12 @@ class RowContainer {
// Updates column stats for serialized row.
inline void updateColumnStats(char* row, int32_t columnIndex);

// Light weight aggregated column stats does not support row erasures. This
// Min/max column stats do not support row erasures. This
// method is called whenever a row is erased.
void invalidateColumnStats();

// Updates the specific column's columnHasNulls_ flag, if 'hasNulls' is true.
// columnHasNulls_ flag is false by default.
inline void updateColumnHasNulls(int32_t columnIndex, bool hasNulls) {
columnHasNulls_[columnIndex] = columnHasNulls_[columnIndex] || hasNulls;
void invalidateMinMaxColumnStats() {
for (auto columnStats : rowColumnsStats_) {
columnStats.invalidateMinMaxColumnStats();
}
}

const std::vector<TypePtr> keyTypes_;
Expand All @@ -1487,8 +1507,6 @@ class RowContainer {

const std::unique_ptr<HashStringAllocator> stringAllocator_;

std::vector<bool> columnHasNulls_;

// Indicates if we can add new row to this row container. It is set to false
// after user calls 'getRowPartitions()' to create 'rowPartitions' object for
// parallel join build.
Expand All @@ -1513,7 +1531,7 @@ class RowContainer {
// Offset and null indicator offset of non-aggregate fields as a single word.
// Corresponds pairwise to 'types_'.
std::vector<RowColumn> rowColumns_;
// Optional aggregated column stats(e.g. min/max size) for non-aggregate
// Aggregated column stats(e.g. min/max size) for non-aggregate
// fields. Index aligns with 'rowColumns_'.
std::vector<RowColumn::Stats> rowColumnsStats_;
// Bit offset of the probed flag for a full or right outer join payload. 0 if
Expand Down Expand Up @@ -1643,7 +1661,6 @@ inline void RowContainer::storeWithNulls<TypeKind::HUGEINT>(
if (decoded.isNullAt(rowIndex)) {
row[nullByte] |= nullMask;
memset(row + offset, 0, sizeof(int128_t));
updateColumnHasNulls(columnIndex, true);
return;
}
HugeInt::serialize(decoded.valueAt<int128_t>(rowIndex), row + offset);
Expand Down
63 changes: 49 additions & 14 deletions velox/exec/tests/RowContainerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ class RowContainerTestHelper {
if (rowContainer_->types_[i]->isFixedWidth()) {
continue;
}
VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes());
VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes());
if (storedStats.minMaxColumnStatsValid()) {
VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes());
VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes());
}
VELOX_CHECK_EQ(expectedStats.sumBytes(), storedStats.sumBytes());
VELOX_CHECK_EQ(expectedStats.avgBytes(), storedStats.avgBytes());
VELOX_CHECK_EQ(
Expand Down Expand Up @@ -1974,25 +1976,25 @@ TEST_F(RowContainerTest, extractSerializedRow) {

auto rows = store(rowContainer, data);

for (int col = 0; col < rowType->size(); ++col) {
ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]);
}

// Extract serialized rows.
auto serialized = BaseVector::create<FlatVector<StringView>>(
VARBINARY(), data->size(), pool());
rowContainer.extractSerializedRows(
folly::Range(rows.data(), rows.size()), serialized);

for (int col = 0; col < rowType->size(); ++col) {
ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]);
}

rowContainer.clear();
// TODO: fix these once we reset the column stats when clear the row
// container.
for (int col = 0; col < rowType->size(); ++col) {
ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]);
ASSERT_EQ(rowContainer.columnHasNulls(col), false);
}
rows.clear();
for (int col = 0; col < rowType->size(); ++col) {
ASSERT_EQ(rowContainer.columnHasNulls(col), expectedColumnHasNulls[col]);
}

// Load serialized rows back.
for (auto i = 0; i < data->size(); ++i) {
Expand Down Expand Up @@ -2171,7 +2173,7 @@ TEST_F(RowContainerTest, columnHasNulls) {
auto rowContainer =
makeRowContainer({BIGINT(), BIGINT()}, {BIGINT(), BIGINT()}, false);
for (int i = 0; i < rowContainer->columnTypes().size(); ++i) {
ASSERT_TRUE(!rowContainer->columnHasNulls(i));
ASSERT_FALSE(rowContainer->columnHasNulls(i));
}

const uint64_t kNumRows = 1000;
Expand Down Expand Up @@ -2554,11 +2556,11 @@ TEST_F(RowContainerTest, invalidatedColumnStats) {
invalidateFunc(rowContainer.get(), rows);
RowContainerTestHelper(rowContainer.get()).checkConsistency();

ASSERT_FALSE(rowContainer->columnStats(0).has_value());
ASSERT_FALSE(rowContainer->columnStats(1).has_value());
ASSERT_FALSE(rowContainer->columnStats(2).has_value());
ASSERT_FALSE(rowContainer->columnStats(3).has_value());
ASSERT_FALSE(rowContainer->columnStats(4).has_value());
ASSERT_TRUE(rowContainer->columnStats(0).has_value());
ASSERT_TRUE(rowContainer->columnStats(1).has_value());
ASSERT_TRUE(rowContainer->columnStats(2).has_value());
ASSERT_TRUE(rowContainer->columnStats(3).has_value());
ASSERT_TRUE(rowContainer->columnStats(4).has_value());
}
}

Expand Down Expand Up @@ -2621,6 +2623,22 @@ TEST_F(RowContainerTest, rowColumnStats) {
EXPECT_EQ(stats.nonNullCount(), 6);
EXPECT_EQ(stats.nullCount(), 4);
EXPECT_EQ(stats.numCells(), 10);

stats.removeOrUpdateCellStats(25, false, false);
EXPECT_EQ(stats.minMaxColumnStatsValid(), false);
EXPECT_EQ(stats.sumBytes(), 60);
EXPECT_EQ(stats.avgBytes(), 12);
EXPECT_EQ(stats.numCells(), 9);
EXPECT_EQ(stats.nonNullCount(), 5);
EXPECT_EQ(stats.nullCount(), 4);

stats.removeOrUpdateCellStats(0, true, false);
EXPECT_EQ(stats.minMaxColumnStatsValid(), false);
EXPECT_EQ(stats.sumBytes(), 60);
EXPECT_EQ(stats.avgBytes(), 12);
EXPECT_EQ(stats.numCells(), 8);
EXPECT_EQ(stats.nonNullCount(), 5);
EXPECT_EQ(stats.nullCount(), 3);
}

TEST_F(RowContainerTest, storeAndCollectColumnStats) {
Expand Down Expand Up @@ -2661,5 +2679,22 @@ TEST_F(RowContainerTest, storeAndCollectColumnStats) {
EXPECT_EQ(stats.avgBytes(), 13);
}
}

rowContainer->eraseRows(folly::Range(rows.data(), 10)); // there are 2 nulls
for (int i = 0; i < rowContainer->columnTypes().size(); ++i) {
const auto stats = rowContainer->columnStats(i).value();
EXPECT_EQ(stats.nonNullCount(), 849);
EXPECT_EQ(stats.nullCount(), 141);
EXPECT_EQ(stats.numCells(), kNumRows - 10);
if (rowVector->childAt(i)->typeKind() == TypeKind::VARCHAR) {
EXPECT_EQ(stats.sumBytes(), 11809);
EXPECT_EQ(stats.avgBytes(), 13);
}
}
rowContainer->clear();
for (int i = 0; i < rowContainer->columnTypes().size(); ++i) {
EXPECT_EQ(rowContainer->columnStats(i).value().numCells(), 0);
}
}

} // namespace facebook::velox::exec::test

0 comments on commit 7a30998

Please sign in to comment.