Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable CI Test on Scala 2.13 and support custom or spark-core extracted Scala version for Spark's engine #5196

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,18 @@ jobs:
**/target/unit-tests.log
**/kyuubi-spark-sql-engine.log*

scala213:
name: Scala Compilation Test
scala-test:
name: Scala Test
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
java:
- '8'
scala:
- '2.13'
java:
- '8'
spark:
- '3.4'
steps:
- uses: actions/checkout@v3
- name: Tune Runner VM
Expand All @@ -193,14 +195,24 @@ jobs:
check-latest: false
- name: Setup Maven
uses: ./.github/actions/setup-maven
- name: Cache Engine Archives
uses: ./.github/actions/cache-engine-archives
- name: Build on Scala ${{ matrix.scala }}
run: |
MODULES='!externals/kyuubi-flink-sql-engine'
bowenliang123 marked this conversation as resolved.
Show resolved Hide resolved
./build/mvn clean install -pl ${MODULES} -am \
-DskipTests -Pflink-provided,hive-provided,spark-provided \
-Pjava-${{ matrix.java }} \
-Pscala-${{ matrix.scala }} \
-Pspark-3.3
TEST_MODULES="!externals/kyuubi-flink-sql-engine,!integration-tests/kyuubi-flink-it"
./build/mvn clean install ${MVN_OPT} -pl ${TEST_MODULES} -am \
-Pscala-${{ matrix.scala }} -Pjava-${{ matrix.java }} -Pspark-${{ matrix.spark }}
- name: Upload test logs
if: failure()
uses: actions/upload-artifact@v3
with:
name: unit-tests-log-scala-${{ matrix.scala }}-java-${{ matrix.java }}-spark-${{ matrix.spark }}
path: |
**/target/unit-tests.log
**/kyuubi-spark-sql-engine.log*
**/kyuubi-spark-batch-submit.log*
**/kyuubi-jdbc-engine.log*
**/kyuubi-hive-sql-engine.log*

