From ccfd4e738d5bc542b3e9097911cdbc3fce0d3f57 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Fri, 15 Nov 2024 17:21:06 -0800 Subject: [PATCH] feat(hashjoin): Add fast path to list join result --- .../storage_adapters/test_common/InsertTest.h | 2 +- .../hive/tests/HivePartitionFunctionTest.cpp | 3 +- .../hive/tests/HivePartitionUtilTest.cpp | 5 +- .../hive/tests/PartitionIdGeneratorTest.cpp | 2 +- velox/dwio/common/tests/ReaderTest.cpp | 2 +- velox/dwio/parquet/tests/ParquetTestBase.h | 3 +- .../parquet/tests/reader/E2EFilterTest.cpp | 4 +- velox/exec/GroupingSet.cpp | 2 + velox/exec/HashProbe.cpp | 58 +++- velox/exec/HashProbe.h | 17 ++ velox/exec/HashTable.cpp | 36 ++- velox/exec/HashTable.h | 32 +- velox/exec/RowContainer.cpp | 109 ++++--- velox/exec/RowContainer.h | 168 ++++++++--- velox/exec/StreamingAggregation.cpp | 1 + .../HashJoinListResultBenchmark.cpp | 4 +- .../benchmarks/SetAccumulatorBenchmark.cpp | 3 +- .../tests/AddressableNonNullValueListTest.cpp | 2 +- .../tests/AggregateSpillBenchmarkBase.cpp | 1 + velox/exec/tests/AggregationTest.cpp | 2 + velox/exec/tests/HashBitRangeTest.cpp | 4 +- velox/exec/tests/HashJoinTest.cpp | 74 +++-- .../exec/tests/HashPartitionFunctionTest.cpp | 3 +- velox/exec/tests/HashTableTest.cpp | 27 +- velox/exec/tests/PlanNodeToStringTest.cpp | 4 +- velox/exec/tests/PrestoQueryRunnerTest.cpp | 1 + .../tests/RoundRobinPartitionFunctionTest.cpp | 3 +- velox/exec/tests/RowContainerTest.cpp | 275 +++++++++++++++++- velox/exec/tests/SpillTest.cpp | 1 + .../exec/tests/UnorderedStreamReaderTest.cpp | 3 +- velox/exec/tests/utils/RowContainerTestBase.h | 1 + velox/expression/tests/EvalErrorsTest.cpp | 3 +- .../lib/tests/LambdaFunctionUtilTest.cpp | 2 +- .../aggregates/tests/MapAccumulatorTest.cpp | 3 +- .../tests/SimpleComparisonMatcherTest.cpp | 2 +- .../fuzzer/tests/SparkQueryRunnerTest.cpp | 1 + .../tests/CompactRowSerializerTest.cpp | 2 +- .../tests/UnsafeRowSerializerTest.cpp | 3 +- velox/vector/tests/EncodingTest.cpp | 3 +- .../tests/VectorEstimateFlatSizeTest.cpp | 5 +- velox/vector/tests/VectorPoolTest.cpp | 3 +- .../tests/VectorPrepareForReuseTest.cpp | 3 +- velox/vector/tests/VectorTest.cpp | 2 +- 43 files changed, 709 insertions(+), 175 deletions(-) diff --git a/velox/connectors/hive/storage_adapters/test_common/InsertTest.h b/velox/connectors/hive/storage_adapters/test_common/InsertTest.h index e215ea871b837..178d08a0670be 100644 --- a/velox/connectors/hive/storage_adapters/test_common/InsertTest.h +++ b/velox/connectors/hive/storage_adapters/test_common/InsertTest.h @@ -28,7 +28,7 @@ namespace facebook::velox::test { -class InsertTest : public test::VectorTestBase { +class InsertTest : public velox::test::VectorTestBase { public: void runInsertTest( std::string_view outputDirectory, diff --git a/velox/connectors/hive/tests/HivePartitionFunctionTest.cpp b/velox/connectors/hive/tests/HivePartitionFunctionTest.cpp index 20a090cea5b75..610e0d177ef66 100644 --- a/velox/connectors/hive/tests/HivePartitionFunctionTest.cpp +++ b/velox/connectors/hive/tests/HivePartitionFunctionTest.cpp @@ -21,10 +21,11 @@ #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; class HivePartitionFunctionTest : public ::testing::Test, - public test::VectorTestBase { + public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/connectors/hive/tests/HivePartitionUtilTest.cpp b/velox/connectors/hive/tests/HivePartitionUtilTest.cpp index 187e5c49b9755..7942233b615a0 100644 --- a/velox/connectors/hive/tests/HivePartitionUtilTest.cpp +++ b/velox/connectors/hive/tests/HivePartitionUtilTest.cpp @@ -21,12 +21,13 @@ #include "gtest/gtest.h" -using namespace facebook::velox::connector::hive; +using namespace facebook; using namespace facebook::velox; +using namespace facebook::velox::connector::hive; using namespace facebook::velox::dwio::catalog::fbhive; class HivePartitionUtilTest : public ::testing::Test, - public test::VectorTestBase { + public velox::test::VectorTestBase { protected: template VectorPtr makeDictionary(const std::vector& data) { diff --git a/velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp b/velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp index 7f980cffe7b52..d544d071797f8 100644 --- a/velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp +++ b/velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp @@ -23,7 +23,7 @@ namespace facebook::velox::connector::hive { class PartitionIdGeneratorTest : public ::testing::Test, - public test::VectorTestBase { + public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/dwio/common/tests/ReaderTest.cpp b/velox/dwio/common/tests/ReaderTest.cpp index f4d658583e73f..c6bb23d0f6008 100644 --- a/velox/dwio/common/tests/ReaderTest.cpp +++ b/velox/dwio/common/tests/ReaderTest.cpp @@ -25,7 +25,7 @@ namespace { using namespace facebook::velox::common; -class ReaderTest : public testing::Test, public test::VectorTestBase { +class ReaderTest : public testing::Test, public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/dwio/parquet/tests/ParquetTestBase.h b/velox/dwio/parquet/tests/ParquetTestBase.h index 0195447dfbda2..9b04ad56b5005 100644 --- a/velox/dwio/parquet/tests/ParquetTestBase.h +++ b/velox/dwio/parquet/tests/ParquetTestBase.h @@ -31,7 +31,8 @@ namespace facebook::velox::parquet { -class ParquetTestBase : public testing::Test, public test::VectorTestBase { +class ParquetTestBase : public testing::Test, + public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 8450b01523b4e..1e635e68a7060 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -22,6 +22,7 @@ #include +using namespace facebook; using namespace facebook::velox; using namespace facebook::velox::common; using namespace facebook::velox::dwio::common; @@ -29,7 +30,8 @@ using namespace facebook::velox::parquet; using dwio::common::MemorySink; -class E2EFilterTest : public E2EFilterTestBase, public test::VectorTestBase { +class E2EFilterTest : public E2EFilterTestBase, + public velox::test::VectorTestBase { protected: void SetUp() override { E2EFilterTestBase::SetUp(); diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 31d1d3e2ad0da..4613947f1f040 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -1064,6 +1064,7 @@ bool GroupingSet::getOutputWithSpill( false, false, false, + false, &pool_); initializeAggregates(aggregates_, *mergeRows_, false); @@ -1282,6 +1283,7 @@ void GroupingSet::abandonPartialAggregation() { false, false, false, + false, &pool_); initializeAggregates(aggregates_, *intermediateRows_, true); table_.reset(); diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index f8a820529fe88..e1ad73e57b8f3 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -292,6 +292,57 @@ void HashProbe::maybeSetupSpillInputReader( inputSpillPartitionSet_.erase(iter); } +std::optional HashProbe::estimatedRowSize( + const std::vector& varSizedColumns, + uint64_t totalFixedColumnsBytes) { + static const double kToleranceRatio = 10.0; + std::vector varSizeListColumnsStats; + varSizeListColumnsStats.reserve(varSizedColumns.size()); + for (uint32_t i = 0; i < varSizedColumns.size(); ++i) { + auto statsOpt = columnStats(varSizedColumns[i]); + if (!statsOpt.has_value()) { + return std::nullopt; + } + varSizeListColumnsStats.push_back(statsOpt.value()); + } + + uint64_t totalAvgBytes{totalFixedColumnsBytes}; + uint64_t totalMaxBytes{totalFixedColumnsBytes}; + for (const auto& stats : varSizeListColumnsStats) { + totalAvgBytes += stats.avgBytes(); + totalMaxBytes += stats.maxBytes(); + } + if (totalAvgBytes == 0) { + if (totalMaxBytes == 0) { + return 0; + } + // Return nullopt to prevent memory exploding in extreme size skew cases: + // e.g. 1 row very large and all other rows of size 0. + return std::nullopt; + } + if (totalMaxBytes / totalAvgBytes >= kToleranceRatio) { + return std::nullopt; + } + // Make the total per batch size to be bounded by 2x 'outputBatchSize_': + // worst case size = (outputBatchSize_ / estimated size) * totalMaxBytes + return (totalMaxBytes + totalAvgBytes) / 2; +} + +std::optional HashProbe::columnStats( + int32_t columnIndex) const { + std::vector columnStats; + const auto rowContainers = table_->allRows(); + for (const auto* rowContainer : rowContainers) { + VELOX_CHECK_NOT_NULL(rowContainer); + auto statsOpt = rowContainer->columnStats(columnIndex); + if (!statsOpt.has_value()) { + return std::nullopt; + } + columnStats.push_back(statsOpt.value()); + } + return RowColumn::Stats::merge(columnStats); +} + void HashProbe::initializeResultIter() { VELOX_CHECK_NOT_NULL(table_); if (resultIter_ != nullptr) { @@ -312,8 +363,12 @@ void HashProbe::initializeResultIter() { varSizeListColumns.push_back(column); } } + + // TODO: Make tolerance ratio configurable if needed. resultIter_ = std::make_unique( - std::move(varSizeListColumns), fixedSizeListColumnsSizeSum); + std::move(varSizeListColumns), + fixedSizeListColumnsSizeSum, + estimatedRowSize(varSizeListColumns, fixedSizeListColumnsSizeSum)); } void HashProbe::asyncWaitForHashTable() { @@ -1987,5 +2042,4 @@ void HashProbe::clearBuffers() { operatorCtx_->execCtx()->vectorPool()->clear(); filter_->clearCache(); } - } // namespace facebook::velox::exec diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 44775dacf5671..f4e0c20b0c985 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -312,6 +312,23 @@ class HashProbe : public Operator { // memory reclamation or operator close. void clearBuffers(); + // Returns the estimated row size of the projected output columns. nullopt + // will be returned if insufficient column stats is presented in 'table_', or + // the row size variation is too large. The row size is too large if ratio of + // max row size and avg row size is larger than 'kToleranceRatio' which is set + // to 10. + std::optional estimatedRowSize( + const std::vector& varColumnsStats, + uint64_t totalFixedColumnsBytes); + + // Returns the aggregated column stats at 'columnIndex' of 'table_'. Returns + // nullopt if the column stats is not available. + // + // NOTE: The column stats is collected by default for hash join table but it + // could be invalidated in case of spilling. But we should never expect usage + // of an invalidated table as we always spill the entire table. + std::optional columnStats(int32_t columnIndex) const; + // TODO: Define batch size as bytes based on RowContainer row sizes. const vector_size_t outputBatchSize_; diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 7fa52f394c262..707b5d784d4e3 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -73,6 +73,7 @@ HashTable::HashTable( isJoinBuild, hasProbedFlag, hashMode_ != HashMode::kHash, + isJoinBuild, pool); nextOffset_ = rows_->nextOffset(); } @@ -1826,10 +1827,9 @@ int32_t HashTable::listJoinResults( uint64_t maxBytes) { VELOX_CHECK_LE(inputRows.size(), hits.size()); - if (iter.varSizeListColumns.empty() && !hasDuplicates_) { - // When there is no duplicates, and no variable length columns are selected - // to be projected, we are able to calculate fixed length columns total size - // directly and go through fast path. + if (iter.estimatedRowSize.has_value() && !hasDuplicates_) { + // When there is no duplicates, and row size is estimable, we are able to + // go through fast path. return listJoinResultsFastPath( iter, includeMisses, inputRows, hits, maxBytes); } @@ -1859,9 +1859,10 @@ int32_t HashTable::listJoinResults( hits[numOut] = hit; numOut++; iter.lastRowIndex++; - totalBytes += - (joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) + - iter.fixedSizeListColumnsSizeSum); + totalBytes += iter.estimatedRowSize.has_value() + ? iter.estimatedRowSize.value() + : (joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) + + iter.fixedSizeListColumnsSizeSum); } else { const auto numRows = rows->size(); auto num = @@ -1873,11 +1874,16 @@ int32_t HashTable::listJoinResults( num * sizeof(char*)); iter.lastDuplicateRowIndex += num; numOut += num; - for (const auto* dupRow : *rows) { - totalBytes += - joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow); + if (iter.estimatedRowSize.has_value()) { + totalBytes += iter.estimatedRowSize.value() * numRows; + } else { + for (const auto* dupRow : *rows) { + totalBytes += + joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow) + + iter.fixedSizeListColumnsSizeSum; + } + totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows); } - totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows); if (iter.lastDuplicateRowIndex >= numRows) { iter.lastDuplicateRowIndex = 0; iter.lastRowIndex++; @@ -1900,8 +1906,8 @@ int32_t HashTable::listJoinResultsFastPath( int32_t numOut = 0; const auto maxOut = std::min( static_cast(inputRows.size()), - (iter.fixedSizeListColumnsSizeSum != 0 - ? maxBytes / iter.fixedSizeListColumnsSizeSum + (iter.estimatedRowSize.value() != 0 + ? maxBytes / iter.estimatedRowSize.value() : std::numeric_limits::max())); int32_t i = iter.lastRowIndex; const auto numRows = iter.rows->size(); @@ -1912,8 +1918,8 @@ int32_t HashTable::listJoinResultsFastPath( // We pass the pointers as int64_t's in 'hitWords'. auto resultHits = reinterpret_cast(hits.data()); auto resultRows = inputRows.data(); - const auto outLimit = maxOut - kWidth; - for (; i + kWidth <= numRows && numOut < outLimit; i += kWidth) { + const int64_t simdOutLimit = maxOut - kWidth; + for (; i + kWidth <= numRows && numOut < simdOutLimit; i += kWidth) { auto indices = simd::loadGatherIndices(sourceRows + i); auto hitWords = simd::gather(sourceHits, indices); auto misses = includeMisses ? 0 : simd::toBitMask(hitWords == 0); diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 434ca9570976f..56f045c52afc4 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -142,8 +142,10 @@ class BaseHashTable { struct JoinResultIterator { JoinResultIterator( std::vector&& _varSizeListColumns, - uint64_t _fixedSizeListColumnsSizeSum) - : varSizeListColumns(std::move(_varSizeListColumns)), + uint64_t _fixedSizeListColumnsSizeSum, + std::optional _estimatedRowSize) + : estimatedRowSize(_estimatedRowSize), + varSizeListColumns(std::move(_varSizeListColumns)), fixedSizeListColumnsSizeSum(_fixedSizeListColumnsSizeSum) {} void reset(const HashLookup& lookup) { @@ -157,6 +159,8 @@ class BaseHashTable { return !rows || lastRowIndex == rows->size(); } + /// The row size estimation of the projected output columns, if applicable. + const std::optional estimatedRowSize; /// The indexes of the build side projected columns that are variable sized. const std::vector varSizeListColumns; /// The per row total bytes of the build side projected columns that are @@ -635,18 +639,6 @@ class HashTable : public BaseHashTable { /// purpose. void checkConsistency() const; - auto& testingOtherTables() const { - return otherTables_; - } - - uint64_t testingRehashSize() const { - return rehashSize(); - } - - char** testingTable() const { - return table_; - } - void extractColumn( folly::Range rows, int32_t columnIndex, @@ -659,6 +651,18 @@ class HashTable : public BaseHashTable { result); } + auto& testingOtherTables() const { + return otherTables_; + } + + uint64_t testingRehashSize() const { + return rehashSize(); + } + + char** testingTable() const { + return table_; + } + private: // Enables debug stats for collisions for debug build. #ifdef NDEBUG diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 2fafa805e3d25..f763b5d98fafb 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -139,11 +139,13 @@ RowContainer::RowContainer( bool isJoinBuild, bool hasProbedFlag, bool hasNormalizedKeys, + bool collectColumnStats, memory::MemoryPool* pool) : keyTypes_(keyTypes), nullableKeys_(nullableKeys), isJoinBuild_(isJoinBuild), hasNormalizedKeys_(hasNormalizedKeys), + collectColumnStats_(collectColumnStats), stringAllocator_(std::make_unique(pool)), accumulators_(accumulators), rows_(pool) { @@ -283,6 +285,9 @@ RowContainer::RowContainer( ++nullOffsetsPos; } } + if (collectColumnStats_) { + rowColumnsStats_.resize(types_.size()); + } } RowContainer::~RowContainer() { @@ -344,6 +349,7 @@ void RowContainer::eraseRows(folly::Range rows) { firstFreeRow_ = row; } numFreeRows_ += rows.size(); + invalidateColumnStats(); } int32_t RowContainer::findRows(folly::Range rows, char** result) const { @@ -424,33 +430,6 @@ void RowContainer::freeVariableWidthFields(folly::Range rows) { } } -void RowContainer::checkConsistency() const { - constexpr int32_t kBatch = 1000; - std::vector rows(kBatch); - - RowContainerIterator iter; - int64_t allocatedRows = 0; - for (;;) { - int64_t numRows = listRows(&iter, kBatch, rows.data()); - if (!numRows) { - break; - } - for (auto i = 0; i < numRows; ++i) { - auto row = rows[i]; - VELOX_CHECK(!bits::isBitSet(row, freeFlagOffset_)); - ++allocatedRows; - } - } - - size_t numFree = 0; - for (auto free = firstFreeRow_; free; free = nextFree(free)) { - ++numFree; - VELOX_CHECK(bits::isBitSet(free, freeFlagOffset_)); - } - VELOX_CHECK_EQ(numFree, numFreeRows_); - VELOX_CHECK_EQ(allocatedRows, numRows_); -} - void RowContainer::freeAggregates(folly::Range rows) { for (auto& accumulator : accumulators_) { accumulator.destroy(rows); @@ -491,37 +470,93 @@ void RowContainer::freeRowsExtraMemory( numRows_ -= rows.size(); } +void RowContainer::invalidateColumnStats() { + if (!collectColumnStats_ || rowColumnsStats_.empty()) { + return; + } + rowColumnsStats_.clear(); +} + +// static +RowColumn::Stats RowColumn::Stats::merge( + const std::vector& statsList) { + RowColumn::Stats mergedStats; + for (const auto& stats : statsList) { + if (mergedStats.numCells() == 0) { + mergedStats.minBytes_ = stats.minBytes_; + mergedStats.maxBytes_ = stats.maxBytes_; + } else { + mergedStats.minBytes_ = std::min(mergedStats.minBytes_, stats.minBytes_); + mergedStats.maxBytes_ = std::max(mergedStats.maxBytes_, stats.maxBytes_); + } + mergedStats.nullCount_ += stats.nullCount_; + mergedStats.nonNullCount_ += stats.nonNullCount_; + mergedStats.sumBytes_ += stats.sumBytes_; + } + return mergedStats; +} + +std::optional RowContainer::columnStats( + int32_t columnIndex) const { + VELOX_CHECK(collectColumnStats_); + if (rowColumnsStats_.empty()) { + return std::nullopt; + } + return rowColumnsStats_[columnIndex]; +} + +void RowContainer::updateColumnStats( + const DecodedVector& decoded, + vector_size_t rowIndex, + char* row, + int32_t columnIndex) { + if (!collectColumnStats_ || rowColumnsStats_.empty()) { + // Column stats not enabled, or has been invalidated. + return; + } + + auto& columnStats = rowColumnsStats_[columnIndex]; + if (decoded.isNullAt(rowIndex)) { + columnStats.addNullCell(); + } else if (types_[columnIndex]->isFixedWidth()) { + columnStats.addCellSize(fixedSizeAt(columnIndex)); + } else { + columnStats.addCellSize(variableSizeAt(row, columnIndex)); + } +} + void RowContainer::store( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, char* row, - int32_t column) { + int32_t columnIndex) { auto numKeys = keyTypes_.size(); - bool isKey = column < numKeys; + bool isKey = columnIndex < numKeys; if (isKey && !nullableKeys_) { VELOX_DYNAMIC_TYPE_DISPATCH( storeNoNulls, - typeKinds_[column], + typeKinds_[columnIndex], decoded, - index, + rowIndex, isKey, row, - offsets_[column]); + offsets_[columnIndex]); } else { VELOX_DCHECK(isKey || accumulators_.empty()); - auto rowColumn = rowColumns_[column]; + auto rowColumn = rowColumns_[columnIndex]; VELOX_DYNAMIC_TYPE_DISPATCH_ALL( storeWithNulls, - typeKinds_[column], + typeKinds_[columnIndex], decoded, - index, + rowIndex, isKey, row, rowColumn.offset(), rowColumn.nullByte(), rowColumn.nullMask(), - column); + columnIndex); } + updateColumnStats(decoded, rowIndex, row, columnIndex); } void RowContainer::store( diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index f05b66e0c735d..25486cf82985e 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -24,6 +24,9 @@ #include "velox/vector/VectorTypeUtils.h" namespace facebook::velox::exec { +namespace testutil { +class RowContainerTestHelper; +} using NextRowVector = std::vector>; @@ -163,6 +166,71 @@ class RowColumn { return nullMask() << 1; } + /// Aggregated stats of a column in the 'RowContainer'. + class Stats { + public: + Stats() = default; + + void addCellSize(int32_t bytes) { + if (UNLIKELY(nonNullCount_ == 0)) { + minBytes_ = bytes; + maxBytes_ = bytes; + } else { + minBytes_ = std::min(minBytes_, bytes); + maxBytes_ = std::max(maxBytes_, bytes); + } + sumBytes_ += bytes; + ++nonNullCount_; + } + + void addNullCell() { + ++nullCount_; + } + + int32_t maxBytes() const { + return maxBytes_; + } + + int32_t minBytes() const { + return minBytes_; + } + + uint64_t sumBytes() const { + return sumBytes_; + } + + uint64_t avgBytes() const { + if (nonNullCount_ == 0) { + return 0; + } + return sumBytes_ / nonNullCount_; + } + + uint32_t nonNullCount() const { + return nonNullCount_; + } + + uint32_t nullCount() const { + return nullCount_; + } + + uint32_t numCells() const { + return nullCount_ + nonNullCount_; + } + + /// Merges multiple aggregated stats of the same column into a single one. + static Stats merge(const std::vector& statsList); + + private: + // Aggregated stats for non-null rows of the column. + int32_t minBytes_{0}; + int32_t maxBytes_{0}; + uint64_t sumBytes_{0}; + + uint32_t nonNullCount_{0}; + uint32_t nullCount_{0}; + }; + private: static uint64_t PackOffsets(int32_t offset, int32_t nullOffset) { if (nullOffset == kNotNullOffset) { @@ -205,6 +273,7 @@ class RowContainer { false, // isJoinBuild false, // hasProbedFlag false, // hasNormalizedKey + false, // collectColumnStats pool) {} ~RowContainer(); @@ -236,6 +305,7 @@ class RowContainer { bool isJoinBuild, bool hasProbedFlag, bool hasNormalizedKey, + bool collectColumnStats, memory::MemoryPool* pool); /// Allocates a new row and initializes possible aggregates to null. @@ -256,6 +326,7 @@ class RowContainer { memset(row + nullByte(nullOffsets_[0]), 0xff, initialNulls_.size()); bits::clearBit(row, freeFlagOffset_); } + invalidateColumnStats(); } /// The row size excluding any out-of-line stored variable length values. @@ -294,7 +365,7 @@ class RowContainer { /// Stores the 'index'th value in 'decoded' into 'row' at 'columnIndex'. void store( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, char* row, int32_t columnIndex); @@ -743,6 +814,11 @@ class RowContainer { return types_; } + /// Returns the aggregated column stats of the column with given + /// 'columnIndex'. nullopt will be returned if the column stats was previous + /// invalidated. Any row erase operations will invalidate column stats. + std::optional columnStats(int32_t columnIndex) const; + const auto& keyTypes() const { return keyTypes_; } @@ -760,10 +836,6 @@ class RowContainer { return *stringAllocator_; } - /// Checks that row and free row counts match and that free list membership is - /// consistent with free flag. - void checkConsistency() const; - static inline bool isNullAt(const char* row, int32_t nullByte, uint8_t nullMask) { return (row[nullByte] & nullMask) != 0; @@ -932,43 +1004,45 @@ class RowContainer { template inline void storeWithNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset, int32_t nullByte, uint8_t nullMask, - int32_t column) { + int32_t columnIndex) { using T = typename TypeTraits::NativeType; - if (decoded.isNullAt(index)) { + if (decoded.isNullAt(rowIndex)) { row[nullByte] |= nullMask; // Do not leave an uninitialized value in the case of a // null. This is an error with valgrind/asan. *reinterpret_cast(row + offset) = T(); - updateColumnHasNulls(column, true); + updateColumnHasNulls(columnIndex, true); return; } if constexpr (std::is_same_v) { RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_); - stringAllocator_->copyMultipart(decoded.valueAt(index), row, offset); + stringAllocator_->copyMultipart( + decoded.valueAt(rowIndex), row, offset); } else { - *reinterpret_cast(row + offset) = decoded.valueAt(index); + *reinterpret_cast(row + offset) = decoded.valueAt(rowIndex); } } template inline void storeNoNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, - char* group, + char* row, int32_t offset) { using T = typename TypeTraits::NativeType; if constexpr (std::is_same_v) { - RowSizeTracker tracker(group[rowSizeOffset_], *stringAllocator_); - stringAllocator_->copyMultipart(decoded.valueAt(index), group, offset); + RowSizeTracker tracker(row[rowSizeOffset_], *stringAllocator_); + stringAllocator_->copyMultipart( + decoded.valueAt(rowIndex), row, offset); } else { - *reinterpret_cast(group + offset) = decoded.valueAt(index); + *reinterpret_cast(row + offset) = decoded.valueAt(rowIndex); } } @@ -1385,6 +1459,16 @@ class RowContainer { void freeRowsExtraMemory(folly::Range rows, bool freeNextRowVector); + inline void updateColumnStats( + const DecodedVector& decoded, + vector_size_t rowIndex, + char* row, + int32_t columnIndex); + + // Light weight aggregated column stats does not support row erasures. This + // method is called whenever a row is erased. + void invalidateColumnStats(); + // Updates the specific column's columnHasNulls_ flag, if 'hasNulls' is true. // columnHasNulls_ flag is false by default. inline void updateColumnHasNulls(int32_t columnIndex, bool hasNulls) { @@ -1396,6 +1480,8 @@ class RowContainer { const bool isJoinBuild_; // True if normalized keys are enabled in initial state. const bool hasNormalizedKeys_; + const bool collectColumnStats_; + const std::unique_ptr stringAllocator_; std::vector columnHasNulls_; @@ -1424,6 +1510,10 @@ class RowContainer { // Offset and null indicator offset of non-aggregate fields as a single word. // Corresponds pairwise to 'types_'. std::vector rowColumns_; + // Optional aggregated column stats(e.g. min/max size) for non-aggregate + // fields. Index aligns with 'rowColumns_'. Column stats will only be enabled + // if 'collectColumnStats_' is true. + std::vector rowColumnsStats_; // Bit offset of the probed flag for a full or right outer join payload. 0 if // not applicable. int32_t probedFlagOffset_ = 0; @@ -1455,6 +1545,8 @@ class RowContainer { memory::AllocationPool rows_; int alignment_ = 1; + + friend class testutil::RowContainerTestHelper; }; template <> @@ -1467,102 +1559,102 @@ inline int128_t RowContainer::valueAt( template <> inline void RowContainer::storeWithNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset, int32_t nullByte, uint8_t nullMask, - int32_t column) { + int32_t columnIndex) { storeComplexType( - decoded, index, isKey, row, offset, nullByte, nullMask, column); + decoded, rowIndex, isKey, row, offset, nullByte, nullMask, columnIndex); } template <> inline void RowContainer::storeNoNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset) { - storeComplexType(decoded, index, isKey, row, offset); + storeComplexType(decoded, rowIndex, isKey, row, offset); } template <> inline void RowContainer::storeWithNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset, int32_t nullByte, uint8_t nullMask, - int32_t column) { + int32_t columnIndex) { storeComplexType( - decoded, index, isKey, row, offset, nullByte, nullMask, column); + decoded, rowIndex, isKey, row, offset, nullByte, nullMask, columnIndex); } template <> inline void RowContainer::storeNoNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset) { - storeComplexType(decoded, index, isKey, row, offset); + storeComplexType(decoded, rowIndex, isKey, row, offset); } template <> inline void RowContainer::storeWithNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset, int32_t nullByte, uint8_t nullMask, - int32_t column) { + int32_t columnIndex) { storeComplexType( - decoded, index, isKey, row, offset, nullByte, nullMask, column); + decoded, rowIndex, isKey, row, offset, nullByte, nullMask, columnIndex); } template <> inline void RowContainer::storeNoNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool isKey, char* row, int32_t offset) { - storeComplexType(decoded, index, isKey, row, offset); + storeComplexType(decoded, rowIndex, isKey, row, offset); } template <> inline void RowContainer::storeWithNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool /*isKey*/, char* row, int32_t offset, int32_t nullByte, uint8_t nullMask, - int32_t column) { - if (decoded.isNullAt(index)) { + int32_t columnIndex) { + if (decoded.isNullAt(rowIndex)) { row[nullByte] |= nullMask; memset(row + offset, 0, sizeof(int128_t)); - updateColumnHasNulls(column, true); + updateColumnHasNulls(columnIndex, true); return; } - HugeInt::serialize(decoded.valueAt(index), row + offset); + HugeInt::serialize(decoded.valueAt(rowIndex), row + offset); } template <> inline void RowContainer::storeNoNulls( const DecodedVector& decoded, - vector_size_t index, + vector_size_t rowIndex, bool /*isKey*/, char* row, int32_t offset) { - HugeInt::serialize(decoded.valueAt(index), row + offset); + HugeInt::serialize(decoded.valueAt(rowIndex), row + offset); } template <> diff --git a/velox/exec/StreamingAggregation.cpp b/velox/exec/StreamingAggregation.cpp index e4b0aa598c8e0..692432ca461d2 100644 --- a/velox/exec/StreamingAggregation.cpp +++ b/velox/exec/StreamingAggregation.cpp @@ -343,6 +343,7 @@ std::unique_ptr StreamingAggregation::makeRowContainer( false, false, false, + false, pool()); } diff --git a/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp b/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp index 6945900aee890..3cd93cef5be2e 100644 --- a/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp +++ b/velox/exec/benchmarks/HashJoinListResultBenchmark.cpp @@ -435,7 +435,9 @@ class HashTableListJoinResultBenchmark : public VectorTestBase { } } return BaseHashTable::JoinResultIterator( - std::move(varSizeListColumns), fixedSizeListColumnsSizeSum); + std::move(varSizeListColumns), + fixedSizeListColumnsSizeSum, + std::nullopt); } // Hash probe and list join result. diff --git a/velox/exec/benchmarks/SetAccumulatorBenchmark.cpp b/velox/exec/benchmarks/SetAccumulatorBenchmark.cpp index 72211a7ca814d..746be7a93d055 100644 --- a/velox/exec/benchmarks/SetAccumulatorBenchmark.cpp +++ b/velox/exec/benchmarks/SetAccumulatorBenchmark.cpp @@ -21,6 +21,7 @@ #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; using namespace facebook::velox::exec; @@ -28,7 +29,7 @@ namespace { // Adds 10M mostly unique values to a single SetAccumulator, then extracts // unique values from it. -class SetAccumulatorBenchmark : public facebook::velox::test::VectorTestBase { +class SetAccumulatorBenchmark : public velox::test::VectorTestBase { public: void setup() { VectorFuzzer::Options opts; diff --git a/velox/exec/tests/AddressableNonNullValueListTest.cpp b/velox/exec/tests/AddressableNonNullValueListTest.cpp index bb4983ea0308a..a5e6eeec2ddfe 100644 --- a/velox/exec/tests/AddressableNonNullValueListTest.cpp +++ b/velox/exec/tests/AddressableNonNullValueListTest.cpp @@ -23,7 +23,7 @@ namespace facebook::velox::aggregate::prestosql { namespace { class AddressableNonNullValueListTest : public testing::Test, - public test::VectorTestBase { + public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/exec/tests/AggregateSpillBenchmarkBase.cpp b/velox/exec/tests/AggregateSpillBenchmarkBase.cpp index 8464157a7e096..7fd44fc0a13a5 100644 --- a/velox/exec/tests/AggregateSpillBenchmarkBase.cpp +++ b/velox/exec/tests/AggregateSpillBenchmarkBase.cpp @@ -40,6 +40,7 @@ std::unique_ptr makeRowContainer( false, // isJoinBuild false, // hasProbedFlag false, // hasNormalizedKey + false, // collectColumnStats pool.get()); } diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 6f55a5876fdd1..1e9dd7147ef0f 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -377,6 +377,7 @@ class AggregationTest : public OperatorTestBase { false, true, true, + false, pool_.get()); } @@ -3635,6 +3636,7 @@ TEST_F(AggregationTest, destroyAfterPartialInitialization) { false, // isJoinBuild false, // hasProbedFlag false, // hasNormalizedKeys + false, // collectColumnStats pool()); const auto rowColumn = rows.columnAt(0); agg.setOffsets( diff --git a/velox/exec/tests/HashBitRangeTest.cpp b/velox/exec/tests/HashBitRangeTest.cpp index fd67ddd017b45..4658f21fb3c0b 100644 --- a/velox/exec/tests/HashBitRangeTest.cpp +++ b/velox/exec/tests/HashBitRangeTest.cpp @@ -17,10 +17,12 @@ #include "velox/exec/HashBitRange.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; using namespace facebook::velox::exec; -class HashRangeBitTest : public test::VectorTestBase, public testing::Test { +class HashRangeBitTest : public velox::test::VectorTestBase, + public testing::Test { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 9c6da3a9b437b..c3ddaf3f32836 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5564,27 +5564,42 @@ TEST_F(HashJoinTest, dynamicFilterOnPartitionKey) { } TEST_F(HashJoinTest, probeMemoryLimitOnBuildProjection) { + const uint64_t numBuildRows = 20; std::vector probeVectors = makeBatches(10, [&](int32_t /*unused*/) { - return makeRowVector( - {makeFlatVector(1'000, [](auto row) { return row % 5; })}); + return makeRowVector({makeFlatVector( + 1'000, [](auto row) { return row % 25; })}); }); - // Build side has 4KB + 4B per row. std::vector buildVectors = makeBatches(1, [&](int32_t /*unused*/) { return makeRowVector( - {"u_c0", "u_c1", {"u_c2"}}, - {makeFlatVector({0, 1, 2}), - makeFlatVector({ - std::string(4096, 'a'), - std::string(4096, 'b'), - std::string(4096, 'c'), - }), - makeFlatVector({ - std::string(4096, 'd'), - std::string(4096, 'e'), - std::string(4096, 'f'), + {"u_c0", "u_c1", "u_c2", "u_c3", "u_c4"}, + {makeFlatVector( + numBuildRows, [](auto row) { return row; }), + makeFlatVector( + numBuildRows, + [](auto /* row */) { return std::string(4096, 'a'); }), + makeFlatVector( + numBuildRows, + [](auto /* row */) { return std::string(4096, 'a'); }), + makeFlatVector( + numBuildRows, + [](auto row) { + // Row that has too large of size variation. + if (row == 0) { + return std::string(4096, 'a'); + } else { + return std::string(1, 'a'); + } + }), + makeFlatVector(numBuildRows, [](auto row) { + // Row that has tolerable size variation. + if (row == 0) { + return std::string(4096, 'a'); + } else { + return std::string(256, 'a'); + } })}); }); @@ -5592,29 +5607,38 @@ TEST_F(HashJoinTest, probeMemoryLimitOnBuildProjection) { createDuckDbTable("u", {buildVectors}); struct TestParam { - int32_t numVarSizeColumn; + std::vector varSizeColumns; int32_t numExpectedBatches; std::string referenceQuery; std::string debugString() const { - return fmt::format( - "numVarSizeColumn {}, numExpectedBatches {}, referenceQuery '{}'", - numVarSizeColumn, - numExpectedBatches, - referenceQuery); + std::stringstream ss; + ss << "varSizeColumns ["; + for (const auto& columnIndex : varSizeColumns) { + ss << columnIndex << ", "; + } + ss << "] "; + ss << "numExpectedBatches " << numExpectedBatches << ", referenceQuery '" + << referenceQuery << "'"; + return ss.str(); } }; std::vector testParams{ - {0, 10, "SELECT t.c0 FROM t JOIN u ON t.c0 = u.u_c0"}, - {1, 3000, "SELECT t.c0, u.u_c1 FROM t JOIN u ON t.c0 = u.u_c0"}, - {2, 6000, "SELECT t.c0, u.u_c1, u.u_c2 FROM t JOIN u ON t.c0 = u.u_c0"}}; + {{}, 10, "SELECT t.c0 FROM t JOIN u ON t.c0 = u.u_c0"}, + {{1}, 4000, "SELECT t.c0, u.u_c1 FROM t JOIN u ON t.c0 = u.u_c0"}, + {{1, 2}, + 8000, + "SELECT t.c0, u.u_c1, u.u_c2 FROM t JOIN u ON t.c0 = u.u_c0"}, + {{3}, 210, "SELECT t.c0, u.u_c3 FROM t JOIN u ON t.c0 = u.u_c0"}, + {{4}, 2670, "SELECT t.c0, u.u_c4 FROM t JOIN u ON t.c0 = u.u_c0"}}; + for (const auto& testParam : testParams) { SCOPED_TRACE(testParam.debugString()); core::PlanNodeId joinNodeId; std::vector outputLayout; outputLayout.push_back("c0"); - for (int32_t i = 0; i < testParam.numVarSizeColumn; i++) { - outputLayout.push_back(fmt::format("u_c{}", i + 1)); + for (int32_t i = 0; i < testParam.varSizeColumns.size(); i++) { + outputLayout.push_back(fmt::format("u_c{}", testParam.varSizeColumns[i])); } auto planNodeIdGenerator = std::make_shared(); auto plan = PlanBuilder(planNodeIdGenerator) diff --git a/velox/exec/tests/HashPartitionFunctionTest.cpp b/velox/exec/tests/HashPartitionFunctionTest.cpp index f30b9687c91a6..2148235f0f23b 100644 --- a/velox/exec/tests/HashPartitionFunctionTest.cpp +++ b/velox/exec/tests/HashPartitionFunctionTest.cpp @@ -18,10 +18,11 @@ #include "velox/exec/OperatorUtils.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; using namespace facebook::velox::exec; -class HashPartitionFunctionTest : public test::VectorTestBase, +class HashPartitionFunctionTest : public velox::test::VectorTestBase, public testing::Test { protected: static void SetUpTestCase() { diff --git a/velox/exec/tests/HashTableTest.cpp b/velox/exec/tests/HashTableTest.cpp index afd51b6d6e71c..5ac75b7ebc309 100644 --- a/velox/exec/tests/HashTableTest.cpp +++ b/velox/exec/tests/HashTableTest.cpp @@ -887,6 +887,7 @@ TEST_P(HashTableTest, listJoinResultsSize) { } struct TestParam { + std::optional estimatedRowSize; std::vector varSizeListColumns; std::vector fixedSizeListColumns; uint64_t maxBytes; @@ -894,6 +895,10 @@ TEST_P(HashTableTest, listJoinResultsSize) { std::string debugString() const { std::stringstream ss; + ss << "estimatedRowSize " + << (estimatedRowSize.has_value() + ? std::to_string(estimatedRowSize.value()) + : "null"); ss << "varSizeListColumns "; ss << "["; for (auto i = 0; i < varSizeListColumns.size(); i++) { @@ -918,14 +923,17 @@ TEST_P(HashTableTest, listJoinResultsSize) { // Key types: BIGINT, VARCHAR, ROW(BIGINT, VARCHAR) // Dependent types: BIGINT, VARCHAR std::vector testParams{ - {{}, {0}, 1024, 128}, - {{1}, {}, 2048, 20}, - {{1}, {}, 1 << 20, 1024}, - {{1}, {}, 1 << 14, 154}, - {{1}, {0}, 2048, 18}, - {{}, {0, 3}, 1024, 64}, - {{2}, {}, 2048, 17}, - {{1, 2, 4}, {0, 3}, 1 << 14, 66}}; + {std::nullopt, {}, {0}, 1024, 128}, + {std::nullopt, {1}, {}, 2048, 20}, + {std::nullopt, {1}, {}, 1 << 20, 1024}, + {std::nullopt, {1}, {}, 1 << 14, 154}, + {std::nullopt, {1}, {0}, 2048, 18}, + {std::nullopt, {}, {0, 3}, 1024, 64}, + {std::nullopt, {2}, {}, 2048, 17}, + {std::nullopt, {1, 2, 4}, {0, 3}, 1 << 14, 66}, + {std::nullopt, {2}, {}, 2048, 17}, + {std::make_optional(128), {}, {0}, 1024, 8}, + {std::make_optional(0), {1}, {0}, 1024, 1024}}; for (const auto& testParam : testParams) { SCOPED_TRACE(testParam.debugString()); uint64_t fixedColumnSizeSum{0}; @@ -934,7 +942,8 @@ TEST_P(HashTableTest, listJoinResultsSize) { } BaseHashTable::JoinResultIterator iter( std::vector(testParam.varSizeListColumns), - fixedColumnSizeSum); + fixedColumnSizeSum, + testParam.estimatedRowSize); iter.reset(lookup); auto numRows = table->listJoinResults( iter, true, inputRows, outputRows, testParam.maxBytes); diff --git a/velox/exec/tests/PlanNodeToStringTest.cpp b/velox/exec/tests/PlanNodeToStringTest.cpp index 280f484798b57..6f292a5ee1115 100644 --- a/velox/exec/tests/PlanNodeToStringTest.cpp +++ b/velox/exec/tests/PlanNodeToStringTest.cpp @@ -23,12 +23,14 @@ #include +using namespace facebook; using namespace facebook::velox; using namespace facebook::velox::common::test; using facebook::velox::exec::test::PlanBuilder; -class PlanNodeToStringTest : public testing::Test, public test::VectorTestBase { +class PlanNodeToStringTest : public testing::Test, + public velox::test::VectorTestBase { public: PlanNodeToStringTest() { functions::prestosql::registerAllScalarFunctions(); diff --git a/velox/exec/tests/PrestoQueryRunnerTest.cpp b/velox/exec/tests/PrestoQueryRunnerTest.cpp index 56a4d32bcac2f..25b231dc6c7c1 100644 --- a/velox/exec/tests/PrestoQueryRunnerTest.cpp +++ b/velox/exec/tests/PrestoQueryRunnerTest.cpp @@ -26,6 +26,7 @@ #include "velox/parse/TypeResolver.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; using namespace facebook::velox::test; diff --git a/velox/exec/tests/RoundRobinPartitionFunctionTest.cpp b/velox/exec/tests/RoundRobinPartitionFunctionTest.cpp index de3c43d087400..4c563fd37f00f 100644 --- a/velox/exec/tests/RoundRobinPartitionFunctionTest.cpp +++ b/velox/exec/tests/RoundRobinPartitionFunctionTest.cpp @@ -18,10 +18,11 @@ #include "velox/vector/tests/utils/VectorMaker.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; using namespace facebook::velox::exec; -class RoundRobinPartitionFunctionTest : public test::VectorTestBase, +class RoundRobinPartitionFunctionTest : public velox::test::VectorTestBase, public testing::Test { protected: static void SetUpTestCase() { diff --git a/velox/exec/tests/RowContainerTest.cpp b/velox/exec/tests/RowContainerTest.cpp index b1ff6dc6a412b..cafb239415df9 100644 --- a/velox/exec/tests/RowContainerTest.cpp +++ b/velox/exec/tests/RowContainerTest.cpp @@ -27,6 +27,86 @@ using namespace facebook::velox::exec; using namespace facebook::velox::test; using namespace facebook::velox::common::testutil; +namespace facebook::velox::exec::testutil { +class RowContainerTestHelper { + public: + explicit RowContainerTestHelper(RowContainer* rowContainer) + : rowContainer_(rowContainer) {} + + void checkConsistency() const { + static constexpr int32_t kBatch = 1000; + std::vector rows(kBatch); + RowContainerIterator iter; + int64_t allocatedRows = 0; + std::vector columnsStats; + columnsStats.reserve(rowContainer_->rowColumnsStats_.size()); + columnsStats.resize(rowContainer_->rowColumnsStats_.size()); + const bool isColumnStatsValid = rowContainer_->collectColumnStats_ && + !rowContainer_->rowColumnsStats_.empty(); + for (;;) { + int64_t numRows = rowContainer_->listRows(&iter, kBatch, rows.data()); + if (!numRows) { + break; + } + for (auto rowIndex = 0; rowIndex < numRows; ++rowIndex) { + auto row = rows[rowIndex]; + VELOX_CHECK(!bits::isBitSet(row, rowContainer_->freeFlagOffset_)); + ++allocatedRows; + + if (isColumnStatsValid) { + for (uint32_t columnIndex = 0; + columnIndex < rowContainer_->rowColumnsStats_.size(); + columnIndex++) { + if (rowContainer_->types_[columnIndex]->isFixedWidth()) { + continue; + } + if (rowContainer_->isNullAt( + row, rowContainer_->columnAt(columnIndex))) { + columnsStats[columnIndex].addNullCell(); + } else { + columnsStats[columnIndex].addCellSize( + rowContainer_->variableSizeAt(row, columnIndex)); + } + } + } + } + } + + size_t numFree = 0; + for (auto free = rowContainer_->firstFreeRow_; free; + free = rowContainer_->nextFree(free)) { + ++numFree; + VELOX_CHECK(bits::isBitSet(free, rowContainer_->freeFlagOffset_)); + } + VELOX_CHECK_EQ(numFree, rowContainer_->numFreeRows_); + VELOX_CHECK_EQ(allocatedRows, rowContainer_->numRows_); + + if (isColumnStatsValid) { + VELOX_CHECK_EQ( + rowContainer_->types_.size(), rowContainer_->rowColumnsStats_.size()); + + for (uint32_t i = 0; i < rowContainer_->rowColumnsStats_.size(); i++) { + const auto& storedStats = rowContainer_->rowColumnsStats_[i]; + const auto& expectedStats = columnsStats[i]; + if (rowContainer_->types_[i]->isFixedWidth()) { + continue; + } + VELOX_CHECK_EQ(expectedStats.maxBytes(), storedStats.maxBytes()); + VELOX_CHECK_EQ(expectedStats.minBytes(), storedStats.minBytes()); + VELOX_CHECK_EQ(expectedStats.sumBytes(), storedStats.sumBytes()); + VELOX_CHECK_EQ(expectedStats.avgBytes(), storedStats.avgBytes()); + VELOX_CHECK_EQ( + expectedStats.nonNullCount(), storedStats.nonNullCount()); + VELOX_CHECK_EQ(expectedStats.nullCount(), storedStats.nullCount()); + } + } + } + + private: + RowContainer* const rowContainer_; +}; +} // namespace facebook::velox::exec::testutil + class RowContainerTest : public exec::test::RowContainerTestBase { protected: static void SetUpTestCase() { @@ -965,7 +1045,7 @@ TEST_F(RowContainerTest, types) { } } checkSizes(rows, *data); - data->checkConsistency(); + testutil::RowContainerTestHelper(data.get()).checkConsistency(); auto copy = BaseVector::create(batch->type(), batch->size(), pool_.get()); for (auto column = 0; column < batch->childrenSize(); ++column) { @@ -1149,7 +1229,7 @@ TEST_F(RowContainerTest, erase) { erased.push_back(rows[i]); } data->eraseRows(folly::Range(erased.data(), erased.size())); - data->checkConsistency(); + testutil::RowContainerTestHelper(data.get()).checkConsistency(); std::vector remaining(data->numRows()); iter.reset(); @@ -1164,13 +1244,13 @@ TEST_F(RowContainerTest, erase) { // The next row will be a new one. auto newRow = data->newRow(); EXPECT_EQ(rowSet.end(), rowSet.find(newRow)); - data->checkConsistency(); + testutil::RowContainerTestHelper(data.get()).checkConsistency(); data->clear(); EXPECT_EQ(0, data->numRows()); auto free = data->freeSpace(); EXPECT_EQ(0, free.first); EXPECT_EQ(0, free.second); - data->checkConsistency(); + testutil::RowContainerTestHelper(data.get()).checkConsistency(); } TEST_F(RowContainerTest, initialNulls) { @@ -1338,6 +1418,7 @@ TEST_F(RowContainerTest, alignment) { false, true, true, + true, pool_.get()); constexpr int kNumRows = 100; char* rows[kNumRows]; @@ -1505,6 +1586,7 @@ TEST_F(RowContainerTest, probedFlag) { true, // isJoinBuild true, // hasProbedFlag false, // hasNormalizedKey + true, // collectColumnStats pool_.get()); auto input = makeRowVector({ @@ -1662,8 +1744,8 @@ TEST_F(RowContainerTest, mixedFree) { data2->eraseRows(folly::Range(result2.data(), kNumRows)); EXPECT_EQ(0, data1->numRows()); EXPECT_EQ(0, data2->numRows()); - data1->checkConsistency(); - data2->checkConsistency(); + testutil::RowContainerTestHelper(data1.get()).checkConsistency(); + testutil::RowContainerTestHelper(data2.get()).checkConsistency(); } TEST_F(RowContainerTest, unknown) { @@ -1999,7 +2081,7 @@ TEST_F(RowContainerTest, nextRowVector) { data->eraseRows( folly::Range(erasingRows.data(), erasingRows.size())); validateNextRowVector(); - data->checkConsistency(); + testutil::RowContainerTestHelper(data.get()).checkConsistency(); }; nextRowVectorAppendValidation(); @@ -2340,3 +2422,182 @@ TEST_F(RowContainerTest, isNanAt) { } } } + +TEST_F(RowContainerTest, invalidatedColumnStats) { + constexpr int32_t kNumRows = 100; + auto batch = makeDataset( + ROW( + {{"bool_val", BOOLEAN()}, + {"int_val", INTEGER()}, + {"string_val", VARCHAR()}, + {"array_val", ARRAY(VARCHAR())}, + {"struct_val2", + ROW({{"s_int", INTEGER()}, {"s_array", ARRAY(REAL())}})}}), + kNumRows, + [](RowVectorPtr rows) { + auto strings = + rows->childAt( + rows->type()->as().getChildIdx("string_val")) + ->as>(); + for (auto i = 0; i < strings->size(); i += 11) { + std::string chars; + makeLargeString(i * 10000, chars); + strings->set(i, StringView(chars)); + } + }); + + const auto& types = batch->type()->as().children(); + std::vector keys; + // We have the same types twice, once as non-null keys, next as + // nullable non-keys. + keys.insert(keys.begin(), types.begin(), types.begin() + types.size() / 2); + std::vector dependents; + dependents.insert( + dependents.begin(), types.begin() + types.size() / 2, types.end()); + + auto createRowContainer = [&](std::vector& rows) { + auto data = makeRowContainer(keys, dependents); + + EXPECT_TRUE(data->columnStats(0).has_value()); + EXPECT_EQ(data->columnStats(0)->nonNullCount(), 0); + EXPECT_EQ(data->columnStats(0)->nullCount(), 0); + EXPECT_EQ(data->columnStats(0)->numCells(), 0); + EXPECT_TRUE(data->columnStats(1).has_value()); + EXPECT_EQ(data->columnStats(1)->nonNullCount(), 0); + EXPECT_EQ(data->columnStats(1)->nullCount(), 0); + EXPECT_EQ(data->columnStats(1)->numCells(), 0); + EXPECT_TRUE(data->columnStats(2).has_value()); + EXPECT_EQ(data->columnStats(2)->nonNullCount(), 0); + EXPECT_EQ(data->columnStats(2)->nullCount(), 0); + EXPECT_EQ(data->columnStats(2)->numCells(), 0); + EXPECT_TRUE(data->columnStats(3).has_value()); + EXPECT_EQ(data->columnStats(3)->nonNullCount(), 0); + EXPECT_EQ(data->columnStats(3)->nullCount(), 0); + EXPECT_EQ(data->columnStats(3)->numCells(), 0); + EXPECT_TRUE(data->columnStats(4).has_value()); + EXPECT_EQ(data->columnStats(4)->nonNullCount(), 0); + EXPECT_EQ(data->columnStats(4)->nullCount(), 0); + EXPECT_EQ(data->columnStats(4)->numCells(), 0); + + for (int i = 0; i < kNumRows; ++i) { + auto row = data->newRow(); + } + EXPECT_EQ(kNumRows, data->numRows()); + RowContainerIterator iter; + + EXPECT_EQ(data->listRows(&iter, kNumRows, rows.data()), kNumRows); + EXPECT_EQ(data->listRows(&iter, kNumRows, rows.data()), 0); + + checkSizes(rows, *data); + SelectivityVector allRows(kNumRows); + for (auto column = 0; column < batch->childrenSize(); ++column) { + if (column < keys.size()) { + makeNonNull(batch, column); + EXPECT_EQ(data->columnAt(column).nullMask(), 0); + } else { + EXPECT_NE(data->columnAt(column).nullMask(), 0); + } + DecodedVector decoded(*batch->childAt(column), allRows); + for (auto index = 0; index < kNumRows; ++index) { + data->store(decoded, index, rows[index], column); + } + } + checkSizes(rows, *data); + testutil::RowContainerTestHelper(data.get()).checkConsistency(); + return data; + }; + + std::vector&)>> + invalidateFuncs{ + [](RowContainer* rowContainer, std::vector& rows) { + rowContainer->setAllNull(rows[0]); + }, + [](RowContainer* rowContainer, std::vector& rows) { + rowContainer->eraseRows(folly::Range(rows.data(), rows.size())); + }}; + for (auto& invalidateFunc : invalidateFuncs) { + std::vector rows(kNumRows); + auto rowContainer = createRowContainer(rows); + + ASSERT_TRUE(rowContainer->columnStats(0).has_value()); + ASSERT_GT(rowContainer->columnStats(0)->numCells(), 0); + ASSERT_TRUE(rowContainer->columnStats(1).has_value()); + ASSERT_GT(rowContainer->columnStats(1)->numCells(), 0); + ASSERT_TRUE(rowContainer->columnStats(2).has_value()); + ASSERT_GT(rowContainer->columnStats(2)->numCells(), 0); + ASSERT_TRUE(rowContainer->columnStats(3).has_value()); + ASSERT_GT(rowContainer->columnStats(3)->numCells(), 0); + ASSERT_TRUE(rowContainer->columnStats(4).has_value()); + ASSERT_GT(rowContainer->columnStats(4)->numCells(), 0); + + invalidateFunc(rowContainer.get(), rows); + testutil::RowContainerTestHelper(rowContainer.get()).checkConsistency(); + + ASSERT_FALSE(rowContainer->columnStats(0).has_value()); + ASSERT_FALSE(rowContainer->columnStats(1).has_value()); + ASSERT_FALSE(rowContainer->columnStats(2).has_value()); + ASSERT_FALSE(rowContainer->columnStats(3).has_value()); + ASSERT_FALSE(rowContainer->columnStats(4).has_value()); + } +} + +TEST_F(RowContainerTest, rowColumnStats) { + RowColumn::Stats stats; + EXPECT_EQ(stats.maxBytes(), 0); + EXPECT_EQ(stats.minBytes(), 0); + EXPECT_EQ(stats.sumBytes(), 0); + EXPECT_EQ(stats.avgBytes(), 0); + EXPECT_EQ(stats.nonNullCount(), 0); + EXPECT_EQ(stats.nullCount(), 0); + EXPECT_EQ(stats.numCells(), 0); + + stats.addCellSize(10); + EXPECT_EQ(stats.maxBytes(), 10); + EXPECT_EQ(stats.minBytes(), 10); + EXPECT_EQ(stats.sumBytes(), 10); + EXPECT_EQ(stats.avgBytes(), 10); + EXPECT_EQ(stats.nonNullCount(), 1); + EXPECT_EQ(stats.nullCount(), 0); + EXPECT_EQ(stats.numCells(), 1); + + stats.addCellSize(20); + EXPECT_EQ(stats.maxBytes(), 20); + EXPECT_EQ(stats.minBytes(), 10); + EXPECT_EQ(stats.sumBytes(), 30); + EXPECT_EQ(stats.avgBytes(), 15); + EXPECT_EQ(stats.nonNullCount(), 2); + EXPECT_EQ(stats.nullCount(), 0); + EXPECT_EQ(stats.numCells(), 2); + + stats.addCellSize(5); + EXPECT_EQ(stats.maxBytes(), 20); + EXPECT_EQ(stats.minBytes(), 5); + EXPECT_EQ(stats.sumBytes(), 35); + EXPECT_EQ(stats.avgBytes(), 11); + EXPECT_EQ(stats.nonNullCount(), 3); + EXPECT_EQ(stats.nullCount(), 0); + EXPECT_EQ(stats.numCells(), 3); + + stats.addNullCell(); + EXPECT_EQ(stats.nullCount(), 1); + EXPECT_EQ(stats.nonNullCount(), 3); + EXPECT_EQ(stats.numCells(), 4); + + stats.addNullCell(); + EXPECT_EQ(stats.nullCount(), 2); + EXPECT_EQ(stats.nonNullCount(), 3); + EXPECT_EQ(stats.numCells(), 5); + + stats.addCellSize(15); + stats.addNullCell(); + stats.addCellSize(25); + stats.addNullCell(); + stats.addCellSize(10); + EXPECT_EQ(stats.maxBytes(), 25); + EXPECT_EQ(stats.minBytes(), 5); + EXPECT_EQ(stats.sumBytes(), 85); + EXPECT_EQ(stats.avgBytes(), 14); + EXPECT_EQ(stats.nonNullCount(), 6); + EXPECT_EQ(stats.nullCount(), 4); + EXPECT_EQ(stats.numCells(), 10); +} diff --git a/velox/exec/tests/SpillTest.cpp b/velox/exec/tests/SpillTest.cpp index 351dc462f4815..cedf31058f4df 100644 --- a/velox/exec/tests/SpillTest.cpp +++ b/velox/exec/tests/SpillTest.cpp @@ -28,6 +28,7 @@ #include "velox/type/Timestamp.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; using namespace facebook::velox::exec; using namespace facebook::velox::filesystems; diff --git a/velox/exec/tests/UnorderedStreamReaderTest.cpp b/velox/exec/tests/UnorderedStreamReaderTest.cpp index c30ad81167766..7c43433b29b0f 100644 --- a/velox/exec/tests/UnorderedStreamReaderTest.cpp +++ b/velox/exec/tests/UnorderedStreamReaderTest.cpp @@ -18,10 +18,11 @@ #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; class UnorderedStreamReaderTest : public testing::Test, - public test::VectorTestBase { + public velox::test::VectorTestBase { protected: void SetUp() override { rowType_ = ROW( diff --git a/velox/exec/tests/utils/RowContainerTestBase.h b/velox/exec/tests/utils/RowContainerTestBase.h index c9d126fa869ef..c8ccfe33f06d2 100644 --- a/velox/exec/tests/utils/RowContainerTestBase.h +++ b/velox/exec/tests/utils/RowContainerTestBase.h @@ -86,6 +86,7 @@ class RowContainerTestBase : public testing::Test, isJoinBuild, true, true, + true, pool_.get()); VELOX_CHECK(container->testingMutable()); return container; diff --git a/velox/expression/tests/EvalErrorsTest.cpp b/velox/expression/tests/EvalErrorsTest.cpp index fa012a7160b2a..cb4b9f3f413fb 100644 --- a/velox/expression/tests/EvalErrorsTest.cpp +++ b/velox/expression/tests/EvalErrorsTest.cpp @@ -24,7 +24,8 @@ namespace facebook::velox::exec { namespace { -class EvalErrorsTest : public testing::Test, public test::VectorTestBase { +class EvalErrorsTest : public testing::Test, + public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/functions/lib/tests/LambdaFunctionUtilTest.cpp b/velox/functions/lib/tests/LambdaFunctionUtilTest.cpp index b5739940b0ec3..f328807be2853 100644 --- a/velox/functions/lib/tests/LambdaFunctionUtilTest.cpp +++ b/velox/functions/lib/tests/LambdaFunctionUtilTest.cpp @@ -21,7 +21,7 @@ namespace facebook::velox::functions { namespace { class LambdaFunctionUtilTest : public testing::Test, - public test::VectorTestBase { + public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/functions/prestosql/aggregates/tests/MapAccumulatorTest.cpp b/velox/functions/prestosql/aggregates/tests/MapAccumulatorTest.cpp index a4c89eb63d220..8c3e85539daad 100644 --- a/velox/functions/prestosql/aggregates/tests/MapAccumulatorTest.cpp +++ b/velox/functions/prestosql/aggregates/tests/MapAccumulatorTest.cpp @@ -21,7 +21,8 @@ namespace facebook::velox::aggregate::prestosql { namespace { -class MapAccumulatorTest : public testing::Test, public test::VectorTestBase { +class MapAccumulatorTest : public testing::Test, + public velox::test::VectorTestBase { protected: protected: static void SetUpTestCase() { diff --git a/velox/functions/prestosql/tests/SimpleComparisonMatcherTest.cpp b/velox/functions/prestosql/tests/SimpleComparisonMatcherTest.cpp index c4a6e4edf2023..534f2a33d6a2f 100644 --- a/velox/functions/prestosql/tests/SimpleComparisonMatcherTest.cpp +++ b/velox/functions/prestosql/tests/SimpleComparisonMatcherTest.cpp @@ -27,7 +27,7 @@ namespace facebook::velox::functions::prestosql { namespace { class SimpleComparisonMatcherTest : public testing::Test, - public test::VectorTestBase { + public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/functions/sparksql/fuzzer/tests/SparkQueryRunnerTest.cpp b/velox/functions/sparksql/fuzzer/tests/SparkQueryRunnerTest.cpp index 974c9c3e69ac0..18324aaaa999e 100644 --- a/velox/functions/sparksql/fuzzer/tests/SparkQueryRunnerTest.cpp +++ b/velox/functions/sparksql/fuzzer/tests/SparkQueryRunnerTest.cpp @@ -26,6 +26,7 @@ #include "velox/parse/TypeResolver.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; using namespace facebook::velox::test; diff --git a/velox/serializers/tests/CompactRowSerializerTest.cpp b/velox/serializers/tests/CompactRowSerializerTest.cpp index 277957c323bda..1de38c781165f 100644 --- a/velox/serializers/tests/CompactRowSerializerTest.cpp +++ b/velox/serializers/tests/CompactRowSerializerTest.cpp @@ -24,7 +24,7 @@ namespace facebook::velox::serializer { namespace { class CompactRowSerializerTest : public ::testing::Test, - public test::VectorTestBase, + public velox::test::VectorTestBase, public testing::WithParamInterface { protected: static void SetUpTestCase() { diff --git a/velox/serializers/tests/UnsafeRowSerializerTest.cpp b/velox/serializers/tests/UnsafeRowSerializerTest.cpp index 050c104e33d29..2bcd3f9dae601 100644 --- a/velox/serializers/tests/UnsafeRowSerializerTest.cpp +++ b/velox/serializers/tests/UnsafeRowSerializerTest.cpp @@ -20,10 +20,11 @@ #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; class UnsafeRowSerializerTest : public ::testing::Test, - public test::VectorTestBase, + public velox::test::VectorTestBase, public testing::WithParamInterface { protected: static void SetUpTestCase() { diff --git a/velox/vector/tests/EncodingTest.cpp b/velox/vector/tests/EncodingTest.cpp index 3ec1a77e676a4..f4e8f1d320f22 100644 --- a/velox/vector/tests/EncodingTest.cpp +++ b/velox/vector/tests/EncodingTest.cpp @@ -19,10 +19,11 @@ #include "velox/vector/tests/VectorTestUtils.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; using namespace facebook::velox::test; -class EncodingTest : public testing::Test, public test::VectorTestBase { +class EncodingTest : public testing::Test, public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/vector/tests/VectorEstimateFlatSizeTest.cpp b/velox/vector/tests/VectorEstimateFlatSizeTest.cpp index 4fa79ea5870d8..3260357a41dd6 100644 --- a/velox/vector/tests/VectorEstimateFlatSizeTest.cpp +++ b/velox/vector/tests/VectorEstimateFlatSizeTest.cpp @@ -16,12 +16,13 @@ #include #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; class VectorEstimateFlatSizeTest : public testing::Test, - public test::VectorTestBase { + public velox::test::VectorTestBase { protected: - using test::VectorTestBase::makeArrayVector; + using velox::test::VectorTestBase::makeArrayVector; static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/vector/tests/VectorPoolTest.cpp b/velox/vector/tests/VectorPoolTest.cpp index 4f49055ea6123..a1456e7536a72 100644 --- a/velox/vector/tests/VectorPoolTest.cpp +++ b/velox/vector/tests/VectorPoolTest.cpp @@ -20,7 +20,8 @@ namespace facebook::velox::test { -class VectorPoolTest : public testing::Test, public test::VectorTestBase { +class VectorPoolTest : public testing::Test, + public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/vector/tests/VectorPrepareForReuseTest.cpp b/velox/vector/tests/VectorPrepareForReuseTest.cpp index a1ab89a310700..208f288e63c39 100644 --- a/velox/vector/tests/VectorPrepareForReuseTest.cpp +++ b/velox/vector/tests/VectorPrepareForReuseTest.cpp @@ -17,10 +17,11 @@ #include "velox/vector/tests/VectorTestUtils.h" #include "velox/vector/tests/utils/VectorTestBase.h" +using namespace facebook; using namespace facebook::velox; class VectorPrepareForReuseTest : public testing::Test, - public test::VectorTestBase { + public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({}); diff --git a/velox/vector/tests/VectorTest.cpp b/velox/vector/tests/VectorTest.cpp index 44d35c1ca8b31..155eee72ef212 100644 --- a/velox/vector/tests/VectorTest.cpp +++ b/velox/vector/tests/VectorTest.cpp @@ -107,7 +107,7 @@ struct NonPOD { int NonPOD::alive = 0; -class VectorTest : public testing::Test, public test::VectorTestBase { +class VectorTest : public testing::Test, public velox::test::VectorTestBase { protected: static void SetUpTestCase() { memory::MemoryManager::testingSetInstance({});