From 93056f09035f53fd0b760481add44eb4ec3aded2 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Mon, 30 Sep 2024 13:35:24 +0800 Subject: [PATCH] [GLUTEN-7028][CH][Part-4] Refactor `DeltaMergeTreeFileFormat` to read table configuration from deltalog's metadata (#7170) *Call ClickhouseMetaSerializer.forWrite at driver side and Broadcast ReadRel.ExtensionTable. --- backends-clickhouse/pom.xml | 2 + .../sql/delta/catalog/ClickHouseTableV2.scala | 30 +-- .../source/DeltaMergeTreeFileFormat.scala | 71 ++---- .../sql/delta/catalog/ClickHouseTableV2.scala | 30 +-- .../source/DeltaMergeTreeFileFormat.scala | 71 ++---- .../org/apache/spark/sql/delta/DeltaLog.scala | 3 - .../sql/delta/catalog/ClickHouseTableV2.scala | 29 +-- .../source/DeltaMergeTreeFileFormat.scala | 72 ++---- .../clickhouse/ExtensionTableBuilder.java | 45 ++-- .../clickhouse/ExtensionTableNode.java | 174 ++----------- .../clickhouse/CHIteratorApi.scala | 32 +-- .../backendsapi/clickhouse/CHRuleApi.scala | 11 +- .../spark/sql/delta/ClickhouseSnapshot.scala | 7 +- .../delta/catalog/ClickHouseTableV2Base.scala | 132 ++-------- .../commands/GlutenCHCacheDataCommand.scala | 14 +- .../clickhouse/ClickhouseMetaSerializer.scala | 235 ++++++++++++++++++ .../utils/MergeTreeDeltaUtil.scala | 23 +- .../utils/MergeTreePartsPartitionsUtil.scala | 34 +-- .../mergetree/DeltaMetaReader.scala | 36 +++ .../datasources/mergetree/StorageMeta.scala | 180 ++++++++++++++ .../v1/CHFormatWriterInjects.scala | 20 -- .../v1/CHMergeTreeWriterInjects.scala | 96 ++----- .../v1/clickhouse/MergeTreeOutputWriter.scala | 2 +- .../backendsapi/clickhouse/CHExtendRule.scala | 23 ++ .../backendsapi/clickhouse/CHExtendRule.scala | 23 ++ .../backendsapi/clickhouse/CHExtendRule.scala | 27 ++ .../spark/sql/catalyst/AddStorageInfo.scala | 50 ++++ ...ickHouseMergeTreePathBasedWriteSuite.scala | 63 ++--- ...ClickhouseMergetreeSoftAffinitySuite.scala | 2 +- 29 files changed, 816 insertions(+), 721 deletions(-) create mode 100644 backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala rename backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/{ => clickhouse}/utils/MergeTreeDeltaUtil.scala (75%) rename backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/{ => clickhouse}/utils/MergeTreePartsPartitionsUtil.scala (96%) create mode 100644 backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala create mode 100644 backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala create mode 100644 backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala create mode 100644 backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala create mode 100644 backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala create mode 100644 backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 130fe88552e7..241cdd84519a 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -338,6 +338,7 @@ src/test/scala/**/*.scala src/main/delta-${delta.binary.version}/**/*.scala src/test/delta-${delta.binary.version}/**/*.scala + src/main/${sparkshim.module.name}/**/*.scala src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala @@ -397,6 +398,7 @@ src/main/delta-${delta.binary.version} + src/main/${sparkshim.module.name} diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 90370f0b1d99..ae8ec32bd0a4 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} -import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.collection.BitSet @@ -95,28 +96,21 @@ class ClickHouseTableV2( def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = { new DeltaMergeTreeFileFormat( - meta, - dataBaseName, - tableName, - ClickhouseSnapshot.genSnapshotId(snapshot), - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - clickhouseTableConfigs, - partitionColumns - ) + StorageMeta.withMoreStorageInfo( + meta, + ClickhouseSnapshot.genSnapshotId(snapshot), + deltaLog.dataPath, + dataBaseName, + tableName)) } - override def deltaProperties(): ju.Map[String, String] = properties() + override def deltaProperties: Map[String, String] = properties().asScala.toMap - override def deltaCatalog(): Option[CatalogTable] = catalogTable + override def deltaCatalog: Option[CatalogTable] = catalogTable - override def deltaPath(): Path = path + override def deltaPath: Path = path - override def deltaSnapshot(): Snapshot = snapshot + override def deltaSnapshot: Snapshot = snapshot def cacheThis(): Unit = { deltaLog2Table.put(deltaLog, this) diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index 994329ccf17c..19b3d396bd6f 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer +import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} import org.apache.spark.sql.types.StructType @@ -28,47 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileFormat(metadata.columnMappingMode, metadata.schema) { - protected var database = "" - protected var tableName = "" - protected var snapshotId = "" - protected var orderByKeyOption: Option[Seq[String]] = None - protected var lowCardKeyOption: Option[Seq[String]] = None - protected var minmaxIndexKeyOption: Option[Seq[String]] = None - protected var bfIndexKeyOption: Option[Seq[String]] = None - protected var setIndexKeyOption: Option[Seq[String]] = None - protected var primaryKeyOption: Option[Seq[String]] = None - protected var partitionColumns: Seq[String] = Seq.empty[String] - protected var clickhouseTableConfigs: Map[String, String] = Map.empty - - // scalastyle:off argcount - def this( - metadata: Metadata, - database: String, - tableName: String, - snapshotId: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - clickhouseTableConfigs: Map[String, String], - partitionColumns: Seq[String]) { - this(metadata) - this.database = database - this.tableName = tableName - this.snapshotId = snapshotId - this.orderByKeyOption = orderByKeyOption - this.lowCardKeyOption = lowCardKeyOption - this.minmaxIndexKeyOption = minmaxIndexKeyOption - this.bfIndexKeyOption = bfIndexKeyOption - this.setIndexKeyOption = setIndexKeyOption - this.primaryKeyOption = primaryKeyOption - this.clickhouseTableConfigs = clickhouseTableConfigs - this.partitionColumns = partitionColumns - } - // scalastyle:on argcount - override def shortName(): String = "mergetree" override def toString(): String = "MergeTree" @@ -95,6 +56,17 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) .getInstance() .nativeConf(options, "") + @transient val deltaMetaReader = DeltaMetaReader(metadata) + + val database = deltaMetaReader.storageDB + val tableName = deltaMetaReader.storageTable + val deltaPath = deltaMetaReader.storagePath + + val extensionTableBC = sparkSession.sparkContext.broadcast( + ClickhouseMetaSerializer + .forWrite(deltaMetaReader, metadata.schema) + .toByteArray) + new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { ".mergetree" @@ -104,25 +76,18 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + require(path == deltaPath) GlutenMergeTreeWriterInjects .getInstance() .asInstanceOf[CHMergeTreeWriterInjects] .createOutputWriter( path, - database, - tableName, - snapshotId, - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - partitionColumns, metadata.schema, - clickhouseTableConfigs, context, - nativeConf + nativeConf, + database, + tableName, + extensionTableBC.value ) } } diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 90370f0b1d99..ae8ec32bd0a4 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} -import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.collection.BitSet @@ -95,28 +96,21 @@ class ClickHouseTableV2( def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = { new DeltaMergeTreeFileFormat( - meta, - dataBaseName, - tableName, - ClickhouseSnapshot.genSnapshotId(snapshot), - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - clickhouseTableConfigs, - partitionColumns - ) + StorageMeta.withMoreStorageInfo( + meta, + ClickhouseSnapshot.genSnapshotId(snapshot), + deltaLog.dataPath, + dataBaseName, + tableName)) } - override def deltaProperties(): ju.Map[String, String] = properties() + override def deltaProperties: Map[String, String] = properties().asScala.toMap - override def deltaCatalog(): Option[CatalogTable] = catalogTable + override def deltaCatalog: Option[CatalogTable] = catalogTable - override def deltaPath(): Path = path + override def deltaPath: Path = path - override def deltaSnapshot(): Snapshot = snapshot + override def deltaSnapshot: Snapshot = snapshot def cacheThis(): Unit = { deltaLog2Table.put(deltaLog, this) diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index d2c7cf5dfe32..c2d1ec47b9ca 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer +import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} import org.apache.spark.sql.types.StructType @@ -28,47 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass")) class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileFormat(metadata) { - protected var database = "" - protected var tableName = "" - protected var snapshotId = "" - protected var orderByKeyOption: Option[Seq[String]] = None - protected var lowCardKeyOption: Option[Seq[String]] = None - protected var minmaxIndexKeyOption: Option[Seq[String]] = None - protected var bfIndexKeyOption: Option[Seq[String]] = None - protected var setIndexKeyOption: Option[Seq[String]] = None - protected var primaryKeyOption: Option[Seq[String]] = None - protected var partitionColumns: Seq[String] = Seq.empty[String] - protected var clickhouseTableConfigs: Map[String, String] = Map.empty - - // scalastyle:off argcount - def this( - metadata: Metadata, - database: String, - tableName: String, - snapshotId: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - clickhouseTableConfigs: Map[String, String], - partitionColumns: Seq[String]) { - this(metadata) - this.database = database - this.tableName = tableName - this.snapshotId = snapshotId - this.orderByKeyOption = orderByKeyOption - this.lowCardKeyOption = lowCardKeyOption - this.minmaxIndexKeyOption = minmaxIndexKeyOption - this.bfIndexKeyOption = bfIndexKeyOption - this.setIndexKeyOption = setIndexKeyOption - this.primaryKeyOption = primaryKeyOption - this.clickhouseTableConfigs = clickhouseTableConfigs - this.partitionColumns = partitionColumns - } - // scalastyle:on argcount - override def shortName(): String = "mergetree" override def toString(): String = "MergeTree" @@ -98,6 +59,17 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma .getInstance() .nativeConf(options, "") + @transient val deltaMetaReader = DeltaMetaReader(metadata) + + val database = deltaMetaReader.storageDB + val tableName = deltaMetaReader.storageTable + val deltaPath = deltaMetaReader.storagePath + + val extensionTableBC = sparkSession.sparkContext.broadcast( + ClickhouseMetaSerializer + .forWrite(deltaMetaReader, metadata.schema) + .toByteArray) + new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { ".mergetree" @@ -107,25 +79,18 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + require(path == deltaPath) GlutenMergeTreeWriterInjects .getInstance() .asInstanceOf[CHMergeTreeWriterInjects] .createOutputWriter( path, - database, - tableName, - snapshotId, - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - partitionColumns, metadata.schema, - clickhouseTableConfigs, context, - nativeConf + nativeConf, + database, + tableName, + extensionTableBC.value ) } } diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala index dca14d7fb1fb..bac5231309b8 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala @@ -784,9 +784,6 @@ object DeltaLog extends DeltaLogging { FileSourceOptions.IGNORE_CORRUPT_FILES -> "false", FileSourceOptions.IGNORE_MISSING_FILES -> "false" ) - // --- modified start - // Don't need to add the bucketOption here, it handles the delta log meta json file - // --- modified end val fsRelation = HadoopFsRelation( index, index.partitionSchema, schema, None, index.format, allOptions)(spark) LogicalRelation(fsRelation) diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 8b4a13a30a69..f5f1668f60b5 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -30,7 +30,8 @@ import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, De import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} -import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -98,28 +99,19 @@ class ClickHouseTableV2( def getFileFormat(protocol: Protocol, meta: Metadata): DeltaMergeTreeFileFormat = { new DeltaMergeTreeFileFormat( protocol, - meta, - dataBaseName, - tableName, - ClickhouseSnapshot.genSnapshotId(initialSnapshot), - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - clickhouseTableConfigs, - partitionColumns - ) + StorageMeta.withMoreStorageInfo( + meta, + ClickhouseSnapshot.genSnapshotId(initialSnapshot), + deltaLog.dataPath)) } - override def deltaProperties(): ju.Map[String, String] = properties() + override def deltaProperties: Map[String, String] = properties().asScala.toMap - override def deltaCatalog(): Option[CatalogTable] = catalogTable + override def deltaCatalog: Option[CatalogTable] = catalogTable - override def deltaPath(): Path = path + override def deltaPath: Path = path - override def deltaSnapshot(): Snapshot = initialSnapshot + override def deltaSnapshot: Snapshot = initialSnapshot def cacheThis(): Unit = { ClickHouseTableV2.deltaLog2Table.put(deltaLog, this) @@ -133,7 +125,6 @@ class TempClickHouseTableV2( override val spark: SparkSession, override val catalogTable: Option[CatalogTable] = None) extends ClickHouseTableV2(spark, null, catalogTable) { - import collection.JavaConverters._ override def properties(): ju.Map[String, String] = catalogTable.get.properties.asJava override lazy val partitionColumns: Seq[String] = catalogTable.get.partitionColumnNames override def cacheThis(): Unit = {} diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index 36ab3fb6c138..1489ae4dbf49 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer +import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} import org.apache.spark.sql.types.StructType @@ -29,48 +31,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) extends DeltaParquetFileFormat(protocol, metadata) { - protected var database = "" - protected var tableName = "" - protected var snapshotId = "" - protected var orderByKeyOption: Option[Seq[String]] = None - protected var lowCardKeyOption: Option[Seq[String]] = None - protected var minmaxIndexKeyOption: Option[Seq[String]] = None - protected var bfIndexKeyOption: Option[Seq[String]] = None - protected var setIndexKeyOption: Option[Seq[String]] = None - protected var primaryKeyOption: Option[Seq[String]] = None - protected var partitionColumns: Seq[String] = Seq.empty[String] - protected var clickhouseTableConfigs: Map[String, String] = Map.empty - - // scalastyle:off argcount - def this( - protocol: Protocol, - metadata: Metadata, - database: String, - tableName: String, - snapshotId: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - clickhouseTableConfigs: Map[String, String], - partitionColumns: Seq[String]) = { - this(protocol, metadata) - this.database = database - this.tableName = tableName - this.snapshotId = snapshotId - this.orderByKeyOption = orderByKeyOption - this.lowCardKeyOption = lowCardKeyOption - this.minmaxIndexKeyOption = minmaxIndexKeyOption - this.bfIndexKeyOption = bfIndexKeyOption - this.setIndexKeyOption = setIndexKeyOption - this.primaryKeyOption = primaryKeyOption - this.clickhouseTableConfigs = clickhouseTableConfigs - this.partitionColumns = partitionColumns - } - // scalastyle:on argcount - override def shortName(): String = "mergetree" override def toString(): String = "MergeTree" @@ -99,6 +59,17 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) .getInstance() .nativeConf(options, "") + @transient val deltaMetaReader = DeltaMetaReader(metadata) + + val database = deltaMetaReader.storageDB + val tableName = deltaMetaReader.storageTable + val deltaPath = deltaMetaReader.storagePath + + val extensionTableBC = sparkSession.sparkContext.broadcast( + ClickhouseMetaSerializer + .forWrite(deltaMetaReader, metadata.schema) + .toByteArray) + new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { ".mergetree" @@ -108,25 +79,18 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + require(path == deltaPath) GlutenMergeTreeWriterInjects .getInstance() .asInstanceOf[CHMergeTreeWriterInjects] .createOutputWriter( path, - database, - tableName, - snapshotId, - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - partitionColumns, metadata.schema, - clickhouseTableConfigs, context, - nativeConf + nativeConf, + database, + tableName, + extensionTableBC.value ) } } diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java index c43775d85cbd..9d6ed6868ec1 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.clickhouse; -import org.apache.gluten.expression.ConverterUtils; - import java.util.List; import java.util.Map; @@ -25,8 +23,6 @@ public class ExtensionTableBuilder { private ExtensionTableBuilder() {} public static ExtensionTableNode makeExtensionTable( - Long minPartsNum, - Long maxPartsNum, String database, String tableName, String snapshotId, @@ -38,31 +34,28 @@ public static ExtensionTableNode makeExtensionTable( String bfIndexKey, String setIndexKey, String primaryKey, - List partList, - List starts, - List lengths, + ClickhousePartSerializer partSerializer, String tableSchemaJson, Map clickhouseTableConfigs, List preferredLocations) { + + String result = + ClickhouseMetaSerializer.apply( + database, + tableName, + snapshotId, + relativeTablePath, + absoluteTablePath, + orderByKey, + lowCardKey, + minmaxIndexKey, + bfIndexKey, + setIndexKey, + primaryKey, + partSerializer, + tableSchemaJson, + clickhouseTableConfigs); return new ExtensionTableNode( - minPartsNum, - maxPartsNum, - database, - tableName, - snapshotId, - relativeTablePath, - absoluteTablePath, - ConverterUtils.normalizeColName(orderByKey), - ConverterUtils.normalizeColName(lowCardKey), - ConverterUtils.normalizeColName(minmaxIndexKey), - ConverterUtils.normalizeColName(bfIndexKey), - ConverterUtils.normalizeColName(setIndexKey), - ConverterUtils.normalizeColName(primaryKey), - partList, - starts, - lengths, - tableSchemaJson, - clickhouseTableConfigs, - preferredLocations); + preferredLocations, result, partSerializer.pathList(absoluteTablePath)); } } diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java index 69629c8a09af..bb04652be440 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java @@ -19,158 +19,25 @@ import org.apache.gluten.backendsapi.BackendsApiManager; import org.apache.gluten.substrait.rel.SplitInfo; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.StringValue; import io.substrait.proto.ReadRel; -import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.Map; public class ExtensionTableNode implements SplitInfo { - private static final String MERGE_TREE = "MergeTree;"; - private Long minPartsNum; - private Long maxPartsNum; - private String database; - private String tableName; - private String snapshotId; - private String relativePath; - private String absolutePath; - private String tableSchemaJson; - private StringBuffer extensionTableStr = new StringBuffer(MERGE_TREE); - private StringBuffer partPathList = new StringBuffer(""); private final List preferredLocations = new ArrayList<>(); - - private String orderByKey; - - private String primaryKey; - - private String lowCardKey; - private String minmaxIndexKey; - private String bfIndexKey; - private String setIndexKey; - - private List partList; - private List starts; - private List lengths; - - private Map clickhouseTableConfigs; + private final String serializerResult; + private final scala.collection.Seq pathList; ExtensionTableNode( - Long minPartsNum, - Long maxPartsNum, - String database, - String tableName, - String snapshotId, - String relativePath, - String absolutePath, - String orderByKey, - String lowCardKey, - String minmaxIndexKey, - String bfIndexKey, - String setIndexKey, - String primaryKey, - List partList, - List starts, - List lengths, - String tableSchemaJson, - Map clickhouseTableConfigs, - List preferredLocations) { - this.minPartsNum = minPartsNum; - this.maxPartsNum = maxPartsNum; - this.database = database; - this.tableName = tableName; - this.snapshotId = snapshotId; - URI table_uri = URI.create(relativePath); - if (table_uri.getPath().startsWith("/")) { // file:///tmp/xxx => tmp/xxx - this.relativePath = table_uri.getPath().substring(1); - } else { - this.relativePath = table_uri.getPath(); - } - this.absolutePath = absolutePath; - this.tableSchemaJson = tableSchemaJson; - this.orderByKey = orderByKey; - this.lowCardKey = lowCardKey; - this.minmaxIndexKey = minmaxIndexKey; - this.bfIndexKey = bfIndexKey; - this.setIndexKey = setIndexKey; - this.primaryKey = primaryKey; - this.partList = partList; - this.starts = starts; - this.lengths = lengths; - this.clickhouseTableConfigs = clickhouseTableConfigs; + List preferredLocations, + String serializerResult, + scala.collection.Seq pathList) { + this.pathList = pathList; this.preferredLocations.addAll(preferredLocations); - - // New: MergeTree;{database}\n{table}\n{orderByKey}\n{primaryKey}\n{relative_path}\n - // {part_path1}\n{part_path2}\n... - long end = 0; - for (int i = 0; i < this.partList.size(); i++) { - end = this.starts.get(i) + this.lengths.get(i); - partPathList - .append(this.partList.get(i)) - .append("\n") - .append(this.starts.get(i)) - .append("\n") - .append(end) - .append("\n"); - } - - extensionTableStr - .append(this.database) - .append("\n") - .append(this.tableName) - .append("\n") - .append(this.snapshotId) - .append("\n") - .append(this.tableSchemaJson) - .append("\n") - .append(this.orderByKey) - .append("\n"); - - if (!this.orderByKey.isEmpty() && !this.orderByKey.equals("tuple()")) { - extensionTableStr.append(this.primaryKey).append("\n"); - } - extensionTableStr.append(this.lowCardKey).append("\n"); - extensionTableStr.append(this.minmaxIndexKey).append("\n"); - extensionTableStr.append(this.bfIndexKey).append("\n"); - extensionTableStr.append(this.setIndexKey).append("\n"); - extensionTableStr.append(this.relativePath).append("\n"); - extensionTableStr.append(this.absolutePath).append("\n"); - - if (this.clickhouseTableConfigs != null && !this.clickhouseTableConfigs.isEmpty()) { - ObjectMapper objectMapper = new ObjectMapper(); - try { - String clickhouseTableConfigsJson = - objectMapper - .writeValueAsString(this.clickhouseTableConfigs) - .replaceAll("\\\n", "") - .replaceAll(" ", ""); - extensionTableStr.append(clickhouseTableConfigsJson).append("\n"); - } catch (Exception e) { - extensionTableStr.append("").append("\n"); - } - } else { - extensionTableStr.append("").append("\n"); - } - extensionTableStr.append(partPathList); - /* old format - if (!this.partList.isEmpty()) { - } else { - // Old: MergeTree;{database}\n{table}\n{relative_path}\n{min_part}\n{max_part}\n - extensionTableStr - .append(database) - .append("\n") - .append(tableName) - .append("\n") - .append(relativePath) - .append("\n") - .append(this.minPartsNum) - .append("\n") - .append(this.maxPartsNum) - .append("\n"); - } */ + this.serializerResult = serializerResult; } @Override @@ -180,27 +47,22 @@ public List preferredLocations() { @Override public ReadRel.ExtensionTable toProtobuf() { - ReadRel.ExtensionTable.Builder extensionTableBuilder = ReadRel.ExtensionTable.newBuilder(); - StringValue extensionTable = - StringValue.newBuilder().setValue(extensionTableStr.toString()).build(); - extensionTableBuilder.setDetail( - BackendsApiManager.getTransformerApiInstance().packPBMessage(extensionTable)); - return extensionTableBuilder.build(); + return toProtobuf(serializerResult); } - public String getRelativePath() { - return relativePath; + public scala.collection.Seq getPartList() { + return pathList; } - public String getAbsolutePath() { - return absolutePath; - } - - public List getPartList() { - return partList; + public String getExtensionTableStr() { + return serializerResult; } - public String getExtensionTableStr() { - return extensionTableStr.toString(); + public static ReadRel.ExtensionTable toProtobuf(String result) { + ReadRel.ExtensionTable.Builder extensionTableBuilder = ReadRel.ExtensionTable.newBuilder(); + StringValue extensionTable = StringValue.newBuilder().setValue(result).build(); + extensionTableBuilder.setDetail( + BackendsApiManager.getTransformerApiInstance().packPBMessage(extensionTable)); + return extensionTableBuilder.build(); } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 9585d9d5f235..0a3dbc3f5a37 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.shuffle.CHColumnarShuffleWriter import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.FilePartition -import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, ExtensionTableNode} +import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, ExtensionTableNode} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper @@ -131,20 +131,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { properties: Map[String, String]): SplitInfo = { partition match { case p: GlutenMergeTreePartition => - val partLists = new JArrayList[String]() - val starts = new JArrayList[JLong]() - val lengths = new JArrayList[JLong]() - p.partList - .foreach( - parts => { - partLists.add(parts.name) - starts.add(parts.start) - lengths.add(parts.length) - }) ExtensionTableBuilder .makeExtensionTable( - -1L, - -1L, p.database, p.table, p.snapshotId, @@ -156,9 +144,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { p.bfIndexKey, p.setIndexKey, p.primaryKey, - partLists, - starts, - lengths, + ClickhousePartSerializer.fromMergeTreePartSplits(p.partList.toSeq), p.tableSchemaJson, p.clickhouseTableConfigs.asJava, CHAffinity.getNativeMergeTreePartitionLocations(p).toList.asJava @@ -222,27 +208,23 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { val planByteArray = wsCtx.root.toProtobuf.toByteArray splitInfos.zipWithIndex.map { case (splits, index) => - val files = ArrayBuffer[String]() - val splitInfosByteArray = splits.zipWithIndex.map { + val (splitInfosByteArray, files) = splits.zipWithIndex.map { case (split, i) => split match { case filesNode: LocalFilesNode => setFileSchemaForLocalFiles(filesNode, scans(i)) - filesNode.getPaths.forEach(f => files += f) - filesNode.toProtobuf.toByteArray + (filesNode.toProtobuf.toByteArray, filesNode.getPaths.asScala.toSeq) case extensionTableNode: ExtensionTableNode => - extensionTableNode.getPartList.forEach( - name => files += extensionTableNode.getAbsolutePath + "/" + name) - extensionTableNode.toProtobuf.toByteArray + (extensionTableNode.toProtobuf.toByteArray, extensionTableNode.getPartList) } - } + }.unzip GlutenPartition( index, planByteArray, splitInfosByteArray.toArray, locations = splits.flatMap(_.preferredLocations().asScala).toArray, - files.toArray + files.flatten.toArray ) } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 2cf1f4fcc45b..ba9d859bc9cd 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -19,7 +19,7 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.gluten.backendsapi.RuleApi import org.apache.gluten.extension._ import org.apache.gluten.extension.columnar._ -import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast, TransformPreOverrides} +import org.apache.gluten.extension.columnar.MiscColumnarRules._ import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions} import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector} @@ -28,7 +28,7 @@ import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlP import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.utils.PhysicalPlanSelector -import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite} +import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter} import org.apache.spark.util.SparkPlanRules @@ -44,7 +44,7 @@ class CHRuleApi extends RuleApi { } private object CHRuleApi { - def injectSpark(injector: SparkInjector): Unit = { + private def injectSpark(injector: SparkInjector): Unit = { // Inject the regular Spark rules directly. injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply) injector.injectQueryStagePrepRule(spark => CHAQEPropagateEmptyRelation(spark)) @@ -61,9 +61,10 @@ private object CHRuleApi { injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark)) injector.injectOptimizerRule(_ => CountDistinctWithoutExpand) injector.injectOptimizerRule(_ => EqualToRewrite) + CHExtendRule.injectSpark(injector) } - def injectLegacy(injector: LegacyInjector): Unit = { + private def injectLegacy(injector: LegacyInjector): Unit = { // Gluten columnar: Transform rules. injector.injectTransform(_ => RemoveTransitions) injector.injectTransform(_ => PushDownInputFileExpression.PreOffload) @@ -107,7 +108,7 @@ private object CHRuleApi { injector.injectFinal(_ => RemoveFallbackTagRule()) } - def injectRas(injector: RasInjector): Unit = { + private def injectRas(injector: RasInjector): Unit = { // CH backend doesn't work with RAS at the moment. Inject a rule that aborts any // execution calls. injector.inject( diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala index f28e9e8fe468..e3f643046671 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala @@ -125,11 +125,6 @@ object ClickhouseSnapshot { // use timestamp + version as the snapshot id for ch backend def genSnapshotId(snapshot: Snapshot): String = { // When CTAS, there is no latest timestamp in the Snapshot - val ts = if (snapshot.metadata.createdTime.isDefined) { - snapshot.metadata.createdTime.get - } else { - System.currentTimeMillis() - } - ts.toString + "_" + snapshot.version.toString + s"${snapshot.metadata.createdTime.getOrElse(System.currentTimeMillis())}_${snapshot.version}" } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala index 4d01c3798d51..062e96962297 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala @@ -16,140 +16,40 @@ */ package org.apache.spark.sql.delta.catalog -import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.expression.ConverterUtils.normalizeColName -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.delta.Snapshot -import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.delta.actions.Metadata +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.mergetree.{StorageMeta, TablePropertiesReader} import org.apache.hadoop.fs.Path -import java.{util => ju} +trait ClickHouseTableV2Base extends TablePropertiesReader { -trait ClickHouseTableV2Base { + def deltaProperties: Map[String, String] - val DEFAULT_DATABASE = "clickhouse_db" + def deltaCatalog: Option[CatalogTable] - def deltaProperties(): ju.Map[String, String] + def deltaPath: Path - def deltaCatalog(): Option[CatalogTable] + def deltaSnapshot: Snapshot - def deltaPath(): Path + def configuration: Map[String, String] = deltaProperties - def deltaSnapshot(): Snapshot + def metadata: Metadata = deltaSnapshot.metadata - lazy val dataBaseName = deltaCatalog - .map(_.identifier.database.getOrElse("default")) - .getOrElse(DEFAULT_DATABASE) + lazy val dataBaseName: String = deltaCatalog + .map(_.identifier.database.getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE)) + .getOrElse(StorageMeta.DEFAULT_PATH_BASED_DATABASE) - lazy val tableName = deltaCatalog + lazy val tableName: String = deltaCatalog .map(_.identifier.table) .getOrElse(deltaPath.toUri.getPath) - lazy val bucketOption: Option[BucketSpec] = { - val tableProperties = deltaProperties - if (tableProperties.containsKey("numBuckets")) { - val numBuckets = tableProperties.get("numBuckets").trim.toInt - val bucketColumnNames: Seq[String] = - getCommaSeparatedColumns("bucketColumnNames").getOrElse(Seq.empty[String]) - val sortColumnNames: Seq[String] = - getCommaSeparatedColumns("orderByKey").getOrElse(Seq.empty[String]) - Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) - } else { - None - } - } - - lazy val lowCardKeyOption: Option[Seq[String]] = { - getCommaSeparatedColumns("lowCardKey") - } - - lazy val minmaxIndexKeyOption: Option[Seq[String]] = { - getCommaSeparatedColumns("minmaxIndexKey") - } - - lazy val bfIndexKeyOption: Option[Seq[String]] = { - getCommaSeparatedColumns("bloomfilterIndexKey") - } - - lazy val setIndexKeyOption: Option[Seq[String]] = { - getCommaSeparatedColumns("setIndexKey") - } - - private def getCommaSeparatedColumns(keyName: String) = { - val tableProperties = deltaProperties - if (tableProperties.containsKey(keyName)) { - if (tableProperties.get(keyName).nonEmpty) { - val keys = tableProperties - .get(keyName) - .split(",") - .map(n => ConverterUtils.normalizeColName(n.trim)) - .toSeq - keys.foreach( - s => { - if (s.contains(".")) { - throw new IllegalStateException( - s"$keyName $s can not contain '.' (not support nested column yet)") - } - }) - Some(keys) - } else { - None - } - } else { - None - } - } - - lazy val orderByKeyOption: Option[Seq[String]] = { - if (bucketOption.isDefined && bucketOption.get.sortColumnNames.nonEmpty) { - val orderByKeys = bucketOption.get.sortColumnNames.map(normalizeColName).toSeq - val invalidKeys = orderByKeys.intersect(partitionColumns) - if (invalidKeys.nonEmpty) { - throw new IllegalStateException( - s"partition cols $invalidKeys can not be in the order by keys.") - } - Some(orderByKeys) - } else { - val orderByKeys = getCommaSeparatedColumns("orderByKey") - if (orderByKeys.isDefined) { - val invalidKeys = orderByKeys.get.intersect(partitionColumns) - if (invalidKeys.nonEmpty) { - throw new IllegalStateException( - s"partition cols $invalidKeys can not be in the order by keys.") - } - orderByKeys - } else { - None - } - } - } - - lazy val primaryKeyOption: Option[Seq[String]] = { - if (orderByKeyOption.isDefined) { - val primaryKeys = getCommaSeparatedColumns("primaryKey") - if ( - primaryKeys.isDefined && !orderByKeyOption.get - .mkString(",") - .startsWith(primaryKeys.get.mkString(",")) - ) { - throw new IllegalStateException( - s"Primary key $primaryKeys must be a prefix of the sorting key") - } - primaryKeys - } else { - None - } - } - - lazy val partitionColumns = deltaSnapshot.metadata.partitionColumns.map(normalizeColName).toSeq - lazy val clickhouseTableConfigs: Map[String, String] = { - val tableProperties = deltaProperties() - val configs = scala.collection.mutable.Map[String, String]() - configs += ("storage_policy" -> tableProperties.getOrDefault("storage_policy", "default")) - configs.toMap + Map("storage_policy" -> deltaProperties.getOrElse("storage_policy", "default")) } def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index 5337e4d31388..341018fb590b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, import org.apache.spark.sql.delta._ import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.execution.commands.GlutenCacheBase._ -import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder -import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder} +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.types.{BooleanType, StringType} @@ -169,13 +169,7 @@ case class GlutenCHCacheDataCommand( val executorId = value._1 if (parts.nonEmpty) { val onePart = parts(0) - val partNameList = parts.map(_.name).toSeq - // starts and lengths is useless for write - val partRanges = Seq.range(0L, partNameList.length).map(_ => long2Long(0L)).asJava - val extensionTableNode = ExtensionTableBuilder.makeExtensionTable( - -1, - -1, onePart.database, onePart.table, ClickhouseSnapshot.genSnapshotId(snapshot), @@ -188,9 +182,7 @@ case class GlutenCHCacheDataCommand( snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""), snapshot.metadata.configuration.getOrElse("setIndexKey", ""), snapshot.metadata.configuration.getOrElse("primaryKey", ""), - partNameList.asJava, - partRanges, - partRanges, + ClickhousePartSerializer.fromPartNames(parts.map(_.name).toSeq), ConverterUtils.convertNamedStructJson(snapshot.metadata.schema), snapshot.metadata.configuration.asJava, new JList[String]() diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala new file mode 100644 index 000000000000..5c863d76c947 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.clickhouse + +import org.apache.gluten.execution.MergeTreePartSplit +import org.apache.gluten.expression.ConverterUtils + +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.mergetree.{DeltaMetaReader, StorageMeta} +import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts +import org.apache.spark.sql.types.StructType + +import com.fasterxml.jackson.databind.ObjectMapper +import io.substrait.proto.ReadRel + +import java.net.URI +import java.util.{Map => jMap} + +import scala.collection.JavaConverters._ + +case class ClickhousePartSerializer( + partList: Seq[String], + starts: Seq[Long], + lengths: Seq[Long] +) { + def apply(): StringBuilder = { + val partPathList = new StringBuilder + for (i <- partList.indices) { + val end = starts(i) + lengths(i) + partPathList + .append(partList(i)) + .append("\n") + .append(starts(i)) + .append("\n") + .append(end) + .append("\n") + } + partPathList + } + + // TODO: remove pathList + def pathList(absolutePath: String): Seq[String] = { + partList.map(name => absolutePath + "/" + name) + } +} + +object ClickhousePartSerializer { + def fromMergeTreePartSplits(partLists: Seq[MergeTreePartSplit]): ClickhousePartSerializer = { + val partList = partLists.map(_.name) + val starts = partLists.map(_.start) + val lengths = partLists.map(_.length) + ClickhousePartSerializer(partList, starts, lengths) + } + + def fromAddMergeTreeParts(parts: Seq[AddMergeTreeParts]): ClickhousePartSerializer = { + val partList = parts.map(_.name) + val starts = parts.map(_ => 0L) + val lengths = parts.map(_.marks) + ClickhousePartSerializer(partList, starts, lengths) + } + + def fromPartNames(partNames: Seq[String]): ClickhousePartSerializer = { + // starts and lengths is useless for writing + val partRanges = Seq.range(0L, partNames.length) + ClickhousePartSerializer(partNames, partRanges, partRanges) + } +} + +object ClickhouseMetaSerializer { + + def forWrite(deltaMetaReader: DeltaMetaReader, dataSchema: StructType): ReadRel.ExtensionTable = { + val clickhouseTableConfigs = deltaMetaReader.writeConfiguration + + val orderByKey = clickhouseTableConfigs("storage_orderByKey") + val lowCardKey = clickhouseTableConfigs("storage_lowCardKey") + val minmaxIndexKey = clickhouseTableConfigs("storage_minmaxIndexKey") + val bfIndexKey = clickhouseTableConfigs("storage_bfIndexKey") + val setIndexKey = clickhouseTableConfigs("storage_setIndexKey") + val primaryKey = clickhouseTableConfigs("storage_primaryKey") + + val result = apply( + deltaMetaReader.storageDB, + deltaMetaReader.storageTable, + deltaMetaReader.storageSnapshotId, + deltaMetaReader.storagePath, + "", // absolutePath + orderByKey, + lowCardKey, + minmaxIndexKey, + bfIndexKey, + setIndexKey, + primaryKey, + ClickhousePartSerializer.fromPartNames(Seq()), + ConverterUtils.convertNamedStructJson(dataSchema), + clickhouseTableConfigs.filter(_._1 == "storage_policy").asJava + ) + ExtensionTableNode.toProtobuf(result) + + } + // scalastyle:off argcount + def apply1( + database: String, + tableName: String, + snapshotId: String, + relativePath: String, + absolutePath: String, + orderByKeyOption: Option[Seq[String]], + lowCardKeyOption: Option[Seq[String]], + minmaxIndexKeyOption: Option[Seq[String]], + bfIndexKeyOption: Option[Seq[String]], + setIndexKeyOption: Option[Seq[String]], + primaryKeyOption: Option[Seq[String]], + partSerializer: ClickhousePartSerializer, + tableSchemaJson: String, + clickhouseTableConfigs: jMap[String, String]): ReadRel.ExtensionTable = { + + val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( + orderByKeyOption, + primaryKeyOption + ) + + val result = apply( + database, + tableName, + snapshotId, + relativePath, + absolutePath, + orderByKey0, + lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + minmaxIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + primaryKey0, + partSerializer, + tableSchemaJson, + clickhouseTableConfigs + ) + ExtensionTableNode.toProtobuf(result) + } + + def apply( + database: String, + tableName: String, + snapshotId: String, + relativePath: String, + absolutePath: String, + orderByKey0: String, + lowCardKey0: String, + minmaxIndexKey0: String, + bfIndexKey0: String, + setIndexKey0: String, + primaryKey0: String, + partSerializer: ClickhousePartSerializer, + tableSchemaJson: String, + clickhouseTableConfigs: jMap[String, String]): String = { + // scalastyle:on argcount + + // New: MergeTree;{database}\n{table}\n{orderByKey}\n{primaryKey}\n{relative_path}\n + // {part_path1}\n{part_path2}\n... + val extensionTableStr = new StringBuilder(StorageMeta.SERIALIZER_HEADER) + + val orderByKey = ConverterUtils.normalizeColName(orderByKey0) + val lowCardKey = ConverterUtils.normalizeColName(lowCardKey0) + val minmaxIndexKey = ConverterUtils.normalizeColName(minmaxIndexKey0) + val bfIndexKey = ConverterUtils.normalizeColName(bfIndexKey0) + val setIndexKey = ConverterUtils.normalizeColName(setIndexKey0) + val primaryKey = ConverterUtils.normalizeColName(primaryKey0) + + extensionTableStr + .append(database) + .append("\n") + .append(tableName) + .append("\n") + .append(snapshotId) + .append("\n") + .append(tableSchemaJson) + .append("\n") + .append(orderByKey) + .append("\n") + + if (orderByKey.nonEmpty && !(orderByKey == "tuple()")) { + extensionTableStr.append(primaryKey).append("\n") + } + + extensionTableStr.append(lowCardKey).append("\n") + extensionTableStr.append(minmaxIndexKey).append("\n") + extensionTableStr.append(bfIndexKey).append("\n") + extensionTableStr.append(setIndexKey).append("\n") + extensionTableStr.append(normalizeRelativePath(relativePath)).append("\n") + extensionTableStr.append(absolutePath).append("\n") + appendConfigs(extensionTableStr, clickhouseTableConfigs) + extensionTableStr.append(partSerializer()) + + extensionTableStr.toString() + } + + private def normalizeRelativePath(relativePath: String): String = { + val table_uri = URI.create(relativePath) + if (table_uri.getPath.startsWith("/")) { + table_uri.getPath.substring(1) + } else table_uri.getPath + } + + private def appendConfigs( + extensionTableStr: StringBuilder, + clickhouseTableConfigs: jMap[String, String]): Unit = { + if (clickhouseTableConfigs != null && !clickhouseTableConfigs.isEmpty) { + val objectMapper: ObjectMapper = new ObjectMapper + try { + val clickhouseTableConfigsJson: String = objectMapper + .writeValueAsString(clickhouseTableConfigs) + .replaceAll("\n", "") + .replaceAll(" ", "") + extensionTableStr.append(clickhouseTableConfigsJson).append("\n") + } catch { + case e: Exception => + extensionTableStr.append("\n") + } + } else extensionTableStr.append("\n") + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala similarity index 75% rename from backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala rename to backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala index 6b2af0953f00..854c6f91c917 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.utils +package org.apache.spark.sql.execution.datasources.clickhouse.utils import org.apache.gluten.expression.ConverterUtils.normalizeColName @@ -25,18 +25,13 @@ object MergeTreeDeltaUtil { def genOrderByAndPrimaryKeyStr( orderByKeyOption: Option[Seq[String]], primaryKeyOption: Option[Seq[String]]): (String, String) = { + val orderByKey = - if (orderByKeyOption.isDefined && orderByKeyOption.get.nonEmpty) { - columnsToStr(orderByKeyOption) - } else DEFAULT_ORDER_BY_KEY - - val primaryKey = - if ( - !orderByKey.equals(DEFAULT_ORDER_BY_KEY) && primaryKeyOption.isDefined && - primaryKeyOption.get.nonEmpty - ) { - columnsToStr(primaryKeyOption) - } else "" + orderByKeyOption.filter(_.nonEmpty).map(columnsToStr).getOrElse(DEFAULT_ORDER_BY_KEY) + val primaryKey = primaryKeyOption + .filter(p => orderByKey != DEFAULT_ORDER_BY_KEY && p.nonEmpty) + .map(columnsToStr) + .getOrElse("") (orderByKey, primaryKey) } @@ -45,4 +40,8 @@ object MergeTreeDeltaUtil { case Some(keys) => keys.map(normalizeColName).mkString(",") case None => "" } + + def columnsToStr(keys: Seq[String]): String = { + keys.map(normalizeColName).mkString(",") + } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala similarity index 96% rename from backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala rename to backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala index c2eb338261fc..dc31822fd73e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.utils +package org.apache.spark.sql.execution.datasources.clickhouse.utils import org.apache.gluten.backendsapi.clickhouse.{CHBackendSettings, CHConf} import org.apache.gluten.execution.{GlutenMergeTreePartition, MergeTreePartRange, MergeTreePartSplit} @@ -35,7 +35,8 @@ import org.apache.spark.sql.delta.ClickhouseSnapshot import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, HadoopFsRelation, PartitionDirectory} -import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, MergeTreePartFilterReturnedRange} +import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, MergeTreePartFilterReturnedRange} +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.types.BooleanType @@ -48,7 +49,6 @@ import com.google.protobuf.{Any, StringValue} import io.substrait.proto.NamedStruct import io.substrait.proto.Plan -import java.lang.{Long => JLong} import java.util import java.util.{ArrayList => JArrayList} @@ -78,14 +78,15 @@ object MergeTreePartsPartitionsUtil extends Logging { val fileIndex = relation.location.asInstanceOf[TahoeFileIndex] // when querying, use deltaLog.update(true) to get the staleness acceptable snapshot - val snapshotId = ClickhouseSnapshot.genSnapshotId(table.deltaLog.update(true)) + val snapshotId = + ClickhouseSnapshot.genSnapshotId(table.deltaLog.update(stalenessAcceptable = true)) val partitions = new ArrayBuffer[InputPartition] val (database, tableName) = if (table.catalogTable.isDefined) { (table.catalogTable.get.identifier.database.get, table.catalogTable.get.identifier.table) } else { // for file_format.`file_path` - (table.DEFAULT_DATABASE, table.deltaPath.toUri.getPath) + (StorageMeta.DEFAULT_PATH_BASED_DATABASE, table.deltaPath.toUri.getPath) } val engine = "MergeTree" val relativeTablePath = fileIndex.deltaLog.dataPath.toUri.getPath.substring(1) @@ -457,9 +458,10 @@ object MergeTreePartsPartitionsUtil extends Logging { val ret = ClickhouseSnapshot.pathToAddMTPCache.getIfPresent(path) if (ret == null) { val keys = ClickhouseSnapshot.pathToAddMTPCache.asMap().keySet() - val keySample = keys.isEmpty() match { - case true => "" - case false => keys.iterator().next() + val keySample = if (keys.isEmpty) { + "" + } else { + keys.iterator().next() } throw new IllegalStateException( "Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " + @@ -574,20 +576,8 @@ object MergeTreePartsPartitionsUtil extends Logging { case (l1, l2) => l1.sum / l2.sum } - val partLists = new JArrayList[String]() - val starts = new JArrayList[JLong]() - val lengths = new JArrayList[JLong]() - selectPartsFiles.foreach( - part => { - partLists.add(part.name) - starts.add(0) - lengths.add(part.marks) - }) - val extensionTableNode = ExtensionTableBuilder .makeExtensionTable( - -1L, - -1L, database, tableName, snapshotId, @@ -599,9 +589,7 @@ object MergeTreePartsPartitionsUtil extends Logging { table.bfIndexKey(), table.setIndexKey(), table.primaryKey(), - partLists, - starts, - lengths, + ClickhousePartSerializer.fromAddMergeTreeParts(selectPartsFiles), tableSchemaJson, clickhouseTableConfigs.asJava, new JArrayList[String]() diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala new file mode 100644 index 000000000000..de322b65dd8e --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.mergetree + +import org.apache.spark.sql.delta.actions.Metadata + +class DeltaMetaReader( + override val metadata: Metadata, + override val configuration: Map[String, String]) + extends TablePropertiesReader { + + def storageDB: String = configuration(StorageMeta.STORAGE_DB) + def storageTable: String = configuration(StorageMeta.STORAGE_TABLE) + def storageSnapshotId: String = configuration(StorageMeta.STORAGE_SNAPSHOT_ID) + def storagePath: String = configuration(StorageMeta.STORAGE_PATH) +} + +object DeltaMetaReader { + def apply(metadata: Metadata): DeltaMetaReader = { + new DeltaMetaReader(metadata, metadata.configuration) + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala new file mode 100644 index 000000000000..e08f91450ec2 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.mergetree + +import org.apache.gluten.expression.ConverterUtils.normalizeColName + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.delta.actions.Metadata +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil + +import org.apache.hadoop.fs.Path + +import scala.collection.mutable.ListBuffer + +/** Reserved table property for MergeTree table. */ +object StorageMeta { + val Provider: String = "clickhouse" + val DEFAULT_FILE_FORMAT: String = "write.format.default" + val DEFAULT_FILE_FORMAT_DEFAULT: String = "mergetree" + + // Storage properties + val DEFAULT_PATH_BASED_DATABASE: String = "clickhouse_db" + val DEFAULT_CREATE_TABLE_DATABASE: String = "default" + val STORAGE_DB: String = "storage_db" + val STORAGE_TABLE: String = "storage_table" + val STORAGE_SNAPSHOT_ID: String = "storage_snapshot_id" + val STORAGE_PATH: String = "storage_path" + val SERIALIZER_HEADER: String = "MergeTree;" + + def withMoreStorageInfo( + metadata: Metadata, + snapshotId: String, + deltaPath: Path, + database: String, + tableName: String): Metadata = { + val moreOptions = Seq( + STORAGE_DB -> database, + STORAGE_SNAPSHOT_ID -> snapshotId, + STORAGE_TABLE -> tableName, + STORAGE_PATH -> deltaPath.toString) + withMoreOptions(metadata, moreOptions) + } + def withMoreStorageInfo(metadata: Metadata, snapshotId: String, deltaPath: Path): Metadata = { + val moreOptions = + ListBuffer(STORAGE_SNAPSHOT_ID -> snapshotId, STORAGE_PATH -> deltaPath.toString) + // Path-based create table statement does not have storage_db and storage_table + if (!metadata.configuration.contains(STORAGE_DB)) { + moreOptions += STORAGE_DB -> DEFAULT_PATH_BASED_DATABASE + } + if (!metadata.configuration.contains(STORAGE_TABLE)) { + moreOptions += STORAGE_TABLE -> deltaPath.toUri.getPath + } + withMoreOptions(metadata, moreOptions.toSeq) + } + + private def withMoreOptions(metadata: Metadata, newOptions: Seq[(String, String)]): Metadata = { + metadata.copy(configuration = metadata.configuration ++ newOptions) + } +} + +trait TablePropertiesReader { + + def configuration: Map[String, String] + + /** delta */ + def metadata: Metadata + + private def getCommaSeparatedColumns(keyName: String): Option[Seq[String]] = { + configuration.get(keyName).map { + v => + val keys = v.split(",").map(n => normalizeColName(n.trim)).toSeq + keys.foreach { + s => + if (s.contains(".")) { + throw new IllegalStateException( + s"$keyName $s can not contain '.' (not support nested column yet)") + } + } + keys + } + } + + lazy val bucketOption: Option[BucketSpec] = { + val tableProperties = configuration + if (tableProperties.contains("numBuckets")) { + val numBuckets = tableProperties("numBuckets").trim.toInt + val bucketColumnNames: Seq[String] = + getCommaSeparatedColumns("bucketColumnNames").getOrElse(Seq.empty[String]) + val sortColumnNames: Seq[String] = + getCommaSeparatedColumns("orderByKey").getOrElse(Seq.empty[String]) + Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) + } else { + None + } + } + + lazy val lowCardKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("lowCardKey") + } + + lazy val minmaxIndexKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("minmaxIndexKey") + } + + lazy val bfIndexKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("bloomfilterIndexKey") + } + + lazy val setIndexKeyOption: Option[Seq[String]] = { + getCommaSeparatedColumns("setIndexKey") + } + + lazy val partitionColumns: Seq[String] = + metadata.partitionColumns.map(normalizeColName) + + lazy val orderByKeyOption: Option[Seq[String]] = { + val orderByKeys = + if (bucketOption.exists(_.sortColumnNames.nonEmpty)) { + bucketOption.map(_.sortColumnNames.map(normalizeColName)) + } else { + getCommaSeparatedColumns("orderByKey") + } + orderByKeys + .map(_.intersect(partitionColumns)) + .filter(_.nonEmpty) + .foreach { + invalidKeys => + throw new IllegalStateException( + s"partition cols $invalidKeys can not be in the order by keys.") + } + orderByKeys + } + + lazy val primaryKeyOption: Option[Seq[String]] = { + orderByKeyOption.map(_.mkString(",")).flatMap { + orderBy => + val primaryKeys = getCommaSeparatedColumns("primaryKey") + primaryKeys + .map(_.mkString(",")) + .filterNot(orderBy.startsWith) + .foreach( + primaryKey => + throw new IllegalStateException( + s"Primary key $primaryKey must be a prefix of the sorting key $orderBy")) + primaryKeys + } + } + + lazy val writeConfiguration: Map[String, String] = { + val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( + orderByKeyOption, + primaryKeyOption + ) + Map( + "storage_policy" -> configuration.getOrElse("storage_policy", "default"), + "storage_orderByKey" -> orderByKey0, + "storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + "storage_minmaxIndexKey" -> minmaxIndexKeyOption + .map(MergeTreeDeltaUtil.columnsToStr) + .getOrElse(""), + "storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + "storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + "storage_primaryKey" -> primaryKey0 + ) + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala index 36b2a44d13b4..69c001e461d8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala @@ -83,26 +83,6 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase { // TODO: parquet and mergetree OrcUtils.inferSchema(sparkSession, files, options) } - - // scalastyle:off argcount - /** For CH MergeTree format */ - def createOutputWriter( - path: String, - database: String, - tableName: String, - snapshotId: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - partitionColumns: Seq[String], - tableSchema: StructType, - clickhouseTableConfigs: Map[String, String], - context: TaskAttemptContext, - nativeConf: java.util.Map[String, String]): OutputWriter = null - // scalastyle:on argcount } class CHRowSplitter extends GlutenRowSplitter { diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala index cb7dfa8f3835..521b59d60e29 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.v1 import org.apache.gluten.expression.ConverterUtils -import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.`type`.ColumnTypeNode import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder @@ -27,8 +26,7 @@ import org.apache.gluten.utils.ConfigUtil import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, OutputWriter} -import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder -import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhouseMetaSerializer, ClickhousePartSerializer} import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeOutputWriter import org.apache.spark.sql.types.StructType @@ -37,7 +35,7 @@ import com.google.protobuf.{Any, StringValue} import io.substrait.proto.NamedStruct import org.apache.hadoop.mapreduce.TaskAttemptContext -import java.util.{ArrayList => JList, Map => JMap, UUID} +import java.util.{Map => JMap, UUID} import scala.collection.JavaConverters._ @@ -57,50 +55,22 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects { context: TaskAttemptContext, nativeConf: JMap[String, String]): OutputWriter = null - // scalastyle:off argcount - override def createOutputWriter( + override val formatName: String = "mergetree" + + def createOutputWriter( path: String, + dataSchema: StructType, + context: TaskAttemptContext, + nativeConf: JMap[String, String], database: String, tableName: String, - snapshotId: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - partitionColumns: Seq[String], - tableSchema: StructType, - clickhouseTableConfigs: Map[String, String], - context: TaskAttemptContext, - nativeConf: JMap[String, String]): OutputWriter = { - - val uuid = UUID.randomUUID.toString - - val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel( - path, - database, - tableName, - snapshotId, - orderByKeyOption, - lowCardKeyOption, - minmaxIndexKeyOption, - bfIndexKeyOption, - setIndexKeyOption, - primaryKeyOption, - partitionColumns, - Seq(), - ConverterUtils.convertNamedStructJson(tableSchema), - clickhouseTableConfigs, - // use table schema instead of data schema - SparkShimLoader.getSparkShims.attributesFromStruct(tableSchema) - ) + splitInfo: Array[Byte]): OutputWriter = { val datasourceJniWrapper = new CHDatasourceJniWrapper() val instance = datasourceJniWrapper.nativeInitMergeTreeWriterWrapper( - planWithSplitInfo.plan, - planWithSplitInfo.splitInfo, - uuid, + null, + splitInfo, + UUID.randomUUID.toString, context.getTaskAttemptID.getTaskID.getId.toString, context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"), context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"), @@ -109,10 +79,6 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects { new MergeTreeOutputWriter(database, tableName, datasourceJniWrapper, instance, path) } - // scalastyle:on argcount - - override val formatName: String = "mergetree" - } object CHMergeTreeWriterInjects { @@ -146,39 +112,23 @@ object CHMergeTreeWriterInjects { } }.asJava - val (orderByKey, primaryKey) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( - orderByKeyOption, - primaryKeyOption - ) - - val lowCardKey = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption) - val minmaxIndexKey = MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption) - val bfIndexKey = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption) - val setIndexKey = MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption) - val substraitContext = new SubstraitContext - val extensionTableNode = ExtensionTableBuilder.makeExtensionTable( - -1, - -1, + + val extensionTable = ClickhouseMetaSerializer.apply1( database, tableName, snapshotId, path, "", - orderByKey, - lowCardKey, - minmaxIndexKey, - bfIndexKey, - setIndexKey, - primaryKey, - scala.collection.JavaConverters.seqAsJavaList(partList), - scala.collection.JavaConverters.seqAsJavaList( - Seq.range(0L, partList.length).map(long2Long) - ), // starts and lengths is useless for write - scala.collection.JavaConverters.seqAsJavaList(Seq.range(0L, partList.length).map(long2Long)), + orderByKeyOption, + lowCardKeyOption, + minmaxIndexKeyOption, + bfIndexKeyOption, + setIndexKeyOption, + primaryKeyOption, + ClickhousePartSerializer.fromPartNames(partList), tableSchemaJson, - clickhouseTableConfigs.asJava, - new JList[String]() + clickhouseTableConfigs.asJava ) val optimizationContent = "isMergeTree=1\n" @@ -197,6 +147,6 @@ object CHMergeTreeWriterInjects { val plan = PlanBuilder.makePlan(substraitContext, Lists.newArrayList(relNode), nameList).toProtobuf - PlanWithSplitInfo(plan.toByteArray, extensionTableNode.toProtobuf.toByteArray) + PlanWithSplitInfo(plan.toByteArray, extensionTable.toByteArray) } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala index 06e2b98bc8b2..52593d7c1795 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala @@ -43,7 +43,7 @@ class MergeTreeOutputWriter( if (nextBatch.numRows > 0) { val col = nextBatch.column(0).asInstanceOf[CHColumnVector] datasourceJniWrapper.writeToMergeTree(instance, col.getBlockAddress) - } // else just ignore this empty block + } // else ignore this empty block } override def close(): Unit = { diff --git a/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala new file mode 100644 index 000000000000..234954386adb --- /dev/null +++ b/backends-clickhouse/src/main/spark32/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.clickhouse + +import org.apache.gluten.extension.injector.SparkInjector + +object CHExtendRule { + def injectSpark(injector: SparkInjector): Unit = {} +} diff --git a/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala new file mode 100644 index 000000000000..234954386adb --- /dev/null +++ b/backends-clickhouse/src/main/spark33/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.clickhouse + +import org.apache.gluten.extension.injector.SparkInjector + +object CHExtendRule { + def injectSpark(injector: SparkInjector): Unit = {} +} diff --git a/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala b/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala new file mode 100644 index 000000000000..fb3a854ef98c --- /dev/null +++ b/backends-clickhouse/src/main/spark35/org/apache/gluten/backendsapi/clickhouse/CHExtendRule.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.clickhouse + +import org.apache.gluten.extension.injector.SparkInjector + +import org.apache.spark.sql.catalyst.AddStorageInfo + +object CHExtendRule { + def injectSpark(injector: SparkInjector): Unit = { + injector.injectOptimizerRule(_ => AddStorageInfo) + } +} diff --git a/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala b/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala new file mode 100644 index 000000000000..760241f840f2 --- /dev/null +++ b/backends-clickhouse/src/main/spark35/org/apache/spark/sql/catalyst/AddStorageInfo.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst + +import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LogicalPlan, TableSpec} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta + +/** This object is responsible for adding storage information to the CreateTable. */ + +object AddStorageInfo extends Rule[LogicalPlan] { + + private def createMergeTreeTable(tableSpec: TableSpec): Boolean = { + tableSpec.provider.contains(StorageMeta.Provider) || + tableSpec.properties + .get(StorageMeta.DEFAULT_FILE_FORMAT) + .contains(StorageMeta.DEFAULT_FILE_FORMAT_DEFAULT) + } + + override def apply(plan: LogicalPlan): LogicalPlan = + plan.transformWithPruning(_.containsAnyPattern(COMMAND)) { + case create @ CreateTable(ResolvedIdentifier(_, ident), _, _, tableSpec: TableSpec, _) + if createMergeTreeTable(tableSpec) => + val newTableSpec = tableSpec.copy( + properties = tableSpec.properties ++ Seq( + StorageMeta.STORAGE_DB -> ident + .namespace() + .lastOption + .getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE), + StorageMeta.STORAGE_TABLE -> ident.name()) + ) + create.copy(tableSpec = newTableSpec) + } +} diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala index a1dd5d8687a0..62b9ee3bcb31 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala @@ -17,6 +17,7 @@ package org.apache.gluten.execution.mergetree import org.apache.gluten.execution._ +import org.apache.gluten.utils.Arm import org.apache.spark.SparkConf import org.apache.spark.sql.SaveMode @@ -24,6 +25,7 @@ import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.functions._ @@ -162,7 +164,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val planNodeJson = wholeStageTransformer.substraitPlanJson assert( !planNodeJson - .replaceAll("\\\n", "") + .replaceAll("\n", "") .replaceAll(" ", "") .contains("\"input\":{\"filter\":{")) } @@ -269,7 +271,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val planNodeJson = wholeStageTransformer.substraitPlanJson assert( !planNodeJson - .replaceAll("\\\n", "") + .replaceAll("\n", "") .replaceAll(" ", "") .contains("\"input\":{\"filter\":{")) } @@ -1006,7 +1008,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 val partDir = directory.listFiles().filter(f => f.getName.length > 20).head val columnsFile = new File(partDir, "columns.txt") - val columns = Source.fromFile(columnsFile).getLines().mkString + val columns = Arm.withResource(Source.fromFile(columnsFile))(_.getLines().mkString) assert(columns.contains("`l_returnflag` LowCardinality(Nullable(String))")) assert(columns.contains("`l_linestatus` LowCardinality(Nullable(String))")) @@ -1366,34 +1368,39 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .option("clickhouse.lowCardKey", "l_returnflag,l_linestatus") .save(dataPath1) - val df = spark.read - .format("clickhouse") - .load(dataPath) - val result = df.collect() - assertResult(600572)(result.size) + { + val df = spark.read + .format("clickhouse") + .load(dataPath) + val result = df.collect() + assertResult(600572)(result.length) - val plans = collect(df.queryExecution.executedPlan) { - case f: FileSourceScanExecTransformer => f + val plans = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + val partitions = plans.head.getPartitions + assert(partitions.nonEmpty) + assert(partitions.head.isInstanceOf[GlutenMergeTreePartition]) + val mergeTreePartition = partitions.head.asInstanceOf[GlutenMergeTreePartition] + assertResult(mergeTreePartition.database)(StorageMeta.DEFAULT_PATH_BASED_DATABASE) + assertResult(mergeTreePartition.table)(dataPath) } - val partitions = plans(0).getPartitions - assert(partitions.nonEmpty) - assert(partitions(0).isInstanceOf[GlutenMergeTreePartition]) - assert(partitions(0).asInstanceOf[GlutenMergeTreePartition].database.equals("clickhouse_db")) - assert(partitions(0).asInstanceOf[GlutenMergeTreePartition].table.equals(dataPath)) - - val df1 = spark.read - .format("clickhouse") - .load(dataPath1) - val result1 = df1.collect() - assertResult(600572)(result.size) + { + val df1 = spark.read + .format("clickhouse") + .load(dataPath1) + val result1 = df1.collect() + assertResult(600572)(result1.length) - val plans1 = collect(df1.queryExecution.executedPlan) { - case f: FileSourceScanExecTransformer => f + val plans1 = collect(df1.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + val partitions1 = plans1.head.getPartitions + assert(partitions1.nonEmpty) + assert(partitions1.head.isInstanceOf[GlutenMergeTreePartition]) + val mergeTreePartition1 = partitions1.head.asInstanceOf[GlutenMergeTreePartition] + assertResult(mergeTreePartition1.database)(StorageMeta.DEFAULT_PATH_BASED_DATABASE) + assertResult(mergeTreePartition1.table)(dataPath1) } - val partitions1 = plans1(0).getPartitions - assert(partitions1.nonEmpty) - assert(partitions1(0).isInstanceOf[GlutenMergeTreePartition]) - assert(partitions1(0).asInstanceOf[GlutenMergeTreePartition].database.equals("clickhouse_db")) - assert(partitions1(0).asInstanceOf[GlutenMergeTreePartition].table.equals(dataPath1)) } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala index 97418c670f71..aead0bf47fa3 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala @@ -22,7 +22,7 @@ import org.apache.gluten.execution.{GlutenClickHouseTPCHAbstractSuite, GlutenMer import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil +import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil import org.apache.hadoop.fs.Path