Skip to content

Commit

Permalink
databricks fat jar scala 2.13 compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
db-scnakandala committed Jul 10, 2023
1 parent fad50bf commit 9aae3cd
Show file tree
Hide file tree
Showing 19 changed files with 37 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object HadoopBundleFileSystem {
}

def createSchemes(config: Config): Seq[String] = if (config.hasPath("schemes")) {
config.getStringList("schemes").asScala
config.getStringList("schemes").asScala.toSeq
} else { Seq("hdfs") }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ trait JsonSupport {
obj.ds.foreach(ds => fb += ("data_shape" -> ds.toJson))
obj.m.foreach(m => fb += ("model" -> m.toJson))

JsObject(fb.result(): _*)
JsObject(fb.result().toSeq: _*)
}

override def read(json: JsValue): Scalar = json match {
Expand Down Expand Up @@ -220,7 +220,7 @@ trait JsonSupport {
if(obj.ds.nonEmpty) { fb += ("data_shape" -> obj.ds.toJson) }
if(obj.m.nonEmpty) { fb += ("model" -> obj.m.toJson) }

JsObject(fb.result(): _*)
JsObject(fb.result().toSeq: _*)
}

override def read(json: JsValue): List = json match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object FormatModelSerializer {

/** Object for serializing/deserializing model definitions with JSON.
*/
case class JsonFormatModelSerializer(implicit hr: HasBundleRegistry) extends FormatModelSerializer {
case class JsonFormatModelSerializer()(implicit hr: HasBundleRegistry) extends FormatModelSerializer {
override def write(path: Path, model: Model): Unit = {
Files.write(path, model.asBundle.toJson.prettyPrint.getBytes("UTF-8"))
}
Expand All @@ -55,7 +55,7 @@ case class JsonFormatModelSerializer(implicit hr: HasBundleRegistry) extends For

/** Object for serializing/deserializing model definitions with Protobuf.
*/
case class ProtoFormatModelSerializer(implicit hr: HasBundleRegistry) extends FormatModelSerializer {
case class ProtoFormatModelSerializer()(implicit hr: HasBundleRegistry) extends FormatModelSerializer {
override def write(path: Path, model: Model): Unit = {
Files.write(path, model.asBundle.toByteArray)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class DefaultFrameReader extends FrameReader {
var rows = mutable.Seq[Row]()
while(Try(reader.hasNext).getOrElse(false)) {
record = reader.next(record)
val row = ArrayRow(new Array[Any](schema.fields.length))
val row = ArrayRow((new Array[Any](schema.fields.length)).toSeq)
for(i <- schema.fields.indices) { row.set(i, readers(i)(record.get(i))) }
rows :+= row
}

DefaultLeapFrame(schema, rows)
DefaultLeapFrame(schema, rows.toSeq)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DefaultRowReader(override val schema: StructType) extends RowReader {
override def fromBytes(bytes: Array[Byte], charset: Charset = BuiltinFormats.charset): Try[Row] = Try {
decoder = DecoderFactory.get().binaryDecoder(bytes, decoder)
record = datumReader.read(record, decoder)
val row = ArrayRow(new Array[Any](schema.fields.length))
val row = ArrayRow((new Array[Any](schema.fields.length)).toSeq)
for(i <- schema.fields.indices) { row.set(i, readers(i)(record.get(i))) }
row
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import scala.util.control.NonFatal
object ClassLoaderUtil {
def findClassLoader(baseName: String): ClassLoader = {
def findCaller(get: Int Class[_]): ClassLoader =
Iterator.from(2 /*is the magic number, promise*/ ).map(get) dropWhile { c
Iterator.from(2 /*is the magic number, promise*/ ).map(get).dropWhile { c
c != null &&
(c.getName.startsWith(baseName) ||
c.getName.startsWith("scala.Option") ||
c.getName.startsWith("scala.collection.Iterator") ||
c.getName.startsWith("ml.combust.bundle.util.ClassLoaderUtil"))
} next () match {
case null => getClass.getClassLoader
case c => c.getClassLoader
}.nextOption() match {
case Some(c) c.getClassLoader
case None getClass.getClassLoader
}

Option(Thread.currentThread.getContextClassLoader) orElse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.ml.linalg.mleap.VectorUtil._
case class VectorSlicerModel(indices: Array[Int],
namedIndices: Array[(String, Int)] = Array(),
inputSize: Int) extends Model {
val allIndices: Array[Int] = indices.union(namedIndices.map(_._2))
val allIndices: Array[Int] = indices.union(namedIndices.map(_._2)).toArray

def apply(features: Vector): Vector = features match {
case features: DenseVector => Vectors.dense(allIndices.map(features.apply))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class WordToVectorModel(wordIndex: Map[String, Int],
wordIndex.map { case (word, ind) =>
(word, wordVectors.slice(vectorSize * ind, vectorSize * ind + vectorSize))
}
}.mapValues(Vectors.dense).map(identity)
}.mapValues(Vectors.dense).map(identity).toMap

def apply(sentence: Seq[String]): Vector = {
if (sentence.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LeapFrameBuilderSupport {
def createByteString(): BasicType = BasicType.ByteString

def createTensorDimensions(dims : java.util.List[Integer]): Option[Seq[Int]] = {
Some(dims.asScala.map(_.intValue()))
Some(dims.asScala.toSeq.map(_.intValue()))
}

def createSchema(json: String): StructType = json.parseJson.convertTo[StructType]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ class LeapFrameSupport {
}

def select(frame: DefaultLeapFrame, fieldNames: java.util.List[String]): DefaultLeapFrame = {
frame.select(fieldNames.asScala: _*).get
frame.select(fieldNames.asScala.toSeq: _*).get
}

def drop(frame: DefaultLeapFrame, names: java.util.List[String]): DefaultLeapFrame = {
frame.drop(names.asScala: _*).get
frame.drop(names.asScala.toSeq: _*).get
}

def getFields(schema: StructType): java.util.List[StructField] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DefaultFrameReader extends FrameReader {
rows(i) = row
}

DefaultLeapFrame(schema, rows)
DefaultLeapFrame(schema, rows.toSeq)
}).tried
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class DefaultRowReader(override val schema: StructType) extends RowReader {
override def fromBytes(bytes: Array[Byte], charset: Charset = BuiltinFormats.charset): Try[Row] = {
(for(in <- managed(new ByteArrayInputStream(bytes))) yield {
val din = new DataInputStream(in)
val row = ArrayRow(new Array[Any](schema.fields.length))
val row = ArrayRow((new Array[Any](schema.fields.length)).toSeq)
var i = 0
for(s <- serializers) {
row.set(i, s.read(din))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ case class ArrayRow(values: mutable.WrappedArray[Any]) extends Row {
override def withValue(value: Any): ArrayRow = ArrayRow(values :+ value)
override def withValues(values: Seq[Any]): ArrayRow = ArrayRow(this.values ++ values)

override def selectIndices(indices: Int *): ArrayRow = ArrayRow(indices.toArray.map(values))
override def selectIndices(indices: Int *): ArrayRow = ArrayRow(indices.toArray.map(values).toSeq)

override def dropIndices(indices: Int *): ArrayRow = {
val drops = Set(indices: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object Row {
* @param values values in the row
* @return default row implementation with values
*/
def apply(values: Any *): Row = ArrayRow(values.toArray)
def apply(values: Any *): Row = ArrayRow(values.toArray.toSeq)
}

/** Base trait for row data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ case class RowTransformer private (inputSchema: StructType,
def transformOption(row: Row): Option[ArrayRow] = {
val arr = new Array[Any](maxSize)
row.toArray.copyToArray(arr)
val arrRow = ArrayRow(arr)
val arrRow = ArrayRow(arr.toSeq)

val r = transforms.foldLeft(Option(arrRow)) {
(r, transform) => r.flatMap(transform)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.UUID
import ml.combust.mleap.core.annotation.SparkCode

import scala.collection.JavaConverters._
import scala.collection.parallel.CollectionConverters._
import scala.language.existentials
import org.apache.hadoop.fs.Path
import org.json4s.{DefaultFormats, JObject, _}
Expand Down Expand Up @@ -355,7 +356,7 @@ final class OneVsRest @Since("1.4.0") (
}

// create k columns, one for each binary classifier.
val models = Range(0, numClasses).par.map { index =>
val models = (0 until numClasses).par.map { index =>
// generate new label metadata for the binary problem.
val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata()
val labelColName = "mc2b$" + index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.nio.file.Files
import java.util.zip.ZipInputStream
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.Try

/**
Expand All @@ -28,7 +29,7 @@ case class TensorflowModel( @transient var graph: Option[tensorflow.Graph] = Non
) extends Model with AutoCloseable {

def apply(values: Tensor[_] *): Seq[Any] = {
val garbage: mutable.ArrayBuilder[tensorflow.Tensor] = mutable.ArrayBuilder.make[tensorflow.Tensor]()
val garbage: mutable.ArrayBuilder[tensorflow.Tensor] = mutable.ArrayBuilder.make[tensorflow.Tensor](ClassTag(classOf[Tensor[_]]))

val result = Try {
val tensors = values.zip(inputs).map {
Expand Down
6 changes: 4 additions & 2 deletions project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ object Common {
lazy val defaultSettings = buildSettings ++ sonatypeSettings

lazy val buildSettings: Seq[Def.Setting[_]] = Seq(
scalaVersion := "2.12.13",
scalaVersion := "2.13.11",
crossScalaVersions := Seq("2.12.13", "2.13.11"),
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature"),
fork in Test := true,
javaOptions in test += sys.env.getOrElse("JVM_OPTS", ""),
Expand All @@ -32,7 +33,8 @@ object Common {
} else {
Seq()
}
}
},
resolvers += "XGBoost4J Snapshot Repo" at "https://s3-us-west-2.amazonaws.com/xgboost-maven-repo/snapshot/"
)

lazy val mleapSettings: Seq[Def.Setting[_]] = Seq(organization := "ml.combust.mleap")
Expand Down
13 changes: 7 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
lazy val slf4jVersion = "2.0.6"
lazy val awsSdkVersion = "1.11.1033"
val tensorflowJavaVersion = "0.5.0" // Match Tensorflow 2.10.1 https://github.com/tensorflow/java/#tensorflow-version-support
val xgboostVersion = "1.7.3"
val xgboostVersion = "2.0.0-SNAPSHOT"
val breezeVersion = "2.1.0"
val hadoopVersion = "3.3.4" // matches spark version
val platforms = "windows-x86_64,linux-x86_64,macosx-x86_64"
Expand All @@ -33,8 +33,9 @@ object Dependencies {
"org.apache.spark" %% "spark-avro" % sparkVersion
)
val avroDep = "org.apache.avro" % "avro" % "1.11.1"
val sprayJson = "io.spray" %% "spray-json" % "1.3.2"
val arm = "com.jsuereth" %% "scala-arm" % "2.0"
val sprayJson = "io.spray" %% "spray-json" % "1.3.6"
// https://github.com/jsuereth/scala-arm/issues/79
val arm = "com.michaelpollmeier" %% "scala-arm" % "2.1"
val config = "com.typesafe" % "config" % "1.3.0"
val scalaReflect = ScalaVersionDependentModuleID.versioned("org.scala-lang" % "scala-reflect" % _)
val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion
Expand Down Expand Up @@ -72,8 +73,8 @@ object Dependencies {

val breeze = "org.scalanlp" %% "breeze" % breezeVersion

val xgboostDep = "ml.dmlc" %% "xgboost4j" % xgboostVersion
val xgboostSparkDep = "ml.dmlc" %% "xgboost4j-spark" % xgboostVersion
val xgboostDep = "ml.dmlc" %% "xgboost4j" % xgboostVersion exclude("org.scala-lang.modules", "scala-collection-compat_2.12")
val xgboostSparkDep = "ml.dmlc" %% "xgboost4j-spark" % xgboostVersion exclude("org.scala-lang.modules", "scala-collection-compat_2.12") exclude("ml.dmlc", "xgboost4j_2.12")
val xgboostPredictorDep = "ai.h2o" % "xgboost-predictor" % "0.3.18" exclude("com.esotericsoftware.kryo", "kryo")

val hadoop = "org.apache.hadoop" % "hadoop-client" % hadoopVersion
Expand Down Expand Up @@ -127,7 +128,7 @@ object Dependencies {

val xgboostRuntime = l ++= Seq(xgboostDep) ++ Seq(xgboostPredictorDep) ++ Test.spark ++ Test.sparkTest ++ Seq(Test.scalaTest)

val xgboostSpark = l ++= Seq(xgboostSparkDep) ++ Provided.spark ++ Test.spark ++ Test.sparkTest
val xgboostSpark = l ++= Seq(xgboostDep) ++ Seq(xgboostSparkDep) ++ Provided.spark ++ Test.spark ++ Test.sparkTest

val serving = l ++= Seq(akkaHttp, akkaHttpSprayJson, config, Test.scalaTest, Test.akkaHttpTestkit)

Expand Down

0 comments on commit 9aae3cd

Please sign in to comment.