-
Notifications
You must be signed in to change notification settings - Fork 440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-7261][CORE] Use pushedFilters instead of dataFilters to offload scan #8082
base: main
Are you sure you want to change the base?
Conversation
Run Gluten Clickhouse CI on x86 |
Run Gluten Clickhouse CI on x86 |
val runtimeFiltersString = s"RuntimeFilters: ${filterExprs().mkString("[", ",", "]")}" | ||
val result = s"$nodeName$truncatedOutputString ${scan.description()} $runtimeFiltersString" | ||
redact(result) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filterExprs
is the real RuntimeFilters
.
cc @rui-mo |
transform.copy(dataFilters = PushDownUtil.pushFilters(scanExec.dataFilters)) | ||
} else { | ||
transform | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in ScanTransformerFactory
is used by validator and offload rules. It feels a little weird to do validation in it? Do we have better choices?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about use only pushedFilter
here and rely on PushDownFilterToScan
for subsequent pushdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds feasible to me. Thanks.
Run Gluten Clickhouse CI on x86 |
Test failure seems unrelated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Added some questions.
val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter] | ||
for (filterExpr <- dataFilters) { | ||
val translated = | ||
DataSourceStrategy.translateFilterWithMapping( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you elaborate on how this translation happens and how the pushed filters differ from Spark in most cases? If it is based on Spark rules, we cannot control the expressions that are to be pushed down. Is it more reasonable to adopt specific rules according to the backend status?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is the same as that of vanilla spark to generate pushedFilters from dataFilters, and then convert Seq[Filter] to Seq[Expression]. dataFilter does not contain non-deterministic expressions, but contains expensive expressions, such as udf. pushedFilter only contains cheap expressions, such as a>1
, a in (1,2)
.
Refer https://github.com/apache/spark/blob/1eb558c3a6fbdd59e5a305bc3ab12ce748f6511f/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L374
https://github.com/apache/spark/blob/1eb558c3a6fbdd59e5a305bc3ab12ce748f6511f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala#L72
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is the same as that of vanilla spark to generate pushedFilters from dataFilters, and then convert Seq[Filter] to Seq[Expression]. dataFilter does not contain non-deterministic expressions, but contains expensive expressions, such as udf. pushedFilter only contains cheap expressions, such as
a>1
,a in (1,2)
. Refer https://github.com/apache/spark/blob/1eb558c3a6fbdd59e5a305bc3ab12ce748f6511f/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L374 https://github.com/apache/spark/blob/1eb558c3a6fbdd59e5a305bc3ab12ce748f6511f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala#L72
Perhaps the above information could be documented. Also, I have an idea: can we try if the expressions can be converted as Gluten expression transformers? The unsupported ones will not be in the pushed filters.
What changes were proposed in this pull request?
DataFilters may contain complex expressions such as UDF. Vanilla spark uses cheap expressions in dataFilter as pushedFilter. Currently, dataFilters is used as native scan filter in gluten. When dataFilters causes fallback, we can use pushedFilters as native scan filter to improve performance.
(Fixes: #7261)
How was this patch tested?
UT