flink-it:
name: Flink Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ class Lineage(

override def equals(other: Any): Boolean = other match {
case otherLineage: Lineage =>
otherLineage.inputTables == inputTables && otherLineage.outputTables == outputTables &&
otherLineage.columnLineage == columnLineage
otherLineage.inputTables.toSet == inputTables.toSet &&
otherLineage.outputTables.toSet == outputTables.toSet &&
otherLineage.columnLineage.toSet == columnLineage.toSet
case _ => false
}

Expand Down
4 changes: 4 additions & 0 deletions integration-tests/kyuubi-flink-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,8 @@

</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
5 changes: 5 additions & 0 deletions integration-tests/kyuubi-hive-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
2 changes: 2 additions & 0 deletions integration-tests/kyuubi-jdbc-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,7 @@
</executions>
</plugin>
</plugins>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
5 changes: 5 additions & 0 deletions integration-tests/kyuubi-trino-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,9 @@
</dependency>

</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.kyuubi.operation

import scala.collection.mutable.ListBuffer

import org.apache.kyuubi.{IcebergSuiteMixin, SPARK_COMPILE_VERSION}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.util.AssertionUtils._
import org.apache.kyuubi.util.SparkVersionUtil

trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin with SparkVersionUtil {
Expand All @@ -27,10 +30,11 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
val catalogs = metaData.getCatalogs
catalogs.next()
assert(catalogs.getString(TABLE_CAT) === "spark_catalog")
catalogs.next()
assert(catalogs.getString(TABLE_CAT) === catalog)
val results = ListBuffer[String]()
while (catalogs.next()) {
results += catalogs.getString(TABLE_CAT)
}
assertContains(results, "spark_catalog", catalog)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
|""".stripMargin
val rs1 = statement.executeQuery(code)
rs1.next()
assert(rs1.getString(1) startsWith "df: org.apache.spark.sql.DataFrame")
assert(rs1.getString(1) contains "df: org.apache.spark.sql.DataFrame")
yikf marked this conversation as resolved.
Show resolved Hide resolved

// continue
val rs2 = statement.executeQuery("df.count()")
Expand Down Expand Up @@ -311,7 +311,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
|""".stripMargin
val rs5 = statement.executeQuery(code2)
rs5.next()
assert(rs5.getString(1) startsWith "df: org.apache.spark.sql.DataFrame")
assert(rs5.getString(1) contains "df: org.apache.spark.sql.DataFrame")

// re-assign
val rs6 = statement.executeQuery("result.set(df)")
Expand Down Expand Up @@ -420,7 +420,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
statement.execute(code1)
val rs = statement.executeQuery(code2)
rs.next()
assert(rs.getString(1) == "x: Int = 3")
assert(rs.getString(1) contains "x: Int = 3")
yikf marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.engine

import java.io.{File, FilenameFilter, IOException}
import java.io.{File, FileFilter, IOException}
import java.net.URI
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
Expand Down Expand Up @@ -56,13 +56,14 @@ trait ProcBuilder {
}
}

protected val engineScalaBinaryVersion: String = SCALA_COMPILE_VERSION

/**
* The engine jar or other runnable jar containing the main method
*/
def mainResource: Option[String] = {
// 1. get the main resource jar for user specified config first
// TODO use SPARK_SCALA_VERSION instead of SCALA_COMPILE_VERSION
val jarName = s"${module}_$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
val jarName: String = s"${module}_$engineScalaBinaryVersion-$KYUUBI_VERSION.jar"
conf.getOption(s"kyuubi.session.engine.$shortName.main.resource").filter { userSpecified =>
// skip check exist if not local file.
val uri = new URI(userSpecified)
Expand Down Expand Up @@ -295,6 +296,11 @@ trait ProcBuilder {
}
}

protected lazy val engineHomeDirFilter: FileFilter = file => {
val fileName = file.getName
file.isDirectory && fileName.contains(s"$shortName-") && !fileName.contains("-engine")
}

/**
* Get the home directly that contains binary distributions of engines.
*
Expand All @@ -311,24 +317,21 @@ trait ProcBuilder {
* @return SPARK_HOME, HIVE_HOME, etc.
*/
protected def getEngineHome(shortName: String): String = {
val homeDirFilter: FilenameFilter = (dir: File, name: String) =>
dir.isDirectory && name.contains(s"$shortName-") && !name.contains("-engine")

val homeKey = s"${shortName.toUpperCase}_HOME"
// 1. get from env, e.g. SPARK_HOME, FLINK_HOME
env.get(homeKey)
.orElse {
// 2. get from $KYUUBI_HOME/externals/kyuubi-download/target
env.get(KYUUBI_HOME).flatMap { p =>
val candidates = Paths.get(p, "externals", "kyuubi-download", "target")
.toFile.listFiles(homeDirFilter)
.toFile.listFiles(engineHomeDirFilter)
if (candidates == null) None else candidates.map(_.toPath).headOption
}.filter(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
}.orElse {
// 3. get from kyuubi-server/../externals/kyuubi-download/target
Utils.getCodeSourceLocation(getClass).split("kyuubi-server").flatMap { cwd =>
val candidates = Paths.get(cwd, "externals", "kyuubi-download", "target")
.toFile.listFiles(homeDirFilter)
.toFile.listFiles(engineHomeDirFilter)
if (candidates == null) None else candidates.map(_.toPath).headOption
}.find(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
} match {
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can port this change to 1.8 to make the basic engine bootstrap work.

Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.kyuubi.engine.spark

import java.io.{File, IOException}
import java.io.{File, FileFilter, IOException}
import java.nio.file.Paths
import java.util.Locale

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi._
Expand All @@ -35,8 +36,7 @@ import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.KubernetesUtils
import org.apache.kyuubi.util.Validator
import org.apache.kyuubi.util.{KubernetesUtils, Validator}

class SparkProcessBuilder(
override val proxyUser: String,
Expand Down Expand Up @@ -102,6 +102,25 @@ class SparkProcessBuilder(
}
}

private[kyuubi] def extractSparkCoreScalaVersion(fileNames: Iterable[String]): String = {
fileNames.collectFirst { case SPARK_CORE_SCALA_VERSION_REGEX(scalaVersion) => scalaVersion }
.getOrElse(throw new KyuubiException("Failed to extract Scala version from spark-core jar"))
}

override protected val engineScalaBinaryVersion: String = {
val sparkCoreScalaVersion =
extractSparkCoreScalaVersion(Paths.get(sparkHome, "jars").toFile.list())
StringUtils.defaultIfBlank(System.getenv("SPARK_SCALA_VERSION"), sparkCoreScalaVersion)
}

override protected lazy val engineHomeDirFilter: FileFilter = file => {
val r = SCALA_COMPILE_VERSION match {
case "2.12" => SPARK_HOME_REGEX_SCALA_212
case "2.13" => SPARK_HOME_REGEX_SCALA_213
}
file.isDirectory && file.getName.matches(r.regex)
}

override protected lazy val commands: Array[String] = {
// complete `spark.master` if absent on kubernetes
completeMasterUrl(conf)
Expand Down Expand Up @@ -314,4 +333,13 @@ object SparkProcessBuilder {
final private val SPARK_SUBMIT_FILE = if (Utils.isWindows) "spark-submit.cmd" else "spark-submit"
final private val SPARK_CONF_DIR = "SPARK_CONF_DIR"
final private val SPARK_CONF_FILE_NAME = "spark-defaults.conf"

final private[kyuubi] val SPARK_CORE_SCALA_VERSION_REGEX =
"""^spark-core_(\d\.\d+).*.jar$""".r

final private[kyuubi] val SPARK_HOME_REGEX_SCALA_212 =
"""^spark-\d+\.\d+\.\d+-bin-hadoop\d+(\.\d+)?$""".r

final private[kyuubi] val SPARK_HOME_REGEX_SCALA_213 =
"""^spark-\d+\.\d+\.\d+-bin-hadoop\d(\.\d+)?+-scala\d+(\.\d+)?$""".r
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import java.util.concurrent.{Executors, TimeUnit}
import org.scalatest.time.SpanSugar._
import org.scalatestplus.mockito.MockitoSugar

import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_LOG_TIMEOUT, ENGINE_SPARK_MAIN_RESOURCE}
import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
import org.apache.kyuubi.engine.spark.SparkProcessBuilder._
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.service.ServiceUtils
import org.apache.kyuubi.util.AssertionUtils._

class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
private def conf = KyuubiConf().set("kyuubi.on", "off")
Expand Down Expand Up @@ -363,6 +364,46 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
.appendPodNameConf(conf3).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
assert(execPodNamePrefix3 === Some(s"kyuubi-$engineRefId"))
}

test("extract spark core scala version") {
val builder = new SparkProcessBuilder("kentyao", KyuubiConf(false))
Seq(
"spark-core_2.13-3.4.1.jar",
"spark-core_2.13-3.5.0-abc-20230921.jar",
"spark-core_2.13-3.5.0-xyz-1.2.3.jar",
"spark-core_2.13-3.5.0.1.jar",
"spark-core_2.13.2-3.5.0.jar").foreach { f =>
assertResult("2.13")(builder.extractSparkCoreScalaVersion(Seq(f)))
}

Seq(
"spark-dummy_2.13-3.5.0.jar",
"spark-core_2.13-3.5.0.1.zip",
"yummy-spark-core_2.13-3.5.0.jar").foreach { f =>
assertThrows[KyuubiException](builder.extractSparkCoreScalaVersion(Seq(f)))
}
}

test("match scala version of spark home") {
SCALA_COMPILE_VERSION match {
case "2.12" => Seq(
"spark-3.2.4-bin-hadoop3.2",
"spark-3.2.4-bin-hadoop2.7",
"spark-3.4.1-bin-hadoop3")
.foreach { sparkHome =>
assertMatches(sparkHome, SPARK_HOME_REGEX_SCALA_212)
assertNotMatches(sparkHome, SPARK_HOME_REGEX_SCALA_213)
}
case "2.13" => Seq(
"spark-3.2.4-bin-hadoop3.2-scala2.13",
"spark-3.4.1-bin-hadoop3-scala2.13",
"spark-3.5.0-bin-hadoop3-scala2.13")
.foreach { sparkHome =>
assertMatches(sparkHome, SPARK_HOME_REGEX_SCALA_213)
assertNotMatches(sparkHome, SPARK_HOME_REGEX_SCALA_212)
}
}
}
}

class FakeSparkProcessBuilder(config: KyuubiConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ class SessionSigningSuite extends WithKyuubiServer with HiveJDBCTestHelper {
assert(rs2.next())

// skipping prefix "res0: String = " of returned scala result
val publicKeyStr = rs1.getString(1).substring(15)
val sessionUserSign = rs2.getString(1).substring(15)
val sep = " = "
val publicKeyStr = StringUtils.substringAfter(rs1.getString(1), sep)
val sessionUserSign = StringUtils.substringAfter(rs2.getString(1), sep)

assert(StringUtils.isNotBlank(publicKeyStr))
assert(StringUtils.isNotBlank(sessionUserSign))
Expand Down
Loading
Loading