Skip to content

Commit

Permalink
databricks fat jar scala 2.13 compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
db-scnakandala committed Jul 10, 2023
1 parent fad50bf commit 3061cc5
Show file tree
Hide file tree
Showing 20 changed files with 38 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ services:

language: scala
scala:
- 2.12.13
- 2.13.11
jdk:
- openjdk11

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object HadoopBundleFileSystem {
}

def createSchemes(config: Config): Seq[String] = if (config.hasPath("schemes")) {
config.getStringList("schemes").asScala
config.getStringList("schemes").asScala.toSeq
} else { Seq("hdfs") }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ trait JsonSupport {
obj.ds.foreach(ds => fb += ("data_shape" -> ds.toJson))
obj.m.foreach(m => fb += ("model" -> m.toJson))

JsObject(fb.result(): _*)
JsObject(fb.result().toSeq: _*)
}

override def read(json: JsValue): Scalar = json match {
Expand Down Expand Up @@ -220,7 +220,7 @@ trait JsonSupport {
if(obj.ds.nonEmpty) { fb += ("data_shape" -> obj.ds.toJson) }
if(obj.m.nonEmpty) { fb += ("model" -> obj.m.toJson) }

JsObject(fb.result(): _*)
JsObject(fb.result().toSeq: _*)
}

override def read(json: JsValue): List = json match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object FormatModelSerializer {

/** Object for serializing/deserializing model definitions with JSON.
*/
case class JsonFormatModelSerializer(implicit hr: HasBundleRegistry) extends FormatModelSerializer {
case class JsonFormatModelSerializer()(implicit hr: HasBundleRegistry) extends FormatModelSerializer {
override def write(path: Path, model: Model): Unit = {
Files.write(path, model.asBundle.toJson.prettyPrint.getBytes("UTF-8"))
}
Expand All @@ -55,7 +55,7 @@ case class JsonFormatModelSerializer(implicit hr: HasBundleRegistry) extends For

/** Object for serializing/deserializing model definitions with Protobuf.
*/
case class ProtoFormatModelSerializer(implicit hr: HasBundleRegistry) extends FormatModelSerializer {
case class ProtoFormatModelSerializer()(implicit hr: HasBundleRegistry) extends FormatModelSerializer {
override def write(path: Path, model: Model): Unit = {
Files.write(path, model.asBundle.toByteArray)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class DefaultFrameReader extends FrameReader {
var rows = mutable.Seq[Row]()
while(Try(reader.hasNext).getOrElse(false)) {
record = reader.next(record)
val row = ArrayRow(new Array[Any](schema.fields.length))
val row = ArrayRow((new Array[Any](schema.fields.length)).toSeq)
for(i <- schema.fields.indices) { row.set(i, readers(i)(record.get(i))) }
rows :+= row
}

DefaultLeapFrame(schema, rows)
DefaultLeapFrame(schema, rows.toSeq)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DefaultRowReader(override val schema: StructType) extends RowReader {
override def fromBytes(bytes: Array[Byte], charset: Charset = BuiltinFormats.charset): Try[Row] = Try {
decoder = DecoderFactory.get().binaryDecoder(bytes, decoder)
record = datumReader.read(record, decoder)
val row = ArrayRow(new Array[Any](schema.fields.length))
val row = ArrayRow((new Array[Any](schema.fields.length)).toSeq)
for(i <- schema.fields.indices) { row.set(i, readers(i)(record.get(i))) }
row
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import scala.util.control.NonFatal
object ClassLoaderUtil {
def findClassLoader(baseName: String): ClassLoader = {
def findCaller(get: Int Class[_]): ClassLoader =
Iterator.from(2 /*is the magic number, promise*/ ).map(get) dropWhile { c
Iterator.from(2 /*is the magic number, promise*/ ).map(get).dropWhile { c
c != null &&
(c.getName.startsWith(baseName) ||
c.getName.startsWith("scala.Option") ||
c.getName.startsWith("scala.collection.Iterator") ||
c.getName.startsWith("ml.combust.bundle.util.ClassLoaderUtil"))
} next () match {
case null => getClass.getClassLoader
case c => c.getClassLoader
}.nextOption() match {
case Some(c) c.getClassLoader
case None getClass.getClassLoader
}

Option(Thread.currentThread.getContextClassLoader) orElse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.ml.linalg.mleap.VectorUtil._
case class VectorSlicerModel(indices: Array[Int],
namedIndices: Array[(String, Int)] = Array(),
inputSize: Int) extends Model {
val allIndices: Array[Int] = indices.union(namedIndices.map(_._2))
val allIndices: Array[Int] = indices.union(namedIndices.map(_._2)).toArray

def apply(features: Vector): Vector = features match {
case features: DenseVector => Vectors.dense(allIndices.map(features.apply))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class WordToVectorModel(wordIndex: Map[String, Int],
wordIndex.map { case (word, ind) =>
(word, wordVectors.slice(vectorSize * ind, vectorSize * ind + vectorSize))
}
}.mapValues(Vectors.dense).map(identity)
}.mapValues(Vectors.dense).map(identity).toMap

def apply(sentence: Seq[String]): Vector = {
if (sentence.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LeapFrameBuilderSupport {
def createByteString(): BasicType = BasicType.ByteString

def createTensorDimensions(dims : java.util.List[Integer]): Option[Seq[Int]] = {
Some(dims.asScala.map(_.intValue()))
Some(dims.asScala.toSeq.map(_.intValue()))
}

def createSchema(json: String): StructType = json.parseJson.convertTo[StructType]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ class LeapFrameSupport {
}

def select(frame: DefaultLeapFrame, fieldNames: java.util.List[String]): DefaultLeapFrame = {
frame.select(fieldNames.asScala: _*).get
frame.select(fieldNames.asScala.toSeq: _*).get
}

def drop(frame: DefaultLeapFrame, names: java.util.List[String]): DefaultLeapFrame = {
frame.drop(names.asScala: _*).get
frame.drop(names.asScala.toSeq: _*).get
}

def getFields(schema: StructType): java.util.List[StructField] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DefaultFrameReader extends FrameReader {
rows(i) = row
}

DefaultLeapFrame(schema, rows)
DefaultLeapFrame(schema, rows.toSeq)
}).tried
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class DefaultRowReader(override val schema: StructType) extends RowReader {
override def fromBytes(bytes: Array[Byte], charset: Charset = BuiltinFormats.charset): Try[Row] = {
(for(in <- managed(new ByteArrayInputStream(bytes))) yield {
val din = new DataInputStream(in)
val row = ArrayRow(new Array[Any](schema.fields.length))
val row = ArrayRow((new Array[Any](schema.fields.length)).toSeq)
var i = 0
for(s <- serializers) {
row.set(i, s.read(din))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ case class ArrayRow(values: mutable.WrappedArray[Any]) extends Row {
override def withValue(value: Any): ArrayRow = ArrayRow(values :+ value)
override def withValues(values: Seq[Any]): ArrayRow = ArrayRow(this.values ++ values)

override def selectIndices(indices: Int *): ArrayRow = ArrayRow(indices.toArray.map(values))
override def selectIndices(indices: Int *): ArrayRow = ArrayRow(indices.toArray.map(values).toSeq)

override def dropIndices(indices: Int *): ArrayRow = {
val drops = Set(indices: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object Row {
* @param values values in the row
* @return default row implementation with values
*/
def apply(values: Any *): Row = ArrayRow(values.toArray)
def apply(values: Any *): Row = ArrayRow(values.toArray.toSeq)
}

/** Base trait for row data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ case class RowTransformer private (inputSchema: StructType,
def transformOption(row: Row): Option[ArrayRow] = {
val arr = new Array[Any](maxSize)
row.toArray.copyToArray(arr)
val arrRow = ArrayRow(arr)
val arrRow = ArrayRow(arr.toSeq)

val r = transforms.foldLeft(Option(arrRow)) {
(r, transform) => r.flatMap(transform)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.UUID
import ml.combust.mleap.core.annotation.SparkCode

import scala.collection.JavaConverters._
import scala.collection.parallel.CollectionConverters._
import scala.language.existentials
import org.apache.hadoop.fs.Path
import org.json4s.{DefaultFormats, JObject, _}
Expand Down Expand Up @@ -355,7 +356,7 @@ final class OneVsRest @Since("1.4.0") (
}

// create k columns, one for each binary classifier.
val models = Range(0, numClasses).par.map { index =>
val models = (0 until numClasses).par.map { index =>
// generate new label metadata for the binary problem.
val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata()
val labelColName = "mc2b$" + index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.nio.file.Files
import java.util.zip.ZipInputStream
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.Try

/**
Expand All @@ -28,7 +29,7 @@ case class TensorflowModel( @transient var graph: Option[tensorflow.Graph] = Non
) extends Model with AutoCloseable {

def apply(values: Tensor[_] *): Seq[Any] = {
val garbage: mutable.ArrayBuilder[tensorflow.Tensor] = mutable.ArrayBuilder.make[tensorflow.Tensor]()
val garbage: mutable.ArrayBuilder[tensorflow.Tensor] = mutable.ArrayBuilder.make[tensorflow.Tensor](ClassTag(classOf[Tensor[_]]))

val result = Try {
val tensors = values.zip(inputs).map {
Expand Down
6 changes: 4 additions & 2 deletions project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ object Common {
lazy val defaultSettings = buildSettings ++ sonatypeSettings

lazy val buildSettings: Seq[Def.Setting[_]] = Seq(
scalaVersion := "2.12.13",
scalaVersion := "2.13.11",
crossScalaVersions := Seq("2.12.13", "2.13.11"),
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature"),
fork in Test := true,
javaOptions in test += sys.env.getOrElse("JVM_OPTS", ""),
Expand All @@ -32,7 +33,8 @@ object Common {
} else {
Seq()
}
}
},
resolvers += "XGBoost4J Snapshot Repo" at "https://s3-us-west-2.amazonaws.com/xgboost-maven-repo/snapshot/"
)

lazy val mleapSettings: Seq[Def.Setting[_]] = Seq(organization := "ml.combust.mleap")
Expand Down
13 changes: 7 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
lazy val slf4jVersion = "2.0.6"
lazy val awsSdkVersion = "1.11.1033"
val tensorflowJavaVersion = "0.5.0" // Match Tensorflow 2.10.1 https://github.com/tensorflow/java/#tensorflow-version-support
val xgboostVersion = "1.7.3"
val xgboostVersion = "2.0.0-SNAPSHOT"
val breezeVersion = "2.1.0"
val hadoopVersion = "3.3.4" // matches spark version
val platforms = "windows-x86_64,linux-x86_64,macosx-x86_64"
Expand All @@ -33,8 +33,9 @@ object Dependencies {
"org.apache.spark" %% "spark-avro" % sparkVersion
)
val avroDep = "org.apache.avro" % "avro" % "1.11.1"
val sprayJson = "io.spray" %% "spray-json" % "1.3.2"
val arm = "com.jsuereth" %% "scala-arm" % "2.0"
val sprayJson = "io.spray" %% "spray-json" % "1.3.6"
// https://github.com/jsuereth/scala-arm/issues/79
val arm = "com.michaelpollmeier" %% "scala-arm" % "2.1"
val config = "com.typesafe" % "config" % "1.3.0"
val scalaReflect = ScalaVersionDependentModuleID.versioned("org.scala-lang" % "scala-reflect" % _)
val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion
Expand Down Expand Up @@ -72,8 +73,8 @@ object Dependencies {

val breeze = "org.scalanlp" %% "breeze" % breezeVersion

val xgboostDep = "ml.dmlc" %% "xgboost4j" % xgboostVersion
val xgboostSparkDep = "ml.dmlc" %% "xgboost4j-spark" % xgboostVersion
val xgboostDep = "ml.dmlc" %% "xgboost4j" % xgboostVersion exclude("org.scala-lang.modules", "scala-collection-compat_2.12")
val xgboostSparkDep = "ml.dmlc" %% "xgboost4j-spark" % xgboostVersion exclude("org.scala-lang.modules", "scala-collection-compat_2.12") exclude("ml.dmlc", "xgboost4j_2.12")
val xgboostPredictorDep = "ai.h2o" % "xgboost-predictor" % "0.3.18" exclude("com.esotericsoftware.kryo", "kryo")

val hadoop = "org.apache.hadoop" % "hadoop-client" % hadoopVersion
Expand Down Expand Up @@ -127,7 +128,7 @@ object Dependencies {

val xgboostRuntime = l ++= Seq(xgboostDep) ++ Seq(xgboostPredictorDep) ++ Test.spark ++ Test.sparkTest ++ Seq(Test.scalaTest)

val xgboostSpark = l ++= Seq(xgboostSparkDep) ++ Provided.spark ++ Test.spark ++ Test.sparkTest
val xgboostSpark = l ++= Seq(xgboostDep) ++ Seq(xgboostSparkDep) ++ Provided.spark ++ Test.spark ++ Test.sparkTest

val serving = l ++= Seq(akkaHttp, akkaHttpSprayJson, config, Test.scalaTest, Test.akkaHttpTestkit)

Expand Down

0 comments on commit 3061cc5

Please sign in to comment.