diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 89436ce81640..549c4edbcd42 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -510,8 +510,9 @@ DataSink::Stats HiveDataSink::stats() const { for (int i = 0; i < writerInfo_.size(); ++i) { const auto& info = writerInfo_.at(i); VELOX_CHECK_NOT_NULL(info); - if (!info->spillStats->empty()) { - stats.spillStats += *info->spillStats; + const auto spillStats = info->spillStats->rlock(); + if (!spillStats->empty()) { + stats.spillStats += *spillStats; } } return stats; @@ -719,15 +720,15 @@ HiveDataSink::maybeCreateBucketSortWriter( sortCompareFlags_, sortPool, writerInfo_.back()->nonReclaimableSectionHolder.get(), - spillConfig_); + spillConfig_, + writerInfo_.back()->spillStats.get()); return std::make_unique( std::move(writer), std::move(sortBuffer), hiveConfig_->sortWriterMaxOutputRows( connectorQueryCtx_->sessionProperties()), hiveConfig_->sortWriterMaxOutputBytes( - connectorQueryCtx_->sessionProperties()), - writerInfo_.back()->spillStats.get()); + connectorQueryCtx_->sessionProperties())); } void HiveDataSink::splitInputRowsAndEnsureWriters() { diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 7990bb8acb3a..ad22ad2b109a 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -355,7 +355,7 @@ struct HiveWriterInfo { std::shared_ptr _sortPool) : writerParameters(std::move(parameters)), nonReclaimableSectionHolder(new tsan_atomic(false)), - spillStats(new common::SpillStats()), + spillStats(std::make_unique>()), writerPool(std::move(_writerPool)), sinkPool(std::move(_sinkPool)), sortPool(std::move(_sortPool)) {} @@ -364,7 +364,7 @@ struct HiveWriterInfo { const std::unique_ptr> nonReclaimableSectionHolder; /// Collects the spill stats from sort writer if the spilling has been /// triggered. - const std::unique_ptr spillStats; + const std::unique_ptr> spillStats; const std::shared_ptr writerPool; const std::shared_ptr sinkPool; const std::shared_ptr sortPool; diff --git a/velox/dwio/common/SortingWriter.cpp b/velox/dwio/common/SortingWriter.cpp index 1409ad810c8f..83f81d67a72c 100644 --- a/velox/dwio/common/SortingWriter.cpp +++ b/velox/dwio/common/SortingWriter.cpp @@ -22,18 +22,15 @@ SortingWriter::SortingWriter( std::unique_ptr writer, std::unique_ptr sortBuffer, uint32_t maxOutputRowsConfig, - uint64_t maxOutputBytesConfig, - velox::common::SpillStats* spillStats) + uint64_t maxOutputBytesConfig) : outputWriter_(std::move(writer)), maxOutputRowsConfig_(maxOutputRowsConfig), maxOutputBytesConfig_(maxOutputBytesConfig), sortPool_(sortBuffer->pool()), canReclaim_(sortBuffer->canSpill()), - spillStats_(spillStats), sortBuffer_(std::move(sortBuffer)) { VELOX_CHECK_GT(maxOutputRowsConfig_, 0); VELOX_CHECK_GT(maxOutputBytesConfig_, 0); - VELOX_CHECK_NOT_NULL(spillStats_); if (sortPool_->parent()->reclaimer() != nullptr) { sortPool_->setReclaimer(MemoryReclaimer::create(this)); } @@ -64,11 +61,7 @@ void SortingWriter::close() { outputWriter_->write(output); output = sortBuffer_->getOutput(maxOutputBatchRows); } - auto spillStatsOr = sortBuffer_->spilledStats(); - if (spillStatsOr.has_value()) { - VELOX_CHECK(canReclaim_); - *spillStats_ = spillStatsOr.value(); - } + sortBuffer_.reset(); sortPool_->release(); outputWriter_->close(); diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index 8dcab763b092..c73574b334f9 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -29,8 +29,7 @@ class SortingWriter : public Writer { std::unique_ptr writer, std::unique_ptr sortBuffer, uint32_t maxOutputRowsConfig, - uint64_t maxOutputBytesConfig, - velox::common::SpillStats* spillStats); + uint64_t maxOutputBytesConfig); ~SortingWriter() override; @@ -81,7 +80,6 @@ class SortingWriter : public Writer { const uint64_t maxOutputBytesConfig_; memory::MemoryPool* const sortPool_; const bool canReclaim_; - velox::common::SpillStats* const spillStats_; std::unique_ptr sortBuffer_; }; diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index b1e54eed605f..4c0ef7843cea 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -52,7 +52,8 @@ GroupingSet::GroupingSet( const std::optional& groupIdChannel, const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection, - OperatorCtx* operatorCtx) + OperatorCtx* operatorCtx, + folly::Synchronized* spillStats) : preGroupedKeyChannels_(std::move(preGroupedKeys)), hashers_(std::move(hashers)), isGlobal_(hashers_.empty()), @@ -69,7 +70,8 @@ GroupingSet::GroupingSet( stringAllocator_(operatorCtx->pool()), rows_(operatorCtx->pool()), isAdaptive_(queryConfig_.hashAdaptivityEnabled()), - pool_(*operatorCtx->pool()) { + pool_(*operatorCtx->pool()), + spillStats_(spillStats) { VELOX_CHECK_NOT_NULL(nonReclaimableSection_); VELOX_CHECK(pool_.trackUsage()); for (auto& hasher : hashers_) { @@ -131,7 +133,8 @@ std::unique_ptr GroupingSet::createForMarkDistinct( /*groupIdColumn*/ std::nullopt, /*spillConfig*/ nullptr, nonReclaimableSection, - operatorCtx); + operatorCtx, + /*spillStats_*/ nullptr); }; namespace { @@ -939,7 +942,8 @@ void GroupingSet::spill() { makeSpillType(), rows->keyTypes().size(), std::vector(), - spillConfig_); + spillConfig_, + spillStats_); } spiller_->spill(); if (sortedAggregations_) { @@ -958,7 +962,11 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) { auto* rows = table_->rows(); VELOX_CHECK(pool_.trackUsage()); spiller_ = std::make_unique( - Spiller::Type::kAggregateOutput, rows, makeSpillType(), spillConfig_); + Spiller::Type::kAggregateOutput, + rows, + makeSpillType(), + spillConfig_, + spillStats_); spiller_->spill(rowIterator); table_->clear(); diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index bf489c4621af..8cbbe8ff99bc 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -40,7 +40,8 @@ class GroupingSet { const std::optional& groupIdChannel, const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection, - OperatorCtx* operatorCtx); + OperatorCtx* operatorCtx, + folly::Synchronized* spillStats); ~GroupingSet(); @@ -359,6 +360,8 @@ class GroupingSet { // Temporary for case where an aggregate in toIntermediate() outputs post-init // state of aggregate for all rows. std::vector firstGroup_; + + folly::Synchronized* const spillStats_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 0e9c12690c9b..cdc6f5238e21 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -106,7 +106,8 @@ void HashAggregation::initialize() { groupIdChannel, spillConfig_.has_value() ? &spillConfig_.value() : nullptr, &nonReclaimableSection_, - operatorCtx_.get()); + operatorCtx_.get(), + &spillStats_); aggregationNode_.reset(); } @@ -188,13 +189,6 @@ void HashAggregation::updateRuntimeStats() { RuntimeMetric(hashTableStats.numTombstones); } -void HashAggregation::recordSpillStats() { - auto spillStatsOr = groupingSet_->spilledStats(); - if (spillStatsOr.has_value()) { - Operator::recordSpillStats(spillStatsOr.value()); - } -} - void HashAggregation::prepareOutput(vector_size_t size) { if (output_) { VectorPtr output = std::move(output_); @@ -388,7 +382,6 @@ void HashAggregation::noMoreInput() { updateEstimatedOutputRowSize(); groupingSet_->noMoreInput(); Operator::noMoreInput(); - recordSpillStats(); // Release the extra reserved memory right after processing all the inputs. pool()->release(); } @@ -429,9 +422,6 @@ void HashAggregation::reclaim( // Spill all the rows starting from the next output row pointed by // 'resultIterator_'. groupingSet_->spill(resultIterator_); - // NOTE: we will only spill once during the output processing stage so - // record stats here. - recordSpillStats(); } else { // TODO: support fine-grain disk spilling based on 'targetBytes' after // having row container memory compaction support later. diff --git a/velox/exec/HashAggregation.h b/velox/exec/HashAggregation.h index 771dd2ca809d..1bf28c43428a 100644 --- a/velox/exec/HashAggregation.h +++ b/velox/exec/HashAggregation.h @@ -72,10 +72,6 @@ class HashAggregation : public Operator { RowVectorPtr getDistinctOutput(); - // Invoked to record the spilling stats in operator stats after processing all - // the inputs. - void recordSpillStats(); - void updateEstimatedOutputRowSize(); std::shared_ptr aggregationNode_; diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index a4c8dfc5c8b4..fff744ed7528 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -231,9 +231,11 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { << spillConfig.maxSpillLevel << ", and disable spilling for memory pool: " << pool()->name(); + ++spillStats_.wlock()->spillMaxLevelExceededCount; exceededMaxSpillLevelLimit_ = true; return; } + exceededMaxSpillLevelLimit_ = false; hashBits = HashBitRange(startBit, startBit + spillConfig.numPartitionBits); } @@ -243,7 +245,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { table_->rows(), spillType_, std::move(hashBits), - &spillConfig); + &spillConfig, + &spillStats_); const int32_t numPartitions = spiller_->hashBits().numPartitions(); spillInputIndicesBuffers_.resize(numPartitions); @@ -732,7 +735,6 @@ bool HashBuild::finishHashBuild() { } if (spiller != nullptr) { spiller->finishSpill(spillPartitions); - build->recordSpillStats(spiller.get()); } } @@ -740,7 +742,6 @@ bool HashBuild::finishHashBuild() { spiller_->finishSpill(spillPartitions); removeEmptyPartitions(spillPartitions); } - recordSpillStats(); // TODO: re-enable parallel join build with spilling triggered after // https://github.com/facebookincubator/velox/issues/3567 is fixed. @@ -765,23 +766,6 @@ bool HashBuild::finishHashBuild() { return true; } -void HashBuild::recordSpillStats() { - recordSpillStats(spiller_.get()); -} - -void HashBuild::recordSpillStats(Spiller* spiller) { - if (spiller != nullptr) { - const auto spillStats = spiller->stats(); - VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0); - Operator::recordSpillStats(spillStats); - } else if (exceededMaxSpillLevelLimit_) { - exceededMaxSpillLevelLimit_ = false; - common::SpillStats spillStats; - spillStats.spillMaxLevelExceededCount = 1; - Operator::recordSpillStats(spillStats); - } -} - void HashBuild::ensureTableFits(uint64_t numRows) { // NOTE: we don't need memory reservation if all the partitions have been // spilled as nothing need to be built. diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 42bf6633bb15..11691597278e 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -117,9 +117,6 @@ class HashBuild final : public Operator { return canReclaim(); } - void recordSpillStats(); - void recordSpillStats(Spiller* spiller); - // Indicates if the input is read from spill data or not. bool isInputFromSpill() const; diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 023f0dd4db56..1f2f8d91ed9d 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -253,7 +253,8 @@ void HashProbe::maybeSetupSpillInput( spillInputPartitionIds_.begin()->partitionBitOffset(), spillInputPartitionIds_.begin()->partitionBitOffset() + spillConfig.numPartitionBits), - &spillConfig); + &spillConfig, + &spillStats_); // Set the spill partitions to the corresponding ones at the build side. The // hash probe operator itself won't trigger any spilling. spiller_->setPartitionsSpilled(toPartitionNumSet(spillInputPartitionIds_)); @@ -1382,7 +1383,8 @@ void HashProbe::noMoreInputInternal() { VELOX_CHECK_EQ( spillInputPartitionIds_.size(), spiller_->spilledPartitionSet().size()); spiller_->finishSpill(spillPartitionSet_); - recordSpillStats(); + VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeUs, 0); + VELOX_CHECK_EQ(spillStats_.rlock()->spillFillTimeUs, 0); } const bool hasSpillData = hasMoreSpillData(); @@ -1412,14 +1414,6 @@ void HashProbe::noMoreInputInternal() { lastProber_ = true; } -void HashProbe::recordSpillStats() { - VELOX_CHECK_NOT_NULL(spiller_); - const auto spillStats = spiller_->stats(); - VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0); - VELOX_CHECK_EQ(spillStats.spillFillTimeUs, 0); - Operator::recordSpillStats(spillStats); -} - bool HashProbe::isFinished() { return state_ == ProbeOperatorState::kFinish; } diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 3f54f84e77a6..3c35a8365e36 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -220,8 +220,6 @@ class HashProbe : public Operator { // next hash table from the spilled data. void noMoreInputInternal(); - void recordSpillStats(); - // Returns the index of the 'match' column in the output for semi project // joins. VectorPtr& matchColumn() const { diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index e37404421f4c..e74049d43230 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -295,78 +295,80 @@ void Operator::recordBlockingTime(uint64_t start, BlockingReason reason) { fmt::format("blocked{}Times", blockReason), RuntimeCounter(1)); } -void Operator::recordSpillStats(const common::SpillStats& spillStats) { +void Operator::recordSpillStats() { + const auto lockedSpillStats = spillStats_.wlock(); auto lockedStats = stats_.wlock(); - lockedStats->spilledInputBytes += spillStats.spilledInputBytes; - lockedStats->spilledBytes += spillStats.spilledBytes; - lockedStats->spilledRows += spillStats.spilledRows; - lockedStats->spilledPartitions += spillStats.spilledPartitions; - lockedStats->spilledFiles += spillStats.spilledFiles; - if (spillStats.spillFillTimeUs != 0) { + lockedStats->spilledInputBytes += lockedSpillStats->spilledInputBytes; + lockedStats->spilledBytes += lockedSpillStats->spilledBytes; + lockedStats->spilledRows += lockedSpillStats->spilledRows; + lockedStats->spilledPartitions += lockedSpillStats->spilledPartitions; + lockedStats->spilledFiles += lockedSpillStats->spilledFiles; + if (lockedSpillStats->spillFillTimeUs != 0) { lockedStats->addRuntimeStat( "spillFillTime", RuntimeCounter{ static_cast( - spillStats.spillFillTimeUs * + lockedSpillStats->spillFillTimeUs * Timestamp::kNanosecondsInMicrosecond), RuntimeCounter::Unit::kNanos}); } - if (spillStats.spillSortTimeUs != 0) { + if (lockedSpillStats->spillSortTimeUs != 0) { lockedStats->addRuntimeStat( "spillSortTime", RuntimeCounter{ static_cast( - spillStats.spillSortTimeUs * + lockedSpillStats->spillSortTimeUs * Timestamp::kNanosecondsInMicrosecond), RuntimeCounter::Unit::kNanos}); } - if (spillStats.spillSerializationTimeUs != 0) { + if (lockedSpillStats->spillSerializationTimeUs != 0) { lockedStats->addRuntimeStat( "spillSerializationTime", RuntimeCounter{ static_cast( - spillStats.spillSerializationTimeUs * + lockedSpillStats->spillSerializationTimeUs * Timestamp::kNanosecondsInMicrosecond), RuntimeCounter::Unit::kNanos}); } - if (spillStats.spillFlushTimeUs != 0) { + if (lockedSpillStats->spillFlushTimeUs != 0) { lockedStats->addRuntimeStat( "spillFlushTime", RuntimeCounter{ static_cast( - spillStats.spillFlushTimeUs * + lockedSpillStats->spillFlushTimeUs * Timestamp::kNanosecondsInMicrosecond), RuntimeCounter::Unit::kNanos}); } - if (spillStats.spillWrites != 0) { + if (lockedSpillStats->spillWrites != 0) { lockedStats->addRuntimeStat( "spillWrites", - RuntimeCounter{static_cast(spillStats.spillWrites)}); + RuntimeCounter{static_cast(lockedSpillStats->spillWrites)}); } - if (spillStats.spillWriteTimeUs != 0) { + if (lockedSpillStats->spillWriteTimeUs != 0) { lockedStats->addRuntimeStat( "spillWriteTime", RuntimeCounter{ static_cast( - spillStats.spillWriteTimeUs * + lockedSpillStats->spillWriteTimeUs * Timestamp::kNanosecondsInMicrosecond), RuntimeCounter::Unit::kNanos}); } - if (spillStats.spillRuns != 0) { + if (lockedSpillStats->spillRuns != 0) { lockedStats->addRuntimeStat( "spillRuns", - RuntimeCounter{static_cast(spillStats.spillRuns)}); - common::updateGlobalSpillRunStats(spillStats.spillRuns); + RuntimeCounter{static_cast(lockedSpillStats->spillRuns)}); + common::updateGlobalSpillRunStats(lockedSpillStats->spillRuns); } - if (spillStats.spillMaxLevelExceededCount != 0) { + if (lockedSpillStats->spillMaxLevelExceededCount != 0) { lockedStats->addRuntimeStat( "exceededMaxSpillLevel", - RuntimeCounter{ - static_cast(spillStats.spillMaxLevelExceededCount)}); + RuntimeCounter{static_cast( + lockedSpillStats->spillMaxLevelExceededCount)}); common::updateGlobalMaxSpillLevelExceededCount( - spillStats.spillMaxLevelExceededCount); + lockedSpillStats->spillMaxLevelExceededCount); } + lockedSpillStats->reset(); } std::string Operator::toString() const { diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 2b40df4d00b4..c98e1a58deef 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -420,6 +420,7 @@ class Operator : public BaseRuntimeStatWriter { virtual void close() { input_ = nullptr; results_.clear(); + recordSpillStats(); // Release the unused memory reservation on close. operatorCtx_->pool()->release(); } @@ -694,7 +695,7 @@ class Operator : public BaseRuntimeStatWriter { std::optional averageRowSize = std::nullopt) const; /// Invoked to record spill stats in operator stats. - void recordSpillStats(const common::SpillStats& spillStats); + virtual void recordSpillStats(); const std::unique_ptr operatorCtx_; const RowTypePtr outputType_; @@ -705,6 +706,7 @@ class Operator : public BaseRuntimeStatWriter { bool initialized_{false}; folly::Synchronized stats_; + folly::Synchronized spillStats_; /// Indicates if an operator is under a non-reclaimable execution section. /// This prevents the memory arbitrator from reclaiming memory from this diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index 4d9a96955359..59b1bc43a0bc 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -65,7 +65,8 @@ OrderBy::OrderBy( sortCompareFlags, pool(), &nonReclaimableSection_, - spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr); + spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr, + &spillStats_); } void OrderBy::addInput(RowVectorPtr input) { @@ -90,7 +91,6 @@ void OrderBy::noMoreInput() { Operator::noMoreInput(); sortBuffer_->noMoreInput(); maxOutputRows_ = outputBatchRows(sortBuffer_->estimateOutputRowSize()); - recordSpillStats(); } RowVectorPtr OrderBy::getOutput() { @@ -107,12 +107,4 @@ void OrderBy::close() { Operator::close(); sortBuffer_.reset(); } - -void OrderBy::recordSpillStats() { - VELOX_CHECK_NOT_NULL(sortBuffer_); - auto spillStats = sortBuffer_->spilledStats(); - if (spillStats.has_value()) { - Operator::recordSpillStats(spillStats.value()); - } -} } // namespace facebook::velox::exec diff --git a/velox/exec/OrderBy.h b/velox/exec/OrderBy.h index 850eda574207..d75315094d9a 100644 --- a/velox/exec/OrderBy.h +++ b/velox/exec/OrderBy.h @@ -63,10 +63,6 @@ class OrderBy : public Operator { void close() override; private: - // Invoked to record the spilling stats in operator stats after processing all - // the inputs. - void recordSpillStats(); - std::unique_ptr sortBuffer_; bool finished_ = false; uint32_t maxOutputRows_; diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index 7e12b26c220c..e3bb28d00f31 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -116,7 +116,6 @@ void RowNumber::noMoreInput() { if (inputSpiller_ != nullptr) { inputSpiller_->finishSpill(spillInputPartitionSet_); - recordSpillStats(inputSpiller_->stats()); removeEmptyPartitions(spillInputPartitionSet_); restoreNextSpillPartition(); } @@ -390,11 +389,11 @@ SpillPartitionNumSet RowNumber::spillHashTable() { table_->rows(), tableType, spillPartitionBits_, - &spillConfig); + &spillConfig, + &spillStats_); hashTableSpiller->spill(); hashTableSpiller->finishSpill(spillHashTablePartitionSet_); - recordSpillStats(hashTableSpiller->stats()); table_->clear(); pool()->release(); @@ -412,7 +411,8 @@ void RowNumber::setupInputSpiller( Spiller::Type::kHashJoinProbe, inputType_, spillPartitionBits_, - &spillConfig); + &spillConfig, + &spillStats_); inputSpiller_->setPartitionsSpilled(spillPartitionSet); const auto& hashers = table_->hashers(); diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index f9358f99f3b7..5379a600d9f7 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -25,12 +25,14 @@ SortBuffer::SortBuffer( const std::vector& sortCompareFlags, velox::memory::MemoryPool* pool, tsan_atomic* nonReclaimableSection, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : input_(input), sortCompareFlags_(sortCompareFlags), pool_(pool), nonReclaimableSection_(nonReclaimableSection), - spillConfig_(spillConfig) { + spillConfig_(spillConfig), + spillStats_(spillStats) { VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size()); VELOX_CHECK_GT(sortCompareFlags_.size(), 0); VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size()); @@ -258,7 +260,8 @@ void SortBuffer::spillInput() { spillerStoreType_, data_->keyTypes().size(), sortCompareFlags_, - spillConfig_); + spillConfig_, + spillStats_); } spiller_->spill(); data_->clear(); @@ -278,7 +281,8 @@ void SortBuffer::spillOutput() { Spiller::Type::kOrderByOutput, data_.get(), spillerStoreType_, - spillConfig_); + spillConfig_, + spillStats_); auto spillRows = std::vector( sortedRows_.begin() + numOutputRows_, sortedRows_.end()); spiller_->spill(spillRows); diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index ca9930e3e95e..473fe0e7e9b0 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -36,7 +36,8 @@ class SortBuffer { const std::vector& sortCompareFlags, velox::memory::MemoryPool* pool, tsan_atomic* nonReclaimableSection, - const common::SpillConfig* spillConfig = nullptr); + const common::SpillConfig* spillConfig = nullptr, + folly::Synchronized* spillStats = nullptr); void addInput(const VectorPtr& input); @@ -61,14 +62,6 @@ class SortBuffer { return pool_; } - /// Returns the spiller stats including total bytes and rows spilled so far. - std::optional spilledStats() const { - if (spiller_ == nullptr) { - return std::nullopt; - } - return spiller_->stats(); - } - std::optional estimateOutputRowSize() const; private: @@ -95,6 +88,7 @@ class SortBuffer { // execution section or not. tsan_atomic* const nonReclaimableSection_; const common::SpillConfig* const spillConfig_; + folly::Synchronized* const spillStats_; // The column projection map between 'input_' and 'spillerStoreType_' as sort // buffer stores the sort columns first in 'data_'. diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index 2c86ae062575..86f3ee458edb 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -43,12 +43,14 @@ SortWindowBuild::SortWindowBuild( const std::shared_ptr& node, velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, - tsan_atomic* nonReclaimableSection) + tsan_atomic* nonReclaimableSection, + folly::Synchronized* spillStats) : WindowBuild(node, pool, spillConfig, nonReclaimableSection), numPartitionKeys_{node->partitionKeys().size()}, spillCompareFlags_{ makeSpillCompareFlags(numPartitionKeys_, node->sortingOrders())}, - pool_(pool) { + pool_(pool), + spillStats_(spillStats) { VELOX_CHECK_NOT_NULL(pool_); allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size()); allKeyInfo_.insert( @@ -145,7 +147,8 @@ void SortWindowBuild::setupSpiller() { inputType_, spillCompareFlags_.size(), spillCompareFlags_, - spillConfig_); + spillConfig_, + spillStats_); } void SortWindowBuild::spill() { diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index bc73de0117b8..645949ddb7e0 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -30,7 +30,8 @@ class SortWindowBuild : public WindowBuild { const std::shared_ptr& node, velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, - tsan_atomic* nonReclaimableSection); + tsan_atomic* nonReclaimableSection, + folly::Synchronized* spillStats); bool needsInput() override { // No partitions are available yet, so can consume input rows. @@ -85,6 +86,7 @@ class SortWindowBuild : public WindowBuild { const std::vector spillCompareFlags_; memory::MemoryPool* const pool_; + folly::Synchronized* const spillStats_; // allKeyInfo_ is a combination of (partitionKeyInfo_ and sortKeyInfo_). // It is used to perform a full sorting of the input rows to be able to diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index d2dc3fdc9e47..cd63c5545a2a 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -39,7 +39,8 @@ Spiller::Spiller( RowTypePtr rowType, int32_t numSortingKeys, const std::vector& sortCompareFlags, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : Spiller( type, container, @@ -56,7 +57,8 @@ Spiller::Spiller( spillConfig->compressionKind, spillConfig->executor, spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig) { + spillConfig->fileCreateConfig, + spillStats) { VELOX_CHECK( type_ == Type::kOrderByInput || type_ == Type::kAggregateInput, "Unexpected spiller type: {}", @@ -69,7 +71,8 @@ Spiller::Spiller( Type type, RowContainer* container, RowTypePtr rowType, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : Spiller( type, container, @@ -86,7 +89,8 @@ Spiller::Spiller( spillConfig->compressionKind, spillConfig->executor, spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig) { + spillConfig->fileCreateConfig, + spillStats) { VELOX_CHECK( type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput, "Unexpected spiller type: {}", @@ -99,7 +103,8 @@ Spiller::Spiller( Type type, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : Spiller( type, nullptr, @@ -116,7 +121,8 @@ Spiller::Spiller( spillConfig->compressionKind, spillConfig->executor, 0, - spillConfig->fileCreateConfig) { + spillConfig->fileCreateConfig, + spillStats) { VELOX_CHECK_EQ( type_, Type::kHashJoinProbe, @@ -130,7 +136,8 @@ Spiller::Spiller( RowContainer* container, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : Spiller( type, container, @@ -147,7 +154,8 @@ Spiller::Spiller( spillConfig->compressionKind, spillConfig->executor, spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig) { + spillConfig->fileCreateConfig, + spillStats) { VELOX_CHECK_EQ(type_, Type::kHashJoinBuild); VELOX_CHECK(isHashJoinTableSpillType(rowType_, joinType)); } @@ -157,7 +165,8 @@ Spiller::Spiller( RowContainer* container, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : Spiller( type, container, @@ -174,7 +183,8 @@ Spiller::Spiller( spillConfig->compressionKind, spillConfig->executor, spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig) { + spillConfig->fileCreateConfig, + spillStats) { VELOX_CHECK_EQ(type_, Type::kRowNumber); } @@ -194,7 +204,8 @@ Spiller::Spiller( common::CompressionKind compressionKind, folly::Executor* executor, uint64_t maxSpillRunRows, - const std::string& fileCreateConfig) + const std::string& fileCreateConfig, + folly::Synchronized* spillStats) : type_(type), container_(container), executor_(executor), @@ -202,6 +213,7 @@ Spiller::Spiller( rowType_(std::move(rowType)), spillProbedFlag_(recordProbedFlag), maxSpillRunRows_(maxSpillRunRows), + spillStats_(spillStats), state_( getSpillDirPathCb, updateAndCheckSpillLimitCb, @@ -213,7 +225,7 @@ Spiller::Spiller( writeBufferSize, compressionKind, memory::spillMemoryPool(), - &stats_, + spillStats, fileCreateConfig) { TestValue::adjust( "facebook::velox::exec::Spiller", const_cast(&bits_)); @@ -419,7 +431,7 @@ std::unique_ptr Spiller::writeSpill(int32_t partition) { } void Spiller::runSpill(bool lastRun) { - ++stats_.wlock()->spillRuns; + ++spillStats_->wlock()->spillRuns; VELOX_CHECK(type_ != Spiller::Type::kOrderByOutput || lastRun); std::vector>> writes; @@ -481,12 +493,12 @@ void Spiller::runSpill(bool lastRun) { } void Spiller::updateSpillFillTime(uint64_t timeUs) { - stats_.wlock()->spillFillTimeUs += timeUs; + spillStats_->wlock()->spillFillTimeUs += timeUs; common::updateGlobalSpillFillTime(timeUs); } void Spiller::updateSpillSortTime(uint64_t timeUs) { - stats_.wlock()->spillSortTimeUs += timeUs; + spillStats_->wlock()->spillSortTimeUs += timeUs; common::updateGlobalSpillSortTime(timeUs); } @@ -705,6 +717,6 @@ std::string Spiller::typeName(Type type) { } common::SpillStats Spiller::stats() const { - return stats_.copy(); + return spillStats_->copy(); } } // namespace facebook::velox::exec diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index f78c080ffc1c..aecce215dc70 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -59,21 +59,24 @@ class Spiller { RowTypePtr rowType, int32_t numSortingKeys, const std::vector& sortCompareFlags, - const common::SpillConfig* spillConfig); + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); /// type == Type::kAggregateOutput || type == Type::kOrderByOutput Spiller( Type type, RowContainer* container, RowTypePtr rowType, - const common::SpillConfig* spillConfig); + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); /// type == Type::kHashJoinProbe Spiller( Type type, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig); + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); /// type == Type::kHashJoinBuild Spiller( @@ -82,7 +85,8 @@ class Spiller { RowContainer* container, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig); + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); /// type == Type::kRowNumber Spiller( @@ -90,7 +94,8 @@ class Spiller { RowContainer* container, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig); + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); Type type() const { return type_; @@ -204,7 +209,8 @@ class Spiller { common::CompressionKind compressionKind, folly::Executor* executor, uint64_t maxSpillRunRows, - const std::string& fileCreateConfig); + const std::string& fileCreateConfig, + folly::Synchronized* spillStats); // Invoked to spill. If 'startRowIter' is not null, then we only spill rows // from row container starting at the offset pointed by 'startRowIter'. @@ -316,13 +322,14 @@ class Spiller { const bool spillProbedFlag_; const uint64_t maxSpillRunRows_; + folly::Synchronized* const spillStats_; + // True if all rows of spilling partitions are in 'spillRuns_', so // that one can start reading these back. This means that the rows // that are not written out and deleted will be captured by // spillMergeStreamOverRows(). bool finalized_{false}; - folly::Synchronized stats_; SpillState state_; // Collects the rows to spill for each partition. diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index a40611c37112..c8f923af1276 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -255,11 +255,12 @@ void TableWriter::updateStats(const connector::DataSink::Stats& stats) { "numWrittenFiles", RuntimeCounter(stats.numWrittenFiles)); } if (!stats.spillStats.empty()) { - recordSpillStats(stats.spillStats); + *spillStats_.wlock() += stats.spillStats; } } void TableWriter::close() { + Operator::close(); if (!closed_) { // Abort the data sink if the query has already failed and no need for // regular close. diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 2b7b1cbfd884..5b35a11476ce 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -288,7 +288,6 @@ void TopNRowNumber::noMoreInput() { VELOX_CHECK_NULL(merge_); auto spillPartition = spiller_->finishSpill(); merge_ = spillPartition.createOrderedReader(pool()); - recordSpillStats(spiller_->stats()); } else { outputRows_.resize(outputBatchSize_); } @@ -748,6 +747,7 @@ void TopNRowNumber::setupSpiller() { inputType_, spillCompareFlags_.size(), spillCompareFlags_, - &spillConfig_.value()); + &spillConfig_.value(), + &spillStats_); } } // namespace facebook::velox::exec diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index dd8707e7c1cd..6e09973edf60 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -45,7 +45,7 @@ Window::Window( windowNode, pool(), spillConfig, &nonReclaimableSection_); } else { windowBuild_ = std::make_unique( - windowNode, pool(), spillConfig, &nonReclaimableSection_); + windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_); } } @@ -241,10 +241,6 @@ void Window::createPeerAndFrameBuffers() { void Window::noMoreInput() { Operator::noMoreInput(); windowBuild_->noMoreInput(); - - if (auto spillStats = windowBuild_->spilledStats()) { - recordSpillStats(spillStats.value()); - } } void Window::callResetPartition() { diff --git a/velox/exec/tests/AggregateSpillBenchmarkBase.cpp b/velox/exec/tests/AggregateSpillBenchmarkBase.cpp index 4c8fbcc82793..3218cda4431c 100644 --- a/velox/exec/tests/AggregateSpillBenchmarkBase.cpp +++ b/velox/exec/tests/AggregateSpillBenchmarkBase.cpp @@ -124,7 +124,7 @@ void AggregateSpillBenchmarkBase::writeSpillData() { } } -std::unique_ptr AggregateSpillBenchmarkBase::makeSpiller() const { +std::unique_ptr AggregateSpillBenchmarkBase::makeSpiller() { common::SpillConfig spillConfig; spillConfig.getSpillDirPathCb = [&]() -> const std::string& { return spillDir_; @@ -145,11 +145,16 @@ std::unique_ptr AggregateSpillBenchmarkBase::makeSpiller() const { rowType_, rowContainer_->keyTypes().size(), std::vector{}, - &spillConfig); + &spillConfig, + &spillStats_); } else { // TODO: Add config flag to control the max spill rows. return std::make_unique( - spillerType_, rowContainer_.get(), rowType_, &spillConfig); + spillerType_, + rowContainer_.get(), + rowType_, + &spillConfig, + &spillStats_); } } } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/AggregateSpillBenchmarkBase.h b/velox/exec/tests/AggregateSpillBenchmarkBase.h index fe1769dc5579..15b3bb853d66 100644 --- a/velox/exec/tests/AggregateSpillBenchmarkBase.h +++ b/velox/exec/tests/AggregateSpillBenchmarkBase.h @@ -32,7 +32,7 @@ class AggregateSpillBenchmarkBase : public SpillerBenchmarkBase { private: void writeSpillData(); - std::unique_ptr makeSpiller() const; + std::unique_ptr makeSpiller(); const Spiller::Type spillerType_; std::unique_ptr rowContainer_; diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index abca47ad7f22..93e60c178440 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -6299,7 +6299,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, exceededMaxSpillLevel) { .operatorStats.back() .runtimeStats; ASSERT_EQ(joinStats["exceededMaxSpillLevel"].sum, 8); - ASSERT_EQ(joinStats["exceededMaxSpillLevel"].count, 8); + ASSERT_EQ(joinStats["exceededMaxSpillLevel"].count, 1); }) .run(); ASSERT_EQ( diff --git a/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp b/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp index 66627e145771..3488a95b6f30 100644 --- a/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp +++ b/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp @@ -49,7 +49,8 @@ void JoinSpillInputBenchmarkBase::setUp() { exec::Spiller::Type::kHashJoinProbe, rowType_, HashBitRange{29, 29}, - &spillConfig); + &spillConfig, + &spillStats_); spiller_->setPartitionsSpilled({0}); } diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index 992fa8abb0d3..6cc77ec538cd 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -1034,8 +1034,9 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { taskThread.join(); auto stats = task->taskStats().pipelineStats; - ASSERT_EQ(stats[0].operatorStats[1].spilledBytes, 0); - ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0); + ASSERT_TRUE(!enableSpilling || stats[0].operatorStats[1].spilledBytes > 0); + ASSERT_TRUE( + !enableSpilling || stats[0].operatorStats[1].spilledPartitions > 0); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 0); diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index f965652651ac..9b6a285755a9 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -301,13 +301,15 @@ TEST_F(SortBufferTest, batchOutput) { 0, 0, "none"); + folly::Synchronized spillStats; auto sortBuffer = std::make_unique( inputType_, sortColumnIndices_, sortCompareFlags_, pool_.get(), &nonReclaimableSection_, - testData.triggerSpill ? &spillConfig : nullptr); + testData.triggerSpill ? &spillConfig : nullptr, + &spillStats); ASSERT_EQ(sortBuffer->canSpill(), testData.triggerSpill); const std::shared_ptr fuzzerPool = @@ -324,8 +326,6 @@ TEST_F(SortBufferTest, batchOutput) { totalNumInput += inputRows; } sortBuffer->noMoreInput(); - auto spillStats = sortBuffer->spilledStats(); - int expectedOutputBufferIndex = 0; RowVectorPtr output = sortBuffer->getOutput(testData.maxOutputRows); while (output != nullptr) { @@ -336,14 +336,14 @@ TEST_F(SortBufferTest, batchOutput) { } if (!testData.triggerSpill) { - ASSERT_FALSE(spillStats.has_value()); + ASSERT_TRUE(spillStats.rlock()->empty()); } else { - ASSERT_TRUE(spillStats.has_value()); - ASSERT_GT(spillStats->spilledRows, 0); - ASSERT_LE(spillStats->spilledRows, totalNumInput); - ASSERT_GT(spillStats->spilledBytes, 0); - ASSERT_EQ(spillStats->spilledPartitions, 1); - ASSERT_GT(spillStats->spilledFiles, 0); + ASSERT_FALSE(spillStats.rlock()->empty()); + ASSERT_GT(spillStats.rlock()->spilledRows, 0); + ASSERT_LE(spillStats.rlock()->spilledRows, totalNumInput); + ASSERT_GT(spillStats.rlock()->spilledBytes, 0); + ASSERT_EQ(spillStats.rlock()->spilledPartitions, 1); + ASSERT_GT(spillStats.rlock()->spilledFiles, 0); } } } @@ -396,13 +396,15 @@ TEST_F(SortBufferTest, spill) { 0, 0, "none"); + folly::Synchronized spillStats; auto sortBuffer = std::make_unique( inputType_, sortColumnIndices_, sortCompareFlags_, pool_.get(), &nonReclaimableSection_, - testData.spillEnabled ? &spillConfig : nullptr); + testData.spillEnabled ? &spillConfig : nullptr, + &spillStats); const std::shared_ptr fuzzerPool = memory::memoryManager()->addLeafPool("spillSource"); @@ -420,23 +422,22 @@ TEST_F(SortBufferTest, spill) { totalNumInput += 1024; } sortBuffer->noMoreInput(); - const auto spillStats = sortBuffer->spilledStats(); if (!testData.spillTriggered) { - ASSERT_FALSE(spillStats.has_value()); + ASSERT_TRUE(spillStats.rlock()->empty()); if (!testData.spillEnabled) { VELOX_ASSERT_THROW(sortBuffer->spill(), "spill config is null"); } } else { - ASSERT_TRUE(spillStats.has_value()); - ASSERT_GT(spillStats->spilledRows, 0); - ASSERT_LE(spillStats->spilledRows, totalNumInput); - ASSERT_GT(spillStats->spilledBytes, 0); - ASSERT_EQ(spillStats->spilledPartitions, 1); + ASSERT_FALSE(spillStats.rlock()->empty()); + ASSERT_GT(spillStats.rlock()->spilledRows, 0); + ASSERT_LE(spillStats.rlock()->spilledRows, totalNumInput); + ASSERT_GT(spillStats.rlock()->spilledBytes, 0); + ASSERT_EQ(spillStats.rlock()->spilledPartitions, 1); // SortBuffer shall not respect maxFileSize. Total files should be num // addInput() calls minus one which is the first one that has nothing to // spill. - ASSERT_EQ(spillStats->spilledFiles, 3); + ASSERT_EQ(spillStats.rlock()->spilledFiles, 3); sortBuffer.reset(); ASSERT_EQ(memory::spillMemoryPool()->stats().currentBytes, 0); if (memory::spillMemoryPool()->trackUsage()) { @@ -456,13 +457,15 @@ TEST_F(SortBufferTest, emptySpill) { SCOPED_TRACE(fmt::format("hasPostSpillData {}", hasPostSpillData)); auto spillDirectory = exec::test::TempDirectoryPath::create(); auto spillConfig = getSpillConfig(spillDirectory->path); + folly::Synchronized spillStats; auto sortBuffer = std::make_unique( inputType_, sortColumnIndices_, sortCompareFlags_, pool_.get(), &nonReclaimableSection_, - &spillConfig); + &spillConfig, + &spillStats); sortBuffer->spill(); if (hasPostSpillData) { @@ -470,7 +473,7 @@ TEST_F(SortBufferTest, emptySpill) { sortBuffer->addInput(fuzzer.fuzzRow(inputType_)); } sortBuffer->noMoreInput(); - ASSERT_FALSE(sortBuffer->spilledStats()); + ASSERT_TRUE(spillStats.rlock()->empty()); } } } // namespace facebook::velox::functions::test diff --git a/velox/exec/tests/SpillerBenchmarkBase.h b/velox/exec/tests/SpillerBenchmarkBase.h index 28e08cfe792e..2fd181cc015a 100644 --- a/velox/exec/tests/SpillerBenchmarkBase.h +++ b/velox/exec/tests/SpillerBenchmarkBase.h @@ -71,5 +71,6 @@ class SpillerBenchmarkBase { std::unique_ptr spiller_; // Stats. uint64_t executionTimeUs_{0}; + folly::Synchronized spillStats_; }; } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index d3f0568b5d23..c6ed8dc48c4d 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -513,6 +513,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { common::GetSpillDirectoryPathCB tempSpillDirCb = [&]() -> const std::string& { return tempDirPath_->path; }; stats_.clear(); + spillStats_ = folly::Synchronized(); common::SpillConfig spillConfig; spillConfig.getSpillDirPathCb = makeError ? badSpillDirCb : tempSpillDirCb; @@ -527,8 +528,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { if (type_ == Spiller::Type::kHashJoinProbe) { // kHashJoinProbe doesn't have associated row container. - spiller_ = - std::make_unique(type_, rowType_, hashBits_, &spillConfig); + spiller_ = std::make_unique( + type_, rowType_, hashBits_, &spillConfig, &spillStats_); } else if ( type_ == Spiller::Type::kOrderByInput || type_ == Spiller::Type::kAggregateInput) { @@ -540,15 +541,21 @@ class SpillerTest : public exec::test::RowContainerTestBase { rowType_, rowContainer_->keyTypes().size(), compareFlags_, - &spillConfig); + &spillConfig, + &spillStats_); } else if ( type_ == Spiller::Type::kAggregateOutput || type_ == Spiller::Type::kOrderByOutput) { spiller_ = std::make_unique( - type_, rowContainer_.get(), rowType_, &spillConfig); + type_, rowContainer_.get(), rowType_, &spillConfig, &spillStats_); } else if (type_ == Spiller::Type::kRowNumber) { spiller_ = std::make_unique( - type_, rowContainer_.get(), rowType_, hashBits_, &spillConfig); + type_, + rowContainer_.get(), + rowType_, + hashBits_, + &spillConfig, + &spillStats_); } else { VELOX_CHECK_EQ(type_, Spiller::Type::kHashJoinBuild); spiller_ = std::make_unique( @@ -557,7 +564,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { rowContainer_.get(), rowType_, hashBits_, - &spillConfig); + &spillConfig, + &spillStats_); } ASSERT_EQ(spiller_->state().maxPartitions(), numPartitions_); ASSERT_FALSE(spiller_->isAllSpilled()); @@ -1057,6 +1065,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { std::vector> partitions_; std::vector compareFlags_; std::unique_ptr spiller_; + folly::Synchronized spillStats_; }; struct AllTypesTestParam {