From 83ffefa5601a4237c8722b8074d7b995c337883c Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sat, 27 Sep 2025 20:33:31 +0400 Subject: [PATCH 1/3] Add sha1 function impl --- native/core/src/execution/jni_api.rs | 2 ++ .../org/apache/comet/serde/QueryPlanSerde.scala | 3 ++- .../main/scala/org/apache/comet/serde/hash.scala | 15 ++++++++++++++- .../org/apache/comet/CometExpressionSuite.scala | 6 ++++-- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 1f9a4263f2..e6a447bf08 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -65,6 +65,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::time::{Duration, Instant}; use std::{sync::Arc, task::Poll}; +use datafusion_spark::function::hash::sha1::SparkSha1; use tokio::runtime::Runtime; use crate::execution::memory_pools::{ @@ -303,6 +304,7 @@ fn prepare_datafusion_session_context( session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha2::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(CharFunc::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitGet::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); // Must be the last one to override existing functions with the same name datafusion_comet_spark_expr::register_all_comet_functions(&mut session_ctx)?; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 892d8bca63..8a69957e42 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -154,7 +154,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Md5] -> CometScalarFunction("md5"), classOf[Murmur3Hash] -> CometMurmur3Hash, classOf[Sha2] -> CometSha2, - classOf[XxHash64] -> CometXxHash64) + classOf[XxHash64] -> CometXxHash64, + classOf[Sha1] -> CometSha1) private val stringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Ascii] -> CometScalarFunction("ascii"), diff --git a/spark/src/main/scala/org/apache/comet/serde/hash.scala b/spark/src/main/scala/org/apache/comet/serde/hash.scala index 5c45a25936..523095011f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/hash.scala +++ b/spark/src/main/scala/org/apache/comet/serde/hash.scala @@ -19,7 +19,7 @@ package org.apache.comet.serde -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Murmur3Hash, Sha2, XxHash64} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Murmur3Hash, Sha1, Sha2, XxHash64} import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType, StringType} import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -85,6 +85,19 @@ object CometSha2 extends CometExpressionSerde[Sha2] { } } +object CometSha1 extends CometExpressionSerde[Sha1] { + override def convert( + expr: Sha1, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + if (!HashUtils.isSupportedType(expr)) { + return None + } + val childExpr = exprToProtoInternal(expr.child, inputs, binding) + scalarFunctionExprToProtoWithReturnType("sha1", StringType, childExpr) + } +} + private object HashUtils { def isSupportedType(expr: Expression): Boolean = { for (child <- expr.children) { diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index daf0e45cc8..586a581073 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2004,7 +2004,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { |md5(col), md5(cast(a as string)), md5(cast(b as string)), |hash(col), hash(col, 1), hash(col, 0), hash(col, a, b), hash(b, a, col), |xxhash64(col), xxhash64(col, 1), xxhash64(col, 0), xxhash64(col, a, b), xxhash64(b, a, col), - |sha2(col, 0), sha2(col, 256), sha2(col, 224), sha2(col, 384), sha2(col, 512), sha2(col, 128), sha2(col, -1) + |sha2(col, 0), sha2(col, 256), sha2(col, 224), sha2(col, 384), sha2(col, 512), sha2(col, 128), sha2(col, -1), + |sha1(col), sha1(cast(a as string)), sha1(cast(b as string)), |from test |""".stripMargin) } @@ -2116,7 +2117,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { |md5(col), md5(cast(a as string)), --md5(cast(b as string)), |hash(col), hash(col, 1), hash(col, 0), hash(col, a, b), hash(b, a, col), |xxhash64(col), xxhash64(col, 1), xxhash64(col, 0), xxhash64(col, a, b), xxhash64(b, a, col), - |sha2(col, 0), sha2(col, 256), sha2(col, 224), sha2(col, 384), sha2(col, 512), sha2(col, 128), sha2(col, -1) + |sha2(col, 0), sha2(col, 256), sha2(col, 224), sha2(col, 384), sha2(col, 512), sha2(col, 128), sha2(col, -1), + |sha1(col), sha1(cast(a as string)), sha1(cast(b as string)) |from test |""".stripMargin) } From 46325a45f5b6d5a2fe6695f77523a9afe4460928 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 30 Sep 2025 14:24:41 +0400 Subject: [PATCH 2/3] Fix format and tests --- native/core/src/execution/jni_api.rs | 2 +- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index e6a447bf08..4c59cef6f6 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -41,6 +41,7 @@ use datafusion::{ }; use datafusion_comet_proto::spark_operator::Operator; use datafusion_spark::function::bitwise::bit_get::SparkBitGet; +use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; @@ -65,7 +66,6 @@ use std::collections::HashMap; use std::path::PathBuf; use std::time::{Duration, Instant}; use std::{sync::Arc, task::Poll}; -use datafusion_spark::function::hash::sha1::SparkSha1; use tokio::runtime::Runtime; use crate::execution::memory_pools::{ diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 586a581073..2098e126d8 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2005,7 +2005,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { |hash(col), hash(col, 1), hash(col, 0), hash(col, a, b), hash(b, a, col), |xxhash64(col), xxhash64(col, 1), xxhash64(col, 0), xxhash64(col, a, b), xxhash64(b, a, col), |sha2(col, 0), sha2(col, 256), sha2(col, 224), sha2(col, 384), sha2(col, 512), sha2(col, 128), sha2(col, -1), - |sha1(col), sha1(cast(a as string)), sha1(cast(b as string)), + |sha1(col), sha1(cast(a as string)), sha1(cast(b as string)) |from test |""".stripMargin) } @@ -2118,7 +2118,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { |hash(col), hash(col, 1), hash(col, 0), hash(col, a, b), hash(b, a, col), |xxhash64(col), xxhash64(col, 1), xxhash64(col, 0), xxhash64(col, a, b), xxhash64(b, a, col), |sha2(col, 0), sha2(col, 256), sha2(col, 224), sha2(col, 384), sha2(col, 512), sha2(col, 128), sha2(col, -1), - |sha1(col), sha1(cast(a as string)), sha1(cast(b as string)) + |sha1(col), sha1(cast(a as string)) |from test |""".stripMargin) } From 0d747d2c84874dbe384f99afd42e35bb06253058 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Mon, 6 Oct 2025 19:48:09 +0400 Subject: [PATCH 3/3] resolve conflicts --- native/core/src/execution/jni_api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 967d6e7fcf..2f8ba01b32 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -41,9 +41,9 @@ use datafusion::{ }; use datafusion_comet_proto::spark_operator::Operator; use datafusion_spark::function::bitwise::bit_get::SparkBitGet; -use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; +use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc;