Skip to content

Commit

Permalink
Addressing code review comments
Browse files Browse the repository at this point in the history
Moves format check to front.
  • Loading branch information
ecurtin authored and Emily Curtin committed Feb 2, 2018
1 parent a40468c commit 992a278
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import scala.collection.parallel.ForkJoinTaskSupport
object SuiteKickoff {

def run(s: Suite, spark: SparkSession): Unit = {
verifyCanWriteOrThrow(s.benchmarkOutput, s.saveMode, spark)
verifyOutput(s.benchmarkOutput, s.saveMode, spark)

// Translate the maps into runnable workloads
val workloads: Seq[Workload] = s.workloadConfigs.map(ConfigCreator.mapToConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ trait Workload {

def run(spark: SparkSession): DataFrame = {

verifyCanWriteOrThrow(output, saveMode, spark)
verifyOutput(output, saveMode, spark)
if(saveMode == SaveModes.append){
throw SparkBenchException("Save-mode \"append\" not available for workload results. " +
"Please use \"errorifexists\", \"ignore\", or \"overwrite\" instead.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class SparkPiResult(

object SparkPi extends WorkloadDefaults {
val name = "sparkpi"
def apply(m: Map[String, Any]) =
def apply(m: Map[String, Any]) =
new SparkPi(input = m.get("input").map(_.asInstanceOf[String]),
output = None,
slices = getOrDefault[Int](m, "slices", 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.ibm.sparktc.sparkbench.cli

import com.ibm.sparktc.sparkbench.utils.SaveModes
import com.ibm.sparktc.sparkbench.workload.Suite
import org.scalatest.{FlatSpec, Matchers}

Expand All @@ -41,7 +42,7 @@ class HelpersTest extends FlatSpec with Matchers {
Map("z" -> 20),
Map("z" -> 30)
)
val suite = Suite.build(conf, Some("description"), 1, false, "error", Some("output"))
val suite = Suite.build(conf, Some("description"), 1, false, SaveModes.error, Some("output"))
suite.description shouldBe Some("description")
suite.repeat shouldBe 1
suite.parallel shouldBe false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ object SparkFuncs {
case (_, Some(s)) => s
}

def verifyOutput(outputDir: Option[String], saveMode: String, spark: SparkSession, fileFormat: Option[String] = None): Unit = {
verifyCanWriteOrThrow(outputDir, saveMode, spark, fileFormat)
verifyFormatOrThrow(outputDir, fileFormat)
}

def verifyCanWrite(outputDir: String, saveMode: String, spark: SparkSession, fileFormat: Option[String] = None): Boolean = {
if(outputDir == Formats.console) true
else {
Expand All @@ -43,6 +48,53 @@ object SparkFuncs {
}
}

def verifyFormat(outputDir: String, fileFormat: Option[String] = None): Boolean = {
val format = parseFormat(outputDir, fileFormat)

format match {
case Formats.parquet => true
case Formats.csv => true
case Formats.orc => true
case Formats.avro => true
case Formats.json => true
case Formats.console => true
case _ => false
}
}

def pathExists(path: String, spark: SparkSession): Boolean = {
val conf = spark.sparkContext.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
fs.exists(new org.apache.hadoop.fs.Path(path))
}

def verifyPathExistsOrThrow(path: String, errorMessage: String, spark: SparkSession): String = {
if (pathExists(path, spark)) { path } // "It is true that this file exists"
else { throw SparkBenchException(errorMessage) }
}

def verifyPathNotExistsOrThrow(path: String, errorMessage: String, spark: SparkSession): String = {
if (!pathExists(path, spark)) { path } // "It is true that this file does not exist"
else { throw SparkBenchException(errorMessage) }
}

def verifyCanWriteOrThrow(outputDir: Option[String], saveMode: String, spark: SparkSession, fileFormat: Option[String] = None): Unit = {
if(outputDir.nonEmpty) {
if(!verifyCanWrite(outputDir.get, saveMode, spark, fileFormat)){
throw SparkBenchException(s"File ${outputDir.get} already exists and save-mode $saveMode prevents further action")
}
}
}

def verifyFormatOrThrow(outputDir: Option[String], fileFormat: Option[String] = None): Unit = {
if(outputDir.nonEmpty) {
if(!verifyFormat(outputDir.get, fileFormat)) {
throw new Exception(s"Unrecognized or unspecified save format. " +
s"Please check the file extension or add a file format to your arguments: $outputDir")
}
}
}

def writeToDisk(outputDir: String, saveMode: String, data: DataFrame, spark: SparkSession, fileFormat: Option[String] = None): Unit = {

val format = parseFormat(outputDir, fileFormat)
Expand All @@ -54,7 +106,7 @@ object SparkFuncs {
case Formats.avro => data.write.mode(saveMode).avro(outputDir)
case Formats.json => data.write.mode(saveMode).json(outputDir)
case Formats.console => data.show()
case _ => throw new Exception(s"Unrecognized or unspecified save format. " +
case _ => throw new Exception(s"Unrecognized or unspecified save format: $format. " +
s"Please check the file extension or add a file format to your arguments: $outputDir")
}
}
Expand All @@ -77,29 +129,7 @@ object SparkFuncs {
}
}

def pathExists(path: String, spark: SparkSession): Boolean = {
val conf = spark.sparkContext.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
fs.exists(new org.apache.hadoop.fs.Path(path))
}

def verifyPathExistsOrThrow(path: String, errorMessage: String, spark: SparkSession): String = {
if (pathExists(path, spark)) { path } // "It is true that this file exists"
else { throw SparkBenchException(errorMessage) }
}

def verifyPathNotExistsOrThrow(path: String, errorMessage: String, spark: SparkSession): String = {
if (!pathExists(path, spark)) { path } // "It is true that this file does not exist"
else { throw SparkBenchException(errorMessage) }
}

def verifyCanWriteOrThrow(outputDir: Option[String], saveMode: String, spark: SparkSession, fileFormat: Option[String] = None): Unit = {
if(outputDir.nonEmpty) {
if(!verifyCanWrite(outputDir.get, saveMode, spark, fileFormat)){
throw SparkBenchException(s"File ${outputDir.get} already exists and save-mode $saveMode prevents further action")
}
}
}

def addConfToResults(df: DataFrame, m: Map[String, Any]): DataFrame = {
def dealWithNones(a: Any): Any = a match {
Expand Down

0 comments on commit 992a278

Please sign in to comment.