Skip to content

Commit

Permalink
delete IcebergSparkPlanHelper
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaohehuhu committed Dec 20, 2023
1 parent 1955542 commit d5a1db3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 41 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@

package org.apache.kyuubi.sql.watchdog

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

import org.apache.hadoop.fs.Path
import org.apache.iceberg.spark.source.IcebergSparkPlanHelper.numPartitions
import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.read.SupportsReportPartitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.sql.KyuubiSQLConf
import org.apache.kyuubi.util.reflect.DynMethods
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`


/**
* Add MaxScanStrategy to avoid scan excessive partitions or files
Expand Down Expand Up @@ -242,8 +242,7 @@ case class MaxScanStrategy(session: SparkSession)
logicalRelation.catalogTable)
}
}
case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation)
=>
case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation) =>
val isIcebergRelation = DynMethods
.builder("isIcebergRelation")
.impl(Class.forName("org.apache.spark.sql.catalyst.utils.PlanUtils"))
Expand All @@ -256,7 +255,9 @@ case class MaxScanStrategy(session: SparkSession)
val partitionColumnNames = icebergTable.partitioning().map(_.describe())
val stats = relation.computeStats()
lazy val scanFileSize = stats.sizeInBytes
lazy val scanPartitions = numPartitions(relation.scan)
lazy val scanPartitions = relation.scan.asInstanceOf[SupportsReportPartitioning]
.outputPartitioning()
.numPartitions()
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw new MaxFileSizeExceedException(
s"""
Expand Down

0 comments on commit d5a1db3

Please sign in to comment.