From 3e90a7ed6d92f8b3f12b1a19002d7c3ba89be744 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Wed, 1 May 2024 12:42:00 -0700 Subject: [PATCH] Add dynamic filter operator stats for Prestissimo HBO (#9652) Summary: Add dynamic filter operator stats for Presto coordinator to generate query plan with HBO optimization for Prestissimo. Velox might optimize the local query execution by adding dynamic filters at the runtime. For instance, after hash build operator builds the join table, it might generate dynamic filters based on the join key distribution and pushes down the dynamic produced filter to the table scan operator of probe side. This might filter out a lot of raw input rows. Currently, Presto HBO is not aware of the dynamic filtering at the native worker side running with Velox runtime. It might think the probe side is much smaller than the build side and the generate the optimized plan for the next run by flipping the build and probe side. This is not a good optimization and will slow down the query execution by triggering the unnecessary spilling as raw inputs for the original probe side could be very large. We have found this in Meta internal shadowing test. This PR exposes the dynamic filter stats in operator stats to Prestissimo, and the latter reports this to the Presto coordinator which might choose to skip such HBO optimization if the dynamic filter pushdown has been triggered at worker side. The dynamic filter stats include the plan node id set of the dynamic filter producers. The actual filtered rows could be calculated by inputPositions and rawInputPositions stats in operator stats. Also note we can't have exact the number of filtered rows as for table scan, it can have scan filter pushdown as well, and filters could be merged together. The driver framework needs extend addDynamicFilter API to include the producer plan node id for recording. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9652 Reviewed By: mbasmanova Differential Revision: D56709420 Pulled By: xiaoxmeng fbshipit-source-id: d80127076b4df13445d4c6e3fdec21e5315db1ab --- velox/exec/Driver.cpp | 7 +- velox/exec/Operator.cpp | 4 + velox/exec/Operator.h | 26 ++- velox/exec/PlanNodeStats.cpp | 7 + velox/exec/PlanNodeStats.h | 3 + velox/exec/TableScan.cpp | 2 + velox/exec/TableScan.h | 1 + velox/exec/tests/HashJoinTest.cpp | 177 ++++++++++++++++++++ velox/exec/tests/OperatorUtilsTest.cpp | 27 +++ velox/exec/tests/PrintPlanWithStatsTest.cpp | 4 +- velox/exec/tests/TableScanTest.cpp | 2 + velox/experimental/wave/exec/TableScan.cpp | 1 + velox/experimental/wave/exec/TableScan.h | 1 + velox/experimental/wave/exec/WaveDriver.h | 4 +- velox/experimental/wave/exec/WaveOperator.h | 5 +- 15 files changed, 262 insertions(+), 9 deletions(-) diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 29826d38eb8a..b64d69bac7d0 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -293,11 +293,12 @@ void Driver::initializeOperators() { } void Driver::pushdownFilters(int operatorIndex) { - auto op = operators_[operatorIndex].get(); + auto* op = operators_[operatorIndex].get(); const auto& filters = op->getDynamicFilters(); if (filters.empty()) { return; } + const auto& planNodeId = op->planNodeId(); op->addRuntimeStat("dynamicFiltersProduced", RuntimeCounter(filters.size())); @@ -313,7 +314,7 @@ void Driver::pushdownFilters(int operatorIndex) { prevOp->canAddDynamicFilter(), "Cannot push down dynamic filters produced by {}", op->toString()); - prevOp->addDynamicFilter(channel, entry.second); + prevOp->addDynamicFilter(planNodeId, channel, entry.second); prevOp->addRuntimeStat("dynamicFiltersAccepted", RuntimeCounter(1)); break; } @@ -327,7 +328,7 @@ void Driver::pushdownFilters(int operatorIndex) { prevOp->canAddDynamicFilter(), "Cannot push down dynamic filters produced by {}", op->toString()); - prevOp->addDynamicFilter(channel, entry.second); + prevOp->addDynamicFilter(planNodeId, channel, entry.second); prevOp->addRuntimeStat("dynamicFiltersAccepted", RuntimeCounter(1)); break; } diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 45c83098b561..d96926164d09 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -504,6 +504,8 @@ void OperatorStats::add(const OperatorStats& other) { spilledFiles += other.spilledFiles; numNullKeys += other.numNullKeys; + + dynamicFilterStats.add(other.dynamicFilterStats); } void OperatorStats::clear() { @@ -537,6 +539,8 @@ void OperatorStats::clear() { spilledRows = 0; spilledPartitions = 0; spilledFiles = 0; + + dynamicFilterStats.clear(); } std::unique_ptr Operator::MemoryReclaimer::create( diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index c48862c60b63..3519b2f2d6fb 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -82,6 +82,26 @@ struct MemoryStats { } }; +/// Records the dynamic filter stats of an operator. +struct DynamicFilterStats { + /// The set of plan node ids that produce the dynamic filter added to an + /// operator. If it is empty, then there is no dynamic filter added. + std::unordered_set producerNodeIds; + + void clear() { + producerNodeIds.clear(); + } + + void add(const DynamicFilterStats& other) { + producerNodeIds.insert( + other.producerNodeIds.begin(), other.producerNodeIds.end()); + } + + bool empty() const { + return producerNodeIds.empty(); + } +}; + struct OperatorStats { /// Initial ordinal position in the operator's pipeline. int32_t operatorId = 0; @@ -106,6 +126,9 @@ struct OperatorStats { uint64_t inputBytes = 0; uint64_t inputPositions = 0; + /// Contains the dynamic filters stats if applied. + DynamicFilterStats dynamicFilterStats; + /// Number of input batches / vectors. Allows to compute an average batch /// size. uint64_t inputVectors = 0; @@ -161,7 +184,7 @@ struct OperatorStats { int numDrivers = 0; - OperatorStats() {} + OperatorStats() = default; OperatorStats( int32_t _operatorId, @@ -422,6 +445,7 @@ class Operator : public BaseRuntimeStatWriter { /// Adds a filter dynamically generated by a downstream operator. Called only /// if canAddFilter() returns true. virtual void addDynamicFilter( + const core::PlanNodeId& /*producer*/, column_index_t /*outputChannel*/, const std::shared_ptr& /*filter*/) { VELOX_UNSUPPORTED( diff --git a/velox/exec/PlanNodeStats.cpp b/velox/exec/PlanNodeStats.cpp index c536f31b00b3..caab8f764872 100644 --- a/velox/exec/PlanNodeStats.cpp +++ b/velox/exec/PlanNodeStats.cpp @@ -39,6 +39,8 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) { rawInputRows += stats.rawInputPositions; rawInputBytes += stats.rawInputBytes; + dynamicFilterStats.add(stats.dynamicFilterStats); + outputRows += stats.outputPositions; outputBytes += stats.outputBytes; outputVectors += stats.outputVectors; @@ -112,6 +114,11 @@ std::string PlanNodeStats::toString(bool includeInputStats) const { << succinctBytes(spilledBytes) << ", " << spilledFiles << " files)"; } + if (!dynamicFilterStats.empty()) { + out << ", DynamicFilter producer plan nodes: " + << folly::join(',', dynamicFilterStats.producerNodeIds); + } + return out.str(); } diff --git a/velox/exec/PlanNodeStats.h b/velox/exec/PlanNodeStats.h index a53a7fa3ea00..4d10b9d60875 100644 --- a/velox/exec/PlanNodeStats.h +++ b/velox/exec/PlanNodeStats.h @@ -62,6 +62,9 @@ struct PlanNodeStats { /// Sum of raw input bytes for all corresponding operators. uint64_t rawInputBytes{0}; + /// Contains the dynamic filters stats if applied. + DynamicFilterStats dynamicFilterStats; + /// Sum of output rows for all corresponding operators. When /// plan node corresponds to multiple operator types, operators of only one of /// these types report non-zero output rows. diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 1b4674cf5001..f1c24d97e6c4 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -356,12 +356,14 @@ bool TableScan::isFinished() { } void TableScan::addDynamicFilter( + const core::PlanNodeId& producer, column_index_t outputChannel, const std::shared_ptr& filter) { if (dataSource_) { dataSource_->addDynamicFilter(outputChannel, filter); } dynamicFilters_.emplace(outputChannel, filter); + stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer); } } // namespace facebook::velox::exec diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index 973821a3a45c..516e377516ba 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -46,6 +46,7 @@ class TableScan : public SourceOperator { } void addDynamicFilter( + const core::PlanNodeId& producer, column_index_t outputChannel, const std::shared_ptr& filter) override; diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 928b339c2520..9eb9b2354ea6 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -4111,6 +4111,7 @@ TEST_F(HashJoinTest, dynamicFilters) { { // Inner join. core::PlanNodeId probeScanId; + core::PlanNodeId joinId; auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) .tableScan(probeType) .capturePlanNodeId(probeScanId) @@ -4121,6 +4122,7 @@ TEST_F(HashJoinTest, dynamicFilters) { "", {"c0", "c1", "u_c1"}, core::JoinType::kInner) + .capturePlanNodeId(joinId) .project({"c0", "c1 + 1", "c1 + u_c1"}) .planNode(); { @@ -4131,16 +4133,21 @@ TEST_F(HashJoinTest, dynamicFilters) { "SELECT t.c0, t.c1 + 1, t.c1 + u.c1 FROM t, u WHERE t.c0 = u.c0") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); ASSERT_EQ(0, getReplacedWithFilterRows(task, 1).sum); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4157,6 +4164,7 @@ TEST_F(HashJoinTest, dynamicFilters) { "", {"c0", "c1"}, core::JoinType::kLeftSemiFilter) + .capturePlanNodeId(joinId) .project({"c0", "c1 + 1"}) .planNode(); @@ -4168,17 +4176,22 @@ TEST_F(HashJoinTest, dynamicFilters) { "SELECT t.c0, t.c1 + 1 FROM t WHERE t.c0 IN (SELECT c0 FROM u)") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(0, getReplacedWithFilterRows(task, 1).sum); ASSERT_EQ(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); ASSERT_GT(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4195,6 +4208,7 @@ TEST_F(HashJoinTest, dynamicFilters) { "", {"u_c0", "u_c1"}, core::JoinType::kRightSemiFilter) + .capturePlanNodeId(joinId) .project({"u_c0", "u_c1 + 1"}) .planNode(); @@ -4206,17 +4220,22 @@ TEST_F(HashJoinTest, dynamicFilters) { "SELECT u.c0, u.c1 + 1 FROM u WHERE u.c0 IN (SELECT c0 FROM t)") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_EQ(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); ASSERT_EQ(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4232,6 +4251,7 @@ TEST_F(HashJoinTest, dynamicFilters) { assignments["b"] = regularColumn("c1", BIGINT()); core::PlanNodeId probeScanId; + core::PlanNodeId joinId; auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) .startTableScan() .outputType(scanOutputType) @@ -4239,6 +4259,7 @@ TEST_F(HashJoinTest, dynamicFilters) { .endTableScan() .capturePlanNodeId(probeScanId) .hashJoin({"a"}, {"u_c0"}, buildSide, "", {"a", "b", "u_c1"}) + .capturePlanNodeId(joinId) .project({"a", "b + 1", "b + u_c1"}) .planNode(); @@ -4249,17 +4270,22 @@ TEST_F(HashJoinTest, dynamicFilters) { "SELECT t.c0, t.c1 + 1, t.c1 + u.c1 FROM t, u WHERE t.c0 = u.c0") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(0, getReplacedWithFilterRows(task, 1).sum); ASSERT_EQ(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); ASSERT_EQ(0, getReplacedWithFilterRows(task, 1).sum); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4268,10 +4294,12 @@ TEST_F(HashJoinTest, dynamicFilters) { // Push-down that requires merging filters. { core::PlanNodeId probeScanId; + core::PlanNodeId joinId; auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) .tableScan(probeType, {"c0 < 500::INTEGER"}) .capturePlanNodeId(probeScanId) .hashJoin({"c0"}, {"u_c0"}, buildSide, "", {"c1", "u_c1"}) + .capturePlanNodeId(joinId) .project({"c1 + u_c1"}) .planNode(); @@ -4282,17 +4310,22 @@ TEST_F(HashJoinTest, dynamicFilters) { "SELECT t.c1 + u.c1 FROM t, u WHERE t.c0 = u.c0 AND t.c0 < 500") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(0, getReplacedWithFilterRows(task, 1).sum); ASSERT_EQ(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); ASSERT_EQ(0, getReplacedWithFilterRows(task, 1).sum); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4301,11 +4334,13 @@ TEST_F(HashJoinTest, dynamicFilters) { // Push-down that turns join into a no-op. { core::PlanNodeId probeScanId; + core::PlanNodeId joinId; auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) .tableScan(probeType) .capturePlanNodeId(probeScanId) .hashJoin({"c0"}, {"u_c0"}, keyOnlyBuildSide, "", {"c0", "c1"}) + .capturePlanNodeId(joinId) .project({"c0", "c1 + 1"}) .planNode(); @@ -4315,12 +4350,14 @@ TEST_F(HashJoinTest, dynamicFilters) { .referenceQuery("SELECT t.c0, t.c1 + 1 FROM t, u WHERE t.c0 = u.c0") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(0, getReplacedWithFilterRows(task, 1).sum); ASSERT_EQ(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); @@ -4328,6 +4365,9 @@ TEST_F(HashJoinTest, dynamicFilters) { getReplacedWithFilterRows(task, 1).sum, numRowsBuild * numSplits); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4337,10 +4377,12 @@ TEST_F(HashJoinTest, dynamicFilters) { // number of columns than the input. { core::PlanNodeId probeScanId; + core::PlanNodeId joinId; auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) .tableScan(probeType) .capturePlanNodeId(probeScanId) .hashJoin({"c0"}, {"u_c0"}, keyOnlyBuildSide, "", {"c0"}) + .capturePlanNodeId(joinId) .planNode(); HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) @@ -4349,12 +4391,14 @@ TEST_F(HashJoinTest, dynamicFilters) { .referenceQuery("SELECT t.c0 FROM t JOIN u ON (t.c0 = u.c0)") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(0, getReplacedWithFilterRows(task, 1).sum); ASSERT_EQ(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); @@ -4362,6 +4406,9 @@ TEST_F(HashJoinTest, dynamicFilters) { getReplacedWithFilterRows(task, 1).sum, numRowsBuild * numSplits); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4370,10 +4417,12 @@ TEST_F(HashJoinTest, dynamicFilters) { // Push-down that requires merging filters and turns join into a no-op. { core::PlanNodeId probeScanId; + core::PlanNodeId joinId; auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) .tableScan(probeType, {"c0 < 500::INTEGER"}) .capturePlanNodeId(probeScanId) .hashJoin({"c0"}, {"u_c0"}, keyOnlyBuildSide, "", {"c1"}) + .capturePlanNodeId(joinId) .project({"c1 + 1"}) .planNode(); @@ -4384,17 +4433,22 @@ TEST_F(HashJoinTest, dynamicFilters) { "SELECT t.c1 + 1 FROM t, u WHERE t.c0 = u.c0 AND t.c0 < 500") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_EQ(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); ASSERT_GT(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4404,12 +4458,14 @@ TEST_F(HashJoinTest, dynamicFilters) { { // Inner join. core::PlanNodeId probeScanId; + core::PlanNodeId joinId; auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) .tableScan(probeType, {"c0 < 200::INTEGER"}) .capturePlanNodeId(probeScanId) .hashJoin( {"c0"}, {"u_c0"}, buildSide, "", {"c1"}, core::JoinType::kInner) + .capturePlanNodeId(joinId) .project({"c1 + 1"}) .planNode(); @@ -4421,17 +4477,22 @@ TEST_F(HashJoinTest, dynamicFilters) { "SELECT t.c1 + 1 FROM t, u WHERE t.c0 = u.c0 AND t.c0 < 200") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); ASSERT_GT(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4448,6 +4509,7 @@ TEST_F(HashJoinTest, dynamicFilters) { "", {"c1"}, core::JoinType::kLeftSemiFilter) + .capturePlanNodeId(joinId) .project({"c1 + 1"}) .planNode(); @@ -4459,17 +4521,22 @@ TEST_F(HashJoinTest, dynamicFilters) { "SELECT t.c1 + 1 FROM t WHERE t.c0 IN (SELECT c0 FROM u) AND t.c0 < 200") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); ASSERT_GT(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4486,6 +4553,7 @@ TEST_F(HashJoinTest, dynamicFilters) { "", {"u_c1"}, core::JoinType::kRightSemiFilter) + .capturePlanNodeId(joinId) .project({"u_c1 + 1"}) .planNode(); @@ -4497,17 +4565,22 @@ TEST_F(HashJoinTest, dynamicFilters) { "SELECT u.c1 + 1 FROM u WHERE u.c0 IN (SELECT c0 FROM t) AND u.c0 < 200") .verifier([&](const std::shared_ptr& task, bool hasSpill) { SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); + auto planStats = toPlanStats(task->taskStats()); if (hasSpill) { // Dynamic filtering should be disabled with spilling triggered. ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); } else { ASSERT_EQ(1, getFiltersProduced(task, 1).sum); ASSERT_EQ(1, getFiltersAccepted(task, 0).sum); ASSERT_EQ(getReplacedWithFilterRows(task, 1).sum, 0); ASSERT_LT(getInputPositions(task, 1), numRowsProbe * numSplits); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId})); } }) .run(); @@ -4516,9 +4589,11 @@ TEST_F(HashJoinTest, dynamicFilters) { // Disable filter push-down by using values in place of scan. { + core::PlanNodeId joinId; auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) .values(probeVectors) .hashJoin({"c0"}, {"u_c0"}, buildSide, "", {"c1"}) + .capturePlanNodeId(joinId) .project({"c1 + 1"}) .planNode(); @@ -4526,6 +4601,7 @@ TEST_F(HashJoinTest, dynamicFilters) { .planNode(std::move(op)) .referenceQuery("SELECT t.c1 + 1 FROM t, u WHERE t.c0 = u.c0") .verifier([&](const std::shared_ptr& task, bool hasSpill) { + auto planStats = toPlanStats(task->taskStats()); ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(numRowsProbe * numSplits, getInputPositions(task, 1)); @@ -4537,11 +4613,13 @@ TEST_F(HashJoinTest, dynamicFilters) { // probe side. { core::PlanNodeId probeScanId; + core::PlanNodeId joinId; auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) .tableScan(probeType) .capturePlanNodeId(probeScanId) .project({"cast(c0 + 1 as integer) AS t_key", "c1"}) .hashJoin({"t_key"}, {"u_c0"}, buildSide, "", {"c1"}) + .capturePlanNodeId(joinId) .project({"c1 + 1"}) .planNode(); @@ -4550,14 +4628,113 @@ TEST_F(HashJoinTest, dynamicFilters) { .makeInputSplits(makeInputSplits(probeScanId)) .referenceQuery("SELECT t.c1 + 1 FROM t, u WHERE (t.c0 + 1) = u.c0") .verifier([&](const std::shared_ptr& task, bool hasSpill) { + auto planStats = toPlanStats(task->taskStats()); ASSERT_EQ(0, getFiltersProduced(task, 1).sum); ASSERT_EQ(0, getFiltersAccepted(task, 0).sum); ASSERT_EQ(numRowsProbe * numSplits, getInputPositions(task, 1)); + ASSERT_TRUE(planStats.at(probeScanId).dynamicFilterStats.empty()); }) .run(); } } +TEST_F(HashJoinTest, dynamicFiltersStatsWithChainedJoins) { + const int32_t numSplits = 10; + const int32_t numProbeRows = 333; + const int32_t numBuildRows = 100; + + std::vector probeVectors; + probeVectors.reserve(numSplits); + std::vector> tempFiles; + for (int32_t i = 0; i < numSplits; ++i) { + auto rowVector = makeRowVector({ + makeFlatVector( + numProbeRows, [&](auto row) { return row - i * 10; }), + makeFlatVector(numProbeRows, [](auto row) { return row; }), + }); + probeVectors.push_back(rowVector); + tempFiles.push_back(TempFilePath::create()); + writeToFile(tempFiles.back()->getPath(), rowVector); + } + auto makeInputSplits = [&](const core::PlanNodeId& nodeId) { + return [&] { + std::vector probeSplits; + for (auto& file : tempFiles) { + probeSplits.push_back( + exec::Split(makeHiveConnectorSplit(file->getPath()))); + } + SplitInput splits; + splits.emplace(nodeId, probeSplits); + return splits; + }; + }; + + // 100 key values in [35, 233] range. + std::vector buildVectors; + for (int i = 0; i < 5; ++i) { + buildVectors.push_back(makeRowVector({ + makeFlatVector( + numBuildRows / 5, + [i](auto row) { return 35 + 2 * (row + i * numBuildRows / 5); }), + makeFlatVector(numBuildRows / 5, [](auto row) { return row; }), + })); + } + + createDuckDbTable("t", probeVectors); + createDuckDbTable("u", buildVectors); + + auto probeType = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); + + auto planNodeIdGenerator = std::make_shared(); + + auto buildSide1 = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values(buildVectors) + .project({"c0 AS u_c0", "c1 AS u_c1"}) + .planNode(); + auto buildSide2 = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values(buildVectors) + .project({"c0 AS u_c0", "c1 AS u_c1"}) + .planNode(); + // Inner join pushdown. + core::PlanNodeId probeScanId; + core::PlanNodeId joinId1; + core::PlanNodeId joinId2; + auto op = PlanBuilder(planNodeIdGenerator, pool_.get()) + .tableScan(probeType) + .capturePlanNodeId(probeScanId) + .hashJoin( + {"c0"}, + {"u_c0"}, + buildSide1, + "", + {"c0", "c1"}, + core::JoinType::kInner) + .capturePlanNodeId(joinId1) + .hashJoin( + {"c0"}, + {"u_c0"}, + buildSide2, + "", + {"c0", "c1", "u_c1"}, + core::JoinType::kInner) + .capturePlanNodeId(joinId2) + .project({"c0", "c1 + 1", "c1 + u_c1"}) + .planNode(); + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .planNode(std::move(op)) + .makeInputSplits(makeInputSplits(probeScanId)) + .injectSpill(false) + .referenceQuery( + "SELECT t.c0, t.c1 + 1, t.c1 + u.c1 FROM t, u WHERE t.c0 = u.c0") + .verifier([&](const std::shared_ptr& task, bool /*unused*/) { + auto planStats = toPlanStats(task->taskStats()); + ASSERT_EQ( + planStats.at(probeScanId).dynamicFilterStats.producerNodeIds, + std::unordered_set({joinId1, joinId2})); + }) + .run(); +} + TEST_F(HashJoinTest, dynamicFiltersWithSkippedSplits) { const int32_t numSplits = 20; const int32_t numNonSkippedSplits = 10; diff --git a/velox/exec/tests/OperatorUtilsTest.cpp b/velox/exec/tests/OperatorUtilsTest.cpp index cc192928e1e4..543b619dd90e 100644 --- a/velox/exec/tests/OperatorUtilsTest.cpp +++ b/velox/exec/tests/OperatorUtilsTest.cpp @@ -464,3 +464,30 @@ TEST_F(OperatorUtilsTest, memStatsFromPool) { ASSERT_EQ(stats.peakSystemMemoryReservation, 0); ASSERT_EQ(stats.numMemoryAllocations, 1); } + +TEST_F(OperatorUtilsTest, dynamicFilterStats) { + DynamicFilterStats dynamicFilterStats; + ASSERT_TRUE(dynamicFilterStats.empty()); + const std::string nodeId1{"node1"}; + const std::string nodeId2{"node2"}; + dynamicFilterStats.producerNodeIds.emplace(nodeId1); + ASSERT_FALSE(dynamicFilterStats.empty()); + DynamicFilterStats dynamicFilterStatsToMerge; + dynamicFilterStatsToMerge.producerNodeIds.emplace(nodeId1); + ASSERT_FALSE(dynamicFilterStatsToMerge.empty()); + dynamicFilterStats.add(dynamicFilterStatsToMerge); + ASSERT_EQ(dynamicFilterStats.producerNodeIds.size(), 1); + ASSERT_EQ( + dynamicFilterStats.producerNodeIds, + std::unordered_set({nodeId1})); + + dynamicFilterStatsToMerge.producerNodeIds.emplace(nodeId2); + dynamicFilterStats.add(dynamicFilterStatsToMerge); + ASSERT_EQ(dynamicFilterStats.producerNodeIds.size(), 2); + ASSERT_EQ( + dynamicFilterStats.producerNodeIds, + std::unordered_set({nodeId1, nodeId2})); + + dynamicFilterStats.clear(); + ASSERT_TRUE(dynamicFilterStats.empty()); +} diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index 93f7c6aa2ecc..8c8fef0b7b46 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -138,7 +138,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" HashBuild: Input: 100 rows \\(.+\\), Output: 0 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+ Memory allocations: .+, Threads: 1"}, {" HashProbe: Input: 2000 rows \\(.+\\), Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" -- TableScan\\[table: hive_table\\] -> c0:INTEGER, c1:BIGINT"}, - {" Input: 2000 rows \\(.+\\), Raw Input: 20480 rows \\(.+\\), Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 20"}, + {" Input: 2000 rows \\(.+\\), Raw Input: 20480 rows \\(.+\\), Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 20, DynamicFilter producer plan nodes: 3"}, {" -- Project\\[expressions: \\(u_c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(u_c1:BIGINT, ROW\\[\"c1\"\\]\\)\\] -> u_c0:INTEGER, u_c1:BIGINT"}, {" Output: 100 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: 0B, Memory allocations: .+, Threads: 1"}, {" -- Values\\[100 rows in 1 vectors\\] -> c0:INTEGER, c1:BIGINT"}, @@ -184,7 +184,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" -- TableScan\\[table: hive_table\\] -> c0:INTEGER, c1:BIGINT"}, - {" Input: 2000 rows \\(.+\\), Raw Input: 20480 rows \\(.+\\), Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 20"}, + {" Input: 2000 rows \\(.+\\), Raw Input: 20480 rows \\(.+\\), Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 20, DynamicFilter producer plan nodes: 3"}, {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dynamicFiltersAccepted[ ]* sum: 1, count: 1, min: 1, max: 1"}, diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 400e55c76758..954d3c980482 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -267,6 +267,8 @@ TEST_F(TableScanTest, allColumns) { ASSERT_TRUE(it != planStats.end()); ASSERT_TRUE(it->second.peakMemoryBytes > 0); ASSERT_LT(0, it->second.customStats.at("ioWaitNanos").sum); + // Verifies there is no dynamic filter stats. + ASSERT_TRUE(it->second.dynamicFilterStats.empty()); } TEST_F(TableScanTest, connectorStats) { diff --git a/velox/experimental/wave/exec/TableScan.cpp b/velox/experimental/wave/exec/TableScan.cpp index 3ae6c04509ac..0c1d78b3e342 100644 --- a/velox/experimental/wave/exec/TableScan.cpp +++ b/velox/experimental/wave/exec/TableScan.cpp @@ -186,6 +186,7 @@ bool TableScan::isFinished() const { } void TableScan::addDynamicFilter( + const core::PlanNodeId& producer, column_index_t outputChannel, const std::shared_ptr& filter) { if (dataSource_) { diff --git a/velox/experimental/wave/exec/TableScan.h b/velox/experimental/wave/exec/TableScan.h index 81b925ea8449..9ab3e7ea85c9 100644 --- a/velox/experimental/wave/exec/TableScan.h +++ b/velox/experimental/wave/exec/TableScan.h @@ -75,6 +75,7 @@ class TableScan : public WaveOperator { } void addDynamicFilter( + const core::PlanNodeId& producer, column_index_t outputChannel, const std::shared_ptr& filter) override; diff --git a/velox/experimental/wave/exec/WaveDriver.h b/velox/experimental/wave/exec/WaveDriver.h index ea77b336f24e..00db6e7ce9fa 100644 --- a/velox/experimental/wave/exec/WaveDriver.h +++ b/velox/experimental/wave/exec/WaveDriver.h @@ -73,9 +73,11 @@ class WaveDriver : public exec::SourceOperator { std::string toString() const override; void addDynamicFilter( + const core::PlanNodeId& producer, column_index_t outputChannel, const std::shared_ptr& filter) override { - pipelines_[0].operators[0]->addDynamicFilter(outputChannel, filter); + pipelines_[0].operators[0]->addDynamicFilter( + producer, outputChannel, filter); } exec::OperatorCtx* operatorCtx() const { diff --git a/velox/experimental/wave/exec/WaveOperator.h b/velox/experimental/wave/exec/WaveOperator.h index 70f98e357246..18e89301d4a7 100644 --- a/velox/experimental/wave/exec/WaveOperator.h +++ b/velox/experimental/wave/exec/WaveOperator.h @@ -144,8 +144,9 @@ class WaveOperator { } virtual void addDynamicFilter( - column_index_t outputChannel, - const std::shared_ptr& filter) { + const core::PlanNodeId& /*producer*/, + column_index_t /*outputChannel*/, + const std::shared_ptr& /*filter*/) { VELOX_UNSUPPORTED(); }