Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-843][BUILD] sbt support flink-related module build/test #1764

Closed
wants to merge 10 commits into from
210 changes: 209 additions & 1 deletion project/CelebornBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ object CelebornBuild extends sbt.internal.BuildDef {
CelebornClient.client,
CelebornService.service,
CelebornWorker.worker,
CelebornMaster.master) ++ maybeSparkClientModules
CelebornMaster.master) ++ maybeSparkClientModules ++ maybeFlinkClientModules
}

// ThisBuild / parallelExecution := false
Expand Down Expand Up @@ -180,6 +180,17 @@ object Utils {

lazy val maybeSparkClientModules: Seq[Project] = sparkClientProjects.map(_.modules).getOrElse(Seq.empty)

val FLINK_VERSION = profiles.filter(_.startsWith("flink")).headOption

lazy val flinkClientProjects = FLINK_VERSION match {
case Some("flink-1.14") => Some(Flink114)
case Some("flink-1.15") => Some(Flink115)
case Some("flink-1.17") => Some(Flink117)
case _ => None
}

lazy val maybeFlinkClientModules: Seq[Project] = flinkClientProjects.map(_.modules).getOrElse(Seq.empty)

def defaultScalaVersion(): String = {
// 1. Inherit the scala version of the spark project
// 2. if the spark profile not specified, using the DEFAULT_SCALA_VERSION
Expand Down Expand Up @@ -593,3 +604,200 @@ trait SparkClientProjects {
)
}
}

////////////////////////////////////////////////////////
// Flink Client //
////////////////////////////////////////////////////////

object Flink114 extends FlinkClientProjects {
val flinkVersion = "1.14.6"

// note that SBT does not allow using the period symbol (.) in project names.
val flinkClientProjectPath = "client-flink/flink-1.14"
val flinkClientProjectName = "celeborn-client-flink-1_14"
val flinkClientShadedProjectPath: String = "client-flink/flink-1.14-shaded"
val flinkClientShadedProjectName: String = "celeborn-client-flink-1_14-shaded"

override lazy val flinkStreamingDependency: ModuleID = "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "test"
override lazy val flinkClientsDependency: ModuleID = "org.apache.flink" %% "flink-clients" % flinkVersion % "test"
override lazy val flinkRuntimeWebDependency: ModuleID = "org.apache.flink" %% "flink-runtime-web" % flinkVersion % "test"
}

object Flink115 extends FlinkClientProjects {
val flinkVersion = "1.15.4"

// note that SBT does not allow using the period symbol (.) in project names.
val flinkClientProjectPath = "client-flink/flink-1.15"
val flinkClientProjectName = "celeborn-client-flink-1_15"
val flinkClientShadedProjectPath: String = "client-flink/flink-1.15-shaded"
val flinkClientShadedProjectName: String = "celeborn-client-flink-1_15-shaded"
}

object Flink117 extends FlinkClientProjects {
val flinkVersion = "1.17.0"

// note that SBT does not allow using the period symbol (.) in project names.
val flinkClientProjectPath = "client-flink/flink-1.17"
val flinkClientProjectName = "celeborn-client-flink-1_17"
val flinkClientShadedProjectPath: String = "client-flink/flink-1.17-shaded"
val flinkClientShadedProjectName: String = "celeborn-client-flink-1_17-shaded"
}

trait FlinkClientProjects {

val flinkVersion: String

// note that SBT does not allow using the period symbol (.) in project names.
val flinkClientProjectPath: String
val flinkClientProjectName: String
val flinkClientShadedProjectPath: String
val flinkClientShadedProjectName: String

lazy val flinkStreamingDependency: ModuleID = "org.apache.flink" % "flink-streaming-java" % flinkVersion % "test"
lazy val flinkClientsDependency: ModuleID = "org.apache.flink" % "flink-clients" % flinkVersion % "test"
lazy val flinkRuntimeWebDependency: ModuleID = "org.apache.flink" % "flink-runtime-web" % flinkVersion % "test"

def modules: Seq[Project] = Seq(flinkCommon, flinkClient, flinkIt, flinkClientShade)

// get flink major version. e.g:
// 1.17.0 -> 1.17
// 1.15.4 -> 1.15
// 1.14.6 -> 1.14
lazy val flinkMajorVersion: String = flinkVersion.split("\\.").take(2).reduce(_ + "." + _)

// the output would be something like: celeborn-client-flink-1.17_2.12-0.4.0-SNAPSHOT.jar
def flinkClientJarName(
module: ModuleID,
artifact: Artifact,
scalaBinaryVersionString: String): String =
s"celeborn-client-flink-${flinkMajorVersion}_$scalaBinaryVersionString" + "-" + module.revision + "." + artifact.extension

// the output would be something like: celeborn-client-flink-1.17-shaded_2.12-0.4.0-SNAPSHOT.jar
def flinkClientShadeJarName(
revision: String,
artifact: Artifact,
scalaBinaryVersionString: String): String =
s"celeborn-client-flink-$flinkMajorVersion-shaded_$scalaBinaryVersionString" + "-" + revision + "." + artifact.extension

def flinkCommon: Project = {
Project("celeborn-flink-common", file("client-flink/common"))
.dependsOn(CelebornCommon.common, CelebornClient.client)
.settings (
commonSettings,
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-runtime" % flinkVersion % "provided",

// Compiler plugins
// -- Bump up the genjavadoc version explicitly to 0.18 to work with Scala 2.12
compilerPlugin(
"com.typesafe.genjavadoc" %% "genjavadoc-plugin" % "0.18" cross CrossVersion.full)
) ++ commonUnitTestDependencies
)
}

