Skip to content

Commit

Permalink
[GLUTEN-7028][CH][Part-4] Refactor DeltaMergeTreeFileFormat to read…
Browse files Browse the repository at this point in the history
… table configuration from deltalog's metadata (apache#7170)

*Call ClickhouseMetaSerializer.forWrite at driver side and  Broadcast ReadRel.ExtensionTable.
  • Loading branch information
baibaichen authored Sep 30, 2024
1 parent b21a0b5 commit 93056f0
Show file tree
Hide file tree
Showing 29 changed files with 816 additions and 721 deletions.
2 changes: 2 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@
<include>src/test/scala/**/*.scala</include>
<include>src/main/delta-${delta.binary.version}/**/*.scala</include>
<include>src/test/delta-${delta.binary.version}/**/*.scala</include>
<include>src/main/${sparkshim.module.name}/**/*.scala</include>
</includes>
<excludes>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
Expand Down Expand Up @@ -397,6 +398,7 @@
<configuration>
<sources>
<source>src/main/delta-${delta.binary.version}</source>
<source>src/main/${sparkshim.module.name}</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.collection.BitSet
Expand Down Expand Up @@ -95,28 +96,21 @@ class ClickHouseTableV2(

def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
meta,
dataBaseName,
tableName,
ClickhouseSnapshot.genSnapshotId(snapshot),
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
partitionColumns
)
StorageMeta.withMoreStorageInfo(
meta,
ClickhouseSnapshot.genSnapshotId(snapshot),
deltaLog.dataPath,
dataBaseName,
tableName))
}

override def deltaProperties(): ju.Map[String, String] = properties()
override def deltaProperties: Map[String, String] = properties().asScala.toMap

override def deltaCatalog(): Option[CatalogTable] = catalogTable
override def deltaCatalog: Option[CatalogTable] = catalogTable

override def deltaPath(): Path = path
override def deltaPath: Path = path

override def deltaSnapshot(): Snapshot = snapshot
override def deltaSnapshot: Snapshot = snapshot

def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType

Expand All @@ -28,47 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
class DeltaMergeTreeFileFormat(metadata: Metadata)
extends DeltaParquetFileFormat(metadata.columnMappingMode, metadata.schema) {

protected var database = ""
protected var tableName = ""
protected var snapshotId = ""
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
protected var minmaxIndexKeyOption: Option[Seq[String]] = None
protected var bfIndexKeyOption: Option[Seq[String]] = None
protected var setIndexKeyOption: Option[Seq[String]] = None
protected var primaryKeyOption: Option[Seq[String]] = None
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty

// scalastyle:off argcount
def this(
metadata: Metadata,
database: String,
tableName: String,
snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
this(metadata)
this.database = database
this.tableName = tableName
this.snapshotId = snapshotId
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
this.minmaxIndexKeyOption = minmaxIndexKeyOption
this.bfIndexKeyOption = bfIndexKeyOption
this.setIndexKeyOption = setIndexKeyOption
this.primaryKeyOption = primaryKeyOption
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
}
// scalastyle:on argcount

override def shortName(): String = "mergetree"

override def toString(): String = "MergeTree"
Expand All @@ -95,6 +56,17 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
".mergetree"
Expand All @@ -104,25 +76,18 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)
GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
database,
tableName,
snapshotId,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
partitionColumns,
metadata.schema,
clickhouseTableConfigs,
context,
nativeConf
nativeConf,
database,
tableName,
extensionTableBC.value
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.collection.BitSet
Expand Down Expand Up @@ -95,28 +96,21 @@ class ClickHouseTableV2(

def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
meta,
dataBaseName,
tableName,
ClickhouseSnapshot.genSnapshotId(snapshot),
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
partitionColumns
)
StorageMeta.withMoreStorageInfo(
meta,
ClickhouseSnapshot.genSnapshotId(snapshot),
deltaLog.dataPath,
dataBaseName,
tableName))
}

