Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CH] Issue 5018 #5019

Merged
merged 4 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.v2.clickhouse.source

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
Expand All @@ -31,9 +30,11 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)

protected var database = ""
protected var tableName = ""
protected var dataSchemas = Seq.empty[Attribute]
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
protected var minmaxIndexKeyOption: Option[Seq[String]] = None
protected var bfIndexKeyOption: Option[Seq[String]] = None
protected var setIndexKeyOption: Option[Seq[String]] = None
protected var primaryKeyOption: Option[Seq[String]] = None
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty
Expand All @@ -42,18 +43,22 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
metadata: Metadata,
database: String,
tableName: String,
schemas: Seq[Attribute],
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
this(metadata)
this.database = database
this.tableName = tableName
this.dataSchemas = schemas
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
this.minmaxIndexKeyOption = minmaxIndexKeyOption
this.bfIndexKeyOption = bfIndexKeyOption
this.setIndexKeyOption = setIndexKeyOption
this.primaryKeyOption = primaryKeyOption
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
Expand Down Expand Up @@ -102,10 +107,12 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
tableName,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
partitionColumns,
metadata.schema,
dataSchemas,
clickhouseTableConfigs,
context,
nativeConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.v2.clickhouse.source

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
Expand All @@ -30,9 +29,11 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma

protected var database = ""
protected var tableName = ""
protected var dataSchemas = Seq.empty[Attribute]
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
protected var minmaxIndexKeyOption: Option[Seq[String]] = None
protected var bfIndexKeyOption: Option[Seq[String]] = None
protected var setIndexKeyOption: Option[Seq[String]] = None
protected var primaryKeyOption: Option[Seq[String]] = None
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty
Expand All @@ -41,18 +42,22 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
metadata: Metadata,
database: String,
tableName: String,
schemas: Seq[Attribute],
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
this(metadata)
this.database = database
this.tableName = tableName
this.dataSchemas = schemas
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
this.minmaxIndexKeyOption = minmaxIndexKeyOption
this.bfIndexKeyOption = bfIndexKeyOption
this.setIndexKeyOption = setIndexKeyOption
this.primaryKeyOption = primaryKeyOption
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
Expand Down Expand Up @@ -101,10 +106,12 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
tableName,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
partitionColumns,
metadata.schema,
dataSchemas,
clickhouseTableConfigs,
context,
nativeConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class MetricsStep {
@JsonProperty("selected_marks_pk")
protected long selectedMarksPk;

@JsonProperty("selected_marks")
protected long selectedMarks;

public String getName() {
return name;
}
Expand Down Expand Up @@ -64,6 +67,14 @@ public void setSelectedMarksPk(long selectedMarksPk) {
this.selectedMarksPk = selectedMarksPk;
}

public long getSelectedMarks() {
return selectedMarks;
}

public void setSelectedMarks(long selectedMarks) {
this.selectedMarks = selectedMarks;
}

public long getTotalMarksPk() {
return totalMarksPk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
p.absoluteTablePath,
p.orderByKey,
p.lowCardKey,
p.minmaxIndexKey,
p.bfIndexKey,
p.setIndexKey,
p.primaryKey,
partLists,
starts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected marks primary"),
"selectedMarks" -> SQLMetrics.createMetric(sparkContext, "selected marks"),
"totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks primary")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ case class GlutenMergeTreePartition(
absoluteTablePath: String,
orderByKey: String,
lowCardKey: String,
minmaxIndexKey: String,
bfIndexKey: String,
setIndexKey: String,
primaryKey: String,
partList: Array[MergeTreePartSplit],
tableSchemaJson: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
val outputWaitTime: SQLMetric = metrics("outputWaitTime")
val selected_marks_pk: SQLMetric = metrics("selectedMarksPk")
val selected_marks: SQLMetric = metrics("selectedMarks")
val total_marks_pk: SQLMetric = metrics("totalMarksPk")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
Expand All @@ -56,6 +57,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
metricsData.getSteps.forEach(
step => {
selected_marks_pk += step.selectedMarksPk
selected_marks += step.selectedMarks
total_marks_pk += step.totalMarksPk
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ class ClickhouseOptimisticTransaction(
metadata,
tableV2.dataBaseName,
tableV2.tableName,
output,
tableV2.orderByKeyOption,
tableV2.lowCardKeyOption,
tableV2.minmaxIndexKeyOption,
tableV2.bfIndexKeyOption,
tableV2.setIndexKeyOption,
tableV2.primaryKeyOption,
tableV2.clickhouseTableConfigs,
tableV2.partitionColumns
Expand All @@ -144,6 +146,9 @@ class ClickhouseOptimisticTransaction(
// scalastyle:on deltahadoopconfiguration
orderByKeyOption = tableV2.orderByKeyOption,
lowCardKeyOption = tableV2.lowCardKeyOption,
minmaxIndexKeyOption = tableV2.minmaxIndexKeyOption,
bfIndexKeyOption = tableV2.bfIndexKeyOption,
setIndexKeyOption = tableV2.setIndexKeyOption,
primaryKeyOption = tableV2.primaryKeyOption,
partitionColumns = partitioningColumns,
bucketSpec = tableV2.bucketOption,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,34 @@ class ClickHouseTableV2(
}

lazy val lowCardKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("lowCardKey")
}

lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("minmaxIndexKey")
}

lazy val bfIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("bloomfilterIndexKey")
}

lazy val setIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("setIndexKey")
}

private def getCommaSeparatedColumns(keyName: String) = {
val tableProperties = properties()
if (tableProperties.containsKey("lowCardKey")) {
if (tableProperties.get("lowCardKey").nonEmpty) {
val lowCardKeys = tableProperties.get("lowCardKey").split(",").map(_.trim).toSeq
lowCardKeys.foreach(
if (tableProperties.containsKey(keyName)) {
if (tableProperties.get(keyName).nonEmpty) {
val keys = tableProperties.get(keyName).split(",").map(_.trim).toSeq
keys.foreach(
s => {
if (s.contains(".")) {
throw new IllegalStateException(
s"lowCardKey $s can not contain '.' (not support nested column yet)")
s"$keyName $s can not contain '.' (not support nested column yet)")
}
})
Some(lowCardKeys.map(s => s.toLowerCase()))
Some(keys.map(s => s.toLowerCase()))
} else {
None
}
Expand Down Expand Up @@ -259,12 +275,15 @@ class ClickHouseTableV2(
meta,
dataBaseName,
tableName,
Seq.empty[Attribute],
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
partitionColumns)
partitionColumns
)
}
def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ object MergeTreePartsPartitionsUtil extends Logging {
case None => ""
}

val minmaxIndexKey = table.minmaxIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}

val bfIndexKey = table.bfIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}

val setIndexKey = table.setIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}

val tableSchemaJson = ConverterUtils.convertNamedStructJson(table.schema())

// bucket table
Expand All @@ -92,6 +107,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions,
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
table.clickhouseTableConfigs,
sparkSession
Expand All @@ -109,6 +127,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions,
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
table.clickhouseTableConfigs,
sparkSession
Expand All @@ -129,6 +150,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions: ArrayBuffer[InputPartition],
orderByKey: String,
lowCardKey: String,
minmaxIndexKey: String,
bfIndexKey: String,
setIndexKey: String,
primaryKey: String,
clickhouseTableConfigs: Map[String, String],
sparkSession: SparkSession): Unit = {
Expand Down Expand Up @@ -214,6 +238,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
absoluteTablePath,
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
currentFiles.toArray,
tableSchemaJson,
Expand Down Expand Up @@ -256,6 +283,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions: ArrayBuffer[InputPartition],
orderByKey: String,
lowCardKey: String,
minmaxIndexKey: String,
bfIndexKey: String,
setIndexKey: String,
primaryKey: String,
clickhouseTableConfigs: Map[String, String],
sparkSession: SparkSession): Unit = {
Expand Down Expand Up @@ -318,6 +348,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
absoluteTablePath,
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
currentFiles.toArray,
tableSchemaJson,
Expand Down
Loading
Loading