From 11ca5f546d993a00038e01e3552012d0d1af20ce Mon Sep 17 00:00:00 2001 From: Emily Curtin Date: Mon, 19 Feb 2018 15:48:03 -0500 Subject: [PATCH] adding partition option to SQL workload --- .../sparkbench/workload/sql/SQLWorkload.scala | 14 +++++++++++--- docs/_workloads/sql.md | 13 +++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/sql/SQLWorkload.scala b/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/sql/SQLWorkload.scala index f9fd040d..85e4f17c 100644 --- a/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/sql/SQLWorkload.scala +++ b/cli/src/main/scala/com/ibm/sparktc/sparkbench/workload/sql/SQLWorkload.scala @@ -39,7 +39,8 @@ object SQLWorkload extends WorkloadDefaults { output = m.get("output").map(_.asInstanceOf[String]), saveMode = getOrDefault[String](m, "save-mode", SaveModes.error), queryStr = getOrThrow(m, "query").asInstanceOf[String], - cache = getOrDefault[Boolean](m, "cache", false) + cache = getOrDefault[Boolean](m, "cache", false), + numPartitions = m.get("partitions").map(_.asInstanceOf[Int]) ) } @@ -48,7 +49,9 @@ case class SQLWorkload (input: Option[String], output: Option[String] = None, saveMode: String, queryStr: String, - cache: Boolean) extends Workload { + cache: Boolean, + numPartitions: Option[Int] = None + ) extends Workload { def loadFromDisk(spark: SparkSession): (Long, DataFrame) = time { val df = load(spark, input.get) @@ -62,7 +65,12 @@ case class SQLWorkload (input: Option[String], } def save(res: DataFrame, where: String, spark: SparkSession): (Long, Unit) = time { - writeToDisk(where, saveMode, res, spark) + if(numPartitions.nonEmpty){ + writeToDisk(where, saveMode, res.repartition(numPartitions.get), spark) + } + else { + writeToDisk(where, saveMode, res, spark) + } } override def doWorkload(df: Option[DataFrame] = None, spark: SparkSession): DataFrame = { diff --git a/docs/_workloads/sql.md b/docs/_workloads/sql.md index fccc6a7c..1e13905e 100644 --- a/docs/_workloads/sql.md +++ b/docs/_workloads/sql.md @@ -26,6 +26,8 @@ select `0` from input where `0` < -0.9 | save-mode | no | errorifexists | Options are "errorifexists", "ignore" (no-op if exists), and "overwrite" | | query | yes | -- | the sql query to perform. The table name must be "input" as shown in the examples above. | | cache | no | false | whether the dataset should be cached after being read from disk | +| partitions | no| Natural partitioning | If users specify `output` for this workload, they can optionally repartion the dataset using this option.| + #### Examples @@ -45,4 +47,15 @@ select `0` from input where `0` < -0.9 query = "select `0` from input where `0` < -0.9" cache = true } +``` + +```hocon +{ + name = "sql" + input = "/tmp/generated-kmeans-data.parquet" + query = "select `0` from input where `0` < -0.9" + cache = true + output = "hdfs:///query-output-in-three-partitions.csv" + partitions = 3 // will repartition the dataset before writing out +} ``` \ No newline at end of file