Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
binmahone committed Mar 25, 2024
1 parent ede04b9 commit 7717d58
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class MetricsStep {
@JsonProperty("selected_marks_pk")
protected long selectedMarksPk;

@JsonProperty("selected_marks")
protected long selectedMarks;

public String getName() {
return name;
}
Expand Down Expand Up @@ -64,6 +67,14 @@ public void setSelectedMarksPk(long selectedMarksPk) {
this.selectedMarksPk = selectedMarksPk;
}

public long getSelectedMarks() {
return selectedMarks;
}

public void setSelectedMarks(long selectedMarks) {
this.selectedMarks = selectedMarks;
}

public long getTotalMarksPk() {
return totalMarksPk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected marks primary"),
"selectedMarks" -> SQLMetrics.createMetric(sparkContext, "selected marks"),
"totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks primary")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
val outputWaitTime: SQLMetric = metrics("outputWaitTime")
val selected_marks_pk: SQLMetric = metrics("selectedMarksPk")
val selected_marks: SQLMetric = metrics("selectedMarks")
val total_marks_pk: SQLMetric = metrics("totalMarksPk")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
Expand All @@ -56,6 +57,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
metricsData.getSteps.forEach(
step => {
selected_marks_pk += step.selectedMarksPk
selected_marks += step.selectedMarks
total_marks_pk += step.totalMarksPk
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,20 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst
| select * from lineitem
|""".stripMargin)

val ret = spark
val df = spark
.sql(s"""
|select count(*) from lineitem_mergetree_minmax where l_receiptdate = '1998-12-27'
|""".stripMargin)
.collect()

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
val ret = df.collect()
assert(ret.apply(0).get(0) == 1)
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)

val directory = new File(s"$basePath/lineitem_mergetree_minmax")
// find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
Expand Down Expand Up @@ -122,12 +130,20 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst
| select * from lineitem
|""".stripMargin)

val ret = spark
val df = spark
.sql(s"""
|select count(*) from lineitem_mergetree_bf where l_orderkey = '600000'
select count(*) from lineitem_mergetree_bf where l_orderkey = '600000'
|""".stripMargin)
.collect()

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
val ret = df.collect()
assert(ret.apply(0).get(0) == 2)
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)

val directory = new File(s"$basePath/lineitem_mergetree_bf")
// find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
Expand Down Expand Up @@ -172,12 +188,20 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst
| select * from lineitem
|""".stripMargin)

val ret = spark
val df = spark
.sql(s"""
|select count(*) from lineitem_mergetree_set where l_orderkey = '600000'
select count(*) from lineitem_mergetree_set where l_orderkey = '600000'
|""".stripMargin)
.collect()

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
val ret = df.collect()
assert(ret.apply(0).get(0) == 2)
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)

val directory = new File(s"$basePath/lineitem_mergetree_set")
// find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
Expand Down Expand Up @@ -222,12 +246,20 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst
| select * from lineitem
|""".stripMargin)

val ret = spark
val df = spark
.sql(s"""
|select count(*) from lineitem_mergetree_minmax2 where l_receiptdate = '1998-12-27'
|""".stripMargin)
.collect()

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
val ret = df.collect()
assert(ret.apply(0).get(0) == 1)
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)

val directory = new File(s"$basePath/lineitem_mergetree_minmax2")
// find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,20 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs
| select * from lineitem
|""".stripMargin)

val ret = spark
val df = spark
.sql(s"""
|select count(*) from lineitem_mergetree_minmax where l_receiptdate = '1998-12-27'
|""".stripMargin)
.collect()

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
val ret = df.collect()
assert(ret.apply(0).get(0) == 1)
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)

val directory = new File(s"$basePath/lineitem_mergetree_minmax")
// find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
Expand Down Expand Up @@ -123,12 +131,20 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs
| select * from lineitem
|""".stripMargin)

val ret = spark
val df = spark
.sql(s"""
|select count(*) from lineitem_mergetree_bf where l_orderkey = '600000'
|""".stripMargin)
.collect()

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
val ret = df.collect()
assert(ret.apply(0).get(0) == 2)
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)

val directory = new File(s"$basePath/lineitem_mergetree_bf")
// find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
Expand Down Expand Up @@ -173,12 +189,20 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs
| select * from lineitem
|""".stripMargin)

val ret = spark
val df = spark
.sql(s"""
|select count(*) from lineitem_mergetree_set where l_orderkey = '600000'
|""".stripMargin)
.collect()

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
val ret = df.collect()
assert(ret.apply(0).get(0) == 2)
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)

val directory = new File(s"$basePath/lineitem_mergetree_set")
// find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
Expand Down Expand Up @@ -222,12 +246,20 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs
| select * from lineitem
|""".stripMargin)

val ret = spark
val df = spark
.sql(s"""
|select count(*) from lineitem_mergetree_minmax2 where l_receiptdate = '1998-12-27'
|""".stripMargin)
.collect()

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
val ret = df.collect()
assert(ret.apply(0).get(0) == 1)
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)

val directory = new File(s"$basePath/lineitem_mergetree_minmax2")
// find a folder whose name is like 48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ MergeTreeRelParser::parseReadRel(
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions, storage_prewhere_info->prewhere_column_name);
source_step_with_filter->applyFilters();
}

query_context.custom_storage_merge_tree->wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree *>(read_step.get()), ranges);
steps.emplace_back(read_step.get());
query_plan->addStep(std::move(read_step));
Expand Down
3 changes: 3 additions & 0 deletions cpp-ch/local-engine/Parser/RelMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,12 @@ void RelMetric::serialize(Writer<StringBuffer> & writer, bool) const
if (auto read_mergetree = dynamic_cast<DB::ReadFromMergeTree*>(step))
{
auto selected_marks_pk = read_mergetree->getAnalysisResult().selected_marks_pk;
auto selected_marks = read_mergetree->getAnalysisResult().selected_marks;
auto total_marks_pk = read_mergetree->getAnalysisResult().total_marks_pk;
writer.Key("selected_marks_pk");
writer.Uint64(selected_marks_pk);
writer.Key("selected_marks");
writer.Uint64(selected_marks);
writer.Key("total_marks_pk");
writer.Uint64(total_marks_pk);
}
Expand Down

0 comments on commit 7717d58

Please sign in to comment.