Skip to content

Commit

Permalink
Consolidate SpillStats (#9211)
Browse files Browse the repository at this point in the history
Summary:
Decouple spill stats from the spiller as row number and hash probe spilling might
use more than one and different spillers. Consolidate to use one spill stats to collect
the spill stats to streamline implementation. This PR introduces a synchronized spill
stats within the operator to gather these stats and later on we could separate them
for different types of spiller if offline analysis needs.

Pull Request resolved: #9211

Reviewed By: tanjialiang

Differential Revision: D55287100

Pulled By: xiaoxmeng

fbshipit-source-id: ffde57d4f3425e3f3f679252504f7690e8dfce68
  • Loading branch information
duanmeng authored and facebook-github-bot committed Mar 24, 2024
1 parent 3d30bf5 commit 8f0adb1
Show file tree
Hide file tree
Showing 34 changed files with 196 additions and 203 deletions.
11 changes: 6 additions & 5 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,9 @@ DataSink::Stats HiveDataSink::stats() const {
for (int i = 0; i < writerInfo_.size(); ++i) {
const auto& info = writerInfo_.at(i);
VELOX_CHECK_NOT_NULL(info);
if (!info->spillStats->empty()) {
stats.spillStats += *info->spillStats;
const auto spillStats = info->spillStats->rlock();
if (!spillStats->empty()) {
stats.spillStats += *spillStats;
}
}
return stats;
Expand Down Expand Up @@ -719,15 +720,15 @@ HiveDataSink::maybeCreateBucketSortWriter(
sortCompareFlags_,
sortPool,
writerInfo_.back()->nonReclaimableSectionHolder.get(),
spillConfig_);
spillConfig_,
writerInfo_.back()->spillStats.get());
return std::make_unique<dwio::common::SortingWriter>(
std::move(writer),
std::move(sortBuffer),
hiveConfig_->sortWriterMaxOutputRows(
connectorQueryCtx_->sessionProperties()),
hiveConfig_->sortWriterMaxOutputBytes(
connectorQueryCtx_->sessionProperties()),
writerInfo_.back()->spillStats.get());
connectorQueryCtx_->sessionProperties()));
}

