Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writing with functions in distribute/partition expressions #253

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
fcdf0f0
Spark 3.4: Support distribute by any predefined transform
Yxang May 17, 2023
e52b714
Spark 3.4: add udf: years, days, hours, murmurHash2 and murmurHash3. …
Yxang May 18, 2023
ff243b5
Spark 3.4: Fixup sharding key needs to be mod by cluster weight on lo…
Yxang May 19, 2023
a1d4dce
Scala 2.13: Fix Spark 3.4 compile issue
Yxang May 19, 2023
5ddb98f
Spark 3.4: Optimize sharding key handling when shuffle and sort
Yxang May 22, 2023
000638e
Spark 3.4: Optimize sharding key handling when shuffle and sort, appr…
Yxang May 22, 2023
59f3bed
Spark 3.4: Support variable length arguments for murmurHash (up to 5 …
Yxang May 23, 2023
af14b3a
Spark 3.4: add CityHash64
Yxang May 24, 2023
22f191a
Spark 3.4: Optimize sharding key handling when shuffle and sort, appr…
Yxang May 26, 2023
ea5ed0e
Spark 3.4 UDF: Amend input type, Make clickhouse function nullable, b…
Yxang May 26, 2023
a8bdcbf
Spark 3.4: Optimize sharding key handling when shuffle and sort, amen…
Yxang May 30, 2023
3dcdd81
Spark 3.4: Change ExprUtils to implicit
Yxang Jun 2, 2023
386ddb0
Spark 3.4 UDF: clickhouse code reference using tag from commit hash
Yxang Jun 25, 2023
286c21f
Spark 3.4 UDF: support varargs for Hash UDFs
Yxang Jun 26, 2023
e5809f7
Spark 3.4: refactor implicit into normal arg in ExprUtils
Yxang Jun 27, 2023
5ae4f3d
Spark 3.4: Cast type when calling projection, support recursive resolve
Yxang Jun 27, 2023
088bf3d
Spark 3.4 UDF: change pmod to mod because positiveModulo does not exi…
Yxang Jul 14, 2023
85a025f
Docs: add comment for modulo UDF
Yxang Jul 14, 2023
4e201d6
Spark 3.4: Adapt to hash function under clickhouse-core
Yxang Jul 25, 2023
085b3ad
fix style
Yxang Jul 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,6 @@ case class ClusterSpec(
override def toString: String = s"cluster: $name, shards: [${shards.mkString(", ")}]"

@JsonIgnore @transient override lazy val nodes: Array[NodeSpec] = shards.sorted.flatMap(_.nodes)

def totalWeight: Int = shards.map(_.weight).sum
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.clickhouse.cluster

import org.apache.spark.sql.clickhouse.TestUtils.om
import xenon.clickhouse.func.{
ClickHouseXxHash64Shard,
CompositeFunctionRegistry,
DynamicFunctionRegistry,
StaticFunctionRegistry
}

import java.lang.{Long => JLong}

class ClickHouseClusterHashUDFSuite extends SparkClickHouseClusterTest {
// only for query function names
val dummyRegistry: CompositeFunctionRegistry = {
val dynamicFunctionRegistry = new DynamicFunctionRegistry
val xxHash64ShardFunc = new ClickHouseXxHash64Shard(Seq.empty)
dynamicFunctionRegistry.register("ck_xx_hash64_shard", xxHash64ShardFunc) // for compatible
dynamicFunctionRegistry.register("clickhouse_shard_xxHash64", xxHash64ShardFunc)
new CompositeFunctionRegistry(Array(StaticFunctionRegistry, dynamicFunctionRegistry))
}

def runTest(funcSparkName: String, funcCkName: String, stringVal: String): Unit = {
val sparkResult = spark.sql(
s"""SELECT
| $funcSparkName($stringVal) AS hash_value
|""".stripMargin
).collect
assert(sparkResult.length == 1)
val sparkHashVal = sparkResult.head.getAs[Long]("hash_value")

val clickhouseResultJsonStr = runClickHouseSQL(
s"""SELECT
| $funcCkName($stringVal) AS hash_value
|""".stripMargin
).head.getString(0)
val clickhouseResultJson = om.readTree(clickhouseResultJsonStr)
val clickhouseHashVal = JLong.parseUnsignedLong(clickhouseResultJson.get("hash_value").asText)
assert(
sparkHashVal == clickhouseHashVal,
s"ck_function: $funcCkName, spark_function: $funcSparkName, args: ($stringVal)"
)
}

Seq(
"clickhouse_xxHash64",
"clickhouse_murmurHash3_64",
"clickhouse_murmurHash3_32",
"clickhouse_murmurHash2_64",
"clickhouse_murmurHash2_32",
"clickhouse_cityHash64"
).foreach { funcSparkName =>
val funcCkName = dummyRegistry.getFuncMappingBySpark(funcSparkName)
test(s"UDF $funcSparkName") {
Seq(
"spark-clickhouse-connector",
"Apache Spark",
"ClickHouse",
"Yandex",
"热爱",
"在传统的行式数据库系统中,数据按如下顺序存储:",
"🇨🇳"
).foreach { rawStringVal =>
val stringVal = s"\'$rawStringVal\'"
runTest(funcSparkName, funcCkName, stringVal)
}
}
}

Seq(
"clickhouse_murmurHash3_64",
"clickhouse_murmurHash3_32",
"clickhouse_murmurHash2_64",
"clickhouse_murmurHash2_32",
"clickhouse_cityHash64"
).foreach { funcSparkName =>
val funcCkName = dummyRegistry.getFuncMappingBySpark(funcSparkName)
test(s"UDF $funcSparkName multiple args") {
val strings = Seq(
"\'spark-clickhouse-connector\'",
"\'Apache Spark\'",
"\'ClickHouse\'",
"\'Yandex\'",
"\'热爱\'",
"\'在传统的行式数据库系统中,数据按如下顺序存储:\'",
"\'🇨🇳\'"
)
val test_5 = strings.combinations(5)
test_5.foreach { seq =>
val stringVal = seq.mkString(", ")
runTest(funcSparkName, funcCkName, stringVal)
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.clickhouse.cluster

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

class ClusterShardByTransformSuite extends SparkClickHouseClusterTest {
override protected def sparkConf: SparkConf = {
val _conf = super.sparkConf
.set("spark.clickhouse.write.distributed.convertLocal", "true")
_conf
}

def runTest(func_name: String, func_args: Array[String]): Unit = {
val func_expr = s"$func_name(${func_args.mkString(",")})"
val cluster = "single_replica"
val db = s"db_${func_name}_shard_transform"
val tbl_dist = s"tbl_${func_name}_shard"
val tbl_local = s"${tbl_dist}_local"

try {
runClickHouseSQL(s"CREATE DATABASE IF NOT EXISTS $db ON CLUSTER $cluster")

spark.sql(
s"""CREATE TABLE $db.$tbl_local (
| create_time TIMESTAMP NOT NULL,
| create_date DATE NOT NULL,
| value STRING NOT NULL
|) USING ClickHouse
|TBLPROPERTIES (
| cluster = '$cluster',
| engine = 'MergeTree()',
| order_by = 'create_time'
|)
|""".stripMargin
)

runClickHouseSQL(
s"""CREATE TABLE $db.$tbl_dist ON CLUSTER $cluster
|AS $db.$tbl_local
|ENGINE = Distributed($cluster, '$db', '$tbl_local', $func_expr)
|""".stripMargin
)
spark.sql(
s"""INSERT INTO `$db`.`$tbl_dist`
|VALUES
| (timestamp'2021-01-01 10:10:10', date'2021-01-01', '1'),
| (timestamp'2022-02-02 11:10:10', date'2022-02-02', '2'),
| (timestamp'2023-03-03 12:10:10', date'2023-03-03', '3'),
| (timestamp'2024-04-04 13:10:10', date'2024-04-04', '4')
| AS tab(create_time, create_date, value)
|""".stripMargin
)
// check that data is indeed written
checkAnswer(
spark.table(s"$db.$tbl_dist").select("value").orderBy("create_time"),
Seq(Row("1"), Row("2"), Row("3"), Row("4"))
)

// check same data is sharded in the same server comparing native sharding
runClickHouseSQL(
s"""INSERT INTO `$db`.`$tbl_dist`
|VALUES
| (timestamp'2021-01-01 10:10:10', date'2021-01-01', '1'),
| (timestamp'2022-02-02 11:10:10', date'2022-02-02', '2'),
| (timestamp'2023-03-03 12:10:10', date'2023-03-03', '3'),
| (timestamp'2024-04-04 13:10:10', date'2024-04-04', '4')
|""".stripMargin
)
checkAnswer(
spark.table(s"$db.$tbl_local")
.groupBy("value").count().filter("count != 2"),
Seq.empty
)

} finally {
runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl_dist ON CLUSTER $cluster")
runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl_local ON CLUSTER $cluster")
runClickHouseSQL(s"DROP DATABASE IF EXISTS $db ON CLUSTER $cluster")
}
}

Seq(
// wait for SPARK-44180 to be fixed, then add implicit cast test cases
("toYear", Array("create_date")),
// ("toYear", Array("create_time")),
("toYYYYMM", Array("create_date")),
// ("toYYYYMM", Array("create_time")),
("toYYYYMMDD", Array("create_date")),
// ("toYYYYMMDD", Array("create_time")),
("toHour", Array("create_time")),
("xxHash64", Array("value")),
("murmurHash2_64", Array("value")),
("murmurHash2_32", Array("value")),
("murmurHash3_64", Array("value")),
("murmurHash3_32", Array("value")),
("cityHash64", Array("value")),
("modulo", Array("toYYYYMM(create_date)", "10"))
).foreach {
case (func_name: String, func_args: Array[String]) =>
test(s"shard by $func_name(${func_args.mkString(",")})")(runTest(func_name, func_args))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,8 @@ class WriteDistributionAndOrderingSuite extends SparkClickHouseSingleTest {
WRITE_REPARTITION_BY_PARTITION.key -> repartitionByPartition.toString,
WRITE_LOCAL_SORT_BY_KEY.key -> localSortByKey.toString
) {
if (!ignoreUnsupportedTransform && repartitionByPartition) {
intercept[AnalysisException](write())
} else {
write()
check()
}
write()
check()
}

Seq(true, false).foreach { ignoreUnsupportedTransform =>
Expand Down
Loading