Skip to content

Commit

Permalink
Updating custom workload docs
Browse files Browse the repository at this point in the history
The old document referenced an old API. This rewrite focuses on the
example from ecurtin/spark-bench-custom-workload.
  • Loading branch information
ecurtin committed Oct 24, 2017
1 parent b644e9b commit ea1ff10
Showing 1 changed file with 122 additions and 52 deletions.
174 changes: 122 additions & 52 deletions docs/_developers-guide/custom-workloads.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,103 +3,173 @@ layout: page
title: Using Custom Workloads
---

Workloads implement the `Workload` trait and override the `doWorkload` method, which accepts an optional dataframe and
This guide will walk you through making custom workloads for Spark-Bench.

The WordGenerator example project shows how to compile against local Spark-Bench jars
and create a custom workload: <https://github.com/ecurtin/spark-bench-custom-workload>

Workloads implement the `Workload` abstract class and override the `doWorkload` method, which accepts an optional dataframe and
returns a results dataframe.
Workloads must also have companion objects implementing `WorkloadDefaults`, which store constants and construct the workload.

Custom workloads must also have companion objects implementing `WorkloadDefaults`, which store constants and construct the workload.
This custom workload must then be packaged in a JAR that must then be supplied to Spark just as any other Spark job dependency.

Let's build an example that just returns a string. That's it.
We're going to make sure that our clusters are super performant when it comes to returning strings.
Let's build an example that generates a dataset with just one word repeated over and over.

When we're done, we'll be able to use a configuration block like this in our Spark-Bench config file:
```hocon
{
name = "custom"
class = "com.example.WordGenerator"
output = "console"
rows = 10
cols = 3
word = "Cool stuff!!"
}
```
which will produce output like this:
```text
+------------+------------+------------+
| 0| 1| 2|
+------------+------------+------------+
|Cool stuff!!|Cool stuff!!|Cool stuff!!|
|Cool stuff!!|Cool stuff!!|Cool stuff!!|
|Cool stuff!!|Cool stuff!!|Cool stuff!!|
|Cool stuff!!|Cool stuff!!|Cool stuff!!|
|Cool stuff!!|Cool stuff!!|Cool stuff!!|
|Cool stuff!!|Cool stuff!!|Cool stuff!!|
|Cool stuff!!|Cool stuff!!|Cool stuff!!|
|Cool stuff!!|Cool stuff!!|Cool stuff!!|
|Cool stuff!!|Cool stuff!!|Cool stuff!!|
|Cool stuff!!|Cool stuff!!|Cool stuff!!|
+------------+------------+------------+
```

Here's the code:

