Skip to content

Commit

Permalink
Passing config string instead of temp file
Browse files Browse the repository at this point in the history
When launching spark-bench through assembled spark-submit scripts, the
child argument used to be a path to a temporary file created by the
launch process. This was causing issues when running in cluster mode
because the temporary files were in local storage.

Now instead the configuration is passed as one long string instead of a
path to a temporary file.
  • Loading branch information
ecurtin authored and Emily Curtin committed Oct 10, 2017
1 parent 481bee2 commit 0e642b4
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ object CLIKickoff extends App {
override def main(args: Array[String]): Unit = {
args.length match {
case 1 => {
val file = new File(args.head)
if(!file.exists()) throw SparkBenchException(s"Cannot find configuration file: ${file.getPath}")
val worksuites = Configurator(new File(args.head))
// val file = new File(args.head)
// if(!file.exists()) throw SparkBenchException(s"Cannot find configuration file: ${file.getPath}")
val worksuites = Configurator(args.head)
MultipleSuiteKickoff.run(worksuites)
}
case _ => throw new IllegalArgumentException("Requires exactly one option: config file path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import scala.util.Try

object Configurator {

def apply(file: File): Seq[MultiSuiteRunConfig] = {
val config: Config = ConfigFactory.parseFile(file)
def apply(str: String): Seq[MultiSuiteRunConfig] = {
val config: Config = ConfigFactory.parseString(str)
val sparkBenchConfig = config.getObject("spark-bench").toConfig
val sparkContextConfs = parseSparkBenchRunConfig(sparkBenchConfig)
sparkContextConfs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class ConfigFileTest extends FlatSpec with Matchers with BeforeAndAfterAll with
val relativePath = "/etc/testConfFile1.conf"
val resource = getClass.getResource(relativePath)
val path = resource.getPath
CLIKickoff.main(Array(path))
val text = Source.fromFile(path).mkString
CLIKickoff.main(Array(text))

kmeansData.exists() shouldBe true
output1.exists() shouldBe true
Expand All @@ -73,6 +74,7 @@ class ConfigFileTest extends FlatSpec with Matchers with BeforeAndAfterAll with
val relativePath = "/etc/testConfFile2.conf"
val resource = getClass.getResource(relativePath)
val path = resource.getPath
CLIKickoff.main(Array(path))
val text = Source.fromFile(path).mkString
CLIKickoff.main(Array(text))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import com.ibm.sparktc.sparkbench.cli.CLIKickoff
import com.ibm.sparktc.sparkbench.testfixtures.BuildAndTeardownData
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}

import scala.io.Source

class NotebookSimTest extends FlatSpec with Matchers with BeforeAndAfterEach with Capturing {
val dataMaker = new BuildAndTeardownData("notebook-sim-test")

Expand All @@ -43,7 +45,8 @@ class NotebookSimTest extends FlatSpec with Matchers with BeforeAndAfterEach wit
val relativePath = "/etc/notebook-sim.conf"
val resource = getClass.getResource(relativePath)
val path = resource.getPath
CLIKickoff.main(Array(path))
val text = Source.fromFile(path).mkString
CLIKickoff.main(Array(text))
}


Expand Down
16 changes: 14 additions & 2 deletions cli/src/test/scala/com/ibm/sparktc/sparkbench/OutputTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import com.ibm.sparktc.sparkbench.cli.CLIKickoff
import com.ibm.sparktc.sparkbench.testfixtures.BuildAndTeardownData
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}

import scala.io.Source

class OutputTest extends FlatSpec with Matchers with BeforeAndAfterAll with Capturing {
val dataStuff = new BuildAndTeardownData("output-test")

Expand All @@ -36,15 +38,25 @@ class OutputTest extends FlatSpec with Matchers with BeforeAndAfterAll with Capt
}

"Specifying Console output" should "work" in {
val (out, _) = captureOutput(CLIKickoff.main(Array(getClass.getResource("/etc/testConfFile3.conf").getPath)))
val relativePath = "/etc/testConfFile3.conf"
val resource = getClass.getResource(relativePath)
val path = resource.getPath
val text = Source.fromFile(path).mkString

val (out, _) = captureOutput(CLIKickoff.main(Array(text)))
out should not be ""
out.split("\n").length shouldBe 10
println(out)
}


"Want to see configuration added to results when there's crazy stuff" should "work" in {
val (out, _) = captureOutput(CLIKickoff.main(Array(getClass.getResource("/etc/testConfFile4.conf").getPath)))
val relativePath = "/etc/testConfFile4.conf"
val resource = getClass.getResource(relativePath)
val path = resource.getPath
val text = Source.fromFile(path).mkString

val (out, _) = captureOutput(CLIKickoff.main(Array(text)))
out should not be ""
out.split("\n").length shouldBe 1
println(out)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package com.ibm.sparktc.sparkbench.cli

import com.ibm.sparktc.sparkbench.utils.SparkBenchException
import com.ibm.sparktc.sparkbench.workload.Suite
import com.typesafe.config.ConfigException
import org.scalatest.{FlatSpec, Matchers}

class HelpersTest extends FlatSpec with Matchers {
"CLIKickoff" should "reject invalid argument strings" in {
an [IllegalArgumentException] should be thrownBy CLIKickoff.main(Array())
a [SparkBenchException] should be thrownBy CLIKickoff.main(Array("/dev/null/this/file/does/not/exist"))
an [Exception] should be thrownBy CLIKickoff.main(Array("this is totally not valid HOCON {{}"))
}
"Suite" should "split workload configs properly" in {
val conf = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,30 @@ object ConfigWrangler {
* Takes in the path to the config file, splits that file to a bunch of files in a folder in /tmp, returns
* the paths to those files.
* @param path
* @return a Seq of paths to the created files.
* @return a Seq of the new config files as Strings
*/
def apply(path: File): Seq[(SparkLaunchConf, String)] = {
def apply(path: File): Seq[SparkSubmitScriptConf] = {
val config: Config = ConfigFactory.parseFile(path)
val sparkBenchConfig = config.getObject(SLD.topLevelConfObject).toConfig
val sparkContextConfs = getListOfSparkSubmits(sparkBenchConfig)

val processedConfs = sparkContextConfs.flatMap(processConfig)
val confsAndPaths: Seq[(Config, String)] = processedConfs.map{ oneConf =>
(oneConf, writePartialConfToDisk(sparkBenchConfig, oneConf))
val processedConfs: Seq[Config] = sparkContextConfs.flatMap(processConfig)
val scriptsReadyToGo: Seq[SparkSubmitScriptConf] = processedConfs.map{ oneConf =>
val wrapped = wrapConf(oneConf)
SparkSubmitScriptConf(oneConf, wrapped.root().render(ConfigRenderOptions.concise()))
}

val ret = confsAndPaths.map{ tuple => {
val conf = tuple._1
val tmpFilePath = tuple._2
(SparkLaunchConf(conf, tmpFilePath), tmpFilePath)
}
}
ret
scriptsReadyToGo
}


private def wrapConf(oneConf: Config): Config = {
val wrapWithSparkSubmit = ConfigFactory.empty.withValue(
SLD.sparkSubmitObject,
ConfigValueFactory.fromIterable(Iterable(oneConf.root).asJava)
)
val sbConf = ConfigFactory.empty.withValue(SLD.topLevelConfObject, wrapWithSparkSubmit.root)
sbConf
}

/**
Expand All @@ -74,10 +79,10 @@ object ConfigWrangler {
* @return
*/
private[sparklaunch] def processConfig(config: Config): Seq[Config] = {
val a: SparkSubmitDeconstructedWithSeqs = SparkSubmitDeconstructedWithSeqs(config)
val a: LaunchConfigDeconstructedWithSeqs = LaunchConfigDeconstructedWithSeqs(config)
if(a.sparkSubmitOptions.isEmpty) Seq(config)
else {
val b: Seq[SparkSubmitDeconstructed] = a.split()
val b: Seq[LaunchConfigDeconstructed] = a.split()
val c: Seq[SparkSubmitPieces] = b.map(_.splitPieces)
val d: Seq[Config] = c.map(_.reconstruct)
d
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,32 @@ import com.typesafe.config.{Config, ConfigValueFactory}
import scala.collection.JavaConverters._
import com.ibm.sparktc.sparkbench.sparklaunch.{SparkLaunchDefaults => SLD}
import com.ibm.sparktc.sparkbench.utils.TypesafeAccessories.{configToMapStringAny, splitGroupedConfigToIndividualConfigs}
import com.ibm.sparktc.sparkbench.utils.GeneralFunctions.optionallyGet

import scala.util.Try



case class SparkSubmitDeconstructedWithSeqs (
case class LaunchConfigDeconstructedWithSeqs(
sparkSubmitOptions: Map[String, Seq[Any]],
suitesConfig: Config
) {

def split(): Seq[SparkSubmitDeconstructed] = {
def split(): Seq[LaunchConfigDeconstructed] = {
val splitMaps: Seq[Map[String, Any]] = splitGroupedConfigToIndividualConfigs(sparkSubmitOptions)
val asJava: Seq[util.Map[String, Any]] = splitMaps.map(_.asJava)

asJava.map(SparkSubmitDeconstructed(_, suitesConfig))
asJava.map(LaunchConfigDeconstructed(_, suitesConfig))
}
}

object SparkSubmitDeconstructedWithSeqs {
object LaunchConfigDeconstructedWithSeqs {

def apply(oneSparkSubmitConfig: Config): SparkSubmitDeconstructedWithSeqs = {
def apply(oneSparkSubmitConfig: Config): LaunchConfigDeconstructedWithSeqs = {
val suites = oneSparkSubmitConfig.withOnlyPath(SLD.suites)
val workingConf = oneSparkSubmitConfig.withoutPath(SLD.suites)
val map: Map[String, Seq[Any]] = configToMapStringAny(workingConf)

val newMap = extractSparkArgsToHigherLevel(map, workingConf)

SparkSubmitDeconstructedWithSeqs(
LaunchConfigDeconstructedWithSeqs(
newMap,
suites
)
Expand All @@ -71,7 +68,7 @@ object SparkSubmitDeconstructedWithSeqs {
}
}

case class SparkSubmitDeconstructed (
case class LaunchConfigDeconstructed(
sparkSubmitOptions: util.Map[String, Any],
suitesConfig: Config
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,52 +23,47 @@ import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.JavaConverters._
import scala.sys.process._
import scala.util.Try
import com.ibm.sparktc.sparkbench.sparklaunch.{SparkLaunchDefaults => SLD}

object SparkLaunch extends App {

override def main(args: Array[String]): Unit = {
assert(args.nonEmpty)
val path = args.head
val (confSeq: Seq[(SparkLaunchConf, String)], parallel: Boolean) = mkConfs(new File(path))
run(confSeq.map(_._1), parallel)
rmTmpFiles(confSeq.map(_._2))
val (confSeq: Seq[SparkSubmitScriptConf], parallel: Boolean) = mkConfs(new File(path))
launchSparkSubmitScripts(confSeq, parallel)
// rmTmpFiles(confSeq.map(_._2))
}

def mkConfs(file: File): (Seq[(SparkLaunchConf, String)], Boolean) = {
def mkConfs(file: File): (Seq[SparkSubmitScriptConf], Boolean) = {
val config: Config = ConfigFactory.parseFile(file)
val sparkBenchConfig = config.getObject("spark-bench").toConfig
val confs: Seq[(SparkLaunchConf, String)] = ConfigWrangler(file)
val confs: Seq[SparkSubmitScriptConf] = ConfigWrangler(file)
val parallel = Try(sparkBenchConfig.getBoolean("spark-submit-parallel")).getOrElse(false)
(confs, parallel)
}


private def getConfigListByName(name: String, config: Config): List[Config] = {
val workloadObjs: Iterable[ConfigObject] = config.getObjectList(name).asScala
workloadObjs.map(_.toConfig).toList
}

def run(confSeq: Seq[SparkLaunchConf], parallel: Boolean): Unit = {
def launchSparkSubmitScripts(confSeq: Seq[SparkSubmitScriptConf], parallel: Boolean): Unit = {
if (parallel) {
val confSeqPar = confSeq.par
//TODO address the concern that this could be confSeqPar.size threads for EACH member of ParSeq
confSeqPar.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(confSeqPar.size))
confSeqPar.foreach(launch)
} else confSeq.foreach(launch)
}

def launch(conf: SparkLaunchConf): Unit = {
val argz: Array[String] = conf.toSparkArgs
def launch(conf: SparkSubmitScriptConf): Unit = {
val argz: Array[String] = conf.toSparkSubmitArgs
val submitProc = Process(Seq(s"${conf.sparkHome}/bin/spark-submit") ++ argz, None, "SPARK_HOME" -> conf.sparkHome)
println(" *** SPARK-SUBMIT: " + submitProc.toString)
if (submitProc.! != 0) {
throw new Exception(s"spark-submit failed to complete properly given these arguments: \n\t${argz.mkString(" ")}")
}
}

private[sparklaunch] def rmTmpFiles(fns: Seq[String]): Unit = fns.foreach { fn =>
try {
val f = new File(fn)
if (f.exists) f.delete
} catch { case e: Throwable => println(s"failed to delete $fn", e) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.typesafe.config.{Config, ConfigObject}
import scala.collection.JavaConverters._
import scala.util.Try

case class SparkLaunchConf(
case class SparkSubmitScriptConf(
`class`: String,
sparkHome: String,
sparkBenchJar: String,
Expand All @@ -35,21 +35,21 @@ case class SparkLaunchConf(
childArgs: Array[String]
){

def toSparkArgs: Array[String] =
def toSparkSubmitArgs: Array[String] =
Array("--class", `class`) ++ sparkArgs ++ sparkConfs ++ Array(sparkBenchJar) ++ childArgs

}

object SparkLaunchConf {
object SparkSubmitScriptConf {

def apply(sparkContextConf: Config, path: String): SparkLaunchConf = {
SparkLaunchConf(
def apply(sparkContextConf: Config, childArg: String): SparkSubmitScriptConf = {
SparkSubmitScriptConf(
`class` = getSparkBenchClass(sparkContextConf),
sparkHome = getSparkHome(sparkContextConf),
sparkBenchJar = getSparkBenchJar(sparkContextConf),
sparkArgs = getSparkArgs(sparkContextConf),
sparkConfs = getSparkConfs(sparkContextConf),
childArgs = Array(path)
childArgs = Array(childArg)
)
}

Expand Down Expand Up @@ -78,7 +78,6 @@ object SparkLaunchConf {
def getSparkBenchJar(sparkContextConf: Config): String = {

val whereIAm = this.getClass.getProtectionDomain.getCodeSource.getLocation.getFile
println(s"I'M HERE::: $whereIAm")

if(whereIAm.endsWith(".jar")) {
/*
Expand Down
Loading

0 comments on commit 0e642b4

Please sign in to comment.