From 8edcb005ee8c863d5fd070e276a05d9c0c30d018 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Wed, 17 Apr 2024 16:29:50 +0800 Subject: [PATCH] [KYUUBI #6315] Spark 3.5: MaxScanStrategy supports DSv2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— Now, MaxScanStrategy can be adopted to limit max scan file size in some datasources, such as Hive. Hopefully we can enhance MaxScanStrategy to include support for the datasourcev2. ## Describe Your Solution ๐Ÿ”ง get the statistics about files scanned through datasourcev2 API ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklists ## ๐Ÿ“ Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## ๐Ÿ“ Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [x] Test coverage is ok - [x] Assignees are selected. - [ ] Minimum number of approvals - [ ] No changes are requested **Be nice. Be informative.** Closes #5852 from zhaohehuhu/dev-1213. Closes #6315 3c5b0c276 [hezhao2] reformat fb113d625 [hezhao2] disable the rule that checks the maxPartitions for dsv2 acc358732 [hezhao2] disable the rule that checks the maxPartitions for dsv2 c8399a021 [hezhao2] fix header 70c845bee [hezhao2] add UTs 3a0739686 [hezhao2] add ut 4d26ce131 [hezhao2] reformat f87cb072c [hezhao2] reformat b307022b8 [hezhao2] move code to Spark 3.5 73258c2ae [hezhao2] fix unused import cf893a0e1 [hezhao2] drop reflection for loading iceberg class dc128bc8e [hezhao2] refactor code 661834cce [hezhao2] revert code 6061f42ab [hezhao2] delete IcebergSparkPlanHelper 5f1c3c082 [hezhao2] fix b15652f05 [hezhao2] remove iceberg dependency fe620ca92 [hezhao2] enable MaxScanStrategy when accessing iceberg datasource Authored-by: hezhao2 Signed-off-by: Cheng Pan --- .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 35 ++++++++++ ...tatisticsAndPartitionAwareDataSource.scala | 64 +++++++++++++++++++ .../sql/ReportStatisticsDataSource.scala | 53 +++++++++++++++ .../apache/spark/sql/WatchDogSuiteBase.scala | 33 ++++++++++ 4 files changed, 185 insertions(+) create mode 100644 extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala create mode 100644 extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 1ed55ebc2fd..e647ad3250e 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.types.StructType import org.apache.kyuubi.sql.KyuubiSQLConf @@ -232,6 +233,40 @@ case class MaxScanStrategy(session: SparkSession) logicalRelation.catalogTable) } } + case ScanOperation( + _, + _, + _, + relation @ DataSourceV2ScanRelation(_, _, _, _, _)) => + val table = relation.relation.table + if (table.partitioning().nonEmpty) { + val partitionColumnNames = table.partitioning().map(_.describe()) + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${table.name()} + |Partition Structure: ${partitionColumnNames.mkString(",")} + |""".stripMargin) + } + } else { + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |detail as below: + |Table: ${table.name()} + |""".stripMargin) + } + } case _ => } } diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala new file mode 100644 index 00000000000..136ced538e1 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.OptionalLong + +import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable, SimpleScanBuilder, SimpleWritableDataSource} +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.expressions.{Expressions, FieldReference, Transform} +import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder, Statistics, SupportsReportPartitioning, SupportsReportStatistics} +import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ReportStatisticsAndPartitionAwareDataSource extends SimpleWritableDataSource { + + class MyScanBuilder( + val partitionKeys: Seq[String]) extends SimpleScanBuilder + with SupportsReportStatistics with SupportsReportPartitioning { + + override def estimateStatistics(): Statistics = { + new Statistics { + override def sizeInBytes(): OptionalLong = OptionalLong.of(80) + + override def numRows(): OptionalLong = OptionalLong.of(10) + + } + } + + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + override def outputPartitioning(): Partitioning = { + new KeyGroupedPartitioning(partitionKeys.map(FieldReference(_)).toArray, 10) + } + } + + override def getTable(options: CaseInsensitiveStringMap): Table = { + new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder(Seq("i")) + } + + override def partitioning(): Array[Transform] = { + Array(Expressions.identity("i")) + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala new file mode 100644 index 00000000000..2035d352562 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.OptionalLong + +import org.apache.spark.sql.connector._ +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ReportStatisticsDataSource extends SimpleWritableDataSource { + + class MyScanBuilder extends SimpleScanBuilder + with SupportsReportStatistics { + + override def estimateStatistics(): Statistics = { + new Statistics { + override def sizeInBytes(): OptionalLong = OptionalLong.of(80) + + override def numRows(): OptionalLong = OptionalLong.of(10) + } + } + + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + } + + override def getTable(options: CaseInsensitiveStringMap): Table = { + new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala index 139efd9ca06..22c998d3b0c 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.commons.io.FileUtils import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException} import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, MaxPartitionExceedException} @@ -607,4 +608,36 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { assert(e.getMessage == "Script transformation is not allowed") } } + + test("watchdog with scan maxFileSize -- data source v2") { + val df = spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load() + df.createOrReplaceTempView("test") + val logical = df.queryExecution.optimizedPlan.collect { + case d: DataSourceV2ScanRelation => d + }.head + val tableSize = logical.computeStats().sizeInBytes.toLong + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> tableSize.toString) { + sql("SELECT * FROM test").queryExecution.sparkPlan + } + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 2).toString) { + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test").queryExecution.sparkPlan) + } + + val nonPartDf = spark.read.format(classOf[ReportStatisticsDataSource].getName).load() + nonPartDf.createOrReplaceTempView("test_non_part") + val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect { + case d: DataSourceV2ScanRelation => d + }.head + val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong + + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> nonPartTableSize.toString) { + sql("SELECT * FROM test_non_part").queryExecution.sparkPlan + } + + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize / 2).toString) { + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test_non_part").queryExecution.sparkPlan) + } + } }