```scala
package com.ibm.sparktc.sparkbench.workload.custom
package com.example

import com.ibm.sparktc.sparkbench.workload.{Workload, WorkloadDefaults}
import com.ibm.sparktc.sparkbench.utils.GeneralFunctions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import com.ibm.sparktc.sparkbench.utils.SparkFuncs.writeToDisk
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

// Create a quick case class with a member for each field we want to return in the results.
case class HelloStringResult(
name: String,
timestamp: Long,
total_runtime: Long,
str: String
)
case class WordGeneratorResult(
name: String,
timestamp: Long,
generate_time: Long,
convert_time: Long,
save_time: Long,
total_runtime: Long,
word: String
)


/*
Each workload must have a companion object extending WorkloadDefaults. Here, you define required
constant attributes of the workload like its name, as well as any default values or constants that
you want to use and a constructor for your workload.
*/
object HelloString extends WorkloadDefaults {
val name = "hellostring"
object WordGenerator extends WorkloadDefaults {
val name = "word-generator"

/*
Give the WorkloadDefaults an apply method that constructs your workload from a
Map[String, Any]. This will be the form you receive your parameters in from the spark-bench
infrastructure. Example:
Map(
"name" -> "hellostring",
"workloadresultsoutputdir" -> None,
"str" -> "Hi I'm an Example"
"name" -> "word-generator",
"output" -> "/tmp/one-word-over-and-over.csv",
"word" -> "Cool"
)
Keep in mind that the keys in your map have been toLowerCase()'d for consistency.
*/
def apply(m: Map[String, Any]) =
new HelloString(input = None, // we don't need to read any input data from disk
workloadResultsOutputDir = None, // we don't have any output data to write to disk in the way that a SQL query would.
str = getOrDefault(m, "str", "Hello, World!")
)

override def apply(m: Map[String, Any]) = WordGenerator(
numRows = getOrThrow(m, "rows").asInstanceOf[Int],
numCols = getOrThrow(m, "cols").asInstanceOf[Int],
output = Some(getOrThrow(m, "output").asInstanceOf[String]),
word = getOrThrow(m, "word").asInstanceOf[String]
)
}

/*
We're going to structure the main workload as a case class that inherits from abstract class Workload.
Input and workloadResultsOutputDir are required to be members of our case class; anything else
depends on the workload. Here, we're taking in a string that we will be returning in our workload.
*/
case class HelloString(
input: Option[String] = None,
workloadResultsOutputDir: Option[String] = None,
str: String
) extends Workload {

case class WordGenerator(
numRows: Int,
numCols: Int,
input: Option[String] = None,
output: Option[String],
word: String
) extends Workload {
/*
doWorkload is an abstract method from Workload. It may or may not take input data, and it will
output a one-row DataFrame made from the results case class we defined above.
doWorkload() is an abstract method from Workload. It optionally takes input data, and it will
output a one-row DataFrame made from the results case class we defined above.
*/
override def doWorkload(df: Option[DataFrame] = None, spark: SparkSession): DataFrame = {
// Every workload returns a timestamp from the start of its doWorkload() method
val timestamp = System.currentTimeMillis()

/*
The time {} function returns a tuple of the start-to-finish time of whatever function
or block you are measuring, and the results of that code. Here, it's going to return a tuple
of the time and the string that's being returned. If we don't care about the results, we can assign it to _.
Here I've assigned the results to returnedString just for clarity.
of the time and the string that's being returned.
If we don't care about the results, we can assign it to _.
*/
val (t, returnedString) = time {
str
val (generateTime, strrdd): (Long, RDD[String]) = time {
val oneRow = Seq.fill(numCols)(word).mkString(",")
val dataset: Seq[String] = for (i <- 0 until numRows) yield oneRow
spark.sparkContext.parallelize(dataset)
}

/*
And now we have everything we need to construct our results case class and create a DataFrame!
*/
spark.createDataFrame(Seq(HelloStringResult(HelloString.name, timestamp, t, returnedString)))
val (convertTime, dataDF) = time {
val rdd = strrdd.map(str => str.split(","))
val schemaString = rdd.first().indices.map(_.toString).mkString(" ")
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = false))
val schema = StructType(fields)
val rowRDD: RDD[Row] = rdd.map(arr => Row(arr: _*))
spark.createDataFrame(rowRDD, schema)
}

val (saveTime, _) = time { writeToDisk(output.get, dataDF, spark) }

val totalTime = generateTime + convertTime + saveTime

/*
* NOTE: The dataframe we're returning is NOT the generated datset. It's the one-line
* benchmark results of how long it took to generate the dataset
*/
spark.createDataFrame(Seq(WordGeneratorResult("word-generator", timestamp, generateTime, convertTime, saveTime, totalTime, word)))

}
}
```

Creating a JAR of this single file using `sbt package` should be sufficient to produce a JAR that can be used with spark-bench. To configure spark-bench to use this new custom workload, add a workload with name "custom" and the key "class" set to the fully qualified name of the workload class. Any other parameters can be provided as usual. For example, to use our `HelloString` workload:
You can compile your custom workload using SBT against your local spark-bench jars, like this:
```scala
lazy val sparkBenchPath = "/ABSOLUTE/PATH/TO/YOUR/SPARK/BENCH/INSTALLATION/lib/"
lazy val sparkVersion = "2.1.1"

lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.11.8",
version := "0.1.0-SNAPSHOT"
)),
name := "WordGenerator",
libraryDependencies += scalaTest % Test,
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
),
unmanagedBase := new java.io.File(sparkBenchPath)
)

```
workloads = [
{
name = "custom"
class = "com.ibm.sparktc.sparkbench.workload.custom.HelloString"
str = ["Hello", "Hi"]
}
]
```

The most complicated part of the process may be getting Spark to properly handle the new JAR. This must be done with the `spark-args` key in the configuration file, which assembles the arguments and passes them as-is to `spark-submit`. For runs on a single machine, simply setting `driver-class-path` may be sufficient. For example, if my custom workload is located at `/home/drwho/code/spark-bench/spark-launch/src/test/resources/jars/HelloWorld.jar`, `spark-args` may look something like:
Expand Down

0 comments on commit ea1ff10

Please sign in to comment.