From 85d90c9d2d371a2285bb74b565d7b36921d66dab Mon Sep 17 00:00:00 2001 From: LiuNeng <1398775315@qq.com> Date: Wed, 11 Sep 2024 15:54:06 +0800 Subject: [PATCH] [CH] Fix load cache missing columns #7192 What changes were proposed in this pull request? Fix MergeTree cache load failed when column name is upper case How was this patch tested? unit tests (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) --- .../spark/rpc/GlutenExecutorEndpoint.scala | 7 +- .../commands/GlutenCHCacheDataCommand.scala | 4 +- ...enClickHouseMergeTreeCacheDataSuite.scala} | 195 +++++++++++++++++- .../substrait/rel/ExtensionTableBuilder.java | 14 +- 4 files changed, 210 insertions(+), 10 deletions(-) rename backends-clickhouse/src/test/scala/org/apache/gluten/execution/{GlutenClickHouseMergeTreeCacheDataSSuite.scala => GlutenClickHouseMergeTreeCacheDataSuite.scala} (67%) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala index 7f2b94eea314..559a22cb12c2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala @@ -75,9 +75,12 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf) val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns) context.reply(CacheJobInfo(status = true, jobId)) } catch { - case _: Exception => + case e: Exception => context.reply( - CacheJobInfo(status = false, "", s"executor: $executorId cache data failed.")) + CacheJobInfo( + status = false, + "", + s"executor: $executorId cache data failed: ${e.getMessage}.")) } case GlutenCacheLoadStatus(jobId) => val status = CHNativeCacheManager.getCacheStatus(jobId) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index 43e3b4b7ab98..7aca290b1691 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, import org.apache.spark.sql.delta._ import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.execution.commands.GlutenCacheBase._ +import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.types.{BooleanType, StringType} @@ -180,7 +181,8 @@ case class GlutenCHCacheDataCommand( ClickhouseSnapshot.genSnapshotId(snapshot), onePart.tablePath, pathToCache.toString, - snapshot.metadata.configuration.getOrElse("orderByKey", ""), + snapshot.metadata.configuration + .getOrElse("orderByKey", MergeTreeDeltaUtil.DEFAULT_ORDER_BY_KEY), snapshot.metadata.configuration.getOrElse("lowCardKey", ""), snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""), snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""), diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala similarity index 67% rename from backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala rename to backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala index a55067185e68..88bb00faced3 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala @@ -32,7 +32,7 @@ import scala.concurrent.duration.DurationInt // Some sqls' line length exceeds 100 // scalastyle:off line.size.limit -class GlutenClickHouseMergeTreeCacheDataSSuite +class GlutenClickHouseMergeTreeCacheDataSuite extends GlutenClickHouseTPCHAbstractSuite with AdaptiveSparkPlanHelper { @@ -398,5 +398,198 @@ class GlutenClickHouseMergeTreeCacheDataSSuite }) spark.sql("drop table lineitem_mergetree_hdfs purge") } + + test("test cache mergetree data no partition columns") { + + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs + |( + | L_ORDERKEY bigint, + | L_PARTKEY bigint, + | L_SUPPKEY bigint, + | L_LINENUMBER bigint, + | L_QUANTITY double, + | L_EXTENDEDPRICE double, + | L_DISCOUNT double, + | L_TAX double, + | L_RETURNFLAG string, + | L_LINESTATUS string, + | L_SHIPDATE date, + | L_COMMITDATE date, + | L_RECEIPTDATE date, + | L_SHIPINSTRUCT string, + | L_SHIPMODE string, + | L_COMMENT string + |) + |USING clickhouse + |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs' + |TBLPROPERTIES (storage_policy='__hdfs_main') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_hdfs + | select * from lineitem a + | where a.l_shipdate between date'1995-01-01' and date'1995-01-31' + |""".stripMargin) + FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) + FileUtils.forceMkdir(new File(HDFS_METADATA_PATH)) + val dataPath = new File(HDFS_CACHE_PATH) + val initial_cache_files = countFiles(dataPath) + + val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect() + assertResult(true)(res1(0).getBoolean(0)) + assertResult(1)(metaPath.list().length) + assert(countFiles(dataPath) > initial_cache_files) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_hdfs + |WHERE + | l_shipdate >= date'1995-01-10' + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runSql(sqlStr)( + df => { + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assertResult(1)(scanExec.size) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assertResult(7898)(addFiles.map(_.rows).sum) + }) + spark.sql("drop table lineitem_mergetree_hdfs purge") + } + + test("test cache mergetree data with upper case column name") { + + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs + |( + | L_ORDERKEY bigint, + | L_PARTKEY bigint, + | L_SUPPKEY bigint, + | L_LINENUMBER bigint, + | L_QUANTITY double, + | L_EXTENDEDPRICE double, + | L_DISCOUNT double, + | L_TAX double, + | L_RETURNFLAG string, + | L_LINESTATUS string, + | L_SHIPDATE date, + | L_COMMITDATE date, + | L_RECEIPTDATE date, + | L_SHIPINSTRUCT string, + | L_SHIPMODE string, + | L_COMMENT string + |) + |USING clickhouse + |PARTITIONED BY (L_SHIPDATE) + |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs' + |TBLPROPERTIES (storage_policy='__hdfs_main', + | orderByKey='L_LINENUMBER,L_ORDERKEY') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_hdfs + | select * from lineitem a + | where a.l_shipdate between date'1995-01-01' and date'1995-01-31' + |""".stripMargin) + FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) + FileUtils.forceMkdir(new File(HDFS_METADATA_PATH)) + val dataPath = new File(HDFS_CACHE_PATH) + val initial_cache_files = countFiles(dataPath) + + val res = spark + .sql(s""" + |cache data + | select * from '$HDFS_URL/test/lineitem_mergetree_hdfs' + | after L_SHIPDATE AS OF '1995-01-10' + | CACHEPROPERTIES(storage_policy='__hdfs_main', + | aaa='ccc')""".stripMargin) + .collect() + assertResult(true)(res(0).getBoolean(0)) + val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + assertResult(true)(metaPath.exists() && metaPath.isDirectory) + assertResult(22)(metaPath.list().length) + assert(countFiles(dataPath) > initial_cache_files) + val first_cache_files = countFiles(dataPath) + val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect() + assertResult(true)(res1(0).getBoolean(0)) + assertResult(31)(metaPath.list().length) + assert(countFiles(dataPath) > first_cache_files) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_hdfs + |WHERE + | l_shipdate >= date'1995-01-10' + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runSql(sqlStr)( + df => { + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assertResult(1)(scanExec.size) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assertResult(7898)(addFiles.map(_.rows).sum) + }) + spark.sql("drop table lineitem_mergetree_hdfs purge") + } } // scalastyle:off line.size.limit diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java index 34c017d80cd1..b2b813dd9f0e 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java @@ -16,6 +16,8 @@ */ package org.apache.gluten.substrait.rel; +import org.apache.gluten.expression.ConverterUtils; + import java.util.List; import java.util.Map; @@ -50,12 +52,12 @@ public static ExtensionTableNode makeExtensionTable( snapshotId, relativeTablePath, absoluteTablePath, - orderByKey, - lowCardKey, - minmaxIndexKey, - bfIndexKey, - setIndexKey, - primaryKey, + ConverterUtils.normalizeColName(orderByKey), + ConverterUtils.normalizeColName(lowCardKey), + ConverterUtils.normalizeColName(minmaxIndexKey), + ConverterUtils.normalizeColName(bfIndexKey), + ConverterUtils.normalizeColName(setIndexKey), + ConverterUtils.normalizeColName(primaryKey), partList, starts, lengths,