From 9a3270bb0785cbb84dfd73a2c9120b136007eecc Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Fri, 15 Nov 2024 17:21:06 -0800 Subject: [PATCH] feat(hashjoin): Add fast path to list join result --- velox/exec/GroupingSet.cpp | 2 + velox/exec/HashProbe.cpp | 58 +++- velox/exec/HashProbe.h | 17 ++ velox/exec/HashTable.cpp | 36 ++- velox/exec/HashTable.h | 32 ++- velox/exec/RowContainer.cpp | 109 ++++--- velox/exec/RowContainer.h | 168 ++++++++--- velox/exec/StreamingAggregation.cpp | 1 + .../tests/AggregateSpillBenchmarkBase.cpp | 1 + velox/exec/tests/AggregationTest.cpp | 2 + velox/exec/tests/HashJoinTest.cpp | 74 +++-- velox/exec/tests/HashTableTest.cpp | 26 +- velox/exec/tests/RowContainerTest.cpp | 266 +++++++++++++++++- velox/exec/tests/utils/RowContainerTestBase.h | 1 + 14 files changed, 646 insertions(+), 147 deletions(-) diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 31d1d3e2ad0da..4613947f1f040 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -1064,6 +1064,7 @@ bool GroupingSet::getOutputWithSpill( false, false, false, + false, &pool_); initializeAggregates(aggregates_, *mergeRows_, false); @@ -1282,6 +1283,7 @@ void GroupingSet::abandonPartialAggregation() { false, false, false, + false, &pool_); initializeAggregates(aggregates_, *intermediateRows_, true); table_.reset(); diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index f8a820529fe88..e1ad73e57b8f3 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -292,6 +292,57 @@ void HashProbe::maybeSetupSpillInputReader( inputSpillPartitionSet_.erase(iter); } +std::optional HashProbe::estimatedRowSize( + const std::vector& varSizedColumns, + uint64_t totalFixedColumnsBytes) { + static const double kToleranceRatio = 10.0; + std::vector varSizeListColumnsStats; + varSizeListColumnsStats.reserve(varSizedColumns.size()); + for (uint32_t i = 0; i < varSizedColumns.size(); ++i) { + auto statsOpt = columnStats(varSizedColumns[i]); + if (!statsOpt.has_value()) { + return std::nullopt; + } + varSizeListColumnsStats.push_back(statsOpt.value()); + } + + uint64_t totalAvgBytes{totalFixedColumnsBytes}; + uint64_t totalMaxBytes{totalFixedColumnsBytes}; + for (const auto& stats : varSizeListColumnsStats) { + totalAvgBytes += stats.avgBytes(); + totalMaxBytes += stats.maxBytes(); + } + if (totalAvgBytes == 0) { + if (totalMaxBytes == 0) { + return 0; + } + // Return nullopt to prevent memory exploding in extreme size skew cases: + // e.g. 1 row very large and all other rows of size 0. + return std::nullopt; + } + if (totalMaxBytes / totalAvgBytes >= kToleranceRatio) { + return std::nullopt; + } + // Make the total per batch size to be bounded by 2x 'outputBatchSize_': + // worst case size = (outputBatchSize_ / estimated size) * totalMaxBytes + return (totalMaxBytes + totalAvgBytes) / 2; +} + +std::optional HashProbe::columnStats( + int32_t columnIndex) const { + std::vector columnStats; + const auto rowContainers = table_->allRows(); + for (const auto* rowContainer : rowContainers) { + VELOX_CHECK_NOT_NULL(rowContainer); + auto statsOpt = rowContainer->columnStats(columnIndex); + if (!statsOpt.has_value()) { + return std::nullopt; + } + columnStats.push_back(statsOpt.value()); + } + return RowColumn::Stats::merge(columnStats); +} + void HashProbe::initializeResultIter() { VELOX_CHECK_NOT_NULL(table_); if (resultIter_ != nullptr) { @@ -312,8 +363,12 @@ void HashProbe::initializeResultIter() { varSizeListColumns.push_back(column); } } + + // TODO: Make tolerance ratio configurable if needed. resultIter_ = std::make_unique( - std::move(varSizeListColumns), fixedSizeListColumnsSizeSum); + std::move(varSizeListColumns), + fixedSizeListColumnsSizeSum, + estimatedRowSize(varSizeListColumns, fixedSizeListColumnsSizeSum)); } void HashProbe::asyncWaitForHashTable() { @@ -1987,5 +2042,4 @@ void HashProbe::clearBuffers() { operatorCtx_->execCtx()->vectorPool()->clear(); filter_->clearCache(); } - } // namespace facebook::velox::exec diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 44775dacf5671..f4e0c20b0c985 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -312,6 +312,23 @@ class HashProbe : public Operator { // memory reclamation or operator close. void clearBuffers(); + // Returns the estimated row size of the projected output columns. nullopt + // will be returned if insufficient column stats is presented in 'table_', or + // the row size variation is too large. The row size is too large if ratio of + // max row size and avg row size is larger than 'kToleranceRatio' which is set + // to 10. + std::optional estimatedRowSize( + const std::vector& varColumnsStats, + uint64_t totalFixedColumnsBytes); + + // Returns the aggregated column stats at 'columnIndex' of 'table_'. Returns + // nullopt if the column stats is not available. + // + // NOTE: The column stats is collected by default for hash join table but it + // could be invalidated in case of spilling. But we should never expect usage + // of an invalidated table as we always spill the entire table. + std::optional columnStats(int32_t columnIndex) const; + // TODO: Define batch size as bytes based on RowContainer row sizes. const vector_size_t outputBatchSize_; diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 7fa52f394c262..707b5d784d4e3 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -73,6 +73,7 @@ HashTable::HashTable( isJoinBuild, hasProbedFlag, hashMode_ != HashMode::kHash, + isJoinBuild, pool); nextOffset_ = rows_->nextOffset(); } @@ -1826,10 +1827,9 @@ int32_t HashTable::listJoinResults( uint64_t maxBytes) { VELOX_CHECK_LE(inputRows.size(), hits.size()); - if (iter.varSizeListColumns.empty() && !hasDuplicates_) { - // When there is no duplicates, and no variable length columns are selected - // to be projected, we are able to calculate fixed length columns total size - // directly and go through fast path. + if (iter.estimatedRowSize.has_value() && !hasDuplicates_) { + // When there is no duplicates, and row size is estimable, we are able to + // go through fast path. return listJoinResultsFastPath( iter, includeMisses, inputRows, hits, maxBytes); } @@ -1859,9 +1859,10 @@ int32_t HashTable::listJoinResults( hits[numOut] = hit; numOut++; iter.lastRowIndex++; - totalBytes += - (joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) + - iter.fixedSizeListColumnsSizeSum); + totalBytes += iter.estimatedRowSize.has_value() + ? iter.estimatedRowSize.value() + : (joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) + + iter.fixedSizeListColumnsSizeSum); } else { const auto numRows = rows->size(); auto num = @@ -1873,11 +1874,16 @@ int32_t HashTable::listJoinResults( num * sizeof(char*)); iter.lastDuplicateRowIndex += num; numOut += num; - for (const auto* dupRow : *rows) { - totalBytes += - joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow); + if (iter.estimatedRowSize.has_value()) { + totalBytes += iter.estimatedRowSize.value() * numRows; + } else { + for (const auto* dupRow : *rows) { + totalBytes += + joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow) + + iter.fixedSizeListColumnsSizeSum; + } + totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows); } - totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows); if (iter.lastDuplicateRowIndex >= numRows) { iter.lastDuplicateRowIndex = 0; iter.lastRowIndex++; @@ -1900,8 +1906,8 @@ int32_t HashTable::listJoinResultsFastPath( int32_t numOut = 0; const auto maxOut = std::min( static_cast(inputRows.size()), - (iter.fixedSizeListColumnsSizeSum != 0 - ? maxBytes / iter.fixedSizeListColumnsSizeSum + (iter.estimatedRowSize.value() != 0 + ? maxBytes / iter.estimatedRowSize.value() : std::numeric_limits::max())); int32_t i = iter.lastRowIndex; const auto numRows = iter.rows->size(); @@ -1912,8 +1918,8 @@ int32_t HashTable::listJoinResultsFastPath( // We pass the pointers as int64_t's in 'hitWords'. auto resultHits = reinterpret_cast(hits.data()); auto resultRows = inputRows.data(); - const auto outLimit = maxOut - kWidth; - for (; i + kWidth <= numRows && numOut < outLimit; i += kWidth) { + const int64_t simdOutLimit = maxOut - kWidth; + for (; i + kWidth <= numRows && numOut < simdOutLimit; i += kWidth) { auto indices = simd::loadGatherIndices(sourceRows + i); auto hitWords = simd::gather(sourceHits, indices); auto misses = includeMisses ? 0 : simd::toBitMask(hitWords == 0); diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 434ca9570976f..56f045c52afc4 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -142,8 +142,10 @@ class BaseHashTable { struct JoinResultIterator { JoinResultIterator( std::vector&& _varSizeListColumns, - uint64_t _fixedSizeListColumnsSizeSum) - : varSizeListColumns(std::move(_varSizeListColumns)), + uint64_t _fixedSizeListColumnsSizeSum, + std::optional _estimatedRowSize) + : estimatedRowSize(_estimatedRowSize), + varSizeListColumns(std::move(_varSizeListColumns)), fixedSizeListColumnsSizeSum(_fixedSizeListColumnsSizeSum) {} void reset(const HashLookup& lookup) { @@ -157,6 +159,8 @@ class BaseHashTable { return !rows || lastRowIndex == rows->size(); } + /// The row size estimation of the projected output columns, if applicable. + const std::optional estimatedRowSize; /// The indexes of the build side projected columns that are variable sized. const std::vector varSizeListColumns; /// The per row total bytes of the build side projected columns that are @@ -635,18 +639,6 @@ class HashTable : public BaseHashTable { /// purpose. void checkConsistency() const; - auto& testingOtherTables() const { - return otherTables_; - } - - uint64_t testingRehashSize() const { - return rehashSize(); - } - - char** testingTable() const { - return table_; - } - void extractColumn( folly::Range rows, int32_t columnIndex, @@ -659,6 +651,18 @@ class HashTable : public BaseHashTable { result); } + auto& testingOtherTables() const { + return otherTables_; + } + + uint64_t testingRehashSize() const { + return rehashSize(); + } + + char** testingTable() const { + return table_; + } + private: // Enables debug stats for collisions for debug build. #ifdef NDEBUG diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 2fafa805e3d25..f763b5d98fafb 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -139,11 +139,13 @@ RowContainer::RowContainer( bool isJoinBuild, bool hasProbedFlag, bool hasNormalizedKeys, + bool collectColumnStats, memory::MemoryPool* pool) : keyTypes_(keyTypes), nullableKeys_(nullableKeys), isJoinBuild_(isJoinBuild), hasNormalizedKeys_(hasNormalizedKeys), + collectColumnStats_(collectColumnStats), stringAllocator_(std::make_unique(pool)), accumulators_(accumulators), rows_(pool) { @@ -283,6 +285,9 @@ RowContainer::RowContainer( ++nullOffsetsPos; } } + if (collectColumnStats_) { + rowColumnsStats_.resize(types_.size()); + } } RowContainer::~RowContainer() { @@ -344,6 +349,7 @@ void RowContainer::eraseRows(folly::Range rows) { firstFreeRow_ = row; } numFreeRows_ += rows.size(); + invalidateColumnStats(); } int32_t RowContainer::findRows(folly::Range rows, char** result) const { @@ -424,33 +430,6 @@ void RowContainer::freeVariableWidthFields(folly::Range rows) { } } -void RowContainer::checkConsistency() const { - constexpr int32_t kBatch = 1000; - std::vector rows(kBatch); - - RowContainerIterator iter; - int64_t allocatedRows = 0; - for (;;) { - int64_t numRows = listRows(&iter, kBatch, rows.data()); - if (!numRows) { - break; - } - for (auto i = 0; i < numRows; ++i) { - auto row = rows[i]; - VELOX_CHECK(!bits::isBitSet(row, freeFlagOffset_)); - ++allocatedRows; - } - } - - size_t numFree = 0; - for (auto free = firstFreeRow_; free; free = nextFree(free)) { - ++numFree; - VELOX_CHECK(bits::isBitSet(free, freeFlagOffset_)); - } - VELOX_CHECK_EQ(numFree, numFreeRows_); - VELOX_CHECK_EQ(allocatedRows, numRows_); -} - void RowContainer::freeAggregates(folly::Range rows) { for (auto& accumulator : accumulators_) { accumulator.destroy(rows); @@ -491,37 +470,93 @@ void RowContainer::freeRowsExtraMemory( numRows_ -= rows.size(); } +void RowContainer::invalidateColumnStats() { + if (!collectColumnStats_ || rowColumnsStats_.empty()) { + return; + } + rowColumnsStats_.clear(); +} + +// static +RowColumn::Stats RowColumn::Stats::merge( + const std::vector& statsList) { + RowColumn::Stats mergedStats; + for (const auto& stats : statsList) { + if (mergedStats.numCells() == 0) { + mergedStats.minBytes_ = stats.minBytes_; + mergedStats.maxBytes_ = stats.maxBytes_; + } else { + mergedStats.minBytes_ = std::min(mergedStats.minBytes_, stats.minBytes_); + mergedStats.maxBytes_ = std::max(mergedStats.maxBytes_, stats.maxBytes_); + } + mergedStats.nullCount_ += stats.nullCount_; + mergedStats.nonNullCount_ += stats.nonNullCount_; + mergedStats.sumBytes_ += stats.sumBytes_; + } + return mergedStats; +} + +std::optional RowContainer::columnStats( + int32_t columnIndex) const { + VELOX_CHECK(collectColumnStats_); + if (rowColumnsStats_.empty()) { + return std::nullopt; + } + return rowColumnsStats_[columnIndex]; +} + +void RowContainer::updateColumnStats( + const DecodedVector& decoded, + vector_size_t rowIndex, + char* row, + int32_t columnIndex) { + if (!collectColumnStats_ || rowColumnsStats_.empty()) { + // Column stats not enabled, or has been invalidated. + return; + } + + auto& columnStats = rowColumnsStats_[columnIndex]; + if (decoded.isNullAt(rowIndex)) { + columnStats.addNullCell(); + } else if (types_[columnIndex]->isFixedWidth()) { + columnStats.addCellSize(fixedSizeAt(columnIndex)); + } else { + columnStats.addCellSize(variableSizeAt(row, columnIndex)); + } +} + void RowContainer::store( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, char* row, - int32_t column) { + int32_t columnIndex) { auto numKeys = keyTypes_.size(); - bool isKey = column < numKeys; + bool isKey = columnIndex < numKeys; if (isKey && !nullableKeys_) { VELOX_DYNAMIC_TYPE_DISPATCH( storeNoNulls, - typeKinds_[column], + typeKinds_[columnIndex], decoded, - index, + rowIndex, isKey, row, - offsets_[column]); + offsets_[columnIndex]); } else { VELOX_DCHECK(isKey || accumulators_.empty()); - auto rowColumn = rowColumns_[column]; + auto rowColumn = rowColumns_[columnIndex]; VELOX_DYNAMIC_TYPE_DISPATCH_ALL( storeWithNulls, - typeKinds_[column], + typeKinds_[columnIndex], decoded, - index, + rowIndex, isKey, row, rowColumn.offset(), rowColumn.nullByte(), rowColumn.nullMask(), - column); + columnIndex); } + updateColumnStats(decoded, rowIndex, row, columnIndex); } void RowContainer::store( diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index f05b66e0c735d..771aa0f0a3500 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -24,6 +24,9 @@ #include "velox/vector/VectorTypeUtils.h" namespace facebook::velox::exec { +namespace test { +class RowContainerTestHelper; +} using NextRowVector = std::vector>; @@ -163,6 +166,71 @@ class RowColumn { return nullMask() << 1; } + /// Aggregated stats of a column in the 'RowContainer'. + class Stats { + public: + Stats() = default; + + void addCellSize(int32_t bytes) { + if (UNLIKELY(nonNullCount_ == 0)) { + minBytes_ = bytes; + maxBytes_ = bytes; + } else { + minBytes_ = std::min(minBytes_, bytes); + maxBytes_ = std::max(maxBytes_, bytes); + } + sumBytes_ += bytes; + ++nonNullCount_; + } + + void addNullCell() { + ++nullCount_; + } + + int32_t maxBytes() const { + return maxBytes_; + } + + int32_t minBytes() const { + return minBytes_; + } + + uint64_t sumBytes() const { + return sumBytes_; + } + + uint64_t avgBytes() const { + if (nonNullCount_ == 0) { + return 0; + } + return sumBytes_ / nonNullCount_; + } + + uint32_t nonNullCount() const { + return nonNullCount_; + } + + uint32_t nullCount() const { + return nullCount_; + } + + uint32_t numCells() const { + return nullCount_ + nonNullCount_; + } + + /// Merges multiple aggregated stats of the same column into a single one. + static Stats merge(const std::vector& statsList); + + private: + // Aggregated stats for non-null rows of the column. + int32_t minBytes_{0}; + int32_t maxBytes_{0}; + uint64_t sumBytes_{0}; + + uint32_t nonNullCount_{0}; + uint32_t nullCount_{0}; + }; + private: static uint64_t PackOffsets(int32_t offset, int32_t nullOffset) { if (nullOffset == kNotNullOffset) { @@ -205,6 +273,7 @@ class RowContainer { false, // isJoinBuild false, // hasProbedFlag false, // hasNormalizedKey + false, // collectColumnStats pool) {} ~RowContainer(); @@ -236,6 +305,7 @@ class RowContainer { bool isJoinBuild, bool hasProbedFlag, bool hasNormalizedKey, + bool collectColumnStats, memory::MemoryPool* pool); /// Allocates a new row and initializes possible aggregates to null. @@ -256,6 +326,7 @@ class RowContainer { memset(row + nullByte(nullOffsets_[0]), 0xff, initialNulls_.size()); bits::clearBit(row, freeFlagOffset_); } + invalidateColumnStats(); } /// The row size excluding any out-of-line stored variable length values. @@ -294,7 +365,7 @@ class RowContainer { /// Stores the 'index'th value in 'decoded' into 'row' at 'columnIndex'. void store( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, char* row, int32_t columnIndex); @@ -743,6 +814,11 @@ class RowContainer { return types_; } + /// Returns the aggregated column stats of the column with given + /// 'columnIndex'. nullopt will be returned if the column stats was previous + /// invalidated. Any row erase operations will invalidate column stats. + std::optional columnStats(int32_t columnIndex) const; + const auto& keyTypes() const { return keyTypes_; } @@ -760,10 +836,6 @@ class RowContainer { return *stringAllocator_; } - /// Checks that row and free row counts match and that free list membership is - /// consistent with free flag. - void checkConsistency() const; - static inline bool isNullAt(const char* row, int32_t nullByte, uint8_t nullMask) { return (row[nullByte] & nullMask) != 0; @@ -932,43 +1004,45 @@ class RowContainer { template inline void storeWithNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset, int32_t nullByte, uint8_t nullMask, - int32_t column) { + int32_t columnIndex) { using T = typename TypeTraits::NativeType; - if (decoded.isNullAt(index)) { + if (decoded.isNullAt(rowIndex)) { row[nullByte] |= nullMask; // Do not leave an uninitialized value in the case of a // null. This is an error with valgrind/asan. *reinterpret_cast(row + offset) = T(); - updateColumnHasNulls(column, true); + updateColumnHasNulls(columnIndex, true); return; } if constexpr (std::is_same_v) { RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_); - stringAllocator_->copyMultipart(decoded.valueAt(index), row, offset); + stringAllocator_->copyMultipart( + decoded.valueAt(rowIndex), row, offset); } else { - *reinterpret_cast(row + offset) = decoded.valueAt(index); + *reinterpret_cast(row + offset) = decoded.valueAt(rowIndex); } } template inline void storeNoNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, - char* group, + char* row, int32_t offset) { using T = typename TypeTraits::NativeType; if constexpr (std::is_same_v) { - RowSizeTracker tracker(group[rowSizeOffset_], *stringAllocator_); - stringAllocator_->copyMultipart(decoded.valueAt(index), group, offset); + RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_); + stringAllocator_->copyMultipart( + decoded.valueAt(rowIndex), row, offset); } else { - *reinterpret_cast(group + offset) = decoded.valueAt(index); + *reinterpret_cast(row + offset) = decoded.valueAt(rowIndex); } } @@ -1385,6 +1459,16 @@ class RowContainer { void freeRowsExtraMemory(folly::Range rows, bool freeNextRowVector); + inline void updateColumnStats( + const DecodedVector& decoded, + vector_size_t rowIndex, + char* row, + int32_t columnIndex); + + // Light weight aggregated column stats does not support row erasures. This + // method is called whenever a row is erased. + void invalidateColumnStats(); + // Updates the specific column's columnHasNulls_ flag, if 'hasNulls' is true. // columnHasNulls_ flag is false by default. inline void updateColumnHasNulls(int32_t columnIndex, bool hasNulls) { @@ -1396,6 +1480,8 @@ class RowContainer { const bool isJoinBuild_; // True if normalized keys are enabled in initial state. const bool hasNormalizedKeys_; + const bool collectColumnStats_; + const std::unique_ptr stringAllocator_; std::vector columnHasNulls_; @@ -1424,6 +1510,10 @@ class RowContainer { // Offset and null indicator offset of non-aggregate fields as a single word. // Corresponds pairwise to 'types_'. std::vector rowColumns_; + // Optional aggregated column stats(e.g. min/max size) for non-aggregate + // fields. Index aligns with 'rowColumns_'. Column stats will only be enabled + // if 'collectColumnStats_' is true. + std::vector rowColumnsStats_; // Bit offset of the probed flag for a full or right outer join payload. 0 if // not applicable. int32_t probedFlagOffset_ = 0; @@ -1455,6 +1545,8 @@ class RowContainer { memory::AllocationPool rows_; int alignment_ = 1; + + friend class test::RowContainerTestHelper; }; template <> @@ -1467,102 +1559,102 @@ inline int128_t RowContainer::valueAt( template <> inline void RowContainer::storeWithNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset, int32_t nullByte, uint8_t nullMask, - int32_t column) { + int32_t columnIndex) { storeComplexType( - decoded, index, isKey, row, offset, nullByte, nullMask, column); + decoded, rowIndex, isKey, row, offset, nullByte, nullMask, columnIndex); } template <> inline void RowContainer::storeNoNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset) { - storeComplexType(decoded, index, isKey, row, offset); + storeComplexType(decoded, rowIndex, isKey, row, offset); } template <> inline void RowContainer::storeWithNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset, int32_t nullByte, uint8_t nullMask, - int32_t column) { + int32_t columnIndex) { storeComplexType( - decoded, index, isKey, row, offset, nullByte, nullMask, column); + decoded, rowIndex, isKey, row, offset, nullByte, nullMask, columnIndex); } template <> inline void RowContainer::storeNoNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset) { - storeComplexType(decoded, index, isKey, row, offset); + storeComplexType(decoded, rowIndex, isKey, row, offset); } template <> inline void RowContainer::storeWithNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset, int32_t nullByte, uint8_t nullMask, - int32_t column) { + int32_t columnIndex) { storeComplexType( - decoded, index, isKey, row, offset, nullByte, nullMask, column); + decoded, rowIndex, isKey, row, offset, nullByte, nullMask, columnIndex); } template <> inline void RowContainer::storeNoNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset) { - storeComplexType(decoded, index, isKey, row, offset); + storeComplexType(decoded, rowIndex, isKey, row, offset); } template <> inline void RowContainer::storeWithNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool /*isKey*/, char* row, int32_t offset, int32_t nullByte, uint8_t nullMask, - int32_t column) { - if (decoded.isNullAt(index)) { + int32_t columnIndex) { + if (decoded.isNullAt(rowIndex)) { row[nullByte] |= nullMask; memset(row + offset, 0, sizeof(int128_t)); - updateColumnHasNulls(column, true); + updateColumnHasNulls(columnIndex, true); return; } - HugeInt::serialize(decoded.valueAt(index), row + offset); + HugeInt::serialize(decoded.valueAt(rowIndex), row + offset); } template <> inline void RowContainer::storeNoNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool /*isKey*/, char* row, int32_t offset) { - HugeInt::serialize(decoded.valueAt(index), row + offset); + HugeInt::serialize(decoded.valueAt(rowIndex), row + offset); } template <> diff --git a/velox/exec/StreamingAggregation.cpp b/velox/exec/StreamingAggregation.cpp index e4b0aa598c8e0..692432ca461d2 100644 --- a/velox/exec/StreamingAggregation.cpp +++ b/velox/exec/StreamingAggregation.cpp @@ -343,6 +343,7 @@ std::unique_ptr StreamingAggregation::makeRowContainer( false, false, false, + false, pool()); } diff --git a/velox/exec/tests/AggregateSpillBenchmarkBase.cpp b/velox/exec/tests/AggregateSpillBenchmarkBase.cpp index 8464157a7e096..7fd44fc0a13a5 100644 --- a/velox/exec/tests/AggregateSpillBenchmarkBase.cpp +++ b/velox/exec/tests/AggregateSpillBenchmarkBase.cpp @@ -40,6 +40,7 @@ std::unique_ptr makeRowContainer( false, // isJoinBuild false, // hasProbedFlag false, // hasNormalizedKey + false, // collectColumnStats pool.get()); } diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 6f55a5876fdd1..1e9dd7147ef0f 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -377,6 +377,7 @@ class AggregationTest : public OperatorTestBase { false, true, true, + false, pool_.get()); } @@ -3635,6 +3636,7 @@ TEST_F(AggregationTest, destroyAfterPartialInitialization) { false, // isJoinBuild false, // hasProbedFlag false, // hasNormalizedKeys + false, // collectColumnStats pool()); const auto rowColumn = rows.columnAt(0); agg.setOffsets( diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 9c6da3a9b437b..c3ddaf3f32836 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5564,27 +5564,42 @@ TEST_F(HashJoinTest, dynamicFilterOnPartitionKey) { } TEST_F(HashJoinTest, probeMemoryLimitOnBuildProjection) { + const uint64_t numBuildRows = 20; std::vector probeVectors = makeBatches(10, [&](int32_t /*unused*/) { - return makeRowVector( - {makeFlatVector(1'000, [](auto row) { return row % 5; })}); + return makeRowVector({makeFlatVector( + 1'000, [](auto row) { return row % 25; })}); }); - // Build side has 4KB + 4B per row. std::vector buildVectors = makeBatches(1, [&](int32_t /*unused*/) { return makeRowVector( - {"u_c0", "u_c1", {"u_c2"}}, - {makeFlatVector({0, 1, 2}), - makeFlatVector({ - std::string(4096, 'a'), - std::string(4096, 'b'), - std::string(4096, 'c'), - }), - makeFlatVector({ - std::string(4096, 'd'), - std::string(4096, 'e'), - std::string(4096, 'f'), + {"u_c0", "u_c1", "u_c2", "u_c3", "u_c4"}, + {makeFlatVector( + numBuildRows, [](auto row) { return row; }), + makeFlatVector( + numBuildRows, + [](auto /* row */) { return std::string(4096, 'a'); }), + makeFlatVector( + numBuildRows, + [](auto /* row */) { return std::string(4096, 'a'); }), + makeFlatVector( + numBuildRows, + [](auto row) { + // Row that has too large of size variation. + if (row == 0) { + return std::string(4096, 'a'); + } else { + return std::string(1, 'a'); + } + }), + makeFlatVector(numBuildRows, [](auto row) { + // Row that has tolerable size variation. + if (row == 0) { + return std::string(4096, 'a'); + } else { + return std::string(256, 'a'); + } })}); }); @@ -5592,29 +5607,38 @@ TEST_F(HashJoinTest, probeMemoryLimitOnBuildProjection) { createDuckDbTable("u", {buildVectors}); struct TestParam { - int32_t numVarSizeColumn; + std::vector varSizeColumns; int32_t numExpectedBatches; std::string referenceQuery; std::string debugString() const { - return fmt::format( - "numVarSizeColumn {}, numExpectedBatches {}, referenceQuery '{}'", - numVarSizeColumn, - numExpectedBatches, - referenceQuery); + std::stringstream ss; + ss << "varSizeColumns ["; + for (const auto& columnIndex : varSizeColumns) { + ss << columnIndex << ", "; + } + ss << "] "; + ss << "numExpectedBatches " << numExpectedBatches << ", referenceQuery '" + << referenceQuery << "'"; + return ss.str(); } }; std::vector testParams{ - {0, 10, "SELECT t.c0 FROM t JOIN u ON t.c0 = u.u_c0"}, - {1, 3000, "SELECT t.c0, u.u_c1 FROM t JOIN u ON t.c0 = u.u_c0"}, - {2, 6000, "SELECT t.c0, u.u_c1, u.u_c2 FROM t JOIN u ON t.c0 = u.u_c0"}}; + {{}, 10, "SELECT t.c0 FROM t JOIN u ON t.c0 = u.u_c0"}, + {{1}, 4000, "SELECT t.c0, u.u_c1 FROM t JOIN u ON t.c0 = u.u_c0"}, + {{1, 2}, + 8000, + "SELECT t.c0, u.u_c1, u.u_c2 FROM t JOIN u ON t.c0 = u.u_c0"}, + {{3}, 210, "SELECT t.c0, u.u_c3 FROM t JOIN u ON t.c0 = u.u_c0"}, + {{4}, 2670, "SELECT t.c0, u.u_c4 FROM t JOIN u ON t.c0 = u.u_c0"}}; + for (const auto& testParam : testParams) { SCOPED_TRACE(testParam.debugString()); core::PlanNodeId joinNodeId; std::vector outputLayout; outputLayout.push_back("c0"); - for (int32_t i = 0; i < testParam.numVarSizeColumn; i++) { - outputLayout.push_back(fmt::format("u_c{}", i + 1)); + for (int32_t i = 0; i < testParam.varSizeColumns.size(); i++) { + outputLayout.push_back(fmt::format("u_c{}", testParam.varSizeColumns[i])); } auto planNodeIdGenerator = std::make_shared(); auto plan = PlanBuilder(planNodeIdGenerator) diff --git a/velox/exec/tests/HashTableTest.cpp b/velox/exec/tests/HashTableTest.cpp index afd51b6d6e71c..630f259ec1319 100644 --- a/velox/exec/tests/HashTableTest.cpp +++ b/velox/exec/tests/HashTableTest.cpp @@ -887,6 +887,7 @@ TEST_P(HashTableTest, listJoinResultsSize) { } struct TestParam { + std::optional estimatedRowSize; std::vector varSizeListColumns; std::vector fixedSizeListColumns; uint64_t maxBytes; @@ -894,6 +895,9 @@ TEST_P(HashTableTest, listJoinResultsSize) { std::string debugString() const { std::stringstream ss; + ss << "estimatedRowSize " << estimatedRowSize.has_value() + ? std::to_string(estimatedRowSize.value()) + : "null"; ss << "varSizeListColumns "; ss << "["; for (auto i = 0; i < varSizeListColumns.size(); i++) { @@ -918,14 +922,17 @@ TEST_P(HashTableTest, listJoinResultsSize) { // Key types: BIGINT, VARCHAR, ROW(BIGINT, VARCHAR) // Dependent types: BIGINT, VARCHAR std::vector testParams{ - {{}, {0}, 1024, 128}, - {{1}, {}, 2048, 20}, - {{1}, {}, 1 << 20, 1024}, - {{1}, {}, 1 << 14, 154}, - {{1}, {0}, 2048, 18}, - {{}, {0, 3}, 1024, 64}, - {{2}, {}, 2048, 17}, - {{1, 2, 4}, {0, 3}, 1 << 14, 66}}; + {std::nullopt, {}, {0}, 1024, 128}, + {std::nullopt, {1}, {}, 2048, 20}, + {std::nullopt, {1}, {}, 1 << 20, 1024}, + {std::nullopt, {1}, {}, 1 << 14, 154}, + {std::nullopt, {1}, {0}, 2048, 18}, + {std::nullopt, {}, {0, 3}, 1024, 64}, + {std::nullopt, {2}, {}, 2048, 17}, + {std::nullopt, {1, 2, 4}, {0, 3}, 1 << 14, 66}, + {std::nullopt, {2}, {}, 2048, 17}, + {std::make_optional(128), {}, {0}, 1024, 8}, + {std::make_optional(0), {1}, {0}, 1024, 1024}}; for (const auto& testParam : testParams) { SCOPED_TRACE(testParam.debugString()); uint64_t fixedColumnSizeSum{0}; @@ -934,7 +941,8 @@ TEST_P(HashTableTest, listJoinResultsSize) { } BaseHashTable::JoinResultIterator iter( std::vector(testParam.varSizeListColumns), - fixedColumnSizeSum); + fixedColumnSizeSum, + testParam.estimatedRowSize); iter.reset(lookup); auto numRows = table->listJoinResults( iter, true, inputRows, outputRows, testParam.maxBytes); diff --git a/velox/exec/tests/RowContainerTest.cpp b/velox/exec/tests/RowContainerTest.cpp index b1ff6dc6a412b..5271347328ac8 100644 --- a/velox/exec/tests/RowContainerTest.cpp +++ b/velox/exec/tests/RowContainerTest.cpp @@ -27,6 +27,85 @@ using namespace facebook::velox::exec; using namespace facebook::velox::test; using namespace facebook::velox::common::testutil; +namespace facebook::velox::exec::test { +class RowContainerTestHelper { + public: + explicit RowContainerTestHelper(RowContainer* rowContainer) + : rowContainer_(rowContainer) {} + + void checkConsistency() const { + static constexpr int32_t kBatch = 1000; + std::vector rows(kBatch); + RowContainerIterator iter; + int64_t allocatedRows = 0; + std::vector columnsStats; + columnsStats.reserve(rowContainer_->rowColumnsStats_.size()); + columnsStats.resize(rowContainer_->rowColumnsStats_.size()); + const bool isColumnStatsValid = rowContainer_->collectColumnStats_ && + !rowContainer_->rowColumnsStats_.empty(); + for (;;) { + int64_t numRows = rowContainer_->listRows(&iter, kBatch, rows.data()); + if (!numRows) { + break; + } + for (auto rowIndex = 0; rowIndex < numRows; ++rowIndex) { + auto row = rows[rowIndex]; + VELOX_CHECK(!bits::isBitSet(row, rowContainer_->freeFlagOffset_)); + ++allocatedRows; + + if (isColumnStatsValid) { + for (uint32_t columnIndex = 0; + columnIndex < rowContainer_->rowColumnsStats_.size(); + columnIndex++) { + if (rowContainer_->types_[columnIndex]->isFixedWidth()) { + continue; + } + if (rowContainer_->isNullAt( + row, rowContainer_->columnAt(columnIndex))) { + columnsStats[columnIndex].addNullCell(); + } else { + columnsStats[columnIndex].addCellSize( + rowContainer_->variableSizeAt(row, columnIndex)); + } + } + } + } + } + + size_t numFree = 0; + for (auto free = rowContainer_->firstFreeRow_; free; + free = rowContainer_->nextFree(free)) { + ++numFree; + VELOX_CHECK(bits::isBitSet(free, rowContainer_->freeFlagOffset_)); + } + VELOX_CHECK_EQ(numFree, rowContainer_->numFreeRows_); + VELOX_CHECK_EQ(allocatedRows, rowContainer_->numRows_); + + if (isColumnStatsValid) { + VELOX_CHECK_EQ( + rowContainer_->types_.size(), rowContainer_->rowColumnsStats_.size()); + + for (uint32_t i = 0; i < rowContainer_->rowColumnsStats_.size(); i++) { + const auto& storedStats = rowContainer_->rowColumnsStats_[i]; + const auto& expectedStats = columnsStats[i]; + if (rowContainer_->types_[i]->isFixedWidth()) { + continue; + } + VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats->maxBytes()); + VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats->minBytes()); + VELOX_CHECK_EQ(expectedStats.sumBytes(), storedStats->sumBytes()); + VELOX_CHECK_EQ(expectedStats.avgBytes(), storedStats->avgBytes()); + VELOX_CHECK_EQ( + expectedStats.nonNullCount(), storedStats->nonNullCount()); + VELOX_CHECK_EQ(expectedStats.nullCount(), storedStats->nullCount()); + } + } + } + + private: + RowContainer* const rowContainer_; +}; + class RowContainerTest : public exec::test::RowContainerTestBase { protected: static void SetUpTestCase() { @@ -965,7 +1044,7 @@ TEST_F(RowContainerTest, types) { } } checkSizes(rows, *data); - data->checkConsistency(); + RowContainerTestHelper(data.get()).checkConsistency(); auto copy = BaseVector::create(batch->type(), batch->size(), pool_.get()); for (auto column = 0; column < batch->childrenSize(); ++column) { @@ -1149,7 +1228,7 @@ TEST_F(RowContainerTest, erase) { erased.push_back(rows[i]); } data->eraseRows(folly::Range(erased.data(), erased.size())); - data->checkConsistency(); + RowContainerTestHelper(data.get()).checkConsistency(); std::vector remaining(data->numRows()); iter.reset(); @@ -1164,13 +1243,13 @@ TEST_F(RowContainerTest, erase) { // The next row will be a new one. auto newRow = data->newRow(); EXPECT_EQ(rowSet.end(), rowSet.find(newRow)); - data->checkConsistency(); + RowContainerTestHelper(data.get()).checkConsistency(); data->clear(); EXPECT_EQ(0, data->numRows()); auto free = data->freeSpace(); EXPECT_EQ(0, free.first); EXPECT_EQ(0, free.second); - data->checkConsistency(); + RowContainerTestHelper(data.get()).checkConsistency(); } TEST_F(RowContainerTest, initialNulls) { @@ -1338,6 +1417,7 @@ TEST_F(RowContainerTest, alignment) { false, true, true, + true, pool_.get()); constexpr int kNumRows = 100; char* rows[kNumRows]; @@ -1505,6 +1585,7 @@ TEST_F(RowContainerTest, probedFlag) { true, // isJoinBuild true, // hasProbedFlag false, // hasNormalizedKey + true, // collectColumnStats pool_.get()); auto input = makeRowVector({ @@ -1662,8 +1743,8 @@ TEST_F(RowContainerTest, mixedFree) { data2->eraseRows(folly::Range(result2.data(), kNumRows)); EXPECT_EQ(0, data1->numRows()); EXPECT_EQ(0, data2->numRows()); - data1->checkConsistency(); - data2->checkConsistency(); + RowContainerTestHelper(data1.get()).checkConsistency(); + RowContainerTestHelper(data2.get()).checkConsistency(); } TEST_F(RowContainerTest, unknown) { @@ -1999,7 +2080,7 @@ TEST_F(RowContainerTest, nextRowVector) { data->eraseRows( folly::Range(erasingRows.data(), erasingRows.size())); validateNextRowVector(); - data->checkConsistency(); + RowContainerTestHelper(data.get()).checkConsistency(); }; nextRowVectorAppendValidation(); @@ -2340,3 +2421,174 @@ TEST_F(RowContainerTest, isNanAt) { } } } + +TEST_F(RowContainerTest, invalidatedColumnStats) { + constexpr int32_t kNumRows = 100; + auto batch = makeDataset( + ROW( + {{"bool_val", BOOLEAN()}, + {"int_val", INTEGER()}, + {"string_val", VARCHAR()}, + {"array_val", ARRAY(VARCHAR())}, + {"struct_val2", + ROW({{"s_int", INTEGER()}, {"s_array", ARRAY(REAL())}})}}), + kNumRows, + [](RowVectorPtr rows) { + auto strings = + rows->childAt( + rows->type()->as().getChildIdx("string_val")) + ->as>(); + for (auto i = 0; i < strings->size(); i += 11) { + std::string chars; + makeLargeString(i * 10000, chars); + strings->set(i, StringView(chars)); + } + }); + + const auto& types = batch->type()->as().children(); + std::vector keys; + // We have the same types twice, once as non-null keys, next as + // nullable non-keys. + keys.insert(keys.begin(), types.begin(), types.begin() + types.size() / 2); + std::vector dependents; + dependents.insert( + dependents.begin(), types.begin() + types.size() / 2, types.end()); + + auto createRowContainer = [&](std::vector& rows) { + auto data = makeRowContainer(keys, dependents); + + EXPECT_FALSE(data->columnStats(0).has_value()); + EXPECT_FALSE(data->columnStats(1).has_value()); + EXPECT_TRUE(data->columnStats(2).has_value()); + EXPECT_EQ(data->columnStats(2)->nonNullCount(), 0); + EXPECT_EQ(data->columnStats(2)->nullCount(), 0); + EXPECT_TRUE(data->columnStats(3).has_value()); + EXPECT_EQ(data->columnStats(3)->nonNullCount(), 0); + EXPECT_EQ(data->columnStats(3)->nullCount(), 0); + EXPECT_TRUE(data->columnStats(4).has_value()); + EXPECT_EQ(data->columnStats(4)->nonNullCount(), 0); + EXPECT_EQ(data->columnStats(4)->nullCount(), 0); + + for (int i = 0; i < kNumRows; ++i) { + auto row = data->newRow(); + } + EXPECT_EQ(kNumRows, data->numRows()); + RowContainerIterator iter; + + EXPECT_EQ(data->listRows(&iter, kNumRows, rows.data()), kNumRows); + EXPECT_EQ(data->listRows(&iter, kNumRows, rows.data()), 0); + + checkSizes(rows, *data); + SelectivityVector allRows(kNumRows); + for (auto column = 0; column < batch->childrenSize(); ++column) { + if (column < keys.size()) { + makeNonNull(batch, column); + EXPECT_EQ(data->columnAt(column).nullMask(), 0); + } else { + EXPECT_NE(data->columnAt(column).nullMask(), 0); + } + DecodedVector decoded(*batch->childAt(column), allRows); + for (auto index = 0; index < kNumRows; ++index) { + data->store(decoded, index, rows[index], column); + } + } + checkSizes(rows, *data); + RowContainerTestHelper(data.get()).checkConsistency(); + return data; + }; + + std::vector&)>> + invalidateFuncs{ + [](RowContainer* rowContainer, std::vector& rows) { + rowContainer->setAllNull(rows[0]); + }, + [](RowContainer* rowContainer, std::vector& rows) { + rowContainer->eraseRows(folly::Range(rows.data(), rows.size())); + }}; + for (auto& invalidateFunc : invalidateFuncs) { + std::vector rows(kNumRows); + auto rowContainer = createRowContainer(rows); + + ASSERT_FALSE(rowContainer->columnStats(0).has_value()); + ASSERT_FALSE(rowContainer->columnStats(1).has_value()); + ASSERT_TRUE(rowContainer->columnStats(2).has_value()); + ASSERT_GT( + rowContainer->columnStats(2)->nonNullCount() + + rowContainer->columnStats(2)->nullCount(), + 0); + ASSERT_TRUE(rowContainer->columnStats(3).has_value()); + ASSERT_GT( + rowContainer->columnStats(3)->nonNullCount() + + rowContainer->columnStats(3)->nullCount(), + 0); + ASSERT_TRUE(rowContainer->columnStats(4).has_value()); + ASSERT_GT( + rowContainer->columnStats(4)->nonNullCount() + + rowContainer->columnStats(4)->nullCount(), + 0); + + invalidateFunc(rowContainer.get(), rows); + RowContainerTestHelper(rowContainer.get()).checkConsistency(); + + ASSERT_FALSE(rowContainer->columnStats(0).has_value()); + ASSERT_FALSE(rowContainer->columnStats(1).has_value()); + ASSERT_FALSE(rowContainer->columnStats(2).has_value()); + ASSERT_FALSE(rowContainer->columnStats(3).has_value()); + ASSERT_FALSE(rowContainer->columnStats(4).has_value()); + } +} + +TEST_F(RowContainerTest, rowColumnStats) { + RowColumn::Stats stats; + EXPECT_EQ(stats.maxBytes(), 0); + EXPECT_EQ(stats.minBytes(), 0); + EXPECT_EQ(stats.sumBytes(), 0); + EXPECT_EQ(stats.avgBytes(), 0); + EXPECT_EQ(stats.nonNullCount(), 0); + EXPECT_EQ(stats.nullCount(), 0); + + stats.addCellSize(10); + EXPECT_EQ(stats.maxBytes(), 10); + EXPECT_EQ(stats.minBytes(), 10); + EXPECT_EQ(stats.sumBytes(), 10); + EXPECT_EQ(stats.avgBytes(), 10); + EXPECT_EQ(stats.nonNullCount(), 1); + EXPECT_EQ(stats.nullCount(), 0); + + stats.addCellSize(20); + EXPECT_EQ(stats.maxBytes(), 20); + EXPECT_EQ(stats.minBytes(), 10); + EXPECT_EQ(stats.sumBytes(), 30); + EXPECT_EQ(stats.avgBytes(), 15); + EXPECT_EQ(stats.nonNullCount(), 2); + EXPECT_EQ(stats.nullCount(), 0); + + stats.addCellSize(5); + EXPECT_EQ(stats.maxBytes(), 20); + EXPECT_EQ(stats.minBytes(), 5); + EXPECT_EQ(stats.sumBytes(), 35); + EXPECT_EQ(stats.avgBytes(), 11); + EXPECT_EQ(stats.nonNullCount(), 3); + EXPECT_EQ(stats.nullCount(), 0); + + stats.addNullCell(); + EXPECT_EQ(stats.nullCount(), 1); + EXPECT_EQ(stats.nonNullCount(), 3); + + stats.addNullCell(); + EXPECT_EQ(stats.nullCount(), 2); + EXPECT_EQ(stats.nonNullCount(), 3); + + stats.addCellSize(15); + stats.addNullCell(); + stats.addCellSize(25); + stats.addNullCell(); + stats.addCellSize(10); + EXPECT_EQ(stats.maxBytes(), 25); + EXPECT_EQ(stats.minBytes(), 5); + EXPECT_EQ(stats.sumBytes(), 85); + EXPECT_EQ(stats.avgBytes(), 14); + EXPECT_EQ(stats.nonNullCount(), 6); + EXPECT_EQ(stats.nullCount(), 4); +} +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/RowContainerTestBase.h b/velox/exec/tests/utils/RowContainerTestBase.h index c9d126fa869ef..c8ccfe33f06d2 100644 --- a/velox/exec/tests/utils/RowContainerTestBase.h +++ b/velox/exec/tests/utils/RowContainerTestBase.h @@ -86,6 +86,7 @@ class RowContainerTestBase : public testing::Test, isJoinBuild, true, true, + true, pool_.get()); VELOX_CHECK(container->testingMutable()); return container;