From 8631d333195bdac70b1760470fd69309c566131a Mon Sep 17 00:00:00 2001 From: mokunhua Date: Fri, 25 Aug 2023 16:00:45 +0800 Subject: [PATCH 1/4] [Gluten-core][VL] Supports Delta 2.2 Read --- gluten-core/pom.xml | 5 + .../FileSourceScanExecTransformer.scala | 1 + .../extension/ColumnarOverrides.scala | 98 ++++++++++++++++++- 3 files changed, 102 insertions(+), 2 deletions(-) diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml index c37a41629d01..96cfab1d1a2f 100644 --- a/gluten-core/pom.xml +++ b/gluten-core/pom.xml @@ -24,6 +24,11 @@ + + io.delta + delta-core_${scala.binary.version} + provided + io.glutenproject gluten-ui diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala index ceb79b5b79a3..7d03182f86b0 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala @@ -285,6 +285,7 @@ class FileSourceScanExecTransformer( case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat case "CSVFileFormat" => ReadFileFormat.TextReadFormat + case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat case _ => ReadFileFormat.UnknownFormat } } diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index adb50084d14c..764bb7012cf1 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -23,7 +23,6 @@ import io.glutenproject.expression.ExpressionConverter import io.glutenproject.extension.columnar._ import io.glutenproject.metrics.GlutenTimeMetric import io.glutenproject.utils.{ColumnarShuffleUtil, LogLevelUtil, PhysicalPlanSelector} - import org.apache.spark.api.python.EvalPythonExecTransformer import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} @@ -32,19 +31,113 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} +import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -import org.apache.spark.sql.execution.datasources.{GlutenWriterColumnarRules, MATERIALIZE_TAG} +import org.apache.spark.sql.execution.datasources.{FileFormat, GlutenWriterColumnarRules, HadoopFsRelation, MATERIALIZE_TAG} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.EvalPythonExec import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.SparkRuleUtil +/** + * This Rule used for the necessary transformation for SparkPlan, like: + * 1) Lake format related transformation which can't be done in Spark, e.g. Delta lake + * 2) Needed to be applied before any other Rules, to avoid information lack like Plan Hint tag + * + */ +case class RewritePlanIfNeeded(session: SparkSession) extends Rule[SparkPlan] { + def apply(plan: SparkPlan): SparkPlan = { + val np = plan transformUp { + // If is Delta Column Mapping read(e.g. nameMapping and idMapping) + // change the metadata of Delta into Parquet one, + // so that gluten can read Delta File using Parquet Reader + // currently supports Delta 2.2 + case p: FileSourceScanExec if isDeltaColumnMappingFileFormat(p.relation.fileFormat) => + transformColumnMappingPlan(p) + } + np + } + + /** + * check if is Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping) + * + * @param fileFormat + * @return + */ + private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean = fileFormat match { + case d: DeltaParquetFileFormat if d.columnMappingMode.name != "none" => + true + case _ => + false + } + + /** + * This method only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping) + * transform the metadata of Delta into Parquet one, each plan should only be transformed once + * Only supports Delta 2.2 ver + * + * @param plan : FileSourceScanExec + * @return + */ + private def transformColumnMappingPlan(splan: SparkPlan): SparkPlan = splan match { + case plan: FileSourceScanExec => + val fmt = plan.relation.fileFormat.asInstanceOf[DeltaParquetFileFormat] + val p_names = scala.collection.mutable.Map[String, String]() + // get physicalName from file + fmt.referenceSchema.foreach { e => + val p_name = e.metadata.getString("delta.columnMapping.physicalName") + val l_name = e.name + p_names += (l_name -> p_name) + } + val attr = plan.output.map { o => + AttributeReference(p_names(o.name), o.dataType, + o.nullable, o.metadata)(o.exprId, o.qualifier) + } + // alias physicalName into tableName + val expr = (attr, plan.requiredSchema.names).zipped.map { (a, p) => + Alias(a, p)(exprId = a.exprId) + } + // replace tableName in schema with physicalName + val new_req_field = plan.requiredSchema.map { e => + StructField(p_names(e.name), e.dataType, e.nullable, e.metadata) + } + val new_data_field = plan.relation.dataSchema.map { e => + StructField(p_names(e.name), e.dataType, e.nullable, e.metadata) + } + val new_partition_field = plan.relation.partitionSchema.map { e => + StructField(p_names(e.name), e.dataType, e.nullable, e.metadata) + } + val rel = HadoopFsRelation( + plan.relation.location, + StructType(new_partition_field), + StructType(new_data_field), + plan.relation.bucketSpec, + plan.relation.fileFormat, + plan.relation.options + )(session) + val newPlan = FileSourceScanExec( + rel, + attr, + StructType(new_req_field), + plan.partitionFilters, + plan.optionalBucketSet, + plan.optionalNumCoalescedBuckets, + plan.dataFilters, + plan.tableIdentifier, + plan.disableBucketedScan + ) + ProjectExec(expr, newPlan) + case _ => splan + } +} + // This rule will conduct the conversion from Spark plan to the plan transformer. case class TransformPreOverrides(isAdaptiveContext: Boolean) extends Rule[SparkPlan] @@ -756,6 +849,7 @@ case class ColumnarOverrideRules(session: SparkSession) } tagBeforeTransformHitsRules ::: List( + (spark: SparkSession) => RewritePlanIfNeeded(spark), (spark: SparkSession) => PlanOneRowRelation(spark), (_: SparkSession) => FallbackEmptySchemaRelation(), (_: SparkSession) => AddTransformHintRule(), From 8a2b0d646db20479d73c254e70453b164add5545 Mon Sep 17 00:00:00 2001 From: mokunhua Date: Fri, 1 Sep 2023 14:31:42 +0800 Subject: [PATCH 2/4] [Gluten-core] Support Delta scan --- .../extension/ColumnarOverrides.scala | 88 +++++++++++++------ 1 file changed, 61 insertions(+), 27 deletions(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 764bb7012cf1..cd676ddf3cd9 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -46,15 +46,16 @@ import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.SparkRuleUtil +import scala.collection._ + /** - * This Rule used for the necessary transformation for SparkPlan, like: - * 1) Lake format related transformation which can't be done in Spark, e.g. Delta lake - * 2) Needed to be applied before any other Rules, to avoid information lack like Plan Hint tag - * + * This Rule used for the necessary transformation for SparkPlan, like: 1) Lake format related + * transformation which can't be done in Spark, e.g. Delta lake; 2) Needed to be applied before any + * other Rules, to avoid information lack like Plan Hint tag */ case class RewritePlanIfNeeded(session: SparkSession) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { - val np = plan transformUp { + val newPlan = plan.transformUpWithSubqueries { // If is Delta Column Mapping read(e.g. nameMapping and idMapping) // change the metadata of Delta into Parquet one, // so that gluten can read Delta File using Parquet Reader @@ -62,7 +63,7 @@ case class RewritePlanIfNeeded(session: SparkSession) extends Rule[SparkPlan] { case p: FileSourceScanExec if isDeltaColumnMappingFileFormat(p.relation.fileFormat) => transformColumnMappingPlan(p) } - np + newPlan } /** @@ -83,41 +84,74 @@ case class RewritePlanIfNeeded(session: SparkSession) extends Rule[SparkPlan] { * transform the metadata of Delta into Parquet one, each plan should only be transformed once * Only supports Delta 2.2 ver * - * @param plan : FileSourceScanExec + * @param plan + * FileSourceScanExec * @return */ - private def transformColumnMappingPlan(splan: SparkPlan): SparkPlan = splan match { + private def transformColumnMappingPlan(sPlan: SparkPlan): SparkPlan = sPlan match { case plan: FileSourceScanExec => val fmt = plan.relation.fileFormat.asInstanceOf[DeltaParquetFileFormat] - val p_names = scala.collection.mutable.Map[String, String]() + val pNames: mutable.Map[String, String] = mutable.Map.empty + var aliasNames: Seq[String] = Seq.empty + var aliasAttr: Seq[Attribute] = Seq.empty // get physicalName from file fmt.referenceSchema.foreach { e => - val p_name = e.metadata.getString("delta.columnMapping.physicalName") - val l_name = e.name - p_names += (l_name -> p_name) + val pName = e.metadata.getString("delta.columnMapping.physicalName") + val lName = e.name + pNames += (lName -> pName) } + // transform output's name into physical name so Reader can read data correctly + // should keep the columns order the same as the origin output val attr = plan.output.map { o => - AttributeReference(p_names(o.name), o.dataType, - o.nullable, o.metadata)(o.exprId, o.qualifier) + val new_attr = o.withName(pNames(o.name)) + if (!aliasNames.contains(o.name)) { + aliasAttr = aliasAttr :+ new_attr + aliasNames = aliasNames :+ o.name + } + new_attr + } + // transform dataFilter's name into physical name + val newDataFilters = plan.dataFilters.map { e => + e.transformDown { + case attr: AttributeReference => + val new_attr = attr.withName(pNames(attr.name)).toAttribute + if (!aliasNames.contains(attr.name)) { + aliasAttr = aliasAttr :+ new_attr + aliasNames = aliasNames :+ attr.name + } + new_attr + } + } + val newPartitionFilters = plan.partitionFilters.map { e => + e.transformDown { + case attr: AttributeReference => + val new_attr = attr.withName(pNames(attr.name)).toAttribute + if (!aliasNames.contains(attr.name)) { + aliasAttr = aliasAttr :+ new_attr + aliasNames = aliasNames :+ attr.name + } + new_attr + } } // alias physicalName into tableName - val expr = (attr, plan.requiredSchema.names).zipped.map { (a, p) => + // should keep the columns order the same as the origin output + val expr = (aliasAttr, aliasNames).zipped.map { (a, p) => Alias(a, p)(exprId = a.exprId) } // replace tableName in schema with physicalName - val new_req_field = plan.requiredSchema.map { e => - StructField(p_names(e.name), e.dataType, e.nullable, e.metadata) + val newReqField = plan.requiredSchema.map { e => + StructField(pNames(e.name), e.dataType, e.nullable, e.metadata) } - val new_data_field = plan.relation.dataSchema.map { e => - StructField(p_names(e.name), e.dataType, e.nullable, e.metadata) + val newDataField = plan.relation.dataSchema.map { e => + StructField(pNames(e.name), e.dataType, e.nullable, e.metadata) } - val new_partition_field = plan.relation.partitionSchema.map { e => - StructField(p_names(e.name), e.dataType, e.nullable, e.metadata) + val newPartitionField = plan.relation.partitionSchema.map { e => + StructField(pNames(e.name), e.dataType, e.nullable, e.metadata) } val rel = HadoopFsRelation( plan.relation.location, - StructType(new_partition_field), - StructType(new_data_field), + StructType(newPartitionField), + StructType(newDataField), plan.relation.bucketSpec, plan.relation.fileFormat, plan.relation.options @@ -125,16 +159,16 @@ case class RewritePlanIfNeeded(session: SparkSession) extends Rule[SparkPlan] { val newPlan = FileSourceScanExec( rel, attr, - StructType(new_req_field), - plan.partitionFilters, + StructType(newReqField), + newPartitionFilters, plan.optionalBucketSet, plan.optionalNumCoalescedBuckets, - plan.dataFilters, + newDataFilters, plan.tableIdentifier, plan.disableBucketedScan ) ProjectExec(expr, newPlan) - case _ => splan + case _ => sPlan } } From 748f5f6959dff36263943a2c21f88ed740f1475a Mon Sep 17 00:00:00 2001 From: mokunhua Date: Thu, 7 Sep 2023 11:55:12 +0800 Subject: [PATCH 3/4] fix code style --- .../extension/ColumnarOverrides.scala | 160 +++++++++--------- 1 file changed, 76 insertions(+), 84 deletions(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 2431f4d50463..10419c09db0d 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -23,6 +23,7 @@ import io.glutenproject.expression.ExpressionConverter import io.glutenproject.extension.columnar._ import io.glutenproject.metrics.GlutenTimeMetric import io.glutenproject.utils.{ColumnarShuffleUtil, LogLevelUtil, PhysicalPlanSelector} + import org.apache.spark.api.python.EvalPythonExecTransformer import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} @@ -31,12 +32,12 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} -import org.apache.spark.sql.delta.DeltaParquetFileFormat +import org.apache.spark.sql.delta.{DeltaParquetFileFormat, NoMapping} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -import org.apache.spark.sql.execution.datasources.{FileFormat, GlutenWriterColumnarRules, HadoopFsRelation, MATERIALIZE_TAG} +import org.apache.spark.sql.execution.datasources.{FileFormat, GlutenWriterColumnarRules, MATERIALIZE_TAG} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins._ @@ -49,117 +50,103 @@ import org.apache.spark.util.SparkRuleUtil import scala.collection._ /** - * This Rule used for the necessary transformation for SparkPlan, like: 1) Lake format related + * This Rule is used for the necessary transformation for SparkPlan, like: 1) Lake format related * transformation which can't be done in Spark, e.g. Delta lake; 2) Needed to be applied before any - * other Rules, to avoid information lack like Plan Hint tag + * other Rules, to avoid information lack like Plan Hint tag; */ case class RewritePlanIfNeeded(session: SparkSession) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { - val newPlan = plan.transformUpWithSubqueries { - // If is Delta Column Mapping read(e.g. nameMapping and idMapping) - // change the metadata of Delta into Parquet one, - // so that gluten can read Delta File using Parquet Reader - // currently supports Delta 2.2 + plan.transformUpWithSubqueries { + // If it enables Delta Column Mapping(e.g. nameMapping and idMapping), + // transform the metadata of Delta into Parquet's, + // so that gluten can read Delta File using Parquet Reader. case p: FileSourceScanExec if isDeltaColumnMappingFileFormat(p.relation.fileFormat) => transformColumnMappingPlan(p) } - newPlan } - /** - * check if is Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping) - * - * @param fileFormat - * @return - */ + /** Check if `FileFormat` is in Delta ColumnMapping Mode(e.g. nameMapping and idMapping). */ private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean = fileFormat match { - case d: DeltaParquetFileFormat if d.columnMappingMode.name != "none" => + case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping => true case _ => false } /** - * This method only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping) - * transform the metadata of Delta into Parquet one, each plan should only be transformed once - * Only supports Delta 2.2 ver - * - * @param plan - * FileSourceScanExec - * @return + * This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping) + * transform the metadata of Delta into Parquet's, each plan should only be transformed once. */ private def transformColumnMappingPlan(sPlan: SparkPlan): SparkPlan = sPlan match { case plan: FileSourceScanExec => val fmt = plan.relation.fileFormat.asInstanceOf[DeltaParquetFileFormat] - val pNames: mutable.Map[String, String] = mutable.Map.empty - var aliasNames: Seq[String] = Seq.empty - var aliasAttr: Seq[Attribute] = Seq.empty - // get physicalName from file - fmt.referenceSchema.foreach { e => - val pName = e.metadata.getString("delta.columnMapping.physicalName") - val lName = e.name - pNames += (lName -> pName) + // a mapping between the table schemas name to parquet schemas. + val columnNameMapping = mutable.Map.empty[String, String] + fmt.referenceSchema.foreach { + f => + val pName = f.metadata.getString("delta.columnMapping.physicalName") + val lName = f.name + columnNameMapping += (lName -> pName) } + + // transform HadoopFsRelation + val relation = plan.relation + val newDataFields = relation.dataSchema.map(e => e.copy(columnNameMapping(e.name))) + val newPartitionFields = relation.partitionSchema.map { + e => e.copy(columnNameMapping(e.name)) + } + val newFsRelation = relation.copy( + partitionSchema = StructType(newPartitionFields), + dataSchema = StructType(newDataFields) + )(session) + // transform output's name into physical name so Reader can read data correctly // should keep the columns order the same as the origin output - val attr = plan.output.map { o => - val new_attr = o.withName(pNames(o.name)) - if (!aliasNames.contains(o.name)) { - aliasAttr = aliasAttr :+ new_attr - aliasNames = aliasNames :+ o.name - } - new_attr - } - // transform dataFilter's name into physical name - val newDataFilters = plan.dataFilters.map { e => - e.transformDown { - case attr: AttributeReference => - val new_attr = attr.withName(pNames(attr.name)).toAttribute - if (!aliasNames.contains(attr.name)) { - aliasAttr = aliasAttr :+ new_attr - aliasNames = aliasNames :+ attr.name - } - new_attr - } + val originColumnNames = mutable.ListBuffer.empty[String] + val transformedAttrs = mutable.ListBuffer.empty[Attribute] + val newOutput = plan.output.map { + o => + val newAttr = o.withName(columnNameMapping(o.name)) + if (!originColumnNames.contains(o.name)) { + transformedAttrs += newAttr + originColumnNames += o.name + } + newAttr } - val newPartitionFilters = plan.partitionFilters.map { e => - e.transformDown { - case attr: AttributeReference => - val new_attr = attr.withName(pNames(attr.name)).toAttribute - if (!aliasNames.contains(attr.name)) { - aliasAttr = aliasAttr :+ new_attr - aliasNames = aliasNames :+ attr.name - } - new_attr - } + // transform dataFilters + val newDataFilters = plan.dataFilters.map { + e => + e.transformDown { + case attr: AttributeReference => + val newAttr = attr.withName(columnNameMapping(attr.name)).toAttribute + if (!originColumnNames.contains(attr.name)) { + transformedAttrs += newAttr + originColumnNames += attr.name + } + newAttr + } } - // alias physicalName into tableName - // should keep the columns order the same as the origin output - val expr = (aliasAttr, aliasNames).zipped.map { (a, p) => - Alias(a, p)(exprId = a.exprId) + // transform partitionFilters + val newPartitionFilters = plan.partitionFilters.map { + e => + e.transformDown { + case attr: AttributeReference => + val newAttr = attr.withName(columnNameMapping(attr.name)).toAttribute + if (!originColumnNames.contains(attr.name)) { + transformedAttrs += newAttr + originColumnNames += attr.name + } + newAttr + } } // replace tableName in schema with physicalName - val newReqField = plan.requiredSchema.map { e => - StructField(pNames(e.name), e.dataType, e.nullable, e.metadata) - } - val newDataField = plan.relation.dataSchema.map { e => - StructField(pNames(e.name), e.dataType, e.nullable, e.metadata) + val newRequiredFields = plan.requiredSchema.map { + e => StructField(columnNameMapping(e.name), e.dataType, e.nullable, e.metadata) } - val newPartitionField = plan.relation.partitionSchema.map { e => - StructField(pNames(e.name), e.dataType, e.nullable, e.metadata) - } - val rel = HadoopFsRelation( - plan.relation.location, - StructType(newPartitionField), - StructType(newDataField), - plan.relation.bucketSpec, - plan.relation.fileFormat, - plan.relation.options - )(session) val newPlan = FileSourceScanExec( - rel, - attr, - StructType(newReqField), + newFsRelation, + newOutput, + StructType(newRequiredFields), newPartitionFilters, plan.optionalBucketSet, plan.optionalNumCoalescedBuckets, @@ -167,6 +154,11 @@ case class RewritePlanIfNeeded(session: SparkSession) extends Rule[SparkPlan] { plan.tableIdentifier, plan.disableBucketedScan ) + + // alias physicalName into tableName + val expr = (transformedAttrs, originColumnNames).zipped.map { + (attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId) + } ProjectExec(expr, newPlan) case _ => sPlan } From 42f950334bb819cc140a7b74b9a3ee5b116a8f28 Mon Sep 17 00:00:00 2001 From: mokunhua Date: Thu, 14 Sep 2023 12:50:03 +0800 Subject: [PATCH 4/4] remove unused import --- .../scala/io/glutenproject/extension/ColumnarOverrides.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 10419c09db0d..1be3ce35df5f 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -import org.apache.spark.sql.execution.datasources.{FileFormat, GlutenWriterColumnarRules, MATERIALIZE_TAG} +import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins._