Skip to content

Commit

Permalink
temp, by defualt all cols minmax index
Browse files Browse the repository at this point in the history
basically works, dealing with nullable

nullable/not-null ok

remove unneceesary change

fix compile
  • Loading branch information
binmahone committed Mar 25, 2024
1 parent e9034ff commit ede04b9
Show file tree
Hide file tree
Showing 20 changed files with 761 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
protected var dataSchemas = Seq.empty[Attribute]
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
protected var minmaxIndexKeyOption: Option[Seq[String]] = None
protected var bfIndexKeyOption: Option[Seq[String]] = None
protected var setIndexKeyOption: Option[Seq[String]] = None
protected var primaryKeyOption: Option[Seq[String]] = None
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty
Expand All @@ -45,6 +48,9 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
schemas: Seq[Attribute],
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
Expand All @@ -54,6 +60,9 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
this.dataSchemas = schemas
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
this.minmaxIndexKeyOption = minmaxIndexKeyOption
this.bfIndexKeyOption = bfIndexKeyOption
this.setIndexKeyOption = setIndexKeyOption
this.primaryKeyOption = primaryKeyOption
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
Expand Down Expand Up @@ -102,6 +111,9 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
tableName,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
setIndexKeyOption,
bfIndexKeyOption,
primaryKeyOption,
partitionColumns,
metadata.schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
protected var dataSchemas = Seq.empty[Attribute]
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
protected var minmaxIndexKeyOption: Option[Seq[String]] = None
protected var bfIndexKeyOption: Option[Seq[String]] = None
protected var setIndexKeyOption: Option[Seq[String]] = None
protected var primaryKeyOption: Option[Seq[String]] = None
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty
Expand All @@ -44,6 +47,9 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
schemas: Seq[Attribute],
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
Expand All @@ -53,6 +59,9 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
this.dataSchemas = schemas
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
this.minmaxIndexKeyOption = minmaxIndexKeyOption
this.bfIndexKeyOption = bfIndexKeyOption
this.setIndexKeyOption = setIndexKeyOption
this.primaryKeyOption = primaryKeyOption
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
Expand Down Expand Up @@ -101,6 +110,9 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
tableName,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
partitionColumns,
metadata.schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
p.absoluteTablePath,
p.orderByKey,
p.lowCardKey,
p.minmaxIndexKey,
p.bfIndexKey,
p.setIndexKey,
p.primaryKey,
partLists,
starts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ case class GlutenMergeTreePartition(
absoluteTablePath: String,
orderByKey: String,
lowCardKey: String,
minmaxIndexKey: String,
bfIndexKey: String,
setIndexKey: String,
primaryKey: String,
partList: Array[MergeTreePartSplit],
tableSchemaJson: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class ClickhouseOptimisticTransaction(
output,
tableV2.orderByKeyOption,
tableV2.lowCardKeyOption,
tableV2.minmaxIndexKeyOption,
tableV2.bfIndexKeyOption,
tableV2.setIndexKeyOption,
tableV2.primaryKeyOption,
tableV2.clickhouseTableConfigs,
tableV2.partitionColumns
Expand All @@ -144,6 +147,9 @@ class ClickhouseOptimisticTransaction(
// scalastyle:on deltahadoopconfiguration
orderByKeyOption = tableV2.orderByKeyOption,
lowCardKeyOption = tableV2.lowCardKeyOption,
minmaxIndexKeyOption = tableV2.minmaxIndexKeyOption,
bfIndexKeyOption = tableV2.bfIndexKeyOption,
setIndexKeyOption = tableV2.setIndexKeyOption,
primaryKeyOption = tableV2.primaryKeyOption,
partitionColumns = partitioningColumns,
bucketSpec = tableV2.bucketOption,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,34 @@ class ClickHouseTableV2(
}

lazy val lowCardKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("lowCardKey")
}

lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("minmaxIndexKey")
}

lazy val bfIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("bloomfilterIndexKey")
}

lazy val setIndexKeyOption: Option[Seq[String]] = {
getCommaSeparatedColumns("setIndexKey")
}

private def getCommaSeparatedColumns(keyName: String) = {
val tableProperties = properties()
if (tableProperties.containsKey("lowCardKey")) {
if (tableProperties.get("lowCardKey").nonEmpty) {
val lowCardKeys = tableProperties.get("lowCardKey").split(",").map(_.trim).toSeq
lowCardKeys.foreach(
if (tableProperties.containsKey(keyName)) {
if (tableProperties.get(keyName).nonEmpty) {
val keys = tableProperties.get(keyName).split(",").map(_.trim).toSeq
keys.foreach(
s => {
if (s.contains(".")) {
throw new IllegalStateException(
s"lowCardKey $s can not contain '.' (not support nested column yet)")
s"$keyName $s can not contain '.' (not support nested column yet)")
}
})
Some(lowCardKeys.map(s => s.toLowerCase()))
Some(keys.map(s => s.toLowerCase()))
} else {
None
}
Expand Down Expand Up @@ -262,9 +278,13 @@ class ClickHouseTableV2(
Seq.empty[Attribute],
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
partitionColumns)
partitionColumns
)
}
def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ object MergeTreePartsPartitionsUtil extends Logging {
case None => ""
}

val minmaxIndexKey = table.minmaxIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}

val bfIndexKey = table.bfIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}

val setIndexKey = table.setIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}

val tableSchemaJson = ConverterUtils.convertNamedStructJson(table.schema())

// bucket table
Expand All @@ -92,6 +107,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions,
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
table.clickhouseTableConfigs,
sparkSession
Expand All @@ -109,6 +127,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions,
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
table.clickhouseTableConfigs,
sparkSession
Expand All @@ -129,6 +150,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions: ArrayBuffer[InputPartition],
orderByKey: String,
lowCardKey: String,
minmaxIndexKey: String,
bfIndexKey: String,
setIndexKey: String,
primaryKey: String,
clickhouseTableConfigs: Map[String, String],
sparkSession: SparkSession): Unit = {
Expand Down Expand Up @@ -214,6 +238,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
absoluteTablePath,
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
currentFiles.toArray,
tableSchemaJson,
Expand Down Expand Up @@ -256,6 +283,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions: ArrayBuffer[InputPartition],
orderByKey: String,
lowCardKey: String,
minmaxIndexKey: String,
bfIndexKey: String,
setIndexKey: String,
primaryKey: String,
clickhouseTableConfigs: Map[String, String],
sparkSession: SparkSession): Unit = {
Expand Down Expand Up @@ -318,6 +348,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
absoluteTablePath,
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
currentFiles.toArray,
tableSchemaJson,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase {
tableName: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
partitionColumns: Seq[String],
tableSchema: StructType,
Expand All @@ -83,11 +86,14 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase {
tableName,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
partitionColumns,
ConverterUtils.convertNamedStructJson(tableSchema),
clickhouseTableConfigs,
dataSchema
tableSchema.toAttributes // use table schema instead of data schema
)

val datasourceJniWrapper = new CHDatasourceJniWrapper()
Expand Down Expand Up @@ -119,17 +125,22 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase {

object CHMergeTreeWriterInjects {

// scalastyle:off argcount
def genMergeTreeWriteRel(
path: String,
database: String,
tableName: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
partitionColumns: Seq[String],
tableSchemaJson: String,
clickhouseTableConfigs: Map[String, String],
output: Seq[Attribute]): PlanWithSplitInfo = {
// scalastyle:on argcount
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
val columnTypeNodes = output.map {
Expand All @@ -150,6 +161,18 @@ object CHMergeTreeWriterInjects {
case Some(keys) => keys.mkString(",")
case None => ""
}
val minmaxIndexKey = minmaxIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}
val bfIndexKey = bfIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}
val setIndexKey = setIndexKeyOption match {
case Some(keys) => keys.mkString(",")
case None => ""
}

val substraitContext = new SubstraitContext
val extensionTableNode = ExtensionTableBuilder.makeExtensionTable(
Expand All @@ -161,6 +184,9 @@ object CHMergeTreeWriterInjects {
"",
orderByKey,
lowCardKey,
minmaxIndexKey,
bfIndexKey,
setIndexKey,
primaryKey,
new JList[String](),
new JList[JLong](),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ object MergeTreeFileFormatWriter extends Logging {
hadoopConf: Configuration,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
Expand All @@ -71,6 +74,9 @@ object MergeTreeFileFormatWriter extends Logging {
hadoopConf = hadoopConf,
orderByKeyOption = orderByKeyOption,
lowCardKeyOption = lowCardKeyOption,
minmaxIndexKeyOption = minmaxIndexKeyOption,
bfIndexKeyOption = bfIndexKeyOption,
setIndexKeyOption = setIndexKeyOption,
primaryKeyOption = primaryKeyOption,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
Expand All @@ -89,6 +95,9 @@ object MergeTreeFileFormatWriter extends Logging {
hadoopConf: Configuration,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
Expand Down
Loading

0 comments on commit ede04b9

Please sign in to comment.