Skip to content

Commit

Permalink
Add dynamic filter operator stats for Prestissimo HBO (facebookincuba…
Browse files Browse the repository at this point in the history
…tor#9652)

Summary:
Add dynamic filter operator stats for Presto coordinator to generate query plan with HBO
optimization for Prestissimo. Velox might optimize the local query execution by adding
dynamic filters at the runtime. For instance, after hash build operator builds the join table,
it might generate dynamic filters based on the join key distribution and pushes down the
dynamic produced filter to the table scan operator of probe side. This might filter out a lot
of raw input rows. Currently, Presto HBO is not aware of the dynamic filtering at the native
worker side running with Velox runtime. It might think the probe side is much smaller than the
build side and the generate the optimized plan for the next run by flipping the build and probe
side. This is not a good optimization and will slow down the query execution by triggering the
unnecessary spilling as raw inputs for the original probe side could be very large. We have
found this in Meta internal shadowing test.

This PR exposes the dynamic filter stats in operator stats to Prestissimo, and the latter reports this
to the Presto coordinator which might choose to skip such HBO optimization if the dynamic filter
pushdown has been triggered at worker side. The dynamic filter stats include the plan node id set
of the dynamic filter producers. The actual filtered rows could be calculated by inputPositions and
rawInputPositions stats in operator stats. Also note we can't have exact the number of filtered rows
as for table scan, it can have scan filter pushdown as well, and filters could be merged together.
The driver framework needs extend addDynamicFilter API to include the producer plan node id for
recording.

Pull Request resolved: facebookincubator#9652

Reviewed By: mbasmanova

Differential Revision: D56709420

Pulled By: xiaoxmeng

fbshipit-source-id: d80127076b4df13445d4c6e3fdec21e5315db1ab
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed May 1, 2024
1 parent 714b747 commit 3e90a7e
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 9 deletions.
7 changes: 4 additions & 3 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,12 @@ void Driver::initializeOperators() {
}

void Driver::pushdownFilters(int operatorIndex) {
auto op = operators_[operatorIndex].get();
auto* op = operators_[operatorIndex].get();
const auto& filters = op->getDynamicFilters();
if (filters.empty()) {
return;
}
const auto& planNodeId = op->planNodeId();

op->addRuntimeStat("dynamicFiltersProduced", RuntimeCounter(filters.size()));

Expand All @@ -313,7 +314,7 @@ void Driver::pushdownFilters(int operatorIndex) {
prevOp->canAddDynamicFilter(),
"Cannot push down dynamic filters produced by {}",
op->toString());
prevOp->addDynamicFilter(channel, entry.second);
prevOp->addDynamicFilter(planNodeId, channel, entry.second);
prevOp->addRuntimeStat("dynamicFiltersAccepted", RuntimeCounter(1));
break;
}
Expand All @@ -327,7 +328,7 @@ void Driver::pushdownFilters(int operatorIndex) {
prevOp->canAddDynamicFilter(),
"Cannot push down dynamic filters produced by {}",
op->toString());
prevOp->addDynamicFilter(channel, entry.second);
prevOp->addDynamicFilter(planNodeId, channel, entry.second);
prevOp->addRuntimeStat("dynamicFiltersAccepted", RuntimeCounter(1));
break;
}
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ void OperatorStats::add(const OperatorStats& other) {
spilledFiles += other.spilledFiles;

numNullKeys += other.numNullKeys;

dynamicFilterStats.add(other.dynamicFilterStats);
}

void OperatorStats::clear() {
Expand Down Expand Up @@ -537,6 +539,8 @@ void OperatorStats::clear() {
spilledRows = 0;
spilledPartitions = 0;
spilledFiles = 0;

dynamicFilterStats.clear();
}

std::unique_ptr<memory::MemoryReclaimer> Operator::MemoryReclaimer::create(
Expand Down
26 changes: 25 additions & 1 deletion velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,26 @@ struct MemoryStats {
}
};

/// Records the dynamic filter stats of an operator.
struct DynamicFilterStats {
/// The set of plan node ids that produce the dynamic filter added to an
/// operator. If it is empty, then there is no dynamic filter added.
std::unordered_set<core::PlanNodeId> producerNodeIds;

void clear() {
producerNodeIds.clear();
}

void add(const DynamicFilterStats& other) {
producerNodeIds.insert(
other.producerNodeIds.begin(), other.producerNodeIds.end());
}

bool empty() const {
return producerNodeIds.empty();
}
};

struct OperatorStats {
/// Initial ordinal position in the operator's pipeline.
int32_t operatorId = 0;
Expand All @@ -106,6 +126,9 @@ struct OperatorStats {
uint64_t inputBytes = 0;
uint64_t inputPositions = 0;

/// Contains the dynamic filters stats if applied.
DynamicFilterStats dynamicFilterStats;

/// Number of input batches / vectors. Allows to compute an average batch
/// size.
uint64_t inputVectors = 0;
Expand Down Expand Up @@ -161,7 +184,7 @@ struct OperatorStats {

int numDrivers = 0;

OperatorStats() {}
OperatorStats() = default;

OperatorStats(
int32_t _operatorId,
Expand Down Expand Up @@ -422,6 +445,7 @@ class Operator : public BaseRuntimeStatWriter {
/// Adds a filter dynamically generated by a downstream operator. Called only
/// if canAddFilter() returns true.
virtual void addDynamicFilter(
const core::PlanNodeId& /*producer*/,
column_index_t /*outputChannel*/,
const std::shared_ptr<common::Filter>& /*filter*/) {
VELOX_UNSUPPORTED(
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/PlanNodeStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) {
rawInputRows += stats.rawInputPositions;
rawInputBytes += stats.rawInputBytes;

dynamicFilterStats.add(stats.dynamicFilterStats);

outputRows += stats.outputPositions;
outputBytes += stats.outputBytes;
outputVectors += stats.outputVectors;
Expand Down Expand Up @@ -112,6 +114,11 @@ std::string PlanNodeStats::toString(bool includeInputStats) const {
<< succinctBytes(spilledBytes) << ", " << spilledFiles << " files)";
}

if (!dynamicFilterStats.empty()) {
out << ", DynamicFilter producer plan nodes: "
<< folly::join(',', dynamicFilterStats.producerNodeIds);
}

return out.str();
}

Expand Down
3 changes: 3 additions & 0 deletions velox/exec/PlanNodeStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ struct PlanNodeStats {
/// Sum of raw input bytes for all corresponding operators.
uint64_t rawInputBytes{0};

/// Contains the dynamic filters stats if applied.
DynamicFilterStats dynamicFilterStats;

/// Sum of output rows for all corresponding operators. When
/// plan node corresponds to multiple operator types, operators of only one of
/// these types report non-zero output rows.
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,14 @@ bool TableScan::isFinished() {
}

void TableScan::addDynamicFilter(
const core::PlanNodeId& producer,
column_index_t outputChannel,
const std::shared_ptr<common::Filter>& filter) {
if (dataSource_) {
dataSource_->addDynamicFilter(outputChannel, filter);
}
dynamicFilters_.emplace(outputChannel, filter);
stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer);
}

} // namespace facebook::velox::exec
1 change: 1 addition & 0 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TableScan : public SourceOperator {
}

void addDynamicFilter(
const core::PlanNodeId& producer,
column_index_t outputChannel,
const std::shared_ptr<common::Filter>& filter) override;

Expand Down
Loading

0 comments on commit 3e90a7e

Please sign in to comment.