Skip to content

Commit

Permalink
adding partition option to SQL workload
Browse files Browse the repository at this point in the history
  • Loading branch information
ecurtin authored and Emily Curtin committed Feb 19, 2018
1 parent 40f60dc commit 11ca5f5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ object SQLWorkload extends WorkloadDefaults {
output = m.get("output").map(_.asInstanceOf[String]),
saveMode = getOrDefault[String](m, "save-mode", SaveModes.error),
queryStr = getOrThrow(m, "query").asInstanceOf[String],
cache = getOrDefault[Boolean](m, "cache", false)
cache = getOrDefault[Boolean](m, "cache", false),
numPartitions = m.get("partitions").map(_.asInstanceOf[Int])
)

}
Expand All @@ -48,7 +49,9 @@ case class SQLWorkload (input: Option[String],
output: Option[String] = None,
saveMode: String,
queryStr: String,
cache: Boolean) extends Workload {
cache: Boolean,
numPartitions: Option[Int] = None
) extends Workload {

def loadFromDisk(spark: SparkSession): (Long, DataFrame) = time {
val df = load(spark, input.get)
Expand All @@ -62,7 +65,12 @@ case class SQLWorkload (input: Option[String],
}

def save(res: DataFrame, where: String, spark: SparkSession): (Long, Unit) = time {
writeToDisk(where, saveMode, res, spark)
if(numPartitions.nonEmpty){
writeToDisk(where, saveMode, res.repartition(numPartitions.get), spark)
}
else {
writeToDisk(where, saveMode, res, spark)
}
}

override def doWorkload(df: Option[DataFrame] = None, spark: SparkSession): DataFrame = {
Expand Down
13 changes: 13 additions & 0 deletions docs/_workloads/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ select `0` from input where `0` < -0.9
| save-mode | no | errorifexists | Options are "errorifexists", "ignore" (no-op if exists), and "overwrite" |
| query | yes | -- | the sql query to perform. The table name must be "input" as shown in the examples above. |
| cache | no | false | whether the dataset should be cached after being read from disk |
| partitions | no| Natural partitioning | If users specify `output` for this workload, they can optionally repartion the dataset using this option.|


#### Examples

Expand All @@ -45,4 +47,15 @@ select `0` from input where `0` < -0.9
query = "select `0` from input where `0` < -0.9"
cache = true
}
```

```hocon
{
name = "sql"
input = "/tmp/generated-kmeans-data.parquet"
query = "select `0` from input where `0` < -0.9"
cache = true
output = "hdfs:///query-output-in-three-partitions.csv"
partitions = 3 // will repartition the dataset before writing out
}
```

0 comments on commit 11ca5f5

Please sign in to comment.