Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/gaoyangxiaozhu/gluten into …
Browse files Browse the repository at this point in the history
…gayangya/url_decode_spark_function
  • Loading branch information
gaoyangxiaozhu committed Mar 21, 2024
2 parents 31dd2af + e317e4d commit e8619af
Show file tree
Hide file tree
Showing 136 changed files with 5,579 additions and 970 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/code_style.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Run clang-format style check for C/C++ programs.
uses: jidicula/clang-format-action@v3.5.1
uses: jidicula/clang-format-action@v4.11.0
with:
clang-format-version: '12'
clang-format-version: '15'
check-path: ${{ matrix.path['check'] }}
fallback-style: 'Google' # optional

Expand Down
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,18 @@ Welcome to contribute to Gluten project! See [contributing guide](CONTRIBUTING.m

## 4.1 Community

You can join a Wechat group (Chinese) or a Spark channel in Velox Slack group (English) for community communication. Contact us if you want.
Glunten successfully joined Apache Incubator since March'24. We welcome developers and users who are interested in Gluten project. Here are several ways to contat us:
### Gluten website
https://gluten.apache.org/

### Mailing lists
For any technical questions, please subscribe/email to [email protected]

### Wechat group
We also have a Wechat group (in Chinese) which maybe more firendly for PRC developers/users. Due to the limitation of wechat group, please contat with weitingchen at apache.org or zhangzc at apache.org to be invited to the group.

### Slack channel
There's also a Spark channel in Velox Slack group (in English) for community communication for Velox backend. Please check Velox document here: https://github.com/facebookincubator/velox?tab=readme-ov-file#community

## 4.2 Issue Report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package io.glutenproject.metrics;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

public class MetricsStep {
Expand All @@ -24,6 +26,12 @@ public class MetricsStep {
protected String description;
protected List<MetricsProcessor> processors;

@JsonProperty("total_marks_pk")
protected long totalMarksPk;

@JsonProperty("selected_marks_pk")
protected long selectedMarksPk;

public String getName() {
return name;
}
Expand All @@ -47,4 +55,20 @@ public List<MetricsProcessor> getProcessors() {
public void setProcessors(List<MetricsProcessor> processors) {
this.processors = processors;
}

public void setTotalMarksPk(long totalMarksPk) {
this.totalMarksPk = totalMarksPk;
}

public void setSelectedMarksPk(long selectedMarksPk) {
this.selectedMarksPk = selectedMarksPk;
}

public long getTotalMarksPk() {
return totalMarksPk;
}

public long getSelectedMarksPk() {
return selectedMarksPk;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
"pruningTime" ->
SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time")
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected marks primary"),
"totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks primary")
)

override def genFileSourceScanTransformerMetricsUpdater(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.glutenproject.backendsapi.clickhouse

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import io.glutenproject.exception.GlutenNotSupportException
import io.glutenproject.execution._
import io.glutenproject.expression._
import io.glutenproject.expression.ConverterUtils.FunctionConfig
Expand Down Expand Up @@ -321,7 +322,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
left: SparkPlan,
right: SparkPlan,
condition: Option[Expression]): CartesianProductExecTransformer =
throw new UnsupportedOperationException(
throw new GlutenNotSupportException(
"CartesianProductExecTransformer is not supported in ch backend.")

override def genBroadcastNestedLoopJoinExecTransformer(
Expand All @@ -330,7 +331,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
buildSide: BuildSide,
joinType: JoinType,
condition: Option[Expression]): BroadcastNestedLoopJoinExecTransformer =
throw new UnsupportedOperationException(
throw new GlutenNotSupportException(
"BroadcastNestedLoopJoinExecTransformer is not supported in ch backend.")

/** Generate an expression transformer to transform GetMapValue to Substrait. */
Expand Down Expand Up @@ -457,7 +458,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
case union: ColumnarUnionExec =>
wrapChild(union)
case other =>
throw new UnsupportedOperationException(
throw new GlutenNotSupportException(
s"Not supported operator ${other.nodeName} for BroadcastRelation")
}
(newChild, (child.output ++ appendedProjections).map(_.toAttribute), preProjectionBuildKeys)
Expand Down Expand Up @@ -580,14 +581,21 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHPosExplodeTransformer(substraitExprName, child, original, attributeSeq)
}

override def genRegexpReplaceTransformer(
substraitExprName: String,
children: Seq[ExpressionTransformer],
expr: RegExpReplace): ExpressionTransformer = {
CHRegExpReplaceTransformer(substraitExprName, children, expr)
}

override def createColumnarWriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec): WriteFilesExec = {
throw new UnsupportedOperationException("ColumnarWriteFilesExec is not support in ch backend.")
throw new GlutenNotSupportException("ColumnarWriteFilesExec is not support in ch backend.")
}

/**
Expand Down Expand Up @@ -633,7 +641,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
val aggregateFunc = aggExpression.aggregateFunction
val substraitAggFuncName = ExpressionMappings.expressionsMap.get(aggregateFunc.getClass)
if (substraitAggFuncName.isEmpty) {
throw new UnsupportedOperationException(s"Not currently supported: $aggregateFunc.")
throw new GlutenNotSupportException(s"Not currently supported: $aggregateFunc.")
}

val childrenNodeList = new JArrayList[ExpressionNode]()
Expand Down Expand Up @@ -704,7 +712,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
)
windowExpressionNodes.add(windowFunctionNode)
case _ =>
throw new UnsupportedOperationException(
throw new GlutenNotSupportException(
"unsupported window function type: " +
wExpression.windowFunction)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.glutenproject.execution

import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.exception.GlutenNotSupportException
import io.glutenproject.execution.CHHashAggregateExecTransformer.getAggregateResultAttributes
import io.glutenproject.expression._
import io.glutenproject.substrait.`type`.{TypeBuilder, TypeNode}
Expand Down Expand Up @@ -224,7 +225,7 @@ case class CHHashAggregateExecTransformer(
// so far, it only happens in a three-stage count distinct case
// e.g. select sum(a), count(distinct b) from f
if (!child.isInstanceOf[BaseAggregateExec]) {
throw new UnsupportedOperationException(
throw new GlutenNotSupportException(
"PartialMerge's child not being HashAggregateExecBaseTransformer" +
" is unsupported yet")
}
Expand All @@ -242,7 +243,7 @@ case class CHHashAggregateExecTransformer(
.replaceWithExpressionTransformer(aggExpr.resultAttribute, originalInputAttributes)
.doTransform(args))
case other =>
throw new UnsupportedOperationException(s"$other not supported.")
throw new GlutenNotSupportException(s"$other not supported.")
}
for (node <- childrenNodes) {
childrenNodeList.add(node)
Expand Down Expand Up @@ -457,7 +458,7 @@ case class CHHashAggregateExecPullOutHelper(
resIndex += 1
resIndex
case other =>
throw new UnsupportedOperationException(s"Unsupported aggregate mode: $other.")
throw new GlutenNotSupportException(s"Unsupported aggregate mode: $other.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.glutenproject.expression

import io.glutenproject.backendsapi.clickhouse.CHBackendSettings
import io.glutenproject.exception.GlutenNotSupportException
import io.glutenproject.expression.ConverterUtils.FunctionConfig
import io.glutenproject.substrait.expression._

Expand Down Expand Up @@ -55,13 +56,12 @@ case class CHTruncTimestampTransformer(
override def doTransform(args: java.lang.Object): ExpressionNode = {
// The format must be constant string in the function date_trunc of ch.
if (!original.format.foldable) {
throw new UnsupportedOperationException(
s"The format ${original.format} must be constant string.")
throw new GlutenNotSupportException(s"The format ${original.format} must be constant string.")
}

val formatStr = original.format.eval().asInstanceOf[UTF8String]
if (formatStr == null) {
throw new UnsupportedOperationException("The format is null.")
throw new GlutenNotSupportException("The format is null.")
}

val (newFormatStr, timeZoneIgnore) = formatStr.toString.toLowerCase(Locale.ROOT) match {
Expand All @@ -76,7 +76,7 @@ case class CHTruncTimestampTransformer(
// Can not support now.
// case "microsecond" => "microsecond"
// case "millisecond" => "millisecond"
case _ => throw new UnsupportedOperationException(s"The format $formatStr is invalidate.")
case _ => throw new GlutenNotSupportException(s"The format $formatStr is invalidate.")
}

// Currently, data_trunc function can not support to set the specified timezone,
Expand All @@ -88,7 +88,7 @@ case class CHTruncTimestampTransformer(
s"${CHBackendSettings.getBackendConfigPrefix}.runtime_config.timezone")
)
) {
throw new UnsupportedOperationException(
throw new GlutenNotSupportException(
s"It doesn't support trunc the format $newFormatStr with the specified timezone " +
s"${timeZoneId.get}.")
}
Expand Down Expand Up @@ -136,13 +136,13 @@ case class CHStringTranslateTransformer(
!matchingNode.isInstanceOf[StringLiteralNode] ||
!replaceNode.isInstanceOf[StringLiteralNode]
) {
throw new UnsupportedOperationException(s"$original not supported yet.")
throw new GlutenNotSupportException(s"$original not supported yet.")
}

val matchingLiteral = matchingNode.asInstanceOf[StringLiteralNode].getValue
val replaceLiteral = replaceNode.asInstanceOf[StringLiteralNode].getValue
if (matchingLiteral.length() != replaceLiteral.length()) {
throw new UnsupportedOperationException(s"$original not supported yet.")
throw new GlutenNotSupportException(s"$original not supported yet.")
}

GenericExpressionTransformer(
Expand Down Expand Up @@ -193,7 +193,32 @@ case class CHPosExplodeTransformer(
Lists.newArrayList(childNode),
ConverterUtils.getTypeNode(structType, false))
case _ =>
throw new UnsupportedOperationException(s"posexplode($childType) not supported yet.")
throw new GlutenNotSupportException(s"posexplode($childType) not supported yet.")
}
}
}

case class CHRegExpReplaceTransformer(
substraitExprName: String,
children: Seq[ExpressionTransformer],
original: RegExpReplace)
extends ExpressionTransformer {

override def doTransform(args: java.lang.Object): ExpressionNode = {
// In CH: replaceRegexpAll(subject, regexp, rep), which is equivalent
// In Spark: regexp_replace(subject, regexp, rep, pos=1)
val posNode = children(3).doTransform(args)
if (
!posNode.isInstanceOf[IntLiteralNode] ||
posNode.asInstanceOf[IntLiteralNode].getValue != 1
) {
throw new UnsupportedOperationException(s"$original not supported yet.")
}

GenericExpressionTransformer(
substraitExprName,
Seq(children(0), children(1), children(2)),
original)
.doTransform(args)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
val extraTime: SQLMetric = metrics("extraTime")
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
val outputWaitTime: SQLMetric = metrics("outputWaitTime")
val selected_marks_pk: SQLMetric = metrics("selectedMarksPk")
val total_marks_pk: SQLMetric = metrics("totalMarksPk")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
// inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
Expand All @@ -51,6 +53,12 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
outputWaitTime += (metricsData.outputWaitTime / 1000L).toLong
outputVectors += metricsData.outputVectors

metricsData.getSteps.forEach(
step => {
selected_marks_pk += step.selectedMarksPk
total_marks_pk += step.totalMarksPk
})

MetricsUtil.updateExtraTimeMetric(
metricsData,
extraTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class GlutenClickHouseHiveTableSuite
.set("spark.sql.files.minPartitionNum", "1")
.set("spark.gluten.sql.columnar.columnartorow", "true")
.set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set("spark.gluten.sql.columnar.iterator", "true")
.set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
.set("spark.gluten.sql.enable.native.validation", "false")
Expand Down Expand Up @@ -1237,4 +1237,19 @@ class GlutenClickHouseHiveTableSuite
}
}
}

test("GLUTEN-3452: Bug fix decimal divide") {
withSQLConf((SQLConf.DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key, "false")) {
val table_create_sql =
"""
| create table test_tbl_3452(d1 decimal(12,2), d2 decimal(15,3)) stored as parquet;
|""".stripMargin
val data_insert_sql = "insert into test_tbl_3452 values(13.0, 0),(11, NULL), (12.3, 200)"
val select_sql = "select d1/d2, d1/0, d1/cast(0 as decimal) from test_tbl_3452"
spark.sql(table_create_sql);
spark.sql(data_insert_sql)
compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
spark.sql("drop table test_tbl_3452")
}
}
}
Loading

0 comments on commit e8619af

Please sign in to comment.