From 3c72fef4766a3f9a4e3ca3df85e38e4b656853f9 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Tue, 24 Dec 2024 20:24:03 +0800 Subject: [PATCH 1/2] [KYUUBI #6857] Spark 3.4: MaxScanStrategy supports DSv2 ### Why are the changes needed? Backport https://github.com/apache/kyuubi/pull/5852 to Spark 3.4, to enhance MaxScanStrategy to include support for the datasourcev2 in Spark 3.4 ### How was this patch tested? Add some UTs ### Was this patch authored or co-authored using generative AI tooling? No Closes #6857 from zhaohehuhu/dev-1224. Closes #6857 c72c62984 [zhaohehuhu] remove the import dfbf2bc2d [zhaohehuhu] MaxScanStrategy supports DSv2 in Spark 3.4 Authored-by: zhaohehuhu Signed-off-by: Cheng Pan --- .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 35 ++++++++++ ...tatisticsAndPartitionAwareDataSource.scala | 64 +++++++++++++++++++ .../sql/ReportStatisticsDataSource.scala | 53 +++++++++++++++ .../apache/spark/sql/WatchDogSuiteBase.scala | 30 +++++++++ 4 files changed, 182 insertions(+) create mode 100644 extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala create mode 100644 extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 1ed55ebc2fd..e647ad3250e 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.types.StructType import org.apache.kyuubi.sql.KyuubiSQLConf @@ -232,6 +233,40 @@ case class MaxScanStrategy(session: SparkSession) logicalRelation.catalogTable) } } + case ScanOperation( + _, + _, + _, + relation @ DataSourceV2ScanRelation(_, _, _, _, _)) => + val table = relation.relation.table + if (table.partitioning().nonEmpty) { + val partitionColumnNames = table.partitioning().map(_.describe()) + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${table.name()} + |Partition Structure: ${partitionColumnNames.mkString(",")} + |""".stripMargin) + } + } else { + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |detail as below: + |Table: ${table.name()} + |""".stripMargin) + } + } case _ => } } diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala new file mode 100644 index 00000000000..670d9ce7e43 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.OptionalLong + +import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable, SimpleScanBuilder, SimpleWritableDataSource} +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.expressions.{Expressions, FieldReference, Transform} +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ReportStatisticsAndPartitionAwareDataSource extends SimpleWritableDataSource { + + class MyScanBuilder( + val partitionKeys: Seq[String]) extends SimpleScanBuilder + with SupportsReportStatistics with SupportsReportPartitioning { + + override def estimateStatistics(): Statistics = { + new Statistics { + override def sizeInBytes(): OptionalLong = OptionalLong.of(80) + + override def numRows(): OptionalLong = OptionalLong.of(10) + + } + } + + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + override def outputPartitioning(): Partitioning = { + new KeyGroupedPartitioning(partitionKeys.map(FieldReference(_)).toArray, 10) + } + } + + override def getTable(options: CaseInsensitiveStringMap): Table = { + new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder(Seq("i")) + } + + override def partitioning(): Array[Transform] = { + Array(Expressions.identity("i")) + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala new file mode 100644 index 00000000000..2035d352562 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.OptionalLong + +import org.apache.spark.sql.connector._ +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ReportStatisticsDataSource extends SimpleWritableDataSource { + + class MyScanBuilder extends SimpleScanBuilder + with SupportsReportStatistics { + + override def estimateStatistics(): Statistics = { + new Statistics { + override def sizeInBytes(): OptionalLong = OptionalLong.of(80) + + override def numRows(): OptionalLong = OptionalLong.of(10) + } + } + + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + } + + override def getTable(options: CaseInsensitiveStringMap): Table = { + new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala index 139efd9ca06..aec51cbd371 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.commons.io.FileUtils import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException} import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, MaxPartitionExceedException} @@ -607,4 +608,33 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { assert(e.getMessage == "Script transformation is not allowed") } } + + test("watchdog with scan maxFileSize -- data source v2") { + val df = spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load() + df.createOrReplaceTempView("test") + val logical = df.queryExecution.optimizedPlan.collect { + case d: DataSourceV2ScanRelation => d + }.head + val tableSize = logical.computeStats().sizeInBytes.toLong + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> tableSize.toString) { + sql("SELECT * FROM test").queryExecution.sparkPlan + } + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 2).toString) { + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test").queryExecution.sparkPlan) + } + val nonPartDf = spark.read.format(classOf[ReportStatisticsDataSource].getName).load() + nonPartDf.createOrReplaceTempView("test_non_part") + val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect { + case d: DataSourceV2ScanRelation => d + }.head + val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> nonPartTableSize.toString) { + sql("SELECT * FROM test_non_part").queryExecution.sparkPlan + } + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize / 2).toString) { + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test_non_part").queryExecution.sparkPlan) + } + } } From d50cf17edeb12e6eb4da22575f82ba7579d4a3be Mon Sep 17 00:00:00 2001 From: zhang_yao Date: Tue, 24 Dec 2024 21:16:39 +0800 Subject: [PATCH 2/2] [KYUUBI #6615] Make Jetty sending server version in response configurable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes #6615 ## Describe Your Solution ๐Ÿ”ง Add a config item that controls whether Jetty should send its version in response. This is an additional patch which enables/disables sending Jetty version for prometheus reporter. Sending Jetty version could be disabled by calling HttpConfiguration::setSendServerVersion(false) ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช Compiled and tested manually. #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6685 from paul8263/KYUUBI-6615-patch. Closes #6615 0638a5116 [zhang_yao] [KYUUBI #6615] Make Jetty sending server version in response configurable Authored-by: zhang_yao Signed-off-by: Cheng Pan --- .../metrics/PrometheusReporterService.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/PrometheusReporterService.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/PrometheusReporterService.scala index ab014caf14e..e62e2190906 100644 --- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/PrometheusReporterService.scala +++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/PrometheusReporterService.scala @@ -21,11 +21,12 @@ import com.codahale.metrics.MetricRegistry import io.prometheus.client.CollectorRegistry import io.prometheus.client.dropwizard.DropwizardExports import io.prometheus.client.exporter.MetricsServlet -import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector} import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.kyuubi.KyuubiException import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.FRONTEND_JETTY_SEND_VERSION_ENABLED import org.apache.kyuubi.service.AbstractService class PrometheusReporterService(registry: MetricRegistry) @@ -35,12 +36,21 @@ class PrometheusReporterService(registry: MetricRegistry) // VisibleForTesting private[metrics] var httpServer: Server = _ + private[metrics] var httpServerConnector: ServerConnector = _ @volatile protected var isStarted = false override def initialize(conf: KyuubiConf): Unit = { val port = conf.get(MetricsConf.METRICS_PROMETHEUS_PORT) val contextPath = conf.get(MetricsConf.METRICS_PROMETHEUS_PATH) - httpServer = new Server(port) + val jettyVersionEnabled = conf.get(FRONTEND_JETTY_SEND_VERSION_ENABLED) + + val httpConf = new HttpConfiguration() + httpConf.setSendServerVersion(jettyVersionEnabled) + httpServer = new Server() + httpServerConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConf)) + httpServerConnector.setPort(port) + httpServer.addConnector(httpServerConnector) + val context = new ServletContextHandler context.setContextPath("/") httpServer.setHandler(context) @@ -56,6 +66,7 @@ class PrometheusReporterService(registry: MetricRegistry) if (!isStarted) { try { httpServer.start() + httpServerConnector.start() info(s"Prometheus metrics HTTP server has started at ${httpServer.getURI}.") } catch { case rethrow: Exception => @@ -78,12 +89,14 @@ class PrometheusReporterService(registry: MetricRegistry) private def stopHttpServer(): Unit = { if (httpServer != null) { try { + httpServerConnector.stop() httpServer.stop() info("Prometheus metrics HTTP server has stopped.") } catch { case err: Exception => error("Cannot safely stop prometheus metrics HTTP server", err) } finally { httpServer = null + httpServerConnector = null } } }