-
Notifications
You must be signed in to change notification settings - Fork 919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KYUUBI #6315] Spark 3.5: MaxScanStrategy supports DSv2 #5852
Conversation
Please make sure that the Kyuubi Spark extension also works well on iceberg-free Spark runtime. |
good point. Thanks |
Fixed. Plz review again. |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #5852 +/- ##
============================================
- Coverage 58.58% 58.40% -0.19%
Complexity 24 24
============================================
Files 649 651 +2
Lines 39379 39513 +134
Branches 5415 5441 +26
============================================
+ Hits 23070 23076 +6
- Misses 13841 13955 +114
- Partials 2468 2482 +14 ☔ View full report in Codecov by Sentry. |
@zhaohehuhu Could you add a unit test? |
Sure. I will add it. Thanks! |
...uubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
Show resolved
Hide resolved
import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning} | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
||
class ReportStatisticsAndPartitionAwareDataSource extends SimpleWritableDataSource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add a new data source? Is it better to use iceberg datasource directly? @pan3793 WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer to use a dummy DS like Spark does.
lazy val scanFileSize = stats.sizeInBytes | ||
lazy val scanPartitions = relation.scan.asInstanceOf[SupportsReportPartitioning] | ||
.outputPartitioning() | ||
.numPartitions() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is the task number of RDD/stage, instead of the table's partition number, does taskGroups in Iceberg means same thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the input RDD partition number for iceberg datasource. Maybe the value of it is equal to table's partition number, but they're not the same thing. Seems it's a bit hard to get the number of scan table partitions.
disable the rule that checks the maxPartitions for dsv2 @wForget |
...uubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
Outdated
Show resolved
Hide resolved
...spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM
# 🔍 Description ## Issue References 🔗 Now, MaxScanStrategy can be adopted to limit max scan file size in some datasources, such as Hive. Hopefully we can enhance MaxScanStrategy to include support for the datasourcev2. ## Describe Your Solution 🔧 get the statistics about files scanned through datasourcev2 API ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklists ## 📝 Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## 📝 Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [x] Test coverage is ok - [x] Assignees are selected. - [ ] Minimum number of approvals - [ ] No changes are requested **Be nice. Be informative.** Closes #5852 from zhaohehuhu/dev-1213. Closes #6315 3c5b0c2 [hezhao2] reformat fb113d6 [hezhao2] disable the rule that checks the maxPartitions for dsv2 acc3587 [hezhao2] disable the rule that checks the maxPartitions for dsv2 c8399a0 [hezhao2] fix header 70c845b [hezhao2] add UTs 3a07396 [hezhao2] add ut 4d26ce1 [hezhao2] reformat f87cb07 [hezhao2] reformat b307022 [hezhao2] move code to Spark 3.5 73258c2 [hezhao2] fix unused import cf893a0 [hezhao2] drop reflection for loading iceberg class dc128bc [hezhao2] refactor code 661834c [hezhao2] revert code 6061f42 [hezhao2] delete IcebergSparkPlanHelper 5f1c3c0 [hezhao2] fix b15652f [hezhao2] remove iceberg dependency fe620ca [hezhao2] enable MaxScanStrategy when accessing iceberg datasource Authored-by: hezhao2 <[email protected]> Signed-off-by: Cheng Pan <[email protected]> (cherry picked from commit 8edcb00) Signed-off-by: Cheng Pan <[email protected]>
Thanks, merged to master/1.9 |
🔍 Description
Issue References 🔗
Now, MaxScanStrategy can be adopted to limit max scan file size in some datasources, such as Hive. Hopefully we can enhance MaxScanStrategy to include support for the datasourcev2.
Describe Your Solution 🔧
get the statistics about files scanned through datasourcev2 API
Types of changes 🔖
Test Plan 🧪
Behavior Without This Pull Request ⚰️
Behavior With This Pull Request 🎉
Related Unit Tests
Checklists
📝 Author Self Checklist
📝 Committer Pre-Merge Checklist
Be nice. Be informative.