void HiveDataSink::splitInputRowsAndEnsureWriters() {
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ struct HiveWriterInfo {
std::shared_ptr<memory::MemoryPool> _sortPool)
: writerParameters(std::move(parameters)),
nonReclaimableSectionHolder(new tsan_atomic<bool>(false)),
spillStats(new common::SpillStats()),
spillStats(std::make_unique<folly::Synchronized<common::SpillStats>>()),
writerPool(std::move(_writerPool)),
sinkPool(std::move(_sinkPool)),
sortPool(std::move(_sortPool)) {}
Expand All @@ -364,7 +364,7 @@ struct HiveWriterInfo {
const std::unique_ptr<tsan_atomic<bool>> nonReclaimableSectionHolder;
/// Collects the spill stats from sort writer if the spilling has been
/// triggered.
const std::unique_ptr<common::SpillStats> spillStats;
const std::unique_ptr<folly::Synchronized<common::SpillStats>> spillStats;
const std::shared_ptr<memory::MemoryPool> writerPool;
const std::shared_ptr<memory::MemoryPool> sinkPool;
const std::shared_ptr<memory::MemoryPool> sortPool;
Expand Down
11 changes: 2 additions & 9 deletions velox/dwio/common/SortingWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,15 @@ SortingWriter::SortingWriter(
std::unique_ptr<Writer> writer,
std::unique_ptr<exec::SortBuffer> sortBuffer,
uint32_t maxOutputRowsConfig,
uint64_t maxOutputBytesConfig,
velox::common::SpillStats* spillStats)
uint64_t maxOutputBytesConfig)
: outputWriter_(std::move(writer)),
maxOutputRowsConfig_(maxOutputRowsConfig),
maxOutputBytesConfig_(maxOutputBytesConfig),
sortPool_(sortBuffer->pool()),
canReclaim_(sortBuffer->canSpill()),
spillStats_(spillStats),
sortBuffer_(std::move(sortBuffer)) {
VELOX_CHECK_GT(maxOutputRowsConfig_, 0);
VELOX_CHECK_GT(maxOutputBytesConfig_, 0);
VELOX_CHECK_NOT_NULL(spillStats_);
if (sortPool_->parent()->reclaimer() != nullptr) {
sortPool_->setReclaimer(MemoryReclaimer::create(this));
}
Expand Down Expand Up @@ -64,11 +61,7 @@ void SortingWriter::close() {
outputWriter_->write(output);
output = sortBuffer_->getOutput(maxOutputBatchRows);
}
auto spillStatsOr = sortBuffer_->spilledStats();
if (spillStatsOr.has_value()) {
VELOX_CHECK(canReclaim_);
*spillStats_ = spillStatsOr.value();
}

sortBuffer_.reset();
sortPool_->release();
outputWriter_->close();
Expand Down
4 changes: 1 addition & 3 deletions velox/dwio/common/SortingWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ class SortingWriter : public Writer {
std::unique_ptr<Writer> writer,
std::unique_ptr<exec::SortBuffer> sortBuffer,
uint32_t maxOutputRowsConfig,
uint64_t maxOutputBytesConfig,
velox::common::SpillStats* spillStats);
uint64_t maxOutputBytesConfig);

~SortingWriter() override;

Expand Down Expand Up @@ -81,7 +80,6 @@ class SortingWriter : public Writer {
const uint64_t maxOutputBytesConfig_;
memory::MemoryPool* const sortPool_;
const bool canReclaim_;
velox::common::SpillStats* const spillStats_;

std::unique_ptr<exec::SortBuffer> sortBuffer_;
};
Expand Down
18 changes: 13 additions & 5 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ GroupingSet::GroupingSet(
const std::optional<column_index_t>& groupIdChannel,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection,
OperatorCtx* operatorCtx)
OperatorCtx* operatorCtx,
folly::Synchronized<common::SpillStats>* spillStats)
: preGroupedKeyChannels_(std::move(preGroupedKeys)),
hashers_(std::move(hashers)),
isGlobal_(hashers_.empty()),
Expand All @@ -69,7 +70,8 @@ GroupingSet::GroupingSet(
stringAllocator_(operatorCtx->pool()),
rows_(operatorCtx->pool()),
isAdaptive_(queryConfig_.hashAdaptivityEnabled()),
pool_(*operatorCtx->pool()) {
pool_(*operatorCtx->pool()),
spillStats_(spillStats) {
VELOX_CHECK_NOT_NULL(nonReclaimableSection_);
VELOX_CHECK(pool_.trackUsage());
for (auto& hasher : hashers_) {
Expand Down Expand Up @@ -131,7 +133,8 @@ std::unique_ptr<GroupingSet> GroupingSet::createForMarkDistinct(
/*groupIdColumn*/ std::nullopt,
/*spillConfig*/ nullptr,
nonReclaimableSection,
operatorCtx);
operatorCtx,
/*spillStats_*/ nullptr);
};

namespace {
Expand Down Expand Up @@ -939,7 +942,8 @@ void GroupingSet::spill() {
makeSpillType(),
rows->keyTypes().size(),
std::vector<CompareFlags>(),
spillConfig_);
spillConfig_,
spillStats_);
}
spiller_->spill();
if (sortedAggregations_) {
Expand All @@ -958,7 +962,11 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) {
auto* rows = table_->rows();
VELOX_CHECK(pool_.trackUsage());
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kAggregateOutput, rows, makeSpillType(), spillConfig_);
Spiller::Type::kAggregateOutput,
rows,
makeSpillType(),
spillConfig_,
spillStats_);

spiller_->spill(rowIterator);
table_->clear();
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class GroupingSet {
const std::optional<column_index_t>& groupIdChannel,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection,
OperatorCtx* operatorCtx);
OperatorCtx* operatorCtx,
folly::Synchronized<common::SpillStats>* spillStats);

~GroupingSet();

Expand Down Expand Up @@ -359,6 +360,8 @@ class GroupingSet {
// Temporary for case where an aggregate in toIntermediate() outputs post-init
// state of aggregate for all rows.
std::vector<char*> firstGroup_;

folly::Synchronized<common::SpillStats>* const spillStats_;
};

} // namespace facebook::velox::exec
14 changes: 2 additions & 12 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ void HashAggregation::initialize() {
groupIdChannel,
spillConfig_.has_value() ? &spillConfig_.value() : nullptr,
&nonReclaimableSection_,
operatorCtx_.get());
operatorCtx_.get(),
&spillStats_);

aggregationNode_.reset();
}
Expand Down Expand Up @@ -188,13 +189,6 @@ void HashAggregation::updateRuntimeStats() {
RuntimeMetric(hashTableStats.numTombstones);
}

void HashAggregation::recordSpillStats() {
auto spillStatsOr = groupingSet_->spilledStats();
if (spillStatsOr.has_value()) {
Operator::recordSpillStats(spillStatsOr.value());
}
}

