Skip to content

Commit

Permalink
[KYUUBI #6315] Spark 3.5: MaxScanStrategy supports DSv2
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

Now, MaxScanStrategy can be adopted to limit max scan file size in some datasources, such as Hive. Hopefully we can enhance MaxScanStrategy to include support for the datasourcev2.
## Describe Your Solution 🔧

get the statistics about files scanned through datasourcev2 API

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my feature works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested

**Be nice. Be informative.**

Closes #5852 from zhaohehuhu/dev-1213.

Closes #6315

3c5b0c2 [hezhao2] reformat
fb113d6 [hezhao2] disable the rule that checks the maxPartitions for dsv2
acc3587 [hezhao2] disable the rule that checks the maxPartitions for dsv2
c8399a0 [hezhao2] fix header
70c845b [hezhao2] add UTs
3a07396 [hezhao2] add ut
4d26ce1 [hezhao2] reformat
f87cb07 [hezhao2] reformat
b307022 [hezhao2] move code to Spark 3.5
73258c2 [hezhao2] fix unused import
cf893a0 [hezhao2] drop reflection for loading iceberg class
dc128bc [hezhao2] refactor code
661834c [hezhao2] revert code
6061f42 [hezhao2] delete IcebergSparkPlanHelper
5f1c3c0 [hezhao2] fix
b15652f [hezhao2] remove iceberg dependency
fe620ca [hezhao2] enable MaxScanStrategy when accessing iceberg datasource

Authored-by: hezhao2 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
zhaohehuhu authored and pan3793 committed Apr 17, 2024
1 parent 962de72 commit 8edcb00
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.sql.KyuubiSQLConf
Expand Down Expand Up @@ -232,6 +233,40 @@ case class MaxScanStrategy(session: SparkSession)
logicalRelation.catalogTable)
}
}
case ScanOperation(
_,
_,
_,
relation @ DataSourceV2ScanRelation(_, _, _, _, _)) =>
val table = relation.relation.table
if (table.partitioning().nonEmpty) {
val partitionColumnNames = table.partitioning().map(_.describe())
val stats = relation.computeStats()
lazy val scanFileSize = stats.sizeInBytes
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw new MaxFileSizeExceedException(
s"""
|SQL job scan file size in bytes: $scanFileSize
|exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get}
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${table.name()}
|Partition Structure: ${partitionColumnNames.mkString(",")}
|""".stripMargin)
}
} else {
val stats = relation.computeStats()
lazy val scanFileSize = stats.sizeInBytes
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw new MaxFileSizeExceedException(
s"""
|SQL job scan file size in bytes: $scanFileSize
|exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get}
|detail as below:
|Table: ${table.name()}
|""".stripMargin)
}
}
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import java.util.OptionalLong

import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable, SimpleScanBuilder, SimpleWritableDataSource}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.{Expressions, FieldReference, Transform}
import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder, Statistics, SupportsReportPartitioning, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class ReportStatisticsAndPartitionAwareDataSource extends SimpleWritableDataSource {

class MyScanBuilder(
val partitionKeys: Seq[String]) extends SimpleScanBuilder
with SupportsReportStatistics with SupportsReportPartitioning {

override def estimateStatistics(): Statistics = {
new Statistics {
override def sizeInBytes(): OptionalLong = OptionalLong.of(80)

override def numRows(): OptionalLong = OptionalLong.of(10)

}
}

override def planInputPartitions(): Array[InputPartition] = {
Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
}

override def outputPartitioning(): Partitioning = {
new KeyGroupedPartitioning(partitionKeys.map(FieldReference(_)).toArray, 10)
}
}

override def getTable(options: CaseInsensitiveStringMap): Table = {
new SimpleBatchTable {
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder(Seq("i"))
}

override def partitioning(): Array[Transform] = {
Array(Expressions.identity("i"))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import java.util.OptionalLong

import org.apache.spark.sql.connector._
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class ReportStatisticsDataSource extends SimpleWritableDataSource {

class MyScanBuilder extends SimpleScanBuilder
with SupportsReportStatistics {

override def estimateStatistics(): Statistics = {
new Statistics {
override def sizeInBytes(): OptionalLong = OptionalLong.of(80)

override def numRows(): OptionalLong = OptionalLong.of(10)
}
}

override def planInputPartitions(): Array[InputPartition] = {
Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
}

}

override def getTable(options: CaseInsensitiveStringMap): Table = {
new SimpleBatchTable {
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation

import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, MaxPartitionExceedException}
Expand Down Expand Up @@ -607,4 +608,36 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest {
assert(e.getMessage == "Script transformation is not allowed")
}
}

test("watchdog with scan maxFileSize -- data source v2") {
val df = spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load()
df.createOrReplaceTempView("test")
val logical = df.queryExecution.optimizedPlan.collect {
case d: DataSourceV2ScanRelation => d
}.head
val tableSize = logical.computeStats().sizeInBytes.toLong
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> tableSize.toString) {
sql("SELECT * FROM test").queryExecution.sparkPlan
}
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 2).toString) {
intercept[MaxFileSizeExceedException](
sql("SELECT * FROM test").queryExecution.sparkPlan)
}

val nonPartDf = spark.read.format(classOf[ReportStatisticsDataSource].getName).load()
nonPartDf.createOrReplaceTempView("test_non_part")
val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect {
case d: DataSourceV2ScanRelation => d
}.head
val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong

withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> nonPartTableSize.toString) {
sql("SELECT * FROM test_non_part").queryExecution.sparkPlan
}

withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize / 2).toString) {
intercept[MaxFileSizeExceedException](
sql("SELECT * FROM test_non_part").queryExecution.sparkPlan)
}
}
}

0 comments on commit 8edcb00

Please sign in to comment.