-
Notifications
You must be signed in to change notification settings - Fork 75
[NSE-1075] Dynamically adjust input partition size #1076
base: main
Are you sure you want to change the base?
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/oap-project/native-sql-engine/issues Then could you also rename commit message and pull request title in the following format?
See also: |
3005a4e
to
de19cbc
Compare
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum | ||
.getOrElse(SparkShimLoader.getSparkShims.leafNodeDefaultParallelism(sparkSession)) | ||
val PREFERRED_PARTITION_SIZE_LOWER_BOUND: Long = 128 * 1024 * 1024 | ||
val PREFERRED_PARTITION_SIZE_UPPER_BOUND: Long = 512 * 1024 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a new config for these tow value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your advice. The PREFERRED_PARTITION_SIZE_UPPER_BOUND may do the same limitation with spark's max partition size configuration. They can be unified. Maybe, we can make PREFERRED_PARTITION_SIZE_LOWER_BOUND configurable.
|
||
// This implementation is ported from spark FilePartition.scala with changes for | ||
// adjusting openCost. | ||
def getFilePartitions(sparkSession: SparkSession, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackylee-ch, please put your code changes for open cost here. It should be workable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okey
// val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | ||
// // val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum | ||
// // .getOrElse(sparkSession.leafNodeDefaultParallelism) | ||
// val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackylee-ch, I note you have introduced a sort of computation for taskParallelismNum, is it as same as minPartitionNum? This piece of code is ported from spark source code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, they are not same. The taskParallelismNum
is actually the spark.sql.files.expectedPartitionNum
, which can be configured by the user and the default value is the maximum number of tasks that can be parallelized in the current application.
No description provided.