diff --git a/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/SuiteKickoff.scala b/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/SuiteKickoff.scala index fe442f4d..8813265d 100644 --- a/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/SuiteKickoff.scala +++ b/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/SuiteKickoff.scala @@ -58,7 +58,7 @@ import scala.collection.parallel.ForkJoinTaskSupport object SuiteKickoff { def run(s: Suite, spark: SparkSession): Unit = { - verifyCanWriteOrThrow(s.benchmarkOutput, s.saveMode, spark) + verifyOutput(s.benchmarkOutput, s.saveMode, spark) // Translate the maps into runnable workloads val workloads: Seq[Workload] = s.workloadConfigs.map(ConfigCreator.mapToConf) diff --git a/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/Workload.scala b/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/Workload.scala index 08321628..e8c8fd0c 100644 --- a/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/Workload.scala +++ b/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/Workload.scala @@ -46,7 +46,7 @@ trait Workload { def run(spark: SparkSession): DataFrame = { - verifyCanWriteOrThrow(output, saveMode, spark) + verifyOutput(output, saveMode, spark) if(saveMode == SaveModes.append){ throw SparkBenchException("Save-mode \"append\" not available for workload results. " + "Please use \"errorifexists\", \"ignore\", or \"overwrite\" instead.") diff --git a/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/exercise/SparkPi.scala b/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/exercise/SparkPi.scala index 8c8da50f..fdad80b7 100644 --- a/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/exercise/SparkPi.scala +++ b/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/exercise/SparkPi.scala @@ -34,7 +34,7 @@ case class SparkPiResult( object SparkPi extends WorkloadDefaults { val name = "sparkpi" - def apply(m: Map[String, Any]) = + def apply(m: Map[String, Any]) = new SparkPi(input = m.get("input").map(_.asInstanceOf[String]), output = None, slices = getOrDefault[Int](m, "slices", 2) diff --git a/cli/src/test/scala/com/ibm/sparktc/sparkbench/cli/HelpersTest.scala b/cli/src/test/scala/com/ibm/sparktc/sparkbench/cli/HelpersTest.scala index 66e4f5ce..2867e2bc 100644 --- a/cli/src/test/scala/com/ibm/sparktc/sparkbench/cli/HelpersTest.scala +++ b/cli/src/test/scala/com/ibm/sparktc/sparkbench/cli/HelpersTest.scala @@ -17,6 +17,7 @@ package com.ibm.sparktc.sparkbench.cli +import com.ibm.sparktc.sparkbench.utils.SaveModes import com.ibm.sparktc.sparkbench.workload.Suite import org.scalatest.{FlatSpec, Matchers} @@ -41,7 +42,7 @@ class HelpersTest extends FlatSpec with Matchers { Map("z" -> 20), Map("z" -> 30) ) - val suite = Suite.build(conf, Some("description"), 1, false, "error", Some("output")) + val suite = Suite.build(conf, Some("description"), 1, false, SaveModes.error, Some("output")) suite.description shouldBe Some("description") suite.repeat shouldBe 1 suite.parallel shouldBe false diff --git a/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala b/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala index 1ec408db..43d224cf 100644 --- a/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala +++ b/utils/src/main/scala/com/ibm/sparktc/sparkbench/utils/SparkFuncs.scala @@ -29,6 +29,11 @@ object SparkFuncs { case (_, Some(s)) => s } + def verifyOutput(outputDir: Option[String], saveMode: String, spark: SparkSession, fileFormat: Option[String] = None): Unit = { + verifyCanWriteOrThrow(outputDir, saveMode, spark, fileFormat) + verifyFormatOrThrow(outputDir, fileFormat) + } + def verifyCanWrite(outputDir: String, saveMode: String, spark: SparkSession, fileFormat: Option[String] = None): Boolean = { if(outputDir == Formats.console) true else { @@ -43,6 +48,53 @@ object SparkFuncs { } } + def verifyFormat(outputDir: String, fileFormat: Option[String] = None): Boolean = { + val format = parseFormat(outputDir, fileFormat) + + format match { + case Formats.parquet => true + case Formats.csv => true + case Formats.orc => true + case Formats.avro => true + case Formats.json => true + case Formats.console => true + case _ => false + } + } + + def pathExists(path: String, spark: SparkSession): Boolean = { + val conf = spark.sparkContext.hadoopConfiguration + val fs = org.apache.hadoop.fs.FileSystem.get(conf) + fs.exists(new org.apache.hadoop.fs.Path(path)) + } + + def verifyPathExistsOrThrow(path: String, errorMessage: String, spark: SparkSession): String = { + if (pathExists(path, spark)) { path } // "It is true that this file exists" + else { throw SparkBenchException(errorMessage) } + } + + def verifyPathNotExistsOrThrow(path: String, errorMessage: String, spark: SparkSession): String = { + if (!pathExists(path, spark)) { path } // "It is true that this file does not exist" + else { throw SparkBenchException(errorMessage) } + } + + def verifyCanWriteOrThrow(outputDir: Option[String], saveMode: String, spark: SparkSession, fileFormat: Option[String] = None): Unit = { + if(outputDir.nonEmpty) { + if(!verifyCanWrite(outputDir.get, saveMode, spark, fileFormat)){ + throw SparkBenchException(s"File ${outputDir.get} already exists and save-mode $saveMode prevents further action") + } + } + } + + def verifyFormatOrThrow(outputDir: Option[String], fileFormat: Option[String] = None): Unit = { + if(outputDir.nonEmpty) { + if(!verifyFormat(outputDir.get, fileFormat)) { + throw new Exception(s"Unrecognized or unspecified save format. " + + s"Please check the file extension or add a file format to your arguments: $outputDir") + } + } + } + def writeToDisk(outputDir: String, saveMode: String, data: DataFrame, spark: SparkSession, fileFormat: Option[String] = None): Unit = { val format = parseFormat(outputDir, fileFormat) @@ -54,7 +106,7 @@ object SparkFuncs { case Formats.avro => data.write.mode(saveMode).avro(outputDir) case Formats.json => data.write.mode(saveMode).json(outputDir) case Formats.console => data.show() - case _ => throw new Exception(s"Unrecognized or unspecified save format. " + + case _ => throw new Exception(s"Unrecognized or unspecified save format: $format. " + s"Please check the file extension or add a file format to your arguments: $outputDir") } } @@ -77,29 +129,7 @@ object SparkFuncs { } } - def pathExists(path: String, spark: SparkSession): Boolean = { - val conf = spark.sparkContext.hadoopConfiguration - val fs = org.apache.hadoop.fs.FileSystem.get(conf) - fs.exists(new org.apache.hadoop.fs.Path(path)) - } - def verifyPathExistsOrThrow(path: String, errorMessage: String, spark: SparkSession): String = { - if (pathExists(path, spark)) { path } // "It is true that this file exists" - else { throw SparkBenchException(errorMessage) } - } - - def verifyPathNotExistsOrThrow(path: String, errorMessage: String, spark: SparkSession): String = { - if (!pathExists(path, spark)) { path } // "It is true that this file does not exist" - else { throw SparkBenchException(errorMessage) } - } - - def verifyCanWriteOrThrow(outputDir: Option[String], saveMode: String, spark: SparkSession, fileFormat: Option[String] = None): Unit = { - if(outputDir.nonEmpty) { - if(!verifyCanWrite(outputDir.get, saveMode, spark, fileFormat)){ - throw SparkBenchException(s"File ${outputDir.get} already exists and save-mode $saveMode prevents further action") - } - } - } def addConfToResults(df: DataFrame, m: Map[String, Any]): DataFrame = { def dealWithNones(a: Any): Any = a match {