override def deltaProperties(): ju.Map[String, String] = properties()
override def deltaProperties: Map[String, String] = properties().asScala.toMap

override def deltaCatalog(): Option[CatalogTable] = catalogTable
override def deltaCatalog: Option[CatalogTable] = catalogTable

override def deltaPath(): Path = path
override def deltaPath: Path = path

override def deltaSnapshot(): Snapshot = snapshot
override def deltaSnapshot: Snapshot = snapshot

def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType

Expand All @@ -28,47 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileFormat(metadata) {

protected var database = ""
protected var tableName = ""
protected var snapshotId = ""
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
protected var minmaxIndexKeyOption: Option[Seq[String]] = None
protected var bfIndexKeyOption: Option[Seq[String]] = None
protected var setIndexKeyOption: Option[Seq[String]] = None
protected var primaryKeyOption: Option[Seq[String]] = None
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty

// scalastyle:off argcount
def this(
metadata: Metadata,
database: String,
tableName: String,
snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
this(metadata)
this.database = database
this.tableName = tableName
this.snapshotId = snapshotId
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
this.minmaxIndexKeyOption = minmaxIndexKeyOption
this.bfIndexKeyOption = bfIndexKeyOption
this.setIndexKeyOption = setIndexKeyOption
this.primaryKeyOption = primaryKeyOption
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
}
// scalastyle:on argcount

override def shortName(): String = "mergetree"

override def toString(): String = "MergeTree"
Expand Down Expand Up @@ -98,6 +59,17 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
".mergetree"
Expand All @@ -107,25 +79,18 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)
GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
database,
tableName,
snapshotId,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
partitionColumns,
metadata.schema,
clickhouseTableConfigs,
context,
nativeConf
nativeConf,
database,
tableName,
extensionTableBC.value
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,9 +784,6 @@ object DeltaLog extends DeltaLogging {
FileSourceOptions.IGNORE_CORRUPT_FILES -> "false",
FileSourceOptions.IGNORE_MISSING_FILES -> "false"
)
// --- modified start
// Don't need to add the bucketOption here, it handles the delta log meta json file
// --- modified end
val fsRelation = HadoopFsRelation(
index, index.partitionSchema, schema, None, index.format, allOptions)(spark)
LogicalRelation(fsRelation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, De
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -98,28 +99,19 @@ class ClickHouseTableV2(
def getFileFormat(protocol: Protocol, meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
protocol,
meta,
dataBaseName,
tableName,
ClickhouseSnapshot.genSnapshotId(initialSnapshot),
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
partitionColumns
)
StorageMeta.withMoreStorageInfo(
meta,
ClickhouseSnapshot.genSnapshotId(initialSnapshot),
deltaLog.dataPath))
}

override def deltaProperties(): ju.Map[String, String] = properties()
override def deltaProperties: Map[String, String] = properties().asScala.toMap

override def deltaCatalog(): Option[CatalogTable] = catalogTable
override def deltaCatalog: Option[CatalogTable] = catalogTable

override def deltaPath(): Path = path
override def deltaPath: Path = path

override def deltaSnapshot(): Snapshot = initialSnapshot
override def deltaSnapshot: Snapshot = initialSnapshot

def cacheThis(): Unit = {
ClickHouseTableV2.deltaLog2Table.put(deltaLog, this)
Expand All @@ -133,7 +125,6 @@ class TempClickHouseTableV2(
override val spark: SparkSession,
override val catalogTable: Option[CatalogTable] = None)
extends ClickHouseTableV2(spark, null, catalogTable) {
import collection.JavaConverters._
override def properties(): ju.Map[String, String] = catalogTable.get.properties.asJava
override lazy val partitionColumns: Seq[String] = catalogTable.get.partitionColumnNames
override def cacheThis(): Unit = {}
Expand Down
Loading

0 comments on commit 93056f0

Please sign in to comment.