diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml index 14fa2bdfabc2..c6e329d2f7d1 100644 --- a/gluten-core/pom.xml +++ b/gluten-core/pom.xml @@ -19,6 +19,11 @@ + + io.delta + delta-core_${scala.binary.version} + provided + io.glutenproject gluten-ui diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala index ceb79b5b79a3..7d03182f86b0 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala @@ -285,6 +285,7 @@ class FileSourceScanExecTransformer( case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat case "CSVFileFormat" => ReadFileFormat.TextReadFormat + case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat case _ => ReadFileFormat.UnknownFormat } } diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index a9b33e79106f..1be3ce35df5f 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -32,18 +32,138 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} +import org.apache.spark.sql.delta.{DeltaParquetFileFormat, NoMapping} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.EvalPythonExec import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.SparkRuleUtil +import scala.collection._ + +/** + * This Rule is used for the necessary transformation for SparkPlan, like: 1) Lake format related + * transformation which can't be done in Spark, e.g. Delta lake; 2) Needed to be applied before any + * other Rules, to avoid information lack like Plan Hint tag; + */ +case class RewritePlanIfNeeded(session: SparkSession) extends Rule[SparkPlan] { + def apply(plan: SparkPlan): SparkPlan = { + plan.transformUpWithSubqueries { + // If it enables Delta Column Mapping(e.g. nameMapping and idMapping), + // transform the metadata of Delta into Parquet's, + // so that gluten can read Delta File using Parquet Reader. + case p: FileSourceScanExec if isDeltaColumnMappingFileFormat(p.relation.fileFormat) => + transformColumnMappingPlan(p) + } + } + + /** Check if `FileFormat` is in Delta ColumnMapping Mode(e.g. nameMapping and idMapping). */ + private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean = fileFormat match { + case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping => + true + case _ => + false + } + + /** + * This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping) + * transform the metadata of Delta into Parquet's, each plan should only be transformed once. + */ + private def transformColumnMappingPlan(sPlan: SparkPlan): SparkPlan = sPlan match { + case plan: FileSourceScanExec => + val fmt = plan.relation.fileFormat.asInstanceOf[DeltaParquetFileFormat] + // a mapping between the table schemas name to parquet schemas. + val columnNameMapping = mutable.Map.empty[String, String] + fmt.referenceSchema.foreach { + f => + val pName = f.metadata.getString("delta.columnMapping.physicalName") + val lName = f.name + columnNameMapping += (lName -> pName) + } + + // transform HadoopFsRelation + val relation = plan.relation + val newDataFields = relation.dataSchema.map(e => e.copy(columnNameMapping(e.name))) + val newPartitionFields = relation.partitionSchema.map { + e => e.copy(columnNameMapping(e.name)) + } + val newFsRelation = relation.copy( + partitionSchema = StructType(newPartitionFields), + dataSchema = StructType(newDataFields) + )(session) + + // transform output's name into physical name so Reader can read data correctly + // should keep the columns order the same as the origin output + val originColumnNames = mutable.ListBuffer.empty[String] + val transformedAttrs = mutable.ListBuffer.empty[Attribute] + val newOutput = plan.output.map { + o => + val newAttr = o.withName(columnNameMapping(o.name)) + if (!originColumnNames.contains(o.name)) { + transformedAttrs += newAttr + originColumnNames += o.name + } + newAttr + } + // transform dataFilters + val newDataFilters = plan.dataFilters.map { + e => + e.transformDown { + case attr: AttributeReference => + val newAttr = attr.withName(columnNameMapping(attr.name)).toAttribute + if (!originColumnNames.contains(attr.name)) { + transformedAttrs += newAttr + originColumnNames += attr.name + } + newAttr + } + } + // transform partitionFilters + val newPartitionFilters = plan.partitionFilters.map { + e => + e.transformDown { + case attr: AttributeReference => + val newAttr = attr.withName(columnNameMapping(attr.name)).toAttribute + if (!originColumnNames.contains(attr.name)) { + transformedAttrs += newAttr + originColumnNames += attr.name + } + newAttr + } + } + // replace tableName in schema with physicalName + val newRequiredFields = plan.requiredSchema.map { + e => StructField(columnNameMapping(e.name), e.dataType, e.nullable, e.metadata) + } + val newPlan = FileSourceScanExec( + newFsRelation, + newOutput, + StructType(newRequiredFields), + newPartitionFilters, + plan.optionalBucketSet, + plan.optionalNumCoalescedBuckets, + newDataFilters, + plan.tableIdentifier, + plan.disableBucketedScan + ) + + // alias physicalName into tableName + val expr = (transformedAttrs, originColumnNames).zipped.map { + (attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId) + } + ProjectExec(expr, newPlan) + case _ => sPlan + } +} + // This rule will conduct the conversion from Spark plan to the plan transformer. case class TransformPreOverrides(isAdaptiveContext: Boolean) extends Rule[SparkPlan] @@ -750,6 +870,7 @@ case class ColumnarOverrideRules(session: SparkSession) } tagBeforeTransformHitsRules ::: List( + (spark: SparkSession) => RewritePlanIfNeeded(spark), (spark: SparkSession) => PlanOneRowRelation(spark), (_: SparkSession) => FallbackEmptySchemaRelation(), (_: SparkSession) => AddTransformHintRule(),