def flinkClient: Project = {
Project(flinkClientProjectName, file(flinkClientProjectPath))
.dependsOn(CelebornCommon.common, CelebornClient.client, flinkCommon)
.settings (
commonSettings,

// 1. reference for modifying the jar name.
// https://stackoverflow.com/questions/52771831/how-to-modify-jar-name-generate-by-cmd-sbt-package
// 2. since SBT doesn't allow using `.` in the project name, explicitly setting the artifact Name
artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
flinkClientJarName(module, artifact, scalaBinaryVersion.value)
},

libraryDependencies ++= Seq(
"org.apache.flink" % "flink-runtime" % flinkVersion % "provided",
"org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j2Version % "test",
"org.apache.logging.log4j" % "log4j-1.2-api" % log4j2Version % "test",

// Compiler plugins
// -- Bump up the genjavadoc version explicitly to 0.18 to work with Scala 2.12
compilerPlugin(
"com.typesafe.genjavadoc" %% "genjavadoc-plugin" % "0.18" cross CrossVersion.full)
) ++ commonUnitTestDependencies
)
}

def flinkIt: Project = {
Project("celeborn-flink-it", file("tests/flink-it"))
// ref: https://www.scala-sbt.org/1.x/docs/Multi-Project.html#Classpath+dependencies
.dependsOn(CelebornCommon.common % "test->test;compile->compile")
.dependsOn(CelebornClient.client % "test->test;compile->compile")
.dependsOn(CelebornMaster.master % "test->test;compile->compile")
.dependsOn(CelebornWorker.worker % "test->test;compile->compile")
.dependsOn(flinkClient % "test->test;compile->compile")
.settings (
commonSettings,
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-runtime" % flinkVersion % "test",
"org.apache.flink" %% "flink-scala" % flinkVersion % "test",
flinkStreamingDependency,
flinkClientsDependency,
flinkRuntimeWebDependency,

// Compiler plugins
// -- Bump up the genjavadoc version explicitly to 0.18 to work with Scala 2.12
compilerPlugin(
"com.typesafe.genjavadoc" %% "genjavadoc-plugin" % "0.18" cross CrossVersion.full)
) ++ commonUnitTestDependencies
)
}

def flinkClientShade: Project = {
Project(flinkClientShadedProjectName, file(flinkClientShadedProjectPath))
.dependsOn(flinkClient)
.settings (
commonSettings,

(assembly / test) := { },

(assembly / assemblyJarName) := {
val revision: String = version.value
val artifactValue: Artifact = artifact.value
flinkClientShadeJarName(revision, artifactValue, scalaBinaryVersion.value)
},

(assembly / logLevel) := Level.Info,

// Exclude `scala-library` from assembly.
(assembly / assemblyPackageScala / assembleArtifact) := false,

(assembly / assemblyExcludedJars) := {
val cp = (assembly / fullClasspath).value
cp filter { v =>
val name = v.data.getName
!(name.startsWith("celeborn-") ||
name.startsWith("protobuf-java-") ||
name.startsWith("guava-") ||
name.startsWith("netty-") ||
name.startsWith("commons-lang3-") ||
name.startsWith("RoaringBitmap-"))
}
},

(assembly / assemblyShadeRules) := Seq(
ShadeRule.rename("com.google.protobuf.**" -> "org.apache.celeborn.shaded.com.google.protobuf.@1").inAll,
ShadeRule.rename("com.google.common.**" -> "org.apache.celeborn.shaded.com.google.common.@1").inAll,
ShadeRule.rename("io.netty.**" -> "org.apache.celeborn.shaded.io.netty.@1").inAll,
ShadeRule.rename("org.apache.commons.**" -> "org.apache.celeborn.shaded.org.apache.commons.@1").inAll,
ShadeRule.rename("org.roaringbitmap.**" -> "org.apache.celeborn.shaded.org.roaringbitmap.@1").inAll
),

(assembly / assemblyMergeStrategy) := {
case m if m.toLowerCase(Locale.ROOT).endsWith("manifest.mf") => MergeStrategy.discard
// Drop all proto files that are not needed as artifacts of the build.
case m if m.toLowerCase(Locale.ROOT).endsWith(".proto") => MergeStrategy.discard
case m if m.toLowerCase(Locale.ROOT).startsWith("meta-inf/native-image") => MergeStrategy.discard
// Drop netty jnilib
case m if m.toLowerCase(Locale.ROOT).endsWith(".jnilib") => MergeStrategy.discard
// rename netty native lib
case "META-INF/native/libnetty_transport_native_epoll_x86_64.so" => CustomMergeStrategy.rename( _ => "META-INF/native/liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so" )
case "META-INF/native/libnetty_transport_native_epoll_aarch_64.so" => CustomMergeStrategy.rename( _ => "META-INF/native/liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so" )
case _ => MergeStrategy.first
}
)
}
}
Loading