Skip to content

Commit

Permalink
test on scala-2.13
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Sep 21, 2023
1 parent 83bbefe commit f1cb15e
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 41 deletions.
30 changes: 22 additions & 8 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,18 @@ jobs:
**/target/unit-tests.log
**/kyuubi-spark-sql-engine.log*
scala213:
name: Scala Compilation Test
scala-test:
name: Scala Test
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
java:
- '8'
scala:
- '2.13'
java:
- '8'
spark:
- '3.4'
steps:
- uses: actions/checkout@v3
- name: Tune Runner VM
Expand All @@ -192,14 +194,26 @@ jobs:
check-latest: false
- name: Setup Maven
uses: ./.github/actions/setup-maven
- name: Cache Engine Archives
uses: ./.github/actions/cache-engine-archives
- name: Build on Scala ${{ matrix.scala }}
run: |
MODULES='!externals/kyuubi-flink-sql-engine'
./build/mvn clean install -pl ${MODULES} -am \
-DskipTests -Pflink-provided,hive-provided,spark-provided \
TEST_MODULES="!externals/kyuubi-flink-sql-engine,!integration-tests/kyuubi-flink-it"
./build/mvn clean install ${MVN_OPT} -pl ${TEST_MODULES} -am \
-Pjava-${{ matrix.java }} \
-Pscala-${{ matrix.scala }} \
-Pspark-3.3
-Pspark-${{ matrix.spark }}
- name: Upload test logs
if: failure()
uses: actions/upload-artifact@v3
with:
name: unit-tests-log-java-${{ matrix.java }}-spark-${{ matrix.spark }}-scala-${{ matrix.scala }}
path: |
**/target/unit-tests.log
**/kyuubi-spark-sql-engine.log*
**/kyuubi-spark-batch-submit.log*
**/kyuubi-jdbc-engine.log*
**/kyuubi-hive-sql-engine.log*
flink-it:
name: Flink Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ class Lineage(

override def equals(other: Any): Boolean = other match {
case otherLineage: Lineage =>
otherLineage.inputTables == inputTables && otherLineage.outputTables == outputTables &&
otherLineage.columnLineage == columnLineage
otherLineage.inputTables.toSet == inputTables.toSet &&
otherLineage.outputTables.toSet == outputTables.toSet &&
otherLineage.columnLineage.toSet == columnLineage.toSet
case _ => false
}

Expand Down
5 changes: 5 additions & 0 deletions integration-tests/kyuubi-flink-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,9 @@

</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

</project>
5 changes: 5 additions & 0 deletions integration-tests/kyuubi-hive-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
2 changes: 2 additions & 0 deletions integration-tests/kyuubi-jdbc-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,7 @@
</executions>
</plugin>
</plugins>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
5 changes: 5 additions & 0 deletions integration-tests/kyuubi-trino-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,9 @@
</dependency>

</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.kyuubi.operation

import scala.collection.mutable.ListBuffer

import org.apache.kyuubi.{IcebergSuiteMixin, SPARK_COMPILE_VERSION}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.util.AssertionUtils._
import org.apache.kyuubi.util.SparkVersionUtil

trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin with SparkVersionUtil {
Expand All @@ -27,10 +30,11 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
val catalogs = metaData.getCatalogs
catalogs.next()
assert(catalogs.getString(TABLE_CAT) === "spark_catalog")
catalogs.next()
assert(catalogs.getString(TABLE_CAT) === catalog)
val results = ListBuffer[String]()
while (catalogs.next()) {
results += catalogs.getString(TABLE_CAT)
}
assertContains(results, "spark_catalog", catalog)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
|""".stripMargin
val rs1 = statement.executeQuery(code)
rs1.next()
assert(rs1.getString(1) startsWith "df: org.apache.spark.sql.DataFrame")
assert(rs1.getString(1) contains "df: org.apache.spark.sql.DataFrame")

// continue
val rs2 = statement.executeQuery("df.count()")
Expand Down Expand Up @@ -311,7 +311,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
|""".stripMargin
val rs5 = statement.executeQuery(code2)
rs5.next()
assert(rs5.getString(1) startsWith "df: org.apache.spark.sql.DataFrame")
assert(rs5.getString(1) contains "df: org.apache.spark.sql.DataFrame")

// re-assign
val rs6 = statement.executeQuery("result.set(df)")
Expand Down Expand Up @@ -420,7 +420,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
statement.execute(code1)
val rs = statement.executeQuery(code2)
rs.next()
assert(rs.getString(1) == "x: Int = 3")
assert(rs.getString(1) contains "x: Int = 3")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ trait ProcBuilder {
}
}

protected def engineScalaBinaryVersion: String = SCALA_COMPILE_VERSION

/**
* The engine jar or other runnable jar containing the main method
*/
def mainResource: Option[String] = {
// 1. get the main resource jar for user specified config first
// TODO use SPARK_SCALA_VERSION instead of SCALA_COMPILE_VERSION
val jarName = s"${module}_$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
val jarName: String = s"${module}_$engineScalaBinaryVersion-$KYUUBI_VERSION.jar"
conf.getOption(s"kyuubi.session.engine.$shortName.main.resource").filter { userSpecified =>
// skip check exist if not local file.
val uri = new URI(userSpecified)
Expand Down Expand Up @@ -295,6 +296,9 @@ trait ProcBuilder {
}
}

protected lazy val engineHomeDirFilter: FilenameFilter = (_: File, name: String) =>
name.contains(s"$shortName-") && !name.contains("-engine")

/**
* Get the home directly that contains binary distributions of engines.
*
Expand All @@ -311,24 +315,21 @@ trait ProcBuilder {
* @return SPARK_HOME, HIVE_HOME, etc.
*/
protected def getEngineHome(shortName: String): String = {
val homeDirFilter: FilenameFilter = (dir: File, name: String) =>
dir.isDirectory && name.contains(s"$shortName-") && !name.contains("-engine")

val homeKey = s"${shortName.toUpperCase}_HOME"
// 1. get from env, e.g. SPARK_HOME, FLINK_HOME
env.get(homeKey)
.orElse {
// 2. get from $KYUUBI_HOME/externals/kyuubi-download/target
env.get(KYUUBI_HOME).flatMap { p =>
val candidates = Paths.get(p, "externals", "kyuubi-download", "target")
.toFile.listFiles(homeDirFilter)
.toFile.listFiles(engineHomeDirFilter)
if (candidates == null) None else candidates.map(_.toPath).headOption
}.filter(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
}.orElse {
// 3. get from kyuubi-server/../externals/kyuubi-download/target
Utils.getCodeSourceLocation(getClass).split("kyuubi-server").flatMap { cwd =>
val candidates = Paths.get(cwd, "externals", "kyuubi-download", "target")
.toFile.listFiles(homeDirFilter)
.toFile.listFiles(engineHomeDirFilter)
if (candidates == null) None else candidates.map(_.toPath).headOption
}.find(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
} match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package org.apache.kyuubi.engine.spark

import java.io.{File, IOException}
import java.io.{File, FilenameFilter, IOException}
import java.nio.file.Paths
import java.util.Locale

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.shaded.org.apache.commons.io.filefilter.RegexFileFilter

import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
Expand All @@ -35,8 +37,7 @@ import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.KubernetesUtils
import org.apache.kyuubi.util.Validator
import org.apache.kyuubi.util.{KubernetesUtils, SemanticVersion, Validator}

class SparkProcessBuilder(
override val proxyUser: String,
Expand Down Expand Up @@ -102,6 +103,25 @@ class SparkProcessBuilder(
}
}

private lazy val sparkCoreScalaVersion: String = {
Paths.get(sparkHome, "jars").toFile
.list(new RegexFileFilter("^spark-core_.*\\.jar$"))
.map { p => p.substring(p.indexOf("_") + 1, p.lastIndexOf("-")) }
.head
}

override protected def engineScalaBinaryVersion: String =
StringUtils.defaultIfBlank(System.getenv("SPARK_SCALA_VERSION"), sparkCoreScalaVersion)

override protected lazy val engineHomeDirFilter: FilenameFilter = {
val pattern = if (SemanticVersion(SCALA_COMPILE_VERSION) >= "2.13") {
"^spark-\\d+\\.\\d+\\.\\d+-bin-hadoop\\d(\\.\\d+)?+-scala\\d+(\\.\\d+)?$"
} else {
"^spark-\\d+\\.\\d+\\.\\d+-bin-hadoop\\d+(\\.\\d+)?$"
}
new RegexFileFilter(pattern)
}

override protected lazy val commands: Array[String] = {
// complete `spark.master` if absent on kubernetes
completeMasterUrl(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ class SessionSigningSuite extends WithKyuubiServer with HiveJDBCTestHelper {
assert(rs2.next())

// skipping prefix "res0: String = " of returned scala result
val publicKeyStr = rs1.getString(1).substring(15)
val sessionUserSign = rs2.getString(1).substring(15)
val sep = " = "
val publicKeyStr = StringUtils.substringAfter(rs1.getString(1), sep)
val sessionUserSign = StringUtils.substringAfter(rs2.getString(1), sep)

assert(StringUtils.isNotBlank(publicKeyStr))
assert(StringUtils.isNotBlank(sessionUserSign))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import scala.collection.Traversable
import scala.io.Source
import scala.reflect.ClassTag

import org.scalactic.{source, Prettifier}
import org.scalactic.Prettifier
import org.scalactic.source.Position
import org.scalatest.Assertions._

object AssertionUtils {

def assertEqualsIgnoreCase(expected: AnyRef)(actual: AnyRef)(
implicit pos: source.Position): Unit = {
implicit pos: Position): Unit = {
val isEqualsIgnoreCase = (Option(expected), Option(actual)) match {
case (Some(expectedStr: String), Some(actualStr: String)) =>
expectedStr.equalsIgnoreCase(actualStr)
Expand All @@ -44,15 +45,15 @@ object AssertionUtils {
}
}

def assertStartsWithIgnoreCase(expectedPrefix: String)(actual: String)(
implicit pos: source.Position): Unit = {
def assertStartsWithIgnoreCase(expectedPrefix: String)(actual: String)(implicit
pos: Position): Unit = {
if (!actual.toLowerCase(Locale.ROOT).startsWith(expectedPrefix.toLowerCase(Locale.ROOT))) {
fail(s"Expected starting with '$expectedPrefix' ignoring case, but got [$actual]")(pos)
}
}

def assertExistsIgnoreCase(expected: String)(actual: Iterable[String])(
implicit pos: source.Position): Unit = {
def assertExistsIgnoreCase(expected: String)(actual: Iterable[String])(implicit
pos: Position): Unit = {
if (!actual.exists(_.equalsIgnoreCase(expected))) {
fail(s"Expected containing '$expected' ignoring case, but got [$actual]")(pos)
}
Expand All @@ -73,7 +74,7 @@ object AssertionUtils {
regenScript: String,
splitFirstExpectedLine: Boolean = false)(implicit
prettifier: Prettifier,
pos: source.Position): Unit = {
pos: Position): Unit = {
val fileSource = Source.fromFile(path.toUri, StandardCharsets.UTF_8.name())
try {
def expectedLinesIter = if (splitFirstExpectedLine) {
Expand Down Expand Up @@ -104,13 +105,25 @@ object AssertionUtils {
}
}

/**
* Assert the iterable contains all the expected elements
*/
def assertContains(actual: TraversableOnce[AnyRef], expected: AnyRef*)(implicit
prettifier: Prettifier,
pos: Position): Unit = {
val actualSeq = actual.toSeq
withClue(s", expected containing [${expected.mkString(", ")}]") {
expected.foreach { elem => assert(actualSeq.contains(elem))(prettifier, pos) }
}
}

/**
* Asserts that the given function throws an exception of the given type
* and with the exception message equals to expected string
*/
def interceptEquals[T <: Exception](f: => Any)(expected: String)(implicit
classTag: ClassTag[T],
pos: source.Position): Unit = {
pos: Position): Unit = {
assert(expected != null)
val exception = intercept[T](f)(classTag, pos)
assertResult(expected)(exception.getMessage)
Expand All @@ -122,7 +135,7 @@ object AssertionUtils {
*/
def interceptContains[T <: Exception](f: => Any)(contained: String)(implicit
classTag: ClassTag[T],
pos: source.Position): Unit = {
pos: Position): Unit = {
assert(contained != null)
val exception = intercept[T](f)(classTag, pos)
assert(exception.getMessage.contains(contained))
Expand Down
8 changes: 5 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
<fb303.version>0.9.3</fb303.version>
<flexmark.version>0.62.2</flexmark.version>
<flink.version>1.17.1</flink.version>
<flink.archive.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.archive.name>
<flink.archive.name>flink-${flink.version}-bin-scala_2.12.tgz</flink.archive.name>
<flink.archive.mirror>${apache.archive.dist}/flink/flink-${flink.version}</flink.archive.mirror>
<flink.archive.download.skip>false</flink.archive.download.skip>
<google.jsr305.version>3.0.2</google.jsr305.version>
Expand Down Expand Up @@ -198,7 +198,8 @@
-->
<spark.version>3.4.1</spark.version>
<spark.binary.version>3.4</spark.binary.version>
<spark.archive.name>spark-${spark.version}-bin-hadoop3.tgz</spark.archive.name>
<spark.archive.scala.suffix></spark.archive.scala.suffix>
<spark.archive.name>spark-${spark.version}-bin-hadoop3${spark.archive.scala.suffix}.tgz</spark.archive.name>
<spark.archive.mirror>${apache.archive.dist}/spark/spark-${spark.version}</spark.archive.mirror>
<spark.archive.download.skip>false</spark.archive.download.skip>
<sqlite.version>3.42.0.0</sqlite.version>
Expand Down Expand Up @@ -2135,6 +2136,7 @@
<properties>
<scala.binary.version>2.13</scala.binary.version>
<scala.version>2.13.8</scala.version>
<spark.archive.scala.suffix>-scala${scala.binary.version}</spark.archive.scala.suffix>
</properties>
<build>
<pluginManagement>
Expand Down Expand Up @@ -2205,7 +2207,7 @@
<spark.version>3.2.4</spark.version>
<spark.binary.version>3.2</spark.binary.version>
<delta.version>2.0.2</delta.version>
<spark.archive.name>spark-${spark.version}-bin-hadoop3.2.tgz</spark.archive.name>
<spark.archive.name>spark-${spark.version}-bin-hadoop3.2${spark.archive.scala.suffix}.tgz</spark.archive.name>
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow</maven.plugin.scalatest.exclude.tags>
</properties>
</profile>
Expand Down

0 comments on commit f1cb15e

Please sign in to comment.