diff --git a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java index c569cd2ee08e..d1714d825c84 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java @@ -32,6 +32,9 @@ public class MetricsStep { @JsonProperty("selected_marks_pk") protected long selectedMarksPk; + @JsonProperty("selected_marks") + protected long selectedMarks; + public String getName() { return name; } @@ -64,6 +67,14 @@ public void setSelectedMarksPk(long selectedMarksPk) { this.selectedMarksPk = selectedMarksPk; } + public long getSelectedMarks() { + return selectedMarks; + } + + public void setSelectedMarks(long selectedMarks) { + this.selectedMarks = selectedMarks; + } + public long getTotalMarksPk() { return totalMarksPk; } diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala index 3012d5371dcb..0157b370fb6d 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala @@ -127,6 +127,7 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected marks primary"), + "selectedMarks" -> SQLMetrics.createMetric(sparkContext, "selected marks"), "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks primary") ) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala index 8c79536bdc5b..497e8b780810 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala @@ -36,6 +36,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric val inputWaitTime: SQLMetric = metrics("inputWaitTime") val outputWaitTime: SQLMetric = metrics("outputWaitTime") val selected_marks_pk: SQLMetric = metrics("selectedMarksPk") + val selected_marks: SQLMetric = metrics("selectedMarks") val total_marks_pk: SQLMetric = metrics("totalMarksPk") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { @@ -56,6 +57,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric metricsData.getSteps.forEach( step => { selected_marks_pk += step.selectedMarksPk + selected_marks += step.selectedMarks total_marks_pk += step.totalMarksPk }) diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala index 5c8c50b99122..73462780cf6d 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala @@ -72,12 +72,20 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst | select * from lineitem |""".stripMargin) - val ret = spark + val df = spark .sql(s""" |select count(*) from lineitem_mergetree_minmax where l_receiptdate = '1998-12-27' |""".stripMargin) - .collect() + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() assert(ret.apply(0).get(0) == 1) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) val directory = new File(s"$basePath/lineitem_mergetree_minmax") // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 @@ -122,12 +130,20 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst | select * from lineitem |""".stripMargin) - val ret = spark + val df = spark .sql(s""" - |select count(*) from lineitem_mergetree_bf where l_orderkey = '600000' + select count(*) from lineitem_mergetree_bf where l_orderkey = '600000' |""".stripMargin) - .collect() + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() assert(ret.apply(0).get(0) == 2) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) val directory = new File(s"$basePath/lineitem_mergetree_bf") // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 @@ -172,12 +188,20 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst | select * from lineitem |""".stripMargin) - val ret = spark + val df = spark .sql(s""" - |select count(*) from lineitem_mergetree_set where l_orderkey = '600000' + select count(*) from lineitem_mergetree_set where l_orderkey = '600000' |""".stripMargin) - .collect() + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() assert(ret.apply(0).get(0) == 2) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) val directory = new File(s"$basePath/lineitem_mergetree_set") // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 @@ -222,12 +246,20 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst | select * from lineitem |""".stripMargin) - val ret = spark + val df = spark .sql(s""" |select count(*) from lineitem_mergetree_minmax2 where l_receiptdate = '1998-12-27' |""".stripMargin) - .collect() + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() assert(ret.apply(0).get(0) == 1) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) val directory = new File(s"$basePath/lineitem_mergetree_minmax2") // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala index 967f14bbc633..27fa79018d5f 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala @@ -73,12 +73,20 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs | select * from lineitem |""".stripMargin) - val ret = spark + val df = spark .sql(s""" |select count(*) from lineitem_mergetree_minmax where l_receiptdate = '1998-12-27' |""".stripMargin) - .collect() + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() assert(ret.apply(0).get(0) == 1) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) val directory = new File(s"$basePath/lineitem_mergetree_minmax") // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 @@ -123,12 +131,20 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs | select * from lineitem |""".stripMargin) - val ret = spark + val df = spark .sql(s""" |select count(*) from lineitem_mergetree_bf where l_orderkey = '600000' |""".stripMargin) - .collect() + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() assert(ret.apply(0).get(0) == 2) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) val directory = new File(s"$basePath/lineitem_mergetree_bf") // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 @@ -173,12 +189,20 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs | select * from lineitem |""".stripMargin) - val ret = spark + val df = spark .sql(s""" |select count(*) from lineitem_mergetree_set where l_orderkey = '600000' |""".stripMargin) - .collect() + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() assert(ret.apply(0).get(0) == 2) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) val directory = new File(s"$basePath/lineitem_mergetree_set") // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 @@ -222,12 +246,20 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs | select * from lineitem |""".stripMargin) - val ret = spark + val df = spark .sql(s""" |select count(*) from lineitem_mergetree_minmax2 where l_receiptdate = '1998-12-27' |""".stripMargin) - .collect() + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() assert(ret.apply(0).get(0) == 1) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) val directory = new File(s"$basePath/lineitem_mergetree_minmax2") // find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006 diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index 243bcc2c4a33..f1381a1f75a9 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -179,6 +179,7 @@ MergeTreeRelParser::parseReadRel( source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions, storage_prewhere_info->prewhere_column_name); source_step_with_filter->applyFilters(); } + query_context.custom_storage_merge_tree->wrapRangesInDataParts(*reinterpret_cast(read_step.get()), ranges); steps.emplace_back(read_step.get()); query_plan->addStep(std::move(read_step)); diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp b/cpp-ch/local-engine/Parser/RelMetric.cpp index 2449fa4e5729..eec31213a69e 100644 --- a/cpp-ch/local-engine/Parser/RelMetric.cpp +++ b/cpp-ch/local-engine/Parser/RelMetric.cpp @@ -120,9 +120,12 @@ void RelMetric::serialize(Writer & writer, bool) const if (auto read_mergetree = dynamic_cast(step)) { auto selected_marks_pk = read_mergetree->getAnalysisResult().selected_marks_pk; + auto selected_marks = read_mergetree->getAnalysisResult().selected_marks; auto total_marks_pk = read_mergetree->getAnalysisResult().total_marks_pk; writer.Key("selected_marks_pk"); writer.Uint64(selected_marks_pk); + writer.Key("selected_marks"); + writer.Uint64(selected_marks); writer.Key("total_marks_pk"); writer.Uint64(total_marks_pk); }