forked from spark-jobserver/spark-jobserver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
WordCountExample.scala
59 lines (51 loc) · 2.17 KB
/
WordCountExample.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package spark.jobserver
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalactic._
import scala.util.Try
import spark.jobserver.api.{SparkJob => NewSparkJob, _}
/**
* A super-simple Spark job example that implements the SparkJob trait and can be submitted to the job server.
*
* Set the config with the sentence to split or count:
* input.string = "adsfasdf asdkf safksf a sdfa"
*
* validate() returns SparkJobInvalid if there is no input.string
*/
object WordCountExample extends SparkJob {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("WordCountExample")
val sc = new SparkContext(conf)
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is " + results)
}
override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
Try(config.getString("input.string"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No input.string config param"))
}
override def runJob(sc: SparkContext, config: Config): Any = {
sc.parallelize(config.getString("input.string").split(" ").toSeq).countByValue
}
}
/**
* This is the same WordCountExample above but implementing the new SparkJob API. A couple things
* to notice:
* - runJob no longer needs to re-parse the input. The split words are passed straight to RunJob
* - The output of runJob is typed now so it's more type safe
* - the config input no longer is mixed with context settings, it purely has the job input
* - the job could parse the jobId and other environment vars from JobEnvironment
*/
object WordCountExampleNewApi extends NewSparkJob {
type JobData = Seq[String]
type JobOutput = collection.Map[String, Long]
def runJob(sc: SparkContext, runtime: JobEnvironment, data: JobData): JobOutput =
sc.parallelize(data).countByValue
def validate(sc: SparkContext, runtime: JobEnvironment, config: Config):
JobData Or Every[ValidationProblem] = {
Try(config.getString("input.string").split(" ").toSeq)
.map(words => Good(words))
.getOrElse(Bad(One(SingleProblem("No input.string param"))))
}
}