void HashAggregation::prepareOutput(vector_size_t size) {
if (output_) {
VectorPtr output = std::move(output_);
Expand Down Expand Up @@ -388,7 +382,6 @@ void HashAggregation::noMoreInput() {
updateEstimatedOutputRowSize();
groupingSet_->noMoreInput();
Operator::noMoreInput();
recordSpillStats();
// Release the extra reserved memory right after processing all the inputs.
pool()->release();
}
Expand Down Expand Up @@ -429,9 +422,6 @@ void HashAggregation::reclaim(
// Spill all the rows starting from the next output row pointed by
// 'resultIterator_'.
groupingSet_->spill(resultIterator_);
// NOTE: we will only spill once during the output processing stage so
// record stats here.
recordSpillStats();
} else {
// TODO: support fine-grain disk spilling based on 'targetBytes' after
// having row container memory compaction support later.
Expand Down
4 changes: 0 additions & 4 deletions velox/exec/HashAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ class HashAggregation : public Operator {

RowVectorPtr getDistinctOutput();

// Invoked to record the spilling stats in operator stats after processing all
// the inputs.
void recordSpillStats();

void updateEstimatedOutputRowSize();

std::shared_ptr<const core::AggregationNode> aggregationNode_;
Expand Down
24 changes: 4 additions & 20 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,11 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
<< spillConfig.maxSpillLevel
<< ", and disable spilling for memory pool: "
<< pool()->name();
++spillStats_.wlock()->spillMaxLevelExceededCount;
exceededMaxSpillLevelLimit_ = true;
return;
}
exceededMaxSpillLevelLimit_ = false;
hashBits = HashBitRange(startBit, startBit + spillConfig.numPartitionBits);
}

Expand All @@ -243,7 +245,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
table_->rows(),
spillType_,
std::move(hashBits),
&spillConfig);
&spillConfig,
&spillStats_);

const int32_t numPartitions = spiller_->hashBits().numPartitions();
spillInputIndicesBuffers_.resize(numPartitions);
Expand Down Expand Up @@ -732,15 +735,13 @@ bool HashBuild::finishHashBuild() {
}
if (spiller != nullptr) {
spiller->finishSpill(spillPartitions);
build->recordSpillStats(spiller.get());
}
}

if (spiller_ != nullptr) {
spiller_->finishSpill(spillPartitions);
removeEmptyPartitions(spillPartitions);
}
recordSpillStats();

// TODO: re-enable parallel join build with spilling triggered after
// https://github.com/facebookincubator/velox/issues/3567 is fixed.
Expand All @@ -765,23 +766,6 @@ bool HashBuild::finishHashBuild() {
return true;
}

void HashBuild::recordSpillStats() {
recordSpillStats(spiller_.get());
}

void HashBuild::recordSpillStats(Spiller* spiller) {
if (spiller != nullptr) {
const auto spillStats = spiller->stats();
VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0);
Operator::recordSpillStats(spillStats);
} else if (exceededMaxSpillLevelLimit_) {
exceededMaxSpillLevelLimit_ = false;
common::SpillStats spillStats;
spillStats.spillMaxLevelExceededCount = 1;
Operator::recordSpillStats(spillStats);
}
}

void HashBuild::ensureTableFits(uint64_t numRows) {
// NOTE: we don't need memory reservation if all the partitions have been
// spilled as nothing need to be built.
Expand Down
3 changes: 0 additions & 3 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ class HashBuild final : public Operator {
return canReclaim();
}

void recordSpillStats();
void recordSpillStats(Spiller* spiller);

// Indicates if the input is read from spill data or not.
bool isInputFromSpill() const;

Expand Down
14 changes: 4 additions & 10 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ void HashProbe::maybeSetupSpillInput(
spillInputPartitionIds_.begin()->partitionBitOffset(),
spillInputPartitionIds_.begin()->partitionBitOffset() +
spillConfig.numPartitionBits),
&spillConfig);
&spillConfig,
&spillStats_);
// Set the spill partitions to the corresponding ones at the build side. The
// hash probe operator itself won't trigger any spilling.
spiller_->setPartitionsSpilled(toPartitionNumSet(spillInputPartitionIds_));
Expand Down Expand Up @@ -1382,7 +1383,8 @@ void HashProbe::noMoreInputInternal() {
VELOX_CHECK_EQ(
spillInputPartitionIds_.size(), spiller_->spilledPartitionSet().size());
spiller_->finishSpill(spillPartitionSet_);
recordSpillStats();
VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeUs, 0);
VELOX_CHECK_EQ(spillStats_.rlock()->spillFillTimeUs, 0);
}

const bool hasSpillData = hasMoreSpillData();
Expand Down Expand Up @@ -1412,14 +1414,6 @@ void HashProbe::noMoreInputInternal() {
lastProber_ = true;
}

void HashProbe::recordSpillStats() {
VELOX_CHECK_NOT_NULL(spiller_);
const auto spillStats = spiller_->stats();
VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0);
VELOX_CHECK_EQ(spillStats.spillFillTimeUs, 0);
Operator::recordSpillStats(spillStats);
}

bool HashProbe::isFinished() {
return state_ == ProbeOperatorState::kFinish;
}
Expand Down
2 changes: 0 additions & 2 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,6 @@ class HashProbe : public Operator {
// next hash table from the spilled data.
void noMoreInputInternal();

void recordSpillStats();

// Returns the index of the 'match' column in the output for semi project
// joins.
VectorPtr& matchColumn() const {
Expand Down
Loading

0 comments on commit 8f0adb1

Please sign in to comment.