From fe620ca9268bbc37743d572435bf24dcd3f14167 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Wed, 13 Dec 2023 14:19:25 +0800 Subject: [PATCH 01/17] enable MaxScanStrategy when accessing iceberg datasource --- .../spark/kyuubi-extension-spark-3-4/pom.xml | 6 ++ .../spark/source/IcebergSparkPlanHelper.scala | 33 ++++++++++ .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 63 +++++++++++++++++-- 3 files changed, 97 insertions(+), 5 deletions(-) create mode 100644 extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala diff --git a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml index d64d3cdee9b..c3b2efa71f3 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml +++ b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml @@ -110,6 +110,12 @@ log4j-slf4j-impl test + + + org.apache.iceberg + iceberg-spark-runtime-${spark.binary.version}_${scala.binary.version} + ${iceberg.version} + diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala new file mode 100644 index 00000000000..fd8509b8120 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import org.apache.spark.sql.connector.read.Scan + +object IcebergSparkPlanHelper { + + type SparkBatchQueryScan = org.apache.iceberg.spark.source.SparkBatchQueryScan + + def numPartitions(scan: Scan): Long = { + if (scan.isInstanceOf[SparkBatchQueryScan]) { + val sparkBatchQueryScan = scan.asInstanceOf[SparkBatchQueryScan] + sparkBatchQueryScan.outputPartitioning().numPartitions() + } else { + 0L + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 1ed55ebc2fd..7d7e25dd15b 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.sql.watchdog import org.apache.hadoop.fs.Path +import org.apache.iceberg.spark.source.IcebergSparkPlanHelper.{numPartitions} import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy} import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} @@ -26,8 +27,12 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.types.StructType - import org.apache.kyuubi.sql.KyuubiSQLConf +import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation + +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` + /** * Add MaxScanStrategy to avoid scan excessive partitions or files @@ -118,10 +123,15 @@ case class MaxScanStrategy(session: SparkSession) } else { lazy val scanFileSize = relation.tableMeta.stats.map(_.sizeInBytes).sum if (maxFileSizeOpt.exists(_ < scanFileSize)) { - throw nonPartTableMaxFileExceedError( - scanFileSize, - maxFileSizeOpt.get, - Some(relation.tableMeta)) + throw new MaxFileSizeExceedException( + s""" + |Your SQL job scan a whole huge table without any partition filter, + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${relation.tableMeta.qualifiedName} + |Owner: ${relation.tableMeta.owner} + |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")} + |""".stripMargin) } } case ScanOperation( @@ -232,6 +242,49 @@ case class MaxScanStrategy(session: SparkSession) logicalRelation.catalogTable) } } + case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation) + if isIcebergRelation(relation.relation) => + val icebergTable = relation.relation. + table.asInstanceOf[org.apache.iceberg.spark.source.SparkTable] + if (icebergTable.partitioning().nonEmpty) { + val partitionColumnNames = icebergTable.table().spec().fields().map(_.name()).seq + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + lazy val scanPartitions = numPartitions(relation.scan) + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${icebergTable.name()} + |Partition Structure: ${partitionColumnNames.mkString(",")} + |""".stripMargin) + } + if (maxScanPartitionsOpt.exists(_ < scanPartitions)) { + throw new MaxPartitionExceedException( + s""" + |Your SQL job scan a whole huge table without any partition filter, + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${icebergTable.name()} + |Partition Structure: ${partitionColumnNames.mkString(",")} + |""".stripMargin) + } + } else { + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |detail as below: + |Table: ${icebergTable.name()} + |""".stripMargin) + } + } case _ => } } From b15652f051ad262f1bdd61f690337a04eff05042 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Fri, 15 Dec 2023 11:36:07 +0800 Subject: [PATCH 02/17] remove iceberg dependency --- .../spark/kyuubi-extension-spark-3-4/pom.xml | 8 -------- .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 18 ++++++++++++------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml index c3b2efa71f3..633c2db7661 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml +++ b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml @@ -67,8 +67,6 @@ org.apache.kyuubi kyuubi-util-scala_${scala.binary.version} ${project.version} - test-jar - test @@ -110,12 +108,6 @@ log4j-slf4j-impl test - - - org.apache.iceberg - iceberg-spark-runtime-${spark.binary.version}_${scala.binary.version} - ${iceberg.version} - diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 7d7e25dd15b..42e9a3fbea0 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.sql.watchdog import org.apache.hadoop.fs.Path -import org.apache.iceberg.spark.source.IcebergSparkPlanHelper.{numPartitions} +import org.apache.iceberg.spark.source.IcebergSparkPlanHelper.numPartitions import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy} import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.types.StructType import org.apache.kyuubi.sql.KyuubiSQLConf -import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation +import org.apache.kyuubi.util.reflect.DynMethods import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` @@ -243,11 +243,17 @@ case class MaxScanStrategy(session: SparkSession) } } case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation) - if isIcebergRelation(relation.relation) => - val icebergTable = relation.relation. - table.asInstanceOf[org.apache.iceberg.spark.source.SparkTable] + => + val isIcebergRelation = DynMethods + .builder("isIcebergRelation") + .impl(Class.forName("org.apache.spark.sql.catalyst.utils.PlanUtils")) + .buildChecked.invoke[Boolean](relation.relation) + if (isIcebergRelation) { + return + } + val icebergTable = relation.relation.table if (icebergTable.partitioning().nonEmpty) { - val partitionColumnNames = icebergTable.table().spec().fields().map(_.name()).seq + val partitionColumnNames = icebergTable.partitioning().map(_.describe()) val stats = relation.computeStats() lazy val scanFileSize = stats.sizeInBytes lazy val scanPartitions = numPartitions(relation.scan) From 5f1c3c082f1ead26a86136dd0c6605a632889b05 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Fri, 15 Dec 2023 13:33:14 +0800 Subject: [PATCH 03/17] fix --- .../scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 42e9a3fbea0..34b3b3405ae 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -248,7 +248,7 @@ case class MaxScanStrategy(session: SparkSession) .builder("isIcebergRelation") .impl(Class.forName("org.apache.spark.sql.catalyst.utils.PlanUtils")) .buildChecked.invoke[Boolean](relation.relation) - if (isIcebergRelation) { + if (!isIcebergRelation) { return } val icebergTable = relation.relation.table From 6061f42ab9b4bfdbc0ab7b6d44fafae716637691 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Wed, 20 Dec 2023 17:58:13 +0800 Subject: [PATCH 04/17] delete IcebergSparkPlanHelper --- .../spark/source/IcebergSparkPlanHelper.scala | 33 ------------------- .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 17 +++++----- 2 files changed, 9 insertions(+), 41 deletions(-) delete mode 100644 extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala deleted file mode 100644 index fd8509b8120..00000000000 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iceberg.spark.source - -import org.apache.spark.sql.connector.read.Scan - -object IcebergSparkPlanHelper { - - type SparkBatchQueryScan = org.apache.iceberg.spark.source.SparkBatchQueryScan - - def numPartitions(scan: Scan): Long = { - if (scan.isInstanceOf[SparkBatchQueryScan]) { - val sparkBatchQueryScan = scan.asInstanceOf[SparkBatchQueryScan] - sparkBatchQueryScan.outputPartitioning().numPartitions() - } else { - 0L - } - } -} diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 34b3b3405ae..fa24c8d5cd9 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -17,22 +17,22 @@ package org.apache.kyuubi.sql.watchdog +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` + import org.apache.hadoop.fs.Path -import org.apache.iceberg.spark.source.IcebergSparkPlanHelper.numPartitions import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy} import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.read.SupportsReportPartitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.types.StructType + import org.apache.kyuubi.sql.KyuubiSQLConf import org.apache.kyuubi.util.reflect.DynMethods -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation - -import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` - /** * Add MaxScanStrategy to avoid scan excessive partitions or files @@ -242,8 +242,7 @@ case class MaxScanStrategy(session: SparkSession) logicalRelation.catalogTable) } } - case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation) - => + case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation) => val isIcebergRelation = DynMethods .builder("isIcebergRelation") .impl(Class.forName("org.apache.spark.sql.catalyst.utils.PlanUtils")) @@ -256,7 +255,9 @@ case class MaxScanStrategy(session: SparkSession) val partitionColumnNames = icebergTable.partitioning().map(_.describe()) val stats = relation.computeStats() lazy val scanFileSize = stats.sizeInBytes - lazy val scanPartitions = numPartitions(relation.scan) + lazy val scanPartitions = relation.scan.asInstanceOf[SupportsReportPartitioning] + .outputPartitioning() + .numPartitions() if (maxFileSizeOpt.exists(_ < scanFileSize)) { throw new MaxFileSizeExceedException( s""" From 661834cce242e5f40bb1d0687f10871c333082cb Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Thu, 21 Dec 2023 13:59:42 +0800 Subject: [PATCH 05/17] revert code --- .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index fa24c8d5cd9..856e95290a5 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -123,15 +123,10 @@ case class MaxScanStrategy(session: SparkSession) } else { lazy val scanFileSize = relation.tableMeta.stats.map(_.sizeInBytes).sum if (maxFileSizeOpt.exists(_ < scanFileSize)) { - throw new MaxFileSizeExceedException( - s""" - |Your SQL job scan a whole huge table without any partition filter, - |You should optimize your SQL logical according partition structure - |or shorten query scope such as p_date, detail as below: - |Table: ${relation.tableMeta.qualifiedName} - |Owner: ${relation.tableMeta.owner} - |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")} - |""".stripMargin) + throw nonPartTableMaxFileExceedError( + scanFileSize, + maxFileSizeOpt.get, + Some(relation.tableMeta)) } } case ScanOperation( From dc128bc8e87b50dca6ab47a754baee5940cecfc0 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Fri, 22 Dec 2023 10:19:05 +0800 Subject: [PATCH 06/17] refactor code --- .../scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 856e95290a5..0834afabb21 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -17,8 +17,6 @@ package org.apache.kyuubi.sql.watchdog -import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` - import org.apache.hadoop.fs.Path import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy} import org.apache.spark.sql.catalyst.SQLConfHelper From cf893a0e1af770429bc516ad979ede03dd67be88 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Fri, 23 Feb 2024 14:48:45 +0800 Subject: [PATCH 07/17] drop reflection for loading iceberg class --- .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 0834afabb21..127db9eb931 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -236,16 +236,10 @@ case class MaxScanStrategy(session: SparkSession) } } case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation) => - val isIcebergRelation = DynMethods - .builder("isIcebergRelation") - .impl(Class.forName("org.apache.spark.sql.catalyst.utils.PlanUtils")) - .buildChecked.invoke[Boolean](relation.relation) - if (!isIcebergRelation) { - return - } - val icebergTable = relation.relation.table - if (icebergTable.partitioning().nonEmpty) { - val partitionColumnNames = icebergTable.partitioning().map(_.describe()) + val table = relation.relation.table + if (table.partitioning().nonEmpty && + relation.scan.isInstanceOf[SupportsReportPartitioning]) { + val partitionColumnNames = table.partitioning().map(_.describe()) val stats = relation.computeStats() lazy val scanFileSize = stats.sizeInBytes lazy val scanPartitions = relation.scan.asInstanceOf[SupportsReportPartitioning] @@ -258,7 +252,7 @@ case class MaxScanStrategy(session: SparkSession) |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} |You should optimize your SQL logical according partition structure |or shorten query scope such as p_date, detail as below: - |Table: ${icebergTable.name()} + |Table: ${table.name()} |Partition Structure: ${partitionColumnNames.mkString(",")} |""".stripMargin) } @@ -268,7 +262,7 @@ case class MaxScanStrategy(session: SparkSession) |Your SQL job scan a whole huge table without any partition filter, |You should optimize your SQL logical according partition structure |or shorten query scope such as p_date, detail as below: - |Table: ${icebergTable.name()} + |Table: ${table.name()} |Partition Structure: ${partitionColumnNames.mkString(",")} |""".stripMargin) } @@ -281,7 +275,7 @@ case class MaxScanStrategy(session: SparkSession) |SQL job scan file size in bytes: $scanFileSize |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} |detail as below: - |Table: ${icebergTable.name()} + |Table: ${table.name()} |""".stripMargin) } } From 73258c2aea2c38c93ac0a778b3701023d423306a Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Thu, 14 Mar 2024 18:00:30 +0800 Subject: [PATCH 08/17] fix unused import --- .../scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 127db9eb931..4a941f2fe86 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.types.StructType import org.apache.kyuubi.sql.KyuubiSQLConf -import org.apache.kyuubi.util.reflect.DynMethods /** * Add MaxScanStrategy to avoid scan excessive partitions or files From b307022b819ce147ccf3e0b82915cbcc556f3c93 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Fri, 15 Mar 2024 12:09:49 +0800 Subject: [PATCH 09/17] move code to Spark 3.5 --- .../spark/kyuubi-extension-spark-3-4/pom.xml | 2 + .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 44 ------------------ .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 46 +++++++++++++++++++ 3 files changed, 48 insertions(+), 44 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml index 633c2db7661..d64d3cdee9b 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml +++ b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml @@ -67,6 +67,8 @@ org.apache.kyuubi kyuubi-util-scala_${scala.binary.version} ${project.version} + test-jar + test diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 4a941f2fe86..4ae971392e6 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -234,50 +234,6 @@ case class MaxScanStrategy(session: SparkSession) logicalRelation.catalogTable) } } - case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation) => - val table = relation.relation.table - if (table.partitioning().nonEmpty && - relation.scan.isInstanceOf[SupportsReportPartitioning]) { - val partitionColumnNames = table.partitioning().map(_.describe()) - val stats = relation.computeStats() - lazy val scanFileSize = stats.sizeInBytes - lazy val scanPartitions = relation.scan.asInstanceOf[SupportsReportPartitioning] - .outputPartitioning() - .numPartitions() - if (maxFileSizeOpt.exists(_ < scanFileSize)) { - throw new MaxFileSizeExceedException( - s""" - |SQL job scan file size in bytes: $scanFileSize - |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} - |You should optimize your SQL logical according partition structure - |or shorten query scope such as p_date, detail as below: - |Table: ${table.name()} - |Partition Structure: ${partitionColumnNames.mkString(",")} - |""".stripMargin) - } - if (maxScanPartitionsOpt.exists(_ < scanPartitions)) { - throw new MaxPartitionExceedException( - s""" - |Your SQL job scan a whole huge table without any partition filter, - |You should optimize your SQL logical according partition structure - |or shorten query scope such as p_date, detail as below: - |Table: ${table.name()} - |Partition Structure: ${partitionColumnNames.mkString(",")} - |""".stripMargin) - } - } else { - val stats = relation.computeStats() - lazy val scanFileSize = stats.sizeInBytes - if (maxFileSizeOpt.exists(_ < scanFileSize)) { - new MaxFileSizeExceedException( - s""" - |SQL job scan file size in bytes: $scanFileSize - |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} - |detail as below: - |Table: ${table.name()} - |""".stripMargin) - } - } case _ => } } diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 1ed55ebc2fd..4a941f2fe86 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -23,8 +23,10 @@ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.read.SupportsReportPartitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.types.StructType import org.apache.kyuubi.sql.KyuubiSQLConf @@ -232,6 +234,50 @@ case class MaxScanStrategy(session: SparkSession) logicalRelation.catalogTable) } } + case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation) => + val table = relation.relation.table + if (table.partitioning().nonEmpty && + relation.scan.isInstanceOf[SupportsReportPartitioning]) { + val partitionColumnNames = table.partitioning().map(_.describe()) + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + lazy val scanPartitions = relation.scan.asInstanceOf[SupportsReportPartitioning] + .outputPartitioning() + .numPartitions() + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${table.name()} + |Partition Structure: ${partitionColumnNames.mkString(",")} + |""".stripMargin) + } + if (maxScanPartitionsOpt.exists(_ < scanPartitions)) { + throw new MaxPartitionExceedException( + s""" + |Your SQL job scan a whole huge table without any partition filter, + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${table.name()} + |Partition Structure: ${partitionColumnNames.mkString(",")} + |""".stripMargin) + } + } else { + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |detail as below: + |Table: ${table.name()} + |""".stripMargin) + } + } case _ => } } From f87cb072c4804c1f69866612a7df84d19883f66c Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Fri, 15 Mar 2024 14:45:26 +0800 Subject: [PATCH 10/17] reformat --- .../scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 4ae971392e6..1ed55ebc2fd 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -23,10 +23,8 @@ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.read.SupportsReportPartitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.types.StructType import org.apache.kyuubi.sql.KyuubiSQLConf From 4d26ce1314fe0ec55833a92088d83e528977f26b Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Fri, 15 Mar 2024 15:06:51 +0800 Subject: [PATCH 11/17] reformat --- .../apache/kyuubi/sql/watchdog/MaxScanStrategy.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 4a941f2fe86..5020fdfc050 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -234,14 +234,17 @@ case class MaxScanStrategy(session: SparkSession) logicalRelation.catalogTable) } } - case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation) => + case ScanOperation( + _, + _, + _, + relation @ DataSourceV2ScanRelation(_, scan: SupportsReportPartitioning, _, _, _)) => val table = relation.relation.table - if (table.partitioning().nonEmpty && - relation.scan.isInstanceOf[SupportsReportPartitioning]) { + if (table.partitioning().nonEmpty) { val partitionColumnNames = table.partitioning().map(_.describe()) val stats = relation.computeStats() lazy val scanFileSize = stats.sizeInBytes - lazy val scanPartitions = relation.scan.asInstanceOf[SupportsReportPartitioning] + lazy val scanPartitions = scan .outputPartitioning() .numPartitions() if (maxFileSizeOpt.exists(_ < scanFileSize)) { From 3a0739686f684829180319205d7f0c7fc45ecd5d Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Tue, 19 Mar 2024 16:03:35 +0800 Subject: [PATCH 12/17] add ut --- .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 14 ++--- .../sql/ReportStatisticsDataSource.scala | 54 ++++++++++++++++++ ...rtStatisticsPartitionAwareDataSource.scala | 57 +++++++++++++++++++ .../apache/spark/sql/WatchDogSuiteBase.scala | 31 +++++++++- 4 files changed, 146 insertions(+), 10 deletions(-) create mode 100644 extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala create mode 100644 extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsPartitionAwareDataSource.scala diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 5020fdfc050..c9007470144 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -247,21 +247,21 @@ case class MaxScanStrategy(session: SparkSession) lazy val scanPartitions = scan .outputPartitioning() .numPartitions() - if (maxFileSizeOpt.exists(_ < scanFileSize)) { - throw new MaxFileSizeExceedException( + if (maxScanPartitionsOpt.exists(_ < scanPartitions)) { + throw new MaxPartitionExceedException( s""" - |SQL job scan file size in bytes: $scanFileSize - |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |Your SQL job scan a whole huge table without any partition filter, |You should optimize your SQL logical according partition structure |or shorten query scope such as p_date, detail as below: |Table: ${table.name()} |Partition Structure: ${partitionColumnNames.mkString(",")} |""".stripMargin) } - if (maxScanPartitionsOpt.exists(_ < scanPartitions)) { - throw new MaxPartitionExceedException( + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( s""" - |Your SQL job scan a whole huge table without any partition filter, + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} |You should optimize your SQL logical according partition structure |or shorten query scope such as p_date, detail as below: |Table: ${table.name()} diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala new file mode 100644 index 00000000000..e1c5cd846ea --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.expressions.FieldReference +import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning} +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.connector._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ReportStatisticsDataSource extends SimpleWritableDataSource { + + class MyScanBuilder extends SimpleScanBuilder + with SupportsReportStatistics { + + override def estimateStatistics(): Statistics = { + Statistics(sizeInBytes = 80, rowCount = 10) + } + + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + override def createReaderFactory(): PartitionReaderFactory = { + SpecificReaderFactory + } + } + + override def getTable(options: CaseInsensitiveStringMap): Table = { + new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsPartitionAwareDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsPartitionAwareDataSource.scala new file mode 100644 index 00000000000..960ff42a9b6 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsPartitionAwareDataSource.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.expressions.FieldReference +import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, ScanBuilder, SupportsReportPartitioning, SupportsReportStatistics} +import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable, SimpleScanBuilder, SimpleWritableDataSource, SpecificReaderFactory} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ReportStatisticsPartitionAwareDataSource extends SimpleWritableDataSource { + + class MyScanBuilder extends SimpleScanBuilder + with SupportsReportStatistics with SupportsReportPartitioning { + + override def estimateStatistics(): Statistics = { + Statistics(sizeInBytes = 80, rowCount = 10) + } + + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + override def createReaderFactory(): PartitionReaderFactory = { + SpecificReaderFactory + } + + override def outputPartitioning(): Partitioning = + new KeyGroupedPartitioning(Array(FieldReference("p")), 10) + } + + override def getTable(options: CaseInsensitiveStringMap): Table = { + new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala index 139efd9ca06..1d725148b8c 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala @@ -18,14 +18,12 @@ package org.apache.spark.sql import java.io.File - import scala.collection.JavaConverters._ - import org.apache.commons.io.FileUtils import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} - import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException} import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, MaxPartitionExceedException} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { override protected def beforeAll(): Unit = { @@ -607,4 +605,31 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { assert(e.getMessage == "Script transformation is not allowed") } } + + test("watchdog with scan maxPartitions -- data source v2") { + withTempView("test") { + val df = spark.read.format(classOf[ReportStatisticsPartitionAwareDataSource].getName).load() + df.createOrReplaceTempView("test") + checkMaxPartition + } + } + + test("watchdog with scan maxFileSize -- data source v2") { + withTempView("test", "test_non_part") { + val df = spark.read.format(classOf[ReportStatisticsPartitionAwareDataSource].getName).load() + df.createOrReplaceTempView("test") + val logical = df.queryExecution.optimizedPlan.collect { + case d: DataSourceV2ScanRelation => d + }.head + val tableSize = logical.computeStats().sizeInBytes.toLong + + val nonPartDf = spark.read.format(classOf[ReportStatisticsDataSource].getName).load() + nonPartDf.createOrReplaceTempView("test_non_part") + val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect { + case d: DataSourceV2ScanRelation => d + }.head + val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong + checkMaxFileSize(tableSize, nonPartTableSize) + } + } } From 70c845bee21128fa0e8200dfc019530a19354451 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Thu, 21 Mar 2024 15:39:16 +0800 Subject: [PATCH 13/17] add UTs --- .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 9 +-- ...atisticsAndPartitionAwareDataSource.scala} | 34 +++++++---- .../sql/ReportStatisticsDataSource.scala | 16 ++--- .../apache/spark/sql/WatchDogSuiteBase.scala | 61 +++++++++++++------ 4 files changed, 75 insertions(+), 45 deletions(-) rename extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/{ReportStatisticsPartitionAwareDataSource.scala => ReportStatisticsAndPartitionAwareDataSource.scala} (62%) diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index c9007470144..b6d8c85d49e 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -238,13 +238,14 @@ case class MaxScanStrategy(session: SparkSession) _, _, _, - relation @ DataSourceV2ScanRelation(_, scan: SupportsReportPartitioning, _, _, _)) => + relation @ DataSourceV2ScanRelation(_, _, _, _, _)) => val table = relation.relation.table - if (table.partitioning().nonEmpty) { + if (table.partitioning().nonEmpty && + relation.scan.isInstanceOf[SupportsReportPartitioning]) { val partitionColumnNames = table.partitioning().map(_.describe()) val stats = relation.computeStats() lazy val scanFileSize = stats.sizeInBytes - lazy val scanPartitions = scan + lazy val scanPartitions = relation.scan.asInstanceOf[SupportsReportPartitioning] .outputPartitioning() .numPartitions() if (maxScanPartitionsOpt.exists(_ < scanPartitions)) { @@ -272,7 +273,7 @@ case class MaxScanStrategy(session: SparkSession) val stats = relation.computeStats() lazy val scanFileSize = stats.sizeInBytes if (maxFileSizeOpt.exists(_ < scanFileSize)) { - new MaxFileSizeExceedException( + throw new MaxFileSizeExceedException( s""" |SQL job scan file size in bytes: $scanFileSize |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsPartitionAwareDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala similarity index 62% rename from extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsPartitionAwareDataSource.scala rename to extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala index 960ff42a9b6..831ff18f2bf 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsPartitionAwareDataSource.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala @@ -18,39 +18,47 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.plans.logical.Statistics +import java.util.OptionalLong + +import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable, SimpleScanBuilder, SimpleWritableDataSource} import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.connector.expressions.FieldReference +import org.apache.spark.sql.connector.expressions.{Expressions, FieldReference, Transform} +import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder, Statistics, SupportsReportPartitioning, SupportsReportStatistics} import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning} -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, ScanBuilder, SupportsReportPartitioning, SupportsReportStatistics} -import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable, SimpleScanBuilder, SimpleWritableDataSource, SpecificReaderFactory} import org.apache.spark.sql.util.CaseInsensitiveStringMap -class ReportStatisticsPartitionAwareDataSource extends SimpleWritableDataSource { +class ReportStatisticsAndPartitionAwareDataSource extends SimpleWritableDataSource { - class MyScanBuilder extends SimpleScanBuilder + class MyScanBuilder( + val partitionKeys: Seq[String]) extends SimpleScanBuilder with SupportsReportStatistics with SupportsReportPartitioning { override def estimateStatistics(): Statistics = { - Statistics(sizeInBytes = 80, rowCount = 10) + new Statistics { + override def sizeInBytes(): OptionalLong = OptionalLong.of(80) + + override def numRows(): OptionalLong = OptionalLong.of(10) + + } } override def planInputPartitions(): Array[InputPartition] = { Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) } - override def createReaderFactory(): PartitionReaderFactory = { - SpecificReaderFactory + override def outputPartitioning(): Partitioning = { + new KeyGroupedPartitioning(partitionKeys.map(FieldReference(_)).toArray, 10) } - - override def outputPartitioning(): Partitioning = - new KeyGroupedPartitioning(Array(FieldReference("p")), 10) } override def getTable(options: CaseInsensitiveStringMap): Table = { new SimpleBatchTable { override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - new MyScanBuilder + new MyScanBuilder(Seq("i")) + } + + override def partitioning(): Array[Transform] = { + Array(Expressions.identity("i")) } } } diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala index e1c5cd846ea..48a0fb88658 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala @@ -18,12 +18,11 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.plans.logical.Statistics +import java.util.OptionalLong + +import org.apache.spark.sql.connector._ import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.connector.expressions.FieldReference -import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning} import org.apache.spark.sql.connector.read._ -import org.apache.spark.sql.connector._ import org.apache.spark.sql.util.CaseInsensitiveStringMap class ReportStatisticsDataSource extends SimpleWritableDataSource { @@ -32,16 +31,17 @@ class ReportStatisticsDataSource extends SimpleWritableDataSource { with SupportsReportStatistics { override def estimateStatistics(): Statistics = { - Statistics(sizeInBytes = 80, rowCount = 10) + new Statistics { + override def sizeInBytes(): OptionalLong = OptionalLong.of(80) + + override def numRows(): OptionalLong = OptionalLong.of(10) + } } override def planInputPartitions(): Array[InputPartition] = { Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) } - override def createReaderFactory(): PartitionReaderFactory = { - SpecificReaderFactory - } } override def getTable(options: CaseInsensitiveStringMap): Table = { diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala index 1d725148b8c..87b4fa37dc4 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala @@ -18,12 +18,15 @@ package org.apache.spark.sql import java.io.File + import scala.collection.JavaConverters._ + import org.apache.commons.io.FileUtils import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation + import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException} import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, MaxPartitionExceedException} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { override protected def beforeAll(): Unit = { @@ -607,29 +610,47 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { } test("watchdog with scan maxPartitions -- data source v2") { - withTempView("test") { - val df = spark.read.format(classOf[ReportStatisticsPartitionAwareDataSource].getName).load() - df.createOrReplaceTempView("test") - checkMaxPartition + val df = spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load() + df.createOrReplaceTempView("test") + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS.key -> "10") { + checkAnswer(sql("SELECT count(distinct(i)) FROM test"), Row(10) :: Nil) + } + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS.key -> "5") { + intercept[MaxPartitionExceedException]( + sql("SELECT * FROM test").queryExecution.sparkPlan) } } test("watchdog with scan maxFileSize -- data source v2") { - withTempView("test", "test_non_part") { - val df = spark.read.format(classOf[ReportStatisticsPartitionAwareDataSource].getName).load() - df.createOrReplaceTempView("test") - val logical = df.queryExecution.optimizedPlan.collect { - case d: DataSourceV2ScanRelation => d - }.head - val tableSize = logical.computeStats().sizeInBytes.toLong - - val nonPartDf = spark.read.format(classOf[ReportStatisticsDataSource].getName).load() - nonPartDf.createOrReplaceTempView("test_non_part") - val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect { - case d: DataSourceV2ScanRelation => d - }.head - val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong - checkMaxFileSize(tableSize, nonPartTableSize) + val df = spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load() + df.createOrReplaceTempView("test") + val logical = df.queryExecution.optimizedPlan.collect { + case d: DataSourceV2ScanRelation => d + }.head + val tableSize = logical.computeStats().sizeInBytes.toLong + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> tableSize.toString) { + sql("SELECT * FROM test").queryExecution.sparkPlan } + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 2).toString) { + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test").queryExecution.sparkPlan) + } + + val nonPartDf = spark.read.format(classOf[ReportStatisticsDataSource].getName).load() + nonPartDf.createOrReplaceTempView("test_non_part") + val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect { + case d: DataSourceV2ScanRelation => d + }.head + val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong + + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> nonPartTableSize.toString) { + sql("SELECT * FROM test_non_part").queryExecution.sparkPlan + } + + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize / 2).toString) { + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test_non_part").queryExecution.sparkPlan) + } + } } From c8399a021e4812e447a7f14bf12be46f992ae91a Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Fri, 22 Mar 2024 13:37:36 +0800 Subject: [PATCH 14/17] fix header --- ...ortStatisticsAndPartitionAwareDataSource.scala | 15 +++++++-------- .../spark/sql/ReportStatisticsDataSource.scala | 15 +++++++-------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala index 831ff18f2bf..136ced538e1 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala @@ -1,13 +1,12 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala index 48a0fb88658..2035d352562 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala @@ -1,13 +1,12 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, From acc358732ba4a98920b6705fe68a64b8b2fc415e Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Sun, 7 Apr 2024 12:15:59 +0800 Subject: [PATCH 15/17] disable the rule that checks the maxPartitions for dsv2 --- .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 13 ------------- .../org/apache/spark/sql/WatchDogSuiteBase.scala | 14 +------------- 2 files changed, 1 insertion(+), 26 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index b6d8c85d49e..871c21fba58 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -245,19 +245,6 @@ case class MaxScanStrategy(session: SparkSession) val partitionColumnNames = table.partitioning().map(_.describe()) val stats = relation.computeStats() lazy val scanFileSize = stats.sizeInBytes - lazy val scanPartitions = relation.scan.asInstanceOf[SupportsReportPartitioning] - .outputPartitioning() - .numPartitions() - if (maxScanPartitionsOpt.exists(_ < scanPartitions)) { - throw new MaxPartitionExceedException( - s""" - |Your SQL job scan a whole huge table without any partition filter, - |You should optimize your SQL logical according partition structure - |or shorten query scope such as p_date, detail as below: - |Table: ${table.name()} - |Partition Structure: ${partitionColumnNames.mkString(",")} - |""".stripMargin) - } if (maxFileSizeOpt.exists(_ < scanFileSize)) { throw new MaxFileSizeExceedException( s""" diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala index 87b4fa37dc4..dc009caf32c 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala @@ -608,19 +608,7 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { assert(e.getMessage == "Script transformation is not allowed") } } - - test("watchdog with scan maxPartitions -- data source v2") { - val df = spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load() - df.createOrReplaceTempView("test") - withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS.key -> "10") { - checkAnswer(sql("SELECT count(distinct(i)) FROM test"), Row(10) :: Nil) - } - withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS.key -> "5") { - intercept[MaxPartitionExceedException]( - sql("SELECT * FROM test").queryExecution.sparkPlan) - } - } - + test("watchdog with scan maxFileSize -- data source v2") { val df = spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load() df.createOrReplaceTempView("test") From fb113d6258d40aac2886993a3e0711919ad31779 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Sun, 7 Apr 2024 12:18:39 +0800 Subject: [PATCH 16/17] disable the rule that checks the maxPartitions for dsv2 --- .../scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala | 3 +-- .../test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 871c21fba58..14e0eb08c0f 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -240,8 +240,7 @@ case class MaxScanStrategy(session: SparkSession) _, relation @ DataSourceV2ScanRelation(_, _, _, _, _)) => val table = relation.relation.table - if (table.partitioning().nonEmpty && - relation.scan.isInstanceOf[SupportsReportPartitioning]) { + if (table.partitioning().nonEmpty) { val partitionColumnNames = table.partitioning().map(_.describe()) val stats = relation.computeStats() lazy val scanFileSize = stats.sizeInBytes diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala index dc009caf32c..954a2e4296a 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala @@ -608,7 +608,7 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { assert(e.getMessage == "Script transformation is not allowed") } } - + test("watchdog with scan maxFileSize -- data source v2") { val df = spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load() df.createOrReplaceTempView("test") From 3c5b0c276390ac9a825ca0962894c8623301527e Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Sun, 7 Apr 2024 14:38:00 +0800 Subject: [PATCH 17/17] reformat --- .../scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala | 1 - .../src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 14e0eb08c0f..e647ad3250e 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.read.SupportsReportPartitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala index 954a2e4296a..22c998d3b0c 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala @@ -639,6 +639,5 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { intercept[MaxFileSizeExceedException]( sql("SELECT * FROM test_non_part").queryExecution.sparkPlan) } - } }