-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set filter to preloading data source for partition pruning #7382
Set filter to preloading data source for partition pruning #7382
Conversation
✅ Deploy Preview for meta-velox canceled.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix, is it possible to add a unit test?
@@ -92,6 +92,7 @@ RowVectorPtr TableScan::getOutput() { | |||
|
|||
if (!split.hasConnectorSplit()) { | |||
noMoreSplits_ = true; | |||
pendingDynamicFilters_.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's easier to follow that we clear the pending filters after passing them to data source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pendingDynamicFilters_
is used in preload
to intitalize different data source object for preloading splits, I think it should be clear after no more splits. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emm I see what you want to do. But merging filters on every split could cause performance regression. I think we need to do it a little differently. Maybe when we do the preload, we should pass in the scan spec from the current dataSource_
to use in preloading data source as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @Yuhta ,
I tried this proposal, the scanSpec_
can't be used by multi threads same time, even only the filter_
field in ScanSpec
.
For your concern about merging filters on every split, I add two APIs in DataSource
to get and set the filter for the specific outputChannel
, then we only need to set the merged filters to the preloading data sources. This should avoid this problem.
Please help to chekc again, thanks.
Add a UT to assert the split skipping happens in preloaded splits. Thanks. |
Hello @Yuhta , could you please check again? |
31e1c06
to
3c81cf6
Compare
velox/exec/TableScan.cpp
Outdated
@@ -256,6 +259,7 @@ void TableScan::preload(std::shared_ptr<connector::ConnectorSplit> split) { | |||
ctx = operatorCtx_->createConnectorQueryCtx( | |||
split->connectorId, planNodeId(), connectorPool_), | |||
task = operatorCtx_->task(), | |||
&mergedDynamicFilters = mergedDynamicFilters_, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a shared_ptr
to be safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed, thanks.
std::unique_ptr<common::Filter> getFilter( | ||
column_index_t outputChannel) override { | ||
auto& fieldSpec = scanSpec_->getChildByChannel(outputChannel); | ||
return fieldSpec.filter()->clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filter should be MT safe as far as I see, all the methods should be read only. Can you try using a shared_ptr<const Filter>
to avoid deep copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right, changed.
Hello @Yuhta , |
@Yuhta has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
02ae0ca
to
e2560ed
Compare
The fuzzer failure looks realted to #7856
|
velox/connectors/Connector.h
Outdated
@@ -226,6 +226,17 @@ class DataSource { | |||
virtual int64_t estimatedRowSize() { | |||
return kUnknownRowSize; | |||
} | |||
|
|||
virtual std::shared_ptr<common::Filter> getFilter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR changes the Connector API. Please, update PR description to describe the changes and add comments for the newly introduced APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks.
velox/exec/tests/HashJoinTest.cpp
Outdated
} | ||
}) | ||
.run(); | ||
FLAGS_split_preload_per_driver = oldSplitPreload; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an anti-pattern. Use RAII.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks.
velox/connectors/Connector.h
Outdated
// Sets the filter to the column identified by @param outputChannel. | ||
// This can be called when passing the column filters to a preloading data | ||
// source. The purpose of this is to avoid the overhead of filters merging. | ||
virtual void setColumnFilter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different from addDynamicFilter? Having these two APIs is confusing. Please, update PR description to provide overall context for this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem to solve in this PR is passing the dynamic filters to preloading data sources, which are used for partition pruning.
Ininitially I use addDynamicFilter to passe the filters from dataSource_
to preloading data sources, Yuta mentioned merging the filters for every split would cause perf regression, #7382 (comment). Then I added these two methods to get and set filters directly (without merging). Please let me know if any suggestion about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhli1142015 Thank you for this additional context. Is there a GitHub issue about this? Sounds like we need to iterate on the design a bit and it will be easier to do in a GitHub issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhli1142015 For example, it is still not clear to me why we need a new setColumnFilter when we already have addDynamicFilter. These 2 APIs seem to provide exactly same information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#7381, here is the issue I opened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhli1142015 Please, open a GitHub issue and describe the use case you are looking to enabling and the details of the design you are proposing. Thanks.
velox/exec/tests/utils/FlagUpdater.h
Outdated
|
||
// Generic RAII class to update GFlag. | ||
template <typename T> | ||
class FlagUpdater { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe ScopedGFlag?
velox/exec/tests/HashJoinTest.cpp
Outdated
const int32_t numSplits = 5; | ||
std::vector<RowVectorPtr> probeVectors; | ||
probeVectors.reserve(numSplits); | ||
FlagUpdater(FLAGS_split_preload_per_driver, 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there other tests that use split_preload_per_driver flag and therefore need to be updated to use the new RAII class? If so, would you extract the RAII into a separate PR and updating existing tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be a GFlag? Can it be a configuration property instead? CC: @xiaoxmeng
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag is used also in PrintPlanWithStatsTest
and TableScanTest
. I think this should be converted to a configuration, but I think we can't remove the flag as it's already used in Gluten, https://github.com/oap-project/gluten/blob/main/cpp/velox/compute/VeloxBackend.cc#L53C15-L53C39
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Let's make a new PR to replace this gflag with a configuration property. Then come back to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, get it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#7931, thanks.
775bad0
to
fd58569
Compare
The failure should be not related to this change, thanks.
|
@mbasmanova and @Yuhta , could you please help to review this PR? I made the change according to the comment: #7381 (comment) |
fd58569
to
f43a912
Compare
velox/connectors/Connector.h
Outdated
void addDynamicFilter( | ||
column_index_t outputChannel, | ||
const std::shared_ptr<common::Filter>& filter, | ||
const bool isMergedFilter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: No need to const
qualify this
velox/dwio/common/ScanSpec.h
Outdated
ScanSpec& getChildByChannel(column_index_t channel); | ||
ScanSpec& getChildByChannel(column_index_t channel) const; | ||
|
||
std::shared_ptr<common::Filter> getFilterByChannel( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const std::shared_ptr<common::Filter>&
@@ -476,6 +476,18 @@ HiveDataSource::HiveDataSource( | |||
|
|||
readerOpts_.setFileSchema(hiveTableHandle_->dataColumns()); | |||
ioStats_ = std::make_shared<io::IoStatistics>(); | |||
for (const auto& entry : tableHandle->dynamicFilters()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to be careful about reseting the filters on ScanSpec. This should be done very rarely as it requires reset of the stats and order of the filters. We should keep the filters in scan spec as the golden source and only change it when really needed, and in table handle just keep some references/shared_ptr to them, not the reverse.
7c025fc
to
18d392a
Compare
velox/dwio/common/ScanSpec.h
Outdated
getChildByChannel(channel).addFilter(filter); | ||
} | ||
|
||
void setChildFilterByChannel(column_index_t channel, common::Filter& filter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take shared_ptr
to avoid deep copy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks.
@@ -71,6 +71,11 @@ class HiveDataSource : public DataSource { | |||
|
|||
int64_t estimatedRowSize() override; | |||
|
|||
std::shared_ptr<common::Filter> getColumnFilter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const std::shared_ptr<common::Filter>&
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks.
18d392a
to
393334f
Compare
80d272c
to
2181731
Compare
Hello @mbasmanova , |
velox/exec/TableScan.cpp
Outdated
@@ -256,6 +256,7 @@ void TableScan::preload(std::shared_ptr<connector::ConnectorSplit> split) { | |||
ctx = operatorCtx_->createConnectorQueryCtx( | |||
split->connectorId, planNodeId(), connectorPool_), | |||
task = operatorCtx_->task(), | |||
&pendingDynamicFilters = pendingDynamicFilters_, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a reference here? We pass everything else by value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this, we should pass by value here
velox/exec/tests/HashJoinTest.cpp
Outdated
ColumnHandleMap assignments = { | ||
{"n1_0", regularColumn("c0", BIGINT())}, | ||
{"n1_1", regularColumn("c1", BIGINT())}, | ||
{"n1_2", partitionKey("k", BIGINT())}}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it important to use partition keys columns in this test? Would you explain why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we expect dynamic filters on the partition column to be generated. This way, we can verify whether the dynamic filter is passed to the preloaded data source by checking if the split is skipped during processing.
velox/exec/tests/HashJoinTest.cpp
Outdated
.verifier([&](const std::shared_ptr<Task>& task, bool hasSpill) { | ||
SCOPED_TRACE(fmt::format("hasSpill:{}", hasSpill)); | ||
if (!hasSpill) { | ||
ASSERT_EQ(1, getFiltersProduced(task, 1).sum); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is error prone to use hard-coded operator indices here. Can we use plan node IDs instead? We have probeScanId and we can also capture join node ID. We can then get stats via toPlanStats.
std::unordered_map<core::PlanNodeId, PlanNodeStats> toPlanStats(
const TaskStats& taskStats);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
velox/exec/tests/HashJoinTest.cpp
Outdated
}; | ||
}; | ||
auto outputType = | ||
ROW({"n1_0", "n1_1", "n1_2"}, {BIGINT(), BIGINT(), BIGINT()}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These column names are hard to read. By convention, we use t0, t1, ... and u0, u1, ..... You can specify column names as first argument of makeRowVector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good % some questions about the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good % some minor comments.
velox/exec/tests/HashJoinTest.cpp
Outdated
std::vector<exec::Split> probeSplits; | ||
for (int32_t i = 0; i < numSplits; ++i) { | ||
auto value = std::to_string(i); | ||
auto split = facebook::velox::exec::test::HiveConnectorSplitBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop facebook::velox::exec::test::
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
velox/exec/tests/HashJoinTest.cpp
Outdated
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) | ||
.planNode(std::move(op)) | ||
.config(core::QueryConfig::kMaxSplitPreloadPerDriver, "3") | ||
.makeInputSplits(makeInputSplits(probeScanId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code is confusing as makeInputSplits is repeated twice; would you use .inputSplits(probeSplits) instead and remove makeInputSplits lambda from this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
// Executing the join with p1=b0, we expect a dynamic filter for p1 to prune | ||
// the entire file/split. There are total of five splits, and all except the | ||
// first one are expected to be pruned. The result 'preloadedSplits' > 1 | ||
// confirms the successful push of dynamic filters to the preloading data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice comment.
velox/exec/tests/HashJoinTest.cpp
Outdated
// first one are expected to be pruned. The result 'preloadedSplits' > 1 | ||
// confirms the successful push of dynamic filters to the preloading data | ||
// source. | ||
core::PlanNodeId probeScanId, joinNodeId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define each variable separately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks.
velox/exec/tests/HashJoinTest.cpp
Outdated
.capturePlanNodeId(joinNodeId) | ||
.project({"p0"}) | ||
.planNode(); | ||
SplitInput splits = {{probeScanId, probeSplits}}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for this variable
.splits({{probeScanId, probeSplits}})
@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
1 similar comment
@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Hello @mbasmanova , |
@zhli1142015 Would you rebase? |
Add dynamic filter for preloading. Velox's dynamic filters would be evaluated in `addSplit` method to skip the whole split: https://msdata.visualstudio.com/A365/_git/Velox?path=/velox/connectors/hive/HiveConnector.cpp&version=GBmain&line=550&lineEnd=551&lineStartColumn=1&lineEndColumn=1&lineStyle=plain&_a=contents, but it was missed in preloading method. This change impacts data loading in query59. Before  After  Related work items: #2529082
1a18867
to
2e7e7d6
Compare
Done, thanks. |
@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@mbasmanova merged this pull request in ef97b5f. |
Conbench analyzed the 1 benchmark run on commit There were no benchmark performance regressions. 🎉 The full Conbench report has more details. |
…ncubator#7382) Summary: Fixes facebookincubator#7381 Pull Request resolved: facebookincubator#7382 Reviewed By: xiaoxmeng Differential Revision: D51857062 Pulled By: mbasmanova fbshipit-source-id: bc97ee22fcc5b2fbde37666929fdc718fd876ef9
Fixes #7381