diff --git a/README.md b/README.md index 5fd98d42..7c573fe0 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Notes: extends the range of supported versions of ClickHouse Server. 2. Since 0.6.0, HTTP becomes the default protocol. 3. Since 0.7.0, gRPC is deprecated and not recommended, it may be removed in the future. +4. Since 0.8.0, gRPC is removed. ## Compatible Matrix @@ -62,16 +63,6 @@ Run single test `./gradlew test --tests=ConvertDistToLocalWriteSuite` -### ARM Platform +Test against custom ClickHouse image -For developers/users who use ARM platform, e.g. [Apple Silicon](https://developer.apple.com/documentation/apple-silicon) -chips, [Kunpeng](https://www.hikunpeng.com/) chips, you may not able to run TPC-DS integrations test using gRPC in local directly, -because [ClickHouse does not provide gRPC support in official ARM image](https://github.com/ClickHouse/ClickHouse/pull/36754). - -As a workaround, you can set the environment variable `CLICKHOUSE_IMAGE` to use a custom image which supports gRPC -on ARM platform for testing. - -``` -export CLICKHOUSE_IMAGE=pan3793/clickhouse-server:22.5.1-alpine-arm-grpc -./gradlew clean test -``` +`CLICKHOUSE_IMAGE=custom-org/clickhouse-server:custom-tag ./gradlew test` diff --git a/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala b/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala index e98b3161..454312df 100644 --- a/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala +++ b/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala @@ -31,8 +31,7 @@ case class NodeSpec( @JsonIgnore private val _host: String, @JsonIgnore private val _http_port: Option[Int] = None, @JsonIgnore private val _tcp_port: Option[Int] = None, - @JsonIgnore private val _grpc_port: Option[Int] = None, - @JsonProperty("protocol") protocol: ClickHouseProtocol = GRPC, + @JsonProperty("protocol") protocol: ClickHouseProtocol = HTTP, @JsonProperty("username") username: String = "default", @JsonProperty("password") password: String = "", @JsonProperty("database") database: String = "default", @@ -41,7 +40,6 @@ case class NodeSpec( @JsonProperty("host") def host: String = findHost(_host) @JsonProperty("http_port") def http_port: Option[Int] = findPort(_http_port) @JsonProperty("tcp_port") def tcp_port: Option[Int] = findPort(_tcp_port) - @JsonProperty("grpc_port") def grpc_port: Option[Int] = findPort(_grpc_port) private def findHost(source: String): String = if (isTesting) { @@ -56,7 +54,6 @@ case class NodeSpec( } else source def port: Int = protocol match { - case GRPC => grpc_port.get case HTTP => http_port.get case TCP => tcp_port.get case unsupported => throw new IllegalArgumentException(s"Unsupported protocol: $unsupported") diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/config.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/config.xml index 2f253d94..555bbb30 100644 --- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/config.xml +++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/config.xml @@ -43,7 +43,6 @@ :: 8123 - 9100 9000 9009 @@ -51,22 +50,6 @@ 3600 60 - - false - - - gzip - - - medium - - - -1 - -1 - - false - - 128 100 diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-single/grpc_config.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-single/grpc_config.xml deleted file mode 100644 index 9d1eac62..00000000 --- a/clickhouse-core/src/testFixtures/conf/clickhouse-single/grpc_config.xml +++ /dev/null @@ -1,45 +0,0 @@ - - - - 9100 - - false - - - /path/to/ssl_cert_file - /path/to/ssl_key_file - - - false - - - /path/to/ssl_ca_cert_file - - - deflate - - - medium - - - -1 - -1 - - - true - - diff --git a/clickhouse-core/src/testFixtures/scala/xenon/clickhouse/base/ClickHouseClusterMixIn.scala b/clickhouse-core/src/testFixtures/scala/xenon/clickhouse/base/ClickHouseClusterMixIn.scala index 49c06a98..32425bd9 100644 --- a/clickhouse-core/src/testFixtures/scala/xenon/clickhouse/base/ClickHouseClusterMixIn.scala +++ b/clickhouse-core/src/testFixtures/scala/xenon/clickhouse/base/ClickHouseClusterMixIn.scala @@ -26,7 +26,6 @@ trait ClickHouseClusterMixIn extends AnyFunSuite with ForAllTestContainer { protected val ZOOKEEPER_CLIENT_PORT = 2181 protected val CLICKHOUSE_HTTP_PORT = 8123 - protected val CLICKHOUSE_GRPC_PORT = 9100 protected val CLICKHOUSE_TCP_PORT = 9000 protected val CLICKHOUSE_IMAGE: String = Utils.load( @@ -35,7 +34,6 @@ trait ClickHouseClusterMixIn extends AnyFunSuite with ForAllTestContainer { ) protected val clickhouseVersion: ClickHouseVersion = ClickHouseVersion.of(CLICKHOUSE_IMAGE.split(":").last) - protected val grpcEnabled: Boolean = clickhouseVersion.isNewerOrEqualTo("21.1.2.15") test("clickhouse cluster up") { assert(sys.props.get(s"${PREFIX}_HOST_clickhouse-s1r1").isDefined) @@ -53,23 +51,8 @@ trait ClickHouseClusterMixIn extends AnyFunSuite with ForAllTestContainer { assert(sys.props.get(s"${PREFIX}_HOST_clickhouse-s2r2").isDefined) assert(sys.props.get(s"${PREFIX}_HOST_clickhouse-s2r2_PORT_$CLICKHOUSE_HTTP_PORT").isDefined) assert(sys.props.get(s"${PREFIX}_HOST_clickhouse-s2r2_PORT_$CLICKHOUSE_TCP_PORT").isDefined) - - if (grpcEnabled) { - assert(sys.props.get(s"${PREFIX}_HOST_clickhouse-s1r1_PORT_$CLICKHOUSE_GRPC_PORT").isDefined) - assert(sys.props.get(s"${PREFIX}_HOST_clickhouse-s1r2_PORT_$CLICKHOUSE_GRPC_PORT").isDefined) - assert(sys.props.get(s"${PREFIX}_HOST_clickhouse-s2r1_PORT_$CLICKHOUSE_GRPC_PORT").isDefined) - assert(sys.props.get(s"${PREFIX}_HOST_clickhouse-s2r2_PORT_$CLICKHOUSE_GRPC_PORT").isDefined) - } } - // format: off - val grpcExposedServices: List[ExposedService] = if (grpcEnabled) - ExposedService("clickhouse-s1r2", CLICKHOUSE_GRPC_PORT) :: - ExposedService("clickhouse-s1r1", CLICKHOUSE_GRPC_PORT) :: - ExposedService("clickhouse-s2r1", CLICKHOUSE_GRPC_PORT) :: - ExposedService("clickhouse-s2r2", CLICKHOUSE_GRPC_PORT) :: Nil else Nil - // format: on - override val container: DockerComposeContainer = DockerComposeContainer.Def( composeFiles = new File(Utils.classpathResource("clickhouse-cluster/clickhouse-s2r2-compose.yml")), exposedServices = ExposedService("zookeeper", ZOOKEEPER_CLIENT_PORT) :: @@ -84,7 +67,7 @@ trait ClickHouseClusterMixIn extends AnyFunSuite with ForAllTestContainer { ExposedService("clickhouse-s2r1", CLICKHOUSE_TCP_PORT) :: // s2r2 ExposedService("clickhouse-s2r2", CLICKHOUSE_HTTP_PORT) :: - ExposedService("clickhouse-s2r2", CLICKHOUSE_TCP_PORT) :: grpcExposedServices, + ExposedService("clickhouse-s2r2", CLICKHOUSE_TCP_PORT) :: Nil, env = Map("CLICKHOUSE_IMAGE" -> CLICKHOUSE_IMAGE) ).createContainer() @@ -92,22 +75,18 @@ trait ClickHouseClusterMixIn extends AnyFunSuite with ForAllTestContainer { // s1r1 def clickhouse_s1r1_host: String = container.getServiceHost("clickhouse-s1r1", CLICKHOUSE_HTTP_PORT) def clickhouse_s1r1_http_port: Int = container.getServicePort("clickhouse-s1r1", CLICKHOUSE_HTTP_PORT) - def clickhouse_s1r1_grpc_port: Int = container.getServicePort("clickhouse-s1r1", CLICKHOUSE_GRPC_PORT) def clickhouse_s1r1_tcp_port: Int = container.getServicePort("clickhouse-s1r1", CLICKHOUSE_TCP_PORT) // s1r2 def clickhouse_s1r2_host: String = container.getServiceHost("clickhouse-s1r2", CLICKHOUSE_HTTP_PORT) def clickhouse_s1r2_http_port: Int = container.getServicePort("clickhouse-s1r2", CLICKHOUSE_HTTP_PORT) - def clickhouse_s1r2_grpc_port: Int = container.getServicePort("clickhouse-s1r2", CLICKHOUSE_GRPC_PORT) def clickhouse_s1r2_tcp_port: Int = container.getServicePort("clickhouse-s1r2", CLICKHOUSE_TCP_PORT) // s2r1 def clickhouse_s2r1_host: String = container.getServiceHost("clickhouse-s2r1", CLICKHOUSE_HTTP_PORT) def clickhouse_s2r1_http_port: Int = container.getServicePort("clickhouse-s2r1", CLICKHOUSE_HTTP_PORT) - def clickhouse_s2r1_grpc_port: Int = container.getServicePort("clickhouse-s2r1", CLICKHOUSE_GRPC_PORT) def clickhouse_s2r1_tcp_port: Int = container.getServicePort("clickhouse-s2r1", CLICKHOUSE_TCP_PORT) // s2r2 def clickhouse_s2r2_host: String = container.getServiceHost("clickhouse-s2r2", CLICKHOUSE_HTTP_PORT) def clickhouse_s2r2_http_port: Int = container.getServicePort("clickhouse-s2r2", CLICKHOUSE_HTTP_PORT) - def clickhouse_s2r2_grpc_port: Int = container.getServicePort("clickhouse-s2r2", CLICKHOUSE_GRPC_PORT) def clickhouse_s2r2_tcp_port: Int = container.getServicePort("clickhouse-s2r2", CLICKHOUSE_TCP_PORT) // format: on @@ -129,12 +108,5 @@ trait ClickHouseClusterMixIn extends AnyFunSuite with ForAllTestContainer { sys.props += ((s"${PREFIX}_HOST_clickhouse-s2r2", clickhouse_s2r2_host)) sys.props += ((s"${PREFIX}_HOST_clickhouse-s2r2_PORT_$CLICKHOUSE_HTTP_PORT", clickhouse_s2r2_http_port.toString)) sys.props += ((s"${PREFIX}_HOST_clickhouse-s2r2_PORT_$CLICKHOUSE_TCP_PORT", clickhouse_s2r2_tcp_port.toString)) - // all grpc - if (grpcEnabled) { - sys.props += ((s"${PREFIX}_HOST_clickhouse-s1r1_PORT_$CLICKHOUSE_GRPC_PORT", clickhouse_s1r1_grpc_port.toString)) - sys.props += ((s"${PREFIX}_HOST_clickhouse-s1r2_PORT_$CLICKHOUSE_GRPC_PORT", clickhouse_s1r2_grpc_port.toString)) - sys.props += ((s"${PREFIX}_HOST_clickhouse-s2r1_PORT_$CLICKHOUSE_GRPC_PORT", clickhouse_s2r1_grpc_port.toString)) - sys.props += ((s"${PREFIX}_HOST_clickhouse-s2r2_PORT_$CLICKHOUSE_GRPC_PORT", clickhouse_s2r2_grpc_port.toString)) - } } } diff --git a/clickhouse-core/src/testFixtures/scala/xenon/clickhouse/base/ClickHouseSingleMixIn.scala b/clickhouse-core/src/testFixtures/scala/xenon/clickhouse/base/ClickHouseSingleMixIn.scala index 929e8932..38900864 100644 --- a/clickhouse-core/src/testFixtures/scala/xenon/clickhouse/base/ClickHouseSingleMixIn.scala +++ b/clickhouse-core/src/testFixtures/scala/xenon/clickhouse/base/ClickHouseSingleMixIn.scala @@ -31,12 +31,10 @@ trait ClickHouseSingleMixIn extends AnyFunSuite with ForAllTestContainer { val CLICKHOUSE_DB: String = Utils.load("CLICKHOUSE_DB", "") private val CLICKHOUSE_HTTP_PORT = 8123 - private val CLICKHOUSE_GRPC_PORT = 9100 private val CLICKHOUSE_TPC_PORT = 9000 // format: on protected val clickhouseVersion: ClickHouseVersion = ClickHouseVersion.of(CLICKHOUSE_IMAGE.split(":").last) - protected val grpcEnabled: Boolean = clickhouseVersion.isNewerOrEqualTo("21.1.2.15") protected val rootProjectDir: Path = { val thisClassURI = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI @@ -62,11 +60,7 @@ trait ClickHouseSingleMixIn extends AnyFunSuite with ForAllTestContainer { .withEnv("CLICKHOUSE_USER", CLICKHOUSE_USER) .withEnv("CLICKHOUSE_PASSWORD", CLICKHOUSE_PASSWORD) .withEnv("CLICKHOUSE_DB", CLICKHOUSE_DB) - .withExposedPorts(CLICKHOUSE_HTTP_PORT, CLICKHOUSE_GRPC_PORT, CLICKHOUSE_TPC_PORT) - .withCopyFileToContainer( - MountableFile.forClasspathResource("clickhouse-single/grpc_config.xml"), - "/etc/clickhouse-server/config.d/grpc_config.xml" - ) + .withExposedPorts(CLICKHOUSE_HTTP_PORT, CLICKHOUSE_TPC_PORT) .withFileSystemBind(s"${sys.env("ROOT_PROJECT_DIR")}/log/clickhouse-server", "/var/log/clickhouse-server") .withCopyFileToContainer( MountableFile.forClasspathResource("clickhouse-single/users.xml"), @@ -77,7 +71,6 @@ trait ClickHouseSingleMixIn extends AnyFunSuite with ForAllTestContainer { // format: off def clickhouseHost: String = container.host def clickhouseHttpPort: Int = container.mappedPort(CLICKHOUSE_HTTP_PORT) - def clickhouseGrpcPort: Int = container.mappedPort(CLICKHOUSE_GRPC_PORT) def clickhouseTcpPort: Int = container.mappedPort(CLICKHOUSE_TPC_PORT) // format: on } diff --git a/deploy.gradle b/deploy.gradle index fc807ad1..a1f96b8a 100644 --- a/deploy.gradle +++ b/deploy.gradle @@ -45,7 +45,7 @@ subprojects { pom { name = "Spark ClickHouse Connector" url = "https://github.com/housepower/spark-clickhouse-connector" - description = "Spark ClickHouse Connector build on DataSourceV2 API and gRPC protocol." + description = "Spark ClickHouse Connector build on Apache Spark DataSourceV2 API." developers { developer { id = 'pan3793' diff --git a/docs/developers/01_build_and_test.md b/docs/developers/01_build_and_test.md index d431c70a..95e315d9 100644 --- a/docs/developers/01_build_and_test.md +++ b/docs/developers/01_build_and_test.md @@ -48,16 +48,6 @@ Run single test `./gradlew test --tests=ConvertDistToLocalWriteSuite` -### ARM Platform +Test against custom ClickHouse image -For developers/users who use ARM platform, e.g. [Apple Silicon](https://developer.apple.com/documentation/apple-silicon) -chips, [Kunpeng](https://www.hikunpeng.com/) chips, you may not able to run TPC-DS integrations test using gRPC in local directly, -because [ClickHouse does not provide gRPC support in official ARM image](https://github.com/ClickHouse/ClickHouse/pull/36754). - -As a workaround, you can set the environment variable `CLICKHOUSE_IMAGE` to use a custom image which supports gRPC -on ARM platform for testing. - -``` -export CLICKHOUSE_IMAGE=pan3793/clickhouse-server:22.5.1-alpine-arm-grpc -./gradlew clean test -``` +`CLICKHOUSE_IMAGE=custom-org/clickhouse-server:custom-tag ./gradlew test` diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala index df11b03e..c13cebd2 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala @@ -23,77 +23,65 @@ trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn { import testImplicits._ - override protected def sparkConf: SparkConf = { - val _conf = super.sparkConf - .setMaster("local[4]") - .setAppName("spark-clickhouse-cluster-ut") - .set("spark.sql.shuffle.partitions", "4") - // catalog - .set("spark.sql.defaultCatalog", "clickhouse_s1r1") - .set("spark.sql.catalog.clickhouse_s1r1", "xenon.clickhouse.ClickHouseCatalog") - .set("spark.sql.catalog.clickhouse_s1r1.host", clickhouse_s1r1_host) - .set("spark.sql.catalog.clickhouse_s1r1.http_port", clickhouse_s1r1_http_port.toString) - .set("spark.sql.catalog.clickhouse_s1r1.protocol", "http") - .set("spark.sql.catalog.clickhouse_s1r1.user", "default") - .set("spark.sql.catalog.clickhouse_s1r1.password", "") - .set("spark.sql.catalog.clickhouse_s1r1.database", "default") - .set("spark.sql.catalog.clickhouse_s1r1.option.async", "false") - .set("spark.sql.catalog.clickhouse_s1r2", "xenon.clickhouse.ClickHouseCatalog") - .set("spark.sql.catalog.clickhouse_s1r2.host", clickhouse_s1r2_host) - .set("spark.sql.catalog.clickhouse_s1r2.http_port", clickhouse_s1r2_http_port.toString) - .set("spark.sql.catalog.clickhouse_s1r2.protocol", "http") - .set("spark.sql.catalog.clickhouse_s1r2.user", "default") - .set("spark.sql.catalog.clickhouse_s1r2.password", "") - .set("spark.sql.catalog.clickhouse_s1r2.database", "default") - .set("spark.sql.catalog.clickhouse_s1r2.option.async", "false") - .set("spark.sql.catalog.clickhouse_s2r1", "xenon.clickhouse.ClickHouseCatalog") - .set("spark.sql.catalog.clickhouse_s2r1.host", clickhouse_s2r1_host) - .set("spark.sql.catalog.clickhouse_s2r1.http_port", clickhouse_s2r1_http_port.toString) - .set("spark.sql.catalog.clickhouse_s2r1.protocol", "http") - .set("spark.sql.catalog.clickhouse_s2r1.user", "default") - .set("spark.sql.catalog.clickhouse_s2r1.password", "") - .set("spark.sql.catalog.clickhouse_s2r1.database", "default") - .set("spark.sql.catalog.clickhouse_s2r1.option.async", "false") - .set("spark.sql.catalog.clickhouse_s2r2", "xenon.clickhouse.ClickHouseCatalog") - .set("spark.sql.catalog.clickhouse_s2r2.host", clickhouse_s2r2_host) - .set("spark.sql.catalog.clickhouse_s2r2.http_port", clickhouse_s2r2_http_port.toString) - .set("spark.sql.catalog.clickhouse_s2r2.protocol", "http") - .set("spark.sql.catalog.clickhouse_s2r2.user", "default") - .set("spark.sql.catalog.clickhouse_s2r2.password", "") - .set("spark.sql.catalog.clickhouse_s2r2.database", "default") - .set("spark.sql.catalog.clickhouse_s2r2.option.async", "false") - // extended configurations - .set("spark.clickhouse.write.batchSize", "2") - .set("spark.clickhouse.write.maxRetry", "2") - .set("spark.clickhouse.write.retryInterval", "1") - .set("spark.clickhouse.write.retryableErrorCodes", "241") - .set("spark.clickhouse.write.write.repartitionNum", "0") - .set("spark.clickhouse.write.distributed.useClusterNodes", "true") - .set("spark.clickhouse.read.distributed.useClusterNodes", "false") - .set("spark.clickhouse.write.distributed.convertLocal", "false") - .set("spark.clickhouse.read.distributed.convertLocal", "true") - .set("spark.clickhouse.read.format", "binary") - .set("spark.clickhouse.write.format", "arrow") - if (grpcEnabled) { - _conf.set("spark.sql.catalog.clickhouse_s1r1.grpc_port", clickhouse_s1r1_grpc_port.toString) - .set("spark.sql.catalog.clickhouse_s1r2.grpc_port", clickhouse_s1r2_grpc_port.toString) - .set("spark.sql.catalog.clickhouse_s2r1.grpc_port", clickhouse_s2r1_grpc_port.toString) - .set("spark.sql.catalog.clickhouse_s2r2.grpc_port", clickhouse_s2r2_grpc_port.toString) - } - _conf - } + override protected def sparkConf: SparkConf = super.sparkConf + .setMaster("local[4]") + .setAppName("spark-clickhouse-cluster-ut") + .set("spark.sql.shuffle.partitions", "4") + // catalog + .set("spark.sql.defaultCatalog", "clickhouse_s1r1") + .set("spark.sql.catalog.clickhouse_s1r1", "xenon.clickhouse.ClickHouseCatalog") + .set("spark.sql.catalog.clickhouse_s1r1.host", clickhouse_s1r1_host) + .set("spark.sql.catalog.clickhouse_s1r1.http_port", clickhouse_s1r1_http_port.toString) + .set("spark.sql.catalog.clickhouse_s1r1.protocol", "http") + .set("spark.sql.catalog.clickhouse_s1r1.user", "default") + .set("spark.sql.catalog.clickhouse_s1r1.password", "") + .set("spark.sql.catalog.clickhouse_s1r1.database", "default") + .set("spark.sql.catalog.clickhouse_s1r1.option.async", "false") + .set("spark.sql.catalog.clickhouse_s1r2", "xenon.clickhouse.ClickHouseCatalog") + .set("spark.sql.catalog.clickhouse_s1r2.host", clickhouse_s1r2_host) + .set("spark.sql.catalog.clickhouse_s1r2.http_port", clickhouse_s1r2_http_port.toString) + .set("spark.sql.catalog.clickhouse_s1r2.protocol", "http") + .set("spark.sql.catalog.clickhouse_s1r2.user", "default") + .set("spark.sql.catalog.clickhouse_s1r2.password", "") + .set("spark.sql.catalog.clickhouse_s1r2.database", "default") + .set("spark.sql.catalog.clickhouse_s1r2.option.async", "false") + .set("spark.sql.catalog.clickhouse_s2r1", "xenon.clickhouse.ClickHouseCatalog") + .set("spark.sql.catalog.clickhouse_s2r1.host", clickhouse_s2r1_host) + .set("spark.sql.catalog.clickhouse_s2r1.http_port", clickhouse_s2r1_http_port.toString) + .set("spark.sql.catalog.clickhouse_s2r1.protocol", "http") + .set("spark.sql.catalog.clickhouse_s2r1.user", "default") + .set("spark.sql.catalog.clickhouse_s2r1.password", "") + .set("spark.sql.catalog.clickhouse_s2r1.database", "default") + .set("spark.sql.catalog.clickhouse_s2r1.option.async", "false") + .set("spark.sql.catalog.clickhouse_s2r2", "xenon.clickhouse.ClickHouseCatalog") + .set("spark.sql.catalog.clickhouse_s2r2.host", clickhouse_s2r2_host) + .set("spark.sql.catalog.clickhouse_s2r2.http_port", clickhouse_s2r2_http_port.toString) + .set("spark.sql.catalog.clickhouse_s2r2.protocol", "http") + .set("spark.sql.catalog.clickhouse_s2r2.user", "default") + .set("spark.sql.catalog.clickhouse_s2r2.password", "") + .set("spark.sql.catalog.clickhouse_s2r2.database", "default") + .set("spark.sql.catalog.clickhouse_s2r2.option.async", "false") + // extended configurations + .set("spark.clickhouse.write.batchSize", "2") + .set("spark.clickhouse.write.maxRetry", "2") + .set("spark.clickhouse.write.retryInterval", "1") + .set("spark.clickhouse.write.retryableErrorCodes", "241") + .set("spark.clickhouse.write.write.repartitionNum", "0") + .set("spark.clickhouse.write.distributed.useClusterNodes", "true") + .set("spark.clickhouse.read.distributed.useClusterNodes", "false") + .set("spark.clickhouse.write.distributed.convertLocal", "false") + .set("spark.clickhouse.read.distributed.convertLocal", "true") + .set("spark.clickhouse.read.format", "binary") + .set("spark.clickhouse.write.format", "arrow") - override def cmdRunnerOptions: Map[String, String] = { - val _options = Map( - "host" -> clickhouse_s1r1_host, - "http_port" -> clickhouse_s1r1_http_port.toString, - "protocol" -> "http", - "user" -> "default", - "password" -> "", - "database" -> "default" - ) - if (grpcEnabled) _options + ("grpc_port" -> clickhouse_s1r1_grpc_port.toString) else _options - } + override def cmdRunnerOptions: Map[String, String] = Map( + "host" -> clickhouse_s1r1_host, + "http_port" -> clickhouse_s1r1_http_port.toString, + "protocol" -> "http", + "user" -> "default", + "password" -> "", + "database" -> "default" + ) def autoCleanupDistTable( cluster: String, diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/TPCDSClusterSuite.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/TPCDSClusterSuite.scala index 2f730bca..6f5686fe 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/TPCDSClusterSuite.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/TPCDSClusterSuite.scala @@ -23,14 +23,15 @@ class TPCDSClusterSuite extends SparkClickHouseClusterTest { override protected def sparkConf: SparkConf = super.sparkConf .set("spark.sql.catalog.tpcds", "org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog") - .set("spark.sql.catalog.clickhouse_s1r1.protocol", if (grpcEnabled) "grpc" else "http") - .set("spark.sql.catalog.clickhouse_s1r2.protocol", if (grpcEnabled) "grpc" else "http") - .set("spark.sql.catalog.clickhouse_s2r1.protocol", if (grpcEnabled) "grpc" else "http") - .set("spark.sql.catalog.clickhouse_s2r2.protocol", if (grpcEnabled) "grpc" else "http") - .set("spark.clickhouse.read.compression.codec", if (grpcEnabled) "none" else "lz4") + .set("spark.sql.catalog.clickhouse_s1r1.protocol", "http") + .set("spark.sql.catalog.clickhouse_s1r2.protocol", "http") + .set("spark.sql.catalog.clickhouse_s2r1.protocol", "http") + .set("spark.sql.catalog.clickhouse_s2r2.protocol", "http") + .set("spark.clickhouse.read.compression.codec", "lz4") .set("spark.clickhouse.write.batchSize", "100000") - .set("spark.clickhouse.write.compression.codec", if (grpcEnabled) "none" else "lz4") + .set("spark.clickhouse.write.compression.codec", "lz4") .set("spark.clickhouse.write.distributed.convertLocal", "true") + .set("spark.clickhouse.write.format", "json") test("Cluster: TPC-DS sf1 write and count(*)") { withDatabase("tpcds_sf1_cluster") { diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala index 6264c21b..713c5797 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala @@ -24,46 +24,37 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn { import testImplicits._ - override protected def sparkConf: SparkConf = { - val _conf = super.sparkConf - .setMaster("local[2]") - .setAppName("spark-clickhouse-single-ut") - .set("spark.sql.shuffle.partitions", "2") - // catalog - .set("spark.sql.defaultCatalog", "clickhouse") - .set("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog") - .set("spark.sql.catalog.clickhouse.host", clickhouseHost) - .set("spark.sql.catalog.clickhouse.http_port", clickhouseHttpPort.toString) - .set("spark.sql.catalog.clickhouse.protocol", "http") - .set("spark.sql.catalog.clickhouse.user", CLICKHOUSE_USER) - .set("spark.sql.catalog.clickhouse.password", CLICKHOUSE_PASSWORD) - .set("spark.sql.catalog.clickhouse.database", CLICKHOUSE_DB) - .set("spark.sql.catalog.clickhouse.option.async", "false") - // extended configurations - .set("spark.clickhouse.write.batchSize", "2") - .set("spark.clickhouse.write.maxRetry", "2") - .set("spark.clickhouse.write.retryInterval", "1") - .set("spark.clickhouse.write.retryableErrorCodes", "241") - .set("spark.clickhouse.write.write.repartitionNum", "0") - .set("spark.clickhouse.read.format", "json") - .set("spark.clickhouse.write.format", "json") - if (grpcEnabled) { - _conf.set("spark.sql.catalog.clickhouse.grpc_port", clickhouseGrpcPort.toString) - } - _conf - } + override protected def sparkConf: SparkConf = super.sparkConf + .setMaster("local[2]") + .setAppName("spark-clickhouse-single-ut") + .set("spark.sql.shuffle.partitions", "2") + // catalog + .set("spark.sql.defaultCatalog", "clickhouse") + .set("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog") + .set("spark.sql.catalog.clickhouse.host", clickhouseHost) + .set("spark.sql.catalog.clickhouse.http_port", clickhouseHttpPort.toString) + .set("spark.sql.catalog.clickhouse.protocol", "http") + .set("spark.sql.catalog.clickhouse.user", CLICKHOUSE_USER) + .set("spark.sql.catalog.clickhouse.password", CLICKHOUSE_PASSWORD) + .set("spark.sql.catalog.clickhouse.database", CLICKHOUSE_DB) + .set("spark.sql.catalog.clickhouse.option.async", "false") + // extended configurations + .set("spark.clickhouse.write.batchSize", "2") + .set("spark.clickhouse.write.maxRetry", "2") + .set("spark.clickhouse.write.retryInterval", "1") + .set("spark.clickhouse.write.retryableErrorCodes", "241") + .set("spark.clickhouse.write.write.repartitionNum", "0") + .set("spark.clickhouse.read.format", "json") + .set("spark.clickhouse.write.format", "json") - override def cmdRunnerOptions: Map[String, String] = { - val _options = Map( - "host" -> clickhouseHost, - "http_port" -> clickhouseHttpPort.toString, - "protocol" -> "http", - "user" -> CLICKHOUSE_USER, - "password" -> CLICKHOUSE_PASSWORD, - "database" -> CLICKHOUSE_DB - ) - if (grpcEnabled) _options + ("grpc_port" -> clickhouseGrpcPort.toString) else _options - } + override def cmdRunnerOptions: Map[String, String] = Map( + "host" -> clickhouseHost, + "http_port" -> clickhouseHttpPort.toString, + "protocol" -> "http", + "user" -> CLICKHOUSE_USER, + "password" -> CLICKHOUSE_PASSWORD, + "database" -> CLICKHOUSE_DB + ) def withTable( db: String, diff --git a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/TPCDSSuite.scala b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/TPCDSSuite.scala index 094683c9..cda9793e 100644 --- a/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/TPCDSSuite.scala +++ b/spark-3.3/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/TPCDSSuite.scala @@ -23,7 +23,7 @@ class TPCDSSuite extends SparkClickHouseSingleTest { override protected def sparkConf: SparkConf = super.sparkConf .set("spark.sql.catalog.tpcds", "org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog") - .set("spark.sql.catalog.clickhouse.protocol", if (grpcEnabled) "grpc" else "http") + .set("spark.sql.catalog.clickhouse.protocol", "http") .set("spark.clickhouse.read.compression.codec", "none") .set("spark.clickhouse.write.batchSize", "100000") .set("spark.clickhouse.write.compression.codec", "none") diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala index f95dd083..5ff1bb68 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala @@ -65,9 +65,6 @@ class ClickHouseCatalog extends TableCatalog override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { this.catalogName = name this.nodeSpec = buildNodeSpec(options) - if (nodeSpec.protocol == ClickHouseProtocol.GRPC) { - log.warn("gPRC is deprecated and not recommended since v0.7.0, it may be removed in the future.") - } this.currentDb = nodeSpec.database this.nodeClient = NodeClient(nodeSpec) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseHelper.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseHelper.scala index 7815a158..c83c4591 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseHelper.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseHelper.scala @@ -59,7 +59,6 @@ trait ClickHouseHelper extends Logging { }.toMap NodeSpec( _host = options.getOrDefault(CATALOG_PROP_HOST, "localhost"), - _grpc_port = Some(options.getInt(CATALOG_PROP_GRPC_PORT, 9100)), _tcp_port = Some(options.getInt(CATALOG_PROP_TCP_PORT, 9000)), _http_port = Some(options.getInt(CATALOG_PROP_HTTP_PORT, 8123)), protocol = ClickHouseProtocol.fromUriScheme(options.getOrDefault(CATALOG_PROP_PROTOCOL, "http")), @@ -102,7 +101,6 @@ trait ClickHouseHelper extends Logging { // host_address is not works for testcontainers _host = row.get("host_name").asText, _tcp_port = Some(row.get("port").asInt), - _grpc_port = if (Utils.isTesting) Some(9100) else nodeSpec.grpc_port, _http_port = if (Utils.isTesting) Some(8123) else nodeSpec.http_port ) ReplicaSpec(replicaNum, clickhouseNode) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/Constants.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/Constants.scala index 0ccf8a66..384fba2c 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/Constants.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/Constants.scala @@ -22,7 +22,6 @@ object Constants { //////// clickhouse datasource catalog properties //////// ////////////////////////////////////////////////////////// final val CATALOG_PROP_HOST = "host" - final val CATALOG_PROP_GRPC_PORT = "grpc_port" final val CATALOG_PROP_TCP_PORT = "tcp_port" final val CATALOG_PROP_HTTP_PORT = "http_port" final val CATALOG_PROP_PROTOCOL = "protocol" diff --git a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala index d0fc7d4f..d18319e5 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala @@ -16,8 +16,6 @@ package xenon.clickhouse.write import com.clickhouse.client.ClickHouseProtocol import com.clickhouse.data.ClickHouseCompression -import net.jpountz.lz4.LZ4FrameOutputStream -import net.jpountz.lz4.LZ4FrameOutputStream.BLOCKSIZE import org.apache.commons.io.IOUtils import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, SafeProjection} import org.apache.spark.sql.catalyst.{expressions, InternalRow} @@ -156,8 +154,6 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) private def renewCompressedOutput(): Unit = { val compressedOutput = (codec, protocol) match { case (ClickHouseCompression.NONE, _) => observableSerializedOutput - case (ClickHouseCompression.LZ4, ClickHouseProtocol.GRPC) => - new LZ4FrameOutputStream(observableSerializedOutput, BLOCKSIZE.SIZE_4MB) case (ClickHouseCompression.LZ4, ClickHouseProtocol.HTTP) => // clickhouse http client forces compressed output stream // new Lz4OutputStream(observableSerializedOutput, 4 * 1024 * 1024, null) diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala index df11b03e..c13cebd2 100644 --- a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala @@ -23,77 +23,65 @@ trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn { import testImplicits._ - override protected def sparkConf: SparkConf = { - val _conf = super.sparkConf - .setMaster("local[4]") - .setAppName("spark-clickhouse-cluster-ut") - .set("spark.sql.shuffle.partitions", "4") - // catalog - .set("spark.sql.defaultCatalog", "clickhouse_s1r1") - .set("spark.sql.catalog.clickhouse_s1r1", "xenon.clickhouse.ClickHouseCatalog") - .set("spark.sql.catalog.clickhouse_s1r1.host", clickhouse_s1r1_host) - .set("spark.sql.catalog.clickhouse_s1r1.http_port", clickhouse_s1r1_http_port.toString) - .set("spark.sql.catalog.clickhouse_s1r1.protocol", "http") - .set("spark.sql.catalog.clickhouse_s1r1.user", "default") - .set("spark.sql.catalog.clickhouse_s1r1.password", "") - .set("spark.sql.catalog.clickhouse_s1r1.database", "default") - .set("spark.sql.catalog.clickhouse_s1r1.option.async", "false") - .set("spark.sql.catalog.clickhouse_s1r2", "xenon.clickhouse.ClickHouseCatalog") - .set("spark.sql.catalog.clickhouse_s1r2.host", clickhouse_s1r2_host) - .set("spark.sql.catalog.clickhouse_s1r2.http_port", clickhouse_s1r2_http_port.toString) - .set("spark.sql.catalog.clickhouse_s1r2.protocol", "http") - .set("spark.sql.catalog.clickhouse_s1r2.user", "default") - .set("spark.sql.catalog.clickhouse_s1r2.password", "") - .set("spark.sql.catalog.clickhouse_s1r2.database", "default") - .set("spark.sql.catalog.clickhouse_s1r2.option.async", "false") - .set("spark.sql.catalog.clickhouse_s2r1", "xenon.clickhouse.ClickHouseCatalog") - .set("spark.sql.catalog.clickhouse_s2r1.host", clickhouse_s2r1_host) - .set("spark.sql.catalog.clickhouse_s2r1.http_port", clickhouse_s2r1_http_port.toString) - .set("spark.sql.catalog.clickhouse_s2r1.protocol", "http") - .set("spark.sql.catalog.clickhouse_s2r1.user", "default") - .set("spark.sql.catalog.clickhouse_s2r1.password", "") - .set("spark.sql.catalog.clickhouse_s2r1.database", "default") - .set("spark.sql.catalog.clickhouse_s2r1.option.async", "false") - .set("spark.sql.catalog.clickhouse_s2r2", "xenon.clickhouse.ClickHouseCatalog") - .set("spark.sql.catalog.clickhouse_s2r2.host", clickhouse_s2r2_host) - .set("spark.sql.catalog.clickhouse_s2r2.http_port", clickhouse_s2r2_http_port.toString) - .set("spark.sql.catalog.clickhouse_s2r2.protocol", "http") - .set("spark.sql.catalog.clickhouse_s2r2.user", "default") - .set("spark.sql.catalog.clickhouse_s2r2.password", "") - .set("spark.sql.catalog.clickhouse_s2r2.database", "default") - .set("spark.sql.catalog.clickhouse_s2r2.option.async", "false") - // extended configurations - .set("spark.clickhouse.write.batchSize", "2") - .set("spark.clickhouse.write.maxRetry", "2") - .set("spark.clickhouse.write.retryInterval", "1") - .set("spark.clickhouse.write.retryableErrorCodes", "241") - .set("spark.clickhouse.write.write.repartitionNum", "0") - .set("spark.clickhouse.write.distributed.useClusterNodes", "true") - .set("spark.clickhouse.read.distributed.useClusterNodes", "false") - .set("spark.clickhouse.write.distributed.convertLocal", "false") - .set("spark.clickhouse.read.distributed.convertLocal", "true") - .set("spark.clickhouse.read.format", "binary") - .set("spark.clickhouse.write.format", "arrow") - if (grpcEnabled) { - _conf.set("spark.sql.catalog.clickhouse_s1r1.grpc_port", clickhouse_s1r1_grpc_port.toString) - .set("spark.sql.catalog.clickhouse_s1r2.grpc_port", clickhouse_s1r2_grpc_port.toString) - .set("spark.sql.catalog.clickhouse_s2r1.grpc_port", clickhouse_s2r1_grpc_port.toString) - .set("spark.sql.catalog.clickhouse_s2r2.grpc_port", clickhouse_s2r2_grpc_port.toString) - } - _conf - } + override protected def sparkConf: SparkConf = super.sparkConf + .setMaster("local[4]") + .setAppName("spark-clickhouse-cluster-ut") + .set("spark.sql.shuffle.partitions", "4") + // catalog + .set("spark.sql.defaultCatalog", "clickhouse_s1r1") + .set("spark.sql.catalog.clickhouse_s1r1", "xenon.clickhouse.ClickHouseCatalog") + .set("spark.sql.catalog.clickhouse_s1r1.host", clickhouse_s1r1_host) + .set("spark.sql.catalog.clickhouse_s1r1.http_port", clickhouse_s1r1_http_port.toString) + .set("spark.sql.catalog.clickhouse_s1r1.protocol", "http") + .set("spark.sql.catalog.clickhouse_s1r1.user", "default") + .set("spark.sql.catalog.clickhouse_s1r1.password", "") + .set("spark.sql.catalog.clickhouse_s1r1.database", "default") + .set("spark.sql.catalog.clickhouse_s1r1.option.async", "false") + .set("spark.sql.catalog.clickhouse_s1r2", "xenon.clickhouse.ClickHouseCatalog") + .set("spark.sql.catalog.clickhouse_s1r2.host", clickhouse_s1r2_host) + .set("spark.sql.catalog.clickhouse_s1r2.http_port", clickhouse_s1r2_http_port.toString) + .set("spark.sql.catalog.clickhouse_s1r2.protocol", "http") + .set("spark.sql.catalog.clickhouse_s1r2.user", "default") + .set("spark.sql.catalog.clickhouse_s1r2.password", "") + .set("spark.sql.catalog.clickhouse_s1r2.database", "default") + .set("spark.sql.catalog.clickhouse_s1r2.option.async", "false") + .set("spark.sql.catalog.clickhouse_s2r1", "xenon.clickhouse.ClickHouseCatalog") + .set("spark.sql.catalog.clickhouse_s2r1.host", clickhouse_s2r1_host) + .set("spark.sql.catalog.clickhouse_s2r1.http_port", clickhouse_s2r1_http_port.toString) + .set("spark.sql.catalog.clickhouse_s2r1.protocol", "http") + .set("spark.sql.catalog.clickhouse_s2r1.user", "default") + .set("spark.sql.catalog.clickhouse_s2r1.password", "") + .set("spark.sql.catalog.clickhouse_s2r1.database", "default") + .set("spark.sql.catalog.clickhouse_s2r1.option.async", "false") + .set("spark.sql.catalog.clickhouse_s2r2", "xenon.clickhouse.ClickHouseCatalog") + .set("spark.sql.catalog.clickhouse_s2r2.host", clickhouse_s2r2_host) + .set("spark.sql.catalog.clickhouse_s2r2.http_port", clickhouse_s2r2_http_port.toString) + .set("spark.sql.catalog.clickhouse_s2r2.protocol", "http") + .set("spark.sql.catalog.clickhouse_s2r2.user", "default") + .set("spark.sql.catalog.clickhouse_s2r2.password", "") + .set("spark.sql.catalog.clickhouse_s2r2.database", "default") + .set("spark.sql.catalog.clickhouse_s2r2.option.async", "false") + // extended configurations + .set("spark.clickhouse.write.batchSize", "2") + .set("spark.clickhouse.write.maxRetry", "2") + .set("spark.clickhouse.write.retryInterval", "1") + .set("spark.clickhouse.write.retryableErrorCodes", "241") + .set("spark.clickhouse.write.write.repartitionNum", "0") + .set("spark.clickhouse.write.distributed.useClusterNodes", "true") + .set("spark.clickhouse.read.distributed.useClusterNodes", "false") + .set("spark.clickhouse.write.distributed.convertLocal", "false") + .set("spark.clickhouse.read.distributed.convertLocal", "true") + .set("spark.clickhouse.read.format", "binary") + .set("spark.clickhouse.write.format", "arrow") - override def cmdRunnerOptions: Map[String, String] = { - val _options = Map( - "host" -> clickhouse_s1r1_host, - "http_port" -> clickhouse_s1r1_http_port.toString, - "protocol" -> "http", - "user" -> "default", - "password" -> "", - "database" -> "default" - ) - if (grpcEnabled) _options + ("grpc_port" -> clickhouse_s1r1_grpc_port.toString) else _options - } + override def cmdRunnerOptions: Map[String, String] = Map( + "host" -> clickhouse_s1r1_host, + "http_port" -> clickhouse_s1r1_http_port.toString, + "protocol" -> "http", + "user" -> "default", + "password" -> "", + "database" -> "default" + ) def autoCleanupDistTable( cluster: String, diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/TPCDSClusterSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/TPCDSClusterSuite.scala index 2f730bca..6f5686fe 100644 --- a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/TPCDSClusterSuite.scala +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/TPCDSClusterSuite.scala @@ -23,14 +23,15 @@ class TPCDSClusterSuite extends SparkClickHouseClusterTest { override protected def sparkConf: SparkConf = super.sparkConf .set("spark.sql.catalog.tpcds", "org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog") - .set("spark.sql.catalog.clickhouse_s1r1.protocol", if (grpcEnabled) "grpc" else "http") - .set("spark.sql.catalog.clickhouse_s1r2.protocol", if (grpcEnabled) "grpc" else "http") - .set("spark.sql.catalog.clickhouse_s2r1.protocol", if (grpcEnabled) "grpc" else "http") - .set("spark.sql.catalog.clickhouse_s2r2.protocol", if (grpcEnabled) "grpc" else "http") - .set("spark.clickhouse.read.compression.codec", if (grpcEnabled) "none" else "lz4") + .set("spark.sql.catalog.clickhouse_s1r1.protocol", "http") + .set("spark.sql.catalog.clickhouse_s1r2.protocol", "http") + .set("spark.sql.catalog.clickhouse_s2r1.protocol", "http") + .set("spark.sql.catalog.clickhouse_s2r2.protocol", "http") + .set("spark.clickhouse.read.compression.codec", "lz4") .set("spark.clickhouse.write.batchSize", "100000") - .set("spark.clickhouse.write.compression.codec", if (grpcEnabled) "none" else "lz4") + .set("spark.clickhouse.write.compression.codec", "lz4") .set("spark.clickhouse.write.distributed.convertLocal", "true") + .set("spark.clickhouse.write.format", "json") test("Cluster: TPC-DS sf1 write and count(*)") { withDatabase("tpcds_sf1_cluster") { diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala index 6264c21b..713c5797 100644 --- a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/SparkClickHouseSingleTest.scala @@ -24,46 +24,37 @@ trait SparkClickHouseSingleTest extends SparkTest with ClickHouseSingleMixIn { import testImplicits._ - override protected def sparkConf: SparkConf = { - val _conf = super.sparkConf - .setMaster("local[2]") - .setAppName("spark-clickhouse-single-ut") - .set("spark.sql.shuffle.partitions", "2") - // catalog - .set("spark.sql.defaultCatalog", "clickhouse") - .set("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog") - .set("spark.sql.catalog.clickhouse.host", clickhouseHost) - .set("spark.sql.catalog.clickhouse.http_port", clickhouseHttpPort.toString) - .set("spark.sql.catalog.clickhouse.protocol", "http") - .set("spark.sql.catalog.clickhouse.user", CLICKHOUSE_USER) - .set("spark.sql.catalog.clickhouse.password", CLICKHOUSE_PASSWORD) - .set("spark.sql.catalog.clickhouse.database", CLICKHOUSE_DB) - .set("spark.sql.catalog.clickhouse.option.async", "false") - // extended configurations - .set("spark.clickhouse.write.batchSize", "2") - .set("spark.clickhouse.write.maxRetry", "2") - .set("spark.clickhouse.write.retryInterval", "1") - .set("spark.clickhouse.write.retryableErrorCodes", "241") - .set("spark.clickhouse.write.write.repartitionNum", "0") - .set("spark.clickhouse.read.format", "json") - .set("spark.clickhouse.write.format", "json") - if (grpcEnabled) { - _conf.set("spark.sql.catalog.clickhouse.grpc_port", clickhouseGrpcPort.toString) - } - _conf - } + override protected def sparkConf: SparkConf = super.sparkConf + .setMaster("local[2]") + .setAppName("spark-clickhouse-single-ut") + .set("spark.sql.shuffle.partitions", "2") + // catalog + .set("spark.sql.defaultCatalog", "clickhouse") + .set("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog") + .set("spark.sql.catalog.clickhouse.host", clickhouseHost) + .set("spark.sql.catalog.clickhouse.http_port", clickhouseHttpPort.toString) + .set("spark.sql.catalog.clickhouse.protocol", "http") + .set("spark.sql.catalog.clickhouse.user", CLICKHOUSE_USER) + .set("spark.sql.catalog.clickhouse.password", CLICKHOUSE_PASSWORD) + .set("spark.sql.catalog.clickhouse.database", CLICKHOUSE_DB) + .set("spark.sql.catalog.clickhouse.option.async", "false") + // extended configurations + .set("spark.clickhouse.write.batchSize", "2") + .set("spark.clickhouse.write.maxRetry", "2") + .set("spark.clickhouse.write.retryInterval", "1") + .set("spark.clickhouse.write.retryableErrorCodes", "241") + .set("spark.clickhouse.write.write.repartitionNum", "0") + .set("spark.clickhouse.read.format", "json") + .set("spark.clickhouse.write.format", "json") - override def cmdRunnerOptions: Map[String, String] = { - val _options = Map( - "host" -> clickhouseHost, - "http_port" -> clickhouseHttpPort.toString, - "protocol" -> "http", - "user" -> CLICKHOUSE_USER, - "password" -> CLICKHOUSE_PASSWORD, - "database" -> CLICKHOUSE_DB - ) - if (grpcEnabled) _options + ("grpc_port" -> clickhouseGrpcPort.toString) else _options - } + override def cmdRunnerOptions: Map[String, String] = Map( + "host" -> clickhouseHost, + "http_port" -> clickhouseHttpPort.toString, + "protocol" -> "http", + "user" -> CLICKHOUSE_USER, + "password" -> CLICKHOUSE_PASSWORD, + "database" -> CLICKHOUSE_DB + ) def withTable( db: String, diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/TPCDSSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/TPCDSSuite.scala index 094683c9..cda9793e 100644 --- a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/TPCDSSuite.scala +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/single/TPCDSSuite.scala @@ -23,7 +23,7 @@ class TPCDSSuite extends SparkClickHouseSingleTest { override protected def sparkConf: SparkConf = super.sparkConf .set("spark.sql.catalog.tpcds", "org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog") - .set("spark.sql.catalog.clickhouse.protocol", if (grpcEnabled) "grpc" else "http") + .set("spark.sql.catalog.clickhouse.protocol", "http") .set("spark.clickhouse.read.compression.codec", "none") .set("spark.clickhouse.write.batchSize", "100000") .set("spark.clickhouse.write.compression.codec", "none") diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala index 17a8202c..02862392 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala @@ -65,9 +65,6 @@ class ClickHouseCatalog extends TableCatalog override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { this.catalogName = name this.nodeSpec = buildNodeSpec(options) - if (nodeSpec.protocol == ClickHouseProtocol.GRPC) { - log.warn("gPRC is deprecated and not recommended since v0.7.0, it may be removed in the future.") - } this.currentDb = nodeSpec.database this.nodeClient = NodeClient(nodeSpec) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseHelper.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseHelper.scala index 95fbeddb..9e2f9da9 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseHelper.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseHelper.scala @@ -60,7 +60,6 @@ trait ClickHouseHelper extends Logging { .toMap NodeSpec( _host = options.getOrDefault(CATALOG_PROP_HOST, "localhost"), - _grpc_port = Some(options.getInt(CATALOG_PROP_GRPC_PORT, 9100)), _tcp_port = Some(options.getInt(CATALOG_PROP_TCP_PORT, 9000)), _http_port = Some(options.getInt(CATALOG_PROP_HTTP_PORT, 8123)), protocol = ClickHouseProtocol.fromUriScheme(options.getOrDefault(CATALOG_PROP_PROTOCOL, "http")), @@ -103,7 +102,6 @@ trait ClickHouseHelper extends Logging { // host_address is not works for testcontainers _host = row.get("host_name").asText, _tcp_port = Some(row.get("port").asInt), - _grpc_port = if (Utils.isTesting) Some(9100) else nodeSpec.grpc_port, _http_port = if (Utils.isTesting) Some(8123) else nodeSpec.http_port ) ReplicaSpec(replicaNum, clickhouseNode) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/Constants.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/Constants.scala index 0ccf8a66..384fba2c 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/Constants.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/Constants.scala @@ -22,7 +22,6 @@ object Constants { //////// clickhouse datasource catalog properties //////// ////////////////////////////////////////////////////////// final val CATALOG_PROP_HOST = "host" - final val CATALOG_PROP_GRPC_PORT = "grpc_port" final val CATALOG_PROP_TCP_PORT = "tcp_port" final val CATALOG_PROP_HTTP_PORT = "http_port" final val CATALOG_PROP_PROTOCOL = "protocol" diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala index d0fc7d4f..d18319e5 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/ClickHouseWriter.scala @@ -16,8 +16,6 @@ package xenon.clickhouse.write import com.clickhouse.client.ClickHouseProtocol import com.clickhouse.data.ClickHouseCompression -import net.jpountz.lz4.LZ4FrameOutputStream -import net.jpountz.lz4.LZ4FrameOutputStream.BLOCKSIZE import org.apache.commons.io.IOUtils import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, SafeProjection} import org.apache.spark.sql.catalyst.{expressions, InternalRow} @@ -156,8 +154,6 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) private def renewCompressedOutput(): Unit = { val compressedOutput = (codec, protocol) match { case (ClickHouseCompression.NONE, _) => observableSerializedOutput - case (ClickHouseCompression.LZ4, ClickHouseProtocol.GRPC) => - new LZ4FrameOutputStream(observableSerializedOutput, BLOCKSIZE.SIZE_4MB) case (ClickHouseCompression.LZ4, ClickHouseProtocol.HTTP) => // clickhouse http client forces compressed output stream // new Lz4OutputStream(observableSerializedOutput, 4 * 1024 * 1024, null)