diff --git a/src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala b/src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala index baf8791..239b119 100644 --- a/src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala +++ b/src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala @@ -22,6 +22,7 @@ object SparkSubmitPlugin extends AutoPlugin { lazy val sparkSubmitMaster = settingKey[(Seq[String], Seq[String]) => String]("(SparkArgs, AppArgs) => Default Spark Master") lazy val sparkSubmitPropertiesFile = settingKey[Option[String]]("The default configuration file used by Spark") lazy val sparkSubmitClasspath = taskKey[Seq[File]]("Classpath used in SparkSubmit. For example, this can include the HADOOP_CONF_DIR for yarn deployment.") + def sparkSubmit = defaultSparkSubmitKey class SparkSubmitSetting(name: String) { lazy val sparkSubmit = InputKey[Unit](name, @@ -48,6 +49,13 @@ object SparkSubmitPlugin extends AutoPlugin { lazy val defaultSettings = Seq( + fork in sparkSubmit := (fork in defaultSparkSubmitKey).value, + javaHome in sparkSubmit := (javaHome in defaultSparkSubmitKey).value, + connectInput in sparkSubmit := (connectInput in defaultSparkSubmitKey).value, + outputStrategy in sparkSubmit := (outputStrategy in defaultSparkSubmitKey).value, + javaOptions in sparkSubmit := (javaOptions in defaultSparkSubmitKey).value, + baseDirectory in sparkSubmit := (baseDirectory in defaultSparkSubmitKey).value, + envVars in sparkSubmit := (envVars in defaultSparkSubmitKey).value, sparkSubmit := { val jar = (sparkSubmitJar in sparkSubmit).value @@ -80,8 +88,23 @@ object SparkSubmitPlugin extends AutoPlugin { } } + val runner = + if((fork in sparkSubmit).value) { + val forkOptions = ForkOptions( + bootJars = Nil, + javaHome = (javaHome in sparkSubmit).value, + connectInput = (connectInput in sparkSubmit).value, + outputStrategy = (outputStrategy in sparkSubmit).value, + runJVMOptions = (javaOptions in sparkSubmit).value, + workingDirectory = Some((baseDirectory in sparkSubmit).value), + envVars = (envVars in sparkSubmit).value + ) + new sbt.ForkRun(forkOptions) + } else { + new sbt.Run(scalaInstance.value, trapExit.value, taskTemporaryDirectory.value) + } - runner.value.run( + runner.run( "org.apache.spark.deploy.SparkSubmit", sparkSubmitClasspath.value, options, @@ -121,11 +144,36 @@ object SparkSubmitPlugin extends AutoPlugin { sparkSubmitClasspath := data((fullClasspath in Compile).value) ) - def defaultSparkSubmitSetting: SparkSubmitSetting = SparkSubmitSetting("sparkSubmit") + lazy val defaultSparkSubmitSetting: SparkSubmitSetting = SparkSubmitSetting("sparkSubmit") + def defaultSparkSubmitKey = defaultSparkSubmitSetting.sparkSubmit override def trigger = allRequirements } +// requirement: libraryDependencies += spark-repk +object SparkShellPlugin extends AutoPlugin { + override def requires = SparkSubmitPlugin + + import SparkSubmitPlugin.autoImport._ + + lazy val sparkShellSetting: SparkSubmitSetting = SparkSubmitSetting( + "sparkShell", + Seq( + "--class", "org.apache.spark.repl.Main", + "--deploy-mode", "client" + ) + ) + def sparkShellKey = sparkShellSetting.sparkSubmit + + override def projectSettings = sparkShellSetting ++ + Seq( + fork in sparkShellKey := true, + javaOptions in sparkShellKey += "-Dscala.usejavacp=true", + outputStrategy in sparkShellKey := Some(StdoutOutput), + connectInput in sparkShellKey := true + ) +} + object SparkSubmitYARN extends AutoPlugin { override def requires = SparkSubmitPlugin @@ -145,4 +193,4 @@ object SparkSubmitYARN extends AutoPlugin { data((fullClasspath in Compile).value) } ) -} \ No newline at end of file +}