From dc8b6bfa18488f882469917e9c943367ef762888 Mon Sep 17 00:00:00 2001 From: weixi Date: Mon, 23 Oct 2023 13:04:34 +0800 Subject: [PATCH 1/6] add 2 test cases for pyspark batch job submit. --- .../org/apache/kyuubi/BatchTestHelper.scala | 16 ++++++ .../rest/client/BatchRestApiSuite.scala | 49 +++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala index 27298dbf16d..469323a066c 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala @@ -36,6 +36,22 @@ trait BatchTestHelper { _.getName.startsWith("spark-examples")) map (_.getCanonicalPath) } + object PySparkJobPI { + val batchType = "PYSPARK" + val className: String = null // For PySpark, mainClass isn't needed. + val name = "PythonPi" // the app name is hard coded in spark example code + lazy val resource: Option[String] = { + val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", KyuubiConf()) + Paths.get( + sparkProcessBuilder.sparkHome, + "examples", + "src", + "main", + "python").toFile.listFiles().find( + _.getName.equalsIgnoreCase("pi.py")) map (_.getCanonicalPath) + } + } + def newBatchRequest( batchType: String, resource: String, diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala index d04826a9d20..1549cc2292c 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala @@ -238,4 +238,53 @@ class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper { assert( batchSession.conf.get(KyuubiReservedKeys.KYUUBI_CLIENT_VERSION_KEY) == Some(KYUUBI_VERSION)) } + + test("pyspark submit - basic batch rest client with existing resource file") { + val basicKyuubiRestClient: KyuubiRestClient = + KyuubiRestClient.builder(baseUri.toString) + .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC) + .username(ldapUser) + .password(ldapUserPasswd) + .socketTimeout(30000) + .build() + val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient) + + val requestObj = newBatchRequest( + batchType = PySparkJobPI.batchType, + resource = PySparkJobPI.resource.get, + className = null, + name = PySparkJobPI.name, + conf = Map("spark.master" -> "local"), + args = Seq("10")) + val batch: Batch = batchRestApi.createBatch(requestObj) + + assert(batch.getKyuubiInstance === fe.connectionUrl) + assert(batch.getBatchType === "PYSPARK") + basicKyuubiRestClient.close() + } + + test("pyspark submit - basic batch rest client with uploading resource file") { + val basicKyuubiRestClient: KyuubiRestClient = + KyuubiRestClient.builder(baseUri.toString) + .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC) + .username(ldapUser) + .password(ldapUserPasswd) + .socketTimeout(30000) + .build() + val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient) + + val requestObj = newBatchRequest( + batchType = PySparkJobPI.batchType, + resource = null, + className = null, + name = PySparkJobPI.name, + conf = Map("spark.master" -> "local"), + args = Seq("10")) + val resourceFile = Paths.get(PySparkJobPI.resource.get).toFile + val batch: Batch = batchRestApi.createBatch(requestObj, resourceFile) + + assert(batch.getKyuubiInstance === fe.connectionUrl) + assert(batch.getBatchType === "PYSPARK") + basicKyuubiRestClient.close() + } } From e680e604a295ca55198a62649890f414851e64cf Mon Sep 17 00:00:00 2001 From: weixi Date: Tue, 24 Oct 2023 10:21:50 +0800 Subject: [PATCH 2/6] Create a dedicated batch API suite for PySpark jobs. Signed-off-by: weixi --- .../org/apache/kyuubi/BatchTestHelper.scala | 16 ---- .../client/BatchRestApiPySparkSuite.scala | 93 +++++++++++++++++++ .../rest/client/BatchRestApiSuite.scala | 48 ---------- 3 files changed, 93 insertions(+), 64 deletions(-) create mode 100644 kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiPySparkSuite.scala diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala index 469323a066c..27298dbf16d 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala @@ -36,22 +36,6 @@ trait BatchTestHelper { _.getName.startsWith("spark-examples")) map (_.getCanonicalPath) } - object PySparkJobPI { - val batchType = "PYSPARK" - val className: String = null // For PySpark, mainClass isn't needed. - val name = "PythonPi" // the app name is hard coded in spark example code - lazy val resource: Option[String] = { - val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", KyuubiConf()) - Paths.get( - sparkProcessBuilder.sparkHome, - "examples", - "src", - "main", - "python").toFile.listFiles().find( - _.getName.equalsIgnoreCase("pi.py")) map (_.getCanonicalPath) - } - } - def newBatchRequest( batchType: String, resource: String, diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiPySparkSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiPySparkSuite.scala new file mode 100644 index 00000000000..b54f5bd6702 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiPySparkSuite.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server.rest.client + +import java.nio.file.Paths + +import org.apache.kyuubi.{BatchTestHelper, RestClientTestHelper} +import org.apache.kyuubi.client.{BatchRestApi, KyuubiRestClient} +import org.apache.kyuubi.client.api.v1.dto.Batch +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.spark.SparkProcessBuilder + +class BatchRestApiPySparkSuite extends RestClientTestHelper with BatchTestHelper { + object PySparkJobPI { + val batchType = "PYSPARK" + val className: String = null // For PySpark, mainClass isn't needed. + val name = "PythonPi" // the app name is hard coded in spark example code + lazy val resource: Option[String] = { + val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", KyuubiConf()) + Paths.get( + sparkProcessBuilder.sparkHome, + "examples", + "src", + "main", + "python").toFile.listFiles().find( + _.getName.equalsIgnoreCase("pi.py")) map (_.getCanonicalPath) + } + } + + test("pyspark submit - basic batch rest client with existing resource file") { + val basicKyuubiRestClient: KyuubiRestClient = + KyuubiRestClient.builder(baseUri.toString) + .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC) + .username(ldapUser) + .password(ldapUserPasswd) + .socketTimeout(30000) + .build() + val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient) + + val requestObj = newBatchRequest( + batchType = PySparkJobPI.batchType, + resource = PySparkJobPI.resource.get, + className = null, + name = PySparkJobPI.name, + conf = Map("spark.master" -> "local"), + args = Seq("10")) + val batch: Batch = batchRestApi.createBatch(requestObj) + + assert(batch.getKyuubiInstance === fe.connectionUrl) + assert(batch.getBatchType === "PYSPARK") + basicKyuubiRestClient.close() + } + + test("pyspark submit - basic batch rest client with uploading resource file") { + val basicKyuubiRestClient: KyuubiRestClient = + KyuubiRestClient.builder(baseUri.toString) + .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC) + .username(ldapUser) + .password(ldapUserPasswd) + .socketTimeout(30000) + .build() + val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient) + + val requestObj = newBatchRequest( + batchType = PySparkJobPI.batchType, + resource = null, + className = null, + name = PySparkJobPI.name, + conf = Map("spark.master" -> "local"), + args = Seq("10")) + val resourceFile = Paths.get(PySparkJobPI.resource.get).toFile + val batch: Batch = batchRestApi.createBatch(requestObj, resourceFile) + + assert(batch.getKyuubiInstance === fe.connectionUrl) + assert(batch.getBatchType === "PYSPARK") + basicKyuubiRestClient.close() + } +} diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala index 1549cc2292c..fd07bb950bb 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala @@ -239,52 +239,4 @@ class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper { batchSession.conf.get(KyuubiReservedKeys.KYUUBI_CLIENT_VERSION_KEY) == Some(KYUUBI_VERSION)) } - test("pyspark submit - basic batch rest client with existing resource file") { - val basicKyuubiRestClient: KyuubiRestClient = - KyuubiRestClient.builder(baseUri.toString) - .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC) - .username(ldapUser) - .password(ldapUserPasswd) - .socketTimeout(30000) - .build() - val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient) - - val requestObj = newBatchRequest( - batchType = PySparkJobPI.batchType, - resource = PySparkJobPI.resource.get, - className = null, - name = PySparkJobPI.name, - conf = Map("spark.master" -> "local"), - args = Seq("10")) - val batch: Batch = batchRestApi.createBatch(requestObj) - - assert(batch.getKyuubiInstance === fe.connectionUrl) - assert(batch.getBatchType === "PYSPARK") - basicKyuubiRestClient.close() - } - - test("pyspark submit - basic batch rest client with uploading resource file") { - val basicKyuubiRestClient: KyuubiRestClient = - KyuubiRestClient.builder(baseUri.toString) - .authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.BASIC) - .username(ldapUser) - .password(ldapUserPasswd) - .socketTimeout(30000) - .build() - val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient) - - val requestObj = newBatchRequest( - batchType = PySparkJobPI.batchType, - resource = null, - className = null, - name = PySparkJobPI.name, - conf = Map("spark.master" -> "local"), - args = Seq("10")) - val resourceFile = Paths.get(PySparkJobPI.resource.get).toFile - val batch: Batch = batchRestApi.createBatch(requestObj, resourceFile) - - assert(batch.getKyuubiInstance === fe.connectionUrl) - assert(batch.getBatchType === "PYSPARK") - basicKyuubiRestClient.close() - } } From 27d12e8bca23fc688239b0d218dfc5e87e5ae3a6 Mon Sep 17 00:00:00 2001 From: weixi Date: Tue, 24 Oct 2023 10:31:35 +0800 Subject: [PATCH 3/6] rename from BatchRestApiPySparkSuite to PySparkBatchRestApiSuite Signed-off-by: weixi --- ...RestApiPySparkSuite.scala => PySparkBatchRestApiSuite.scala} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/{BatchRestApiPySparkSuite.scala => PySparkBatchRestApiSuite.scala} (98%) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiPySparkSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala similarity index 98% rename from kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiPySparkSuite.scala rename to kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala index b54f5bd6702..881bcaca030 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiPySparkSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala @@ -25,7 +25,7 @@ import org.apache.kyuubi.client.api.v1.dto.Batch import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.spark.SparkProcessBuilder -class BatchRestApiPySparkSuite extends RestClientTestHelper with BatchTestHelper { +class PySparkBatchRestApiSuite extends RestClientTestHelper with BatchTestHelper { object PySparkJobPI { val batchType = "PYSPARK" val className: String = null // For PySpark, mainClass isn't needed. From b2035a3b257511170a4fe4afa387b8ed70e6fbfa Mon Sep 17 00:00:00 2001 From: weixi Date: Tue, 24 Oct 2023 14:07:02 +0800 Subject: [PATCH 4/6] remove no necessary wrapper object "PySparkJobPI" Signed-off-by: weixi --- .../org/apache/kyuubi/BatchTestHelper.scala | 4 +- .../rest/client/BatchRestApiSuite.scala | 1 - .../client/PySparkBatchRestApiSuite.scala | 41 ++++++++----------- 3 files changed, 19 insertions(+), 27 deletions(-) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala index 27298dbf16d..f49ebbeb13f 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/BatchTestHelper.scala @@ -26,6 +26,8 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.spark.SparkProcessBuilder trait BatchTestHelper { + val sparkBatchTestBatchType = "SPARK" + val sparkBatchTestMainClass = "org.apache.spark.examples.SparkPi" val sparkBatchTestAppName = "Spark Pi" // the app name is hard coded in spark example code @@ -56,7 +58,7 @@ trait BatchTestHelper { conf: Map[String, String] = Map.empty, args: Seq[String] = Seq.empty): BatchRequest = { newBatchRequest( - "SPARK", + sparkBatchTestBatchType, sparkBatchTestResource.get, sparkBatchTestMainClass, sparkBatchTestAppName, diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala index fd07bb950bb..d04826a9d20 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala @@ -238,5 +238,4 @@ class BatchRestApiSuite extends RestClientTestHelper with BatchTestHelper { assert( batchSession.conf.get(KyuubiReservedKeys.KYUUBI_CLIENT_VERSION_KEY) == Some(KYUUBI_VERSION)) } - } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala index 881bcaca030..7fd971ea292 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala @@ -26,20 +26,19 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.spark.SparkProcessBuilder class PySparkBatchRestApiSuite extends RestClientTestHelper with BatchTestHelper { - object PySparkJobPI { - val batchType = "PYSPARK" - val className: String = null // For PySpark, mainClass isn't needed. - val name = "PythonPi" // the app name is hard coded in spark example code - lazy val resource: Option[String] = { - val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", KyuubiConf()) - Paths.get( - sparkProcessBuilder.sparkHome, - "examples", - "src", - "main", - "python").toFile.listFiles().find( - _.getName.equalsIgnoreCase("pi.py")) map (_.getCanonicalPath) - } + override val sparkBatchTestBatchType: String = "PYSPARK" + override val sparkBatchTestMainClass: String = null // For PySpark, mainClass isn't needed. + override val sparkBatchTestAppName: String = + "PythonPi" // the app name is hard coded in spark example code + override val sparkBatchTestResource: Option[String] = { + val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", KyuubiConf()) + Paths.get( + sparkProcessBuilder.sparkHome, + "examples", + "src", + "main", + "python").toFile.listFiles().find( + _.getName.equalsIgnoreCase("pi.py")) map (_.getCanonicalPath) } test("pyspark submit - basic batch rest client with existing resource file") { @@ -52,11 +51,7 @@ class PySparkBatchRestApiSuite extends RestClientTestHelper with BatchTestHelper .build() val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient) - val requestObj = newBatchRequest( - batchType = PySparkJobPI.batchType, - resource = PySparkJobPI.resource.get, - className = null, - name = PySparkJobPI.name, + val requestObj = newSparkBatchRequest( conf = Map("spark.master" -> "local"), args = Seq("10")) val batch: Batch = batchRestApi.createBatch(requestObj) @@ -76,14 +71,10 @@ class PySparkBatchRestApiSuite extends RestClientTestHelper with BatchTestHelper .build() val batchRestApi: BatchRestApi = new BatchRestApi(basicKyuubiRestClient) - val requestObj = newBatchRequest( - batchType = PySparkJobPI.batchType, - resource = null, - className = null, - name = PySparkJobPI.name, + val requestObj = newSparkBatchRequest( conf = Map("spark.master" -> "local"), args = Seq("10")) - val resourceFile = Paths.get(PySparkJobPI.resource.get).toFile + val resourceFile = Paths.get(sparkBatchTestResource.get).toFile val batch: Batch = batchRestApi.createBatch(requestObj, resourceFile) assert(batch.getKyuubiInstance === fe.connectionUrl) From 72a92b5eee91773a158739f78b6735576d0683ab Mon Sep 17 00:00:00 2001 From: Bowen Liang Date: Tue, 24 Oct 2023 14:11:09 +0800 Subject: [PATCH 5/6] Update kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala --- .../kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala index 7fd971ea292..41898672aed 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala @@ -28,8 +28,7 @@ import org.apache.kyuubi.engine.spark.SparkProcessBuilder class PySparkBatchRestApiSuite extends RestClientTestHelper with BatchTestHelper { override val sparkBatchTestBatchType: String = "PYSPARK" override val sparkBatchTestMainClass: String = null // For PySpark, mainClass isn't needed. - override val sparkBatchTestAppName: String = - "PythonPi" // the app name is hard coded in spark example code + override val sparkBatchTestAppName: String = "PythonPi" override val sparkBatchTestResource: Option[String] = { val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", KyuubiConf()) Paths.get( From b693efc1b971773f34464808ccf445680085fcb4 Mon Sep 17 00:00:00 2001 From: Bowen Liang Date: Tue, 24 Oct 2023 15:43:50 +0800 Subject: [PATCH 6/6] simplify sparkBatchTestResource --- .../server/rest/client/PySparkBatchRestApiSuite.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala index 41898672aed..8e33eb38229 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/PySparkBatchRestApiSuite.scala @@ -31,13 +31,9 @@ class PySparkBatchRestApiSuite extends RestClientTestHelper with BatchTestHelper override val sparkBatchTestAppName: String = "PythonPi" override val sparkBatchTestResource: Option[String] = { val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", KyuubiConf()) - Paths.get( - sparkProcessBuilder.sparkHome, - "examples", - "src", - "main", - "python").toFile.listFiles().find( - _.getName.equalsIgnoreCase("pi.py")) map (_.getCanonicalPath) + val piScript = + Paths.get(sparkProcessBuilder.sparkHome, "examples/src/main/python/pi.py") + Some(piScript.toAbsolutePath.toString) } test("pyspark submit - basic batch rest client with existing resource file") {