diff --git a/project/plugins.sbt b/project/plugins.sbt
index 02935d8..25c0d4d 100755
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -16,3 +16,5 @@ resolvers ++= Seq(
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
+
+addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.0.1")
\ No newline at end of file
diff --git a/scalariform.sbt b/scalariform.sbt
new file mode 100644
index 0000000..d4cc6c5
--- /dev/null
+++ b/scalariform.sbt
@@ -0,0 +1,9 @@
+import scalariform.formatter.preferences._
+
+scalariformSettings
+
+ScalariformKeys.preferences := FormattingPreferences()
+ .setPreference(RewriteArrowSymbols, true)
+ .setPreference(AlignParameters, true)
+ .setPreference(AlignSingleLineCaseStatements, true)
+ .setPreference(DoubleIndentClassDeclaration, true)
\ No newline at end of file
diff --git a/src/main/scala/driver/Driver.scala b/src/main/scala/driver/Driver.scala
index 7b55fcd..eda094b 100644
--- a/src/main/scala/driver/Driver.scala
+++ b/src/main/scala/driver/Driver.scala
@@ -10,6 +10,6 @@ import wordcount._
object Driver {
def main(args: Array[String]): Unit = {
- ToolRunner.run(new Configuration, WordCount, args);
+ ToolRunner.run(new Configuration, WordCount, args)
}
}
diff --git a/src/main/scala/secondarysort/GroupComparator.scala b/src/main/scala/secondarysort/GroupComparator.scala
index 2226c38..97dc185 100644
--- a/src/main/scala/secondarysort/GroupComparator.scala
+++ b/src/main/scala/secondarysort/GroupComparator.scala
@@ -4,14 +4,14 @@ import org.apache.hadoop.io.{ WritableComparable, WritableComparator }
/**
* We just consider the full date when grouping in the reducer.
- * If you just consider the year, you end up with just one key-value pair per year
+ * If you just consider the year, you end up with just one key-value pair per year
* in the final output and probably not the maximum!!
*/
class GroupComparator extends WritableComparator(classOf[YearYMDClose], true) {
-
+
override def compare(w1: WritableComparable[_], w2: WritableComparable[_]): Int = {
- val t1 = w1.asInstanceOf[YearYMDClose];
- val t2 = w2.asInstanceOf[YearYMDClose];
- t1.ymd.compareTo(t2.ymd); // compare the 2nd field.
- }
+ val t1 = w1.asInstanceOf[YearYMDClose]
+ val t2 = w2.asInstanceOf[YearYMDClose]
+ t1.ymd.compareTo(t2.ymd) // compare the 2nd field.
+ }
}
\ No newline at end of file
diff --git a/src/main/scala/secondarysort/KeyComparator.scala b/src/main/scala/secondarysort/KeyComparator.scala
index c876c6b..e0ffcbb 100644
--- a/src/main/scala/secondarysort/KeyComparator.scala
+++ b/src/main/scala/secondarysort/KeyComparator.scala
@@ -15,13 +15,13 @@ class KeyComparator extends WritableComparator(classOf[YearYMDClose], true) {
* year (1st element) ascending, then the price (3rd element) descending.
*/
override def compare(w1: WritableComparable[_], w2: WritableComparable[_]): Int = {
- val t1 = w1.asInstanceOf[YearYMDClose];
- val t2 = w2.asInstanceOf[YearYMDClose];
- val cmp = t1.year.compareTo(t2.year);
+ val t1 = w1.asInstanceOf[YearYMDClose]
+ val t2 = w2.asInstanceOf[YearYMDClose]
+ val cmp = t1.year.compareTo(t2.year)
if (cmp != 0) {
cmp
} else {
- -t1.closingPrice.compareTo(t2.closingPrice); // REVERSE!
+ -t1.closingPrice.compareTo(t2.closingPrice) // REVERSE!
}
}
}
\ No newline at end of file
diff --git a/src/main/scala/secondarysort/PartitionByYear.scala b/src/main/scala/secondarysort/PartitionByYear.scala
index 4aaa0a3..69ec864 100644
--- a/src/main/scala/secondarysort/PartitionByYear.scala
+++ b/src/main/scala/secondarysort/PartitionByYear.scala
@@ -7,11 +7,11 @@ import org.apache.hadoop.mapred.{ JobConf, Partitioner }
* When determining how to send key-value pairs to reducers, consider ONLY the year!
*/
class PartitionByYear extends Partitioner[YearYMDClose, NullWritable] {
-
- override def configure(job: JobConf): Unit = {}
-
- override def getPartition(
- key: YearYMDClose,
- value: NullWritable,
- numPartitions: Int): Int = math.abs(key.year) % numPartitions;
+
+ override def configure(job: JobConf): Unit = {}
+
+ override def getPartition(
+ key: YearYMDClose,
+ value: NullWritable,
+ numPartitions: Int): Int = math.abs(key.year) % numPartitions
}
diff --git a/src/main/scala/secondarysort/SecondarySort.scala b/src/main/scala/secondarysort/SecondarySort.scala
index 2195269..c54267e 100644
--- a/src/main/scala/secondarysort/SecondarySort.scala
+++ b/src/main/scala/secondarysort/SecondarySort.scala
@@ -2,7 +2,7 @@ package secondarysort
import org.apache.hadoop.conf.Configured
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{ FloatWritable, NullWritable, Text}
+import org.apache.hadoop.io.{ FloatWritable, NullWritable, Text }
import org.apache.hadoop.mapred.{ FileInputFormat, FileOutputFormat, JobClient, JobConf }
import org.apache.hadoop.util.{ GenericOptionsParser, Tool, ToolRunner }
@@ -11,8 +11,8 @@ import org.apache.hadoop.util.{ GenericOptionsParser, Tool, ToolRunner }
* user-specified stock, but sort by year ascending, followed by closing price
* descending. Hence, the first record for a given year will be the maximum price
* for that year.
- * See also the secondary sort example that comes with the Hadoop distribution, and
- * discussions in "Hadoop: The Definitive Guide"
+ * See also the secondary sort example that comes with the Hadoop distribution, and
+ * discussions in "Hadoop: The Definitive Guide"
*/
class SecondarySort extends Configured with Tool {
import SecondarySort._
@@ -21,32 +21,32 @@ class SecondarySort extends Configured with Tool {
val conf = new JobConf(classOf[SecondarySort])
conf.setJobName("Stock Analysis")
val optionsParser = new GenericOptionsParser(conf, args)
-
+
val remainingArgs = optionsParser.getRemainingArgs()
if (remainingArgs.length < 4) {
- usage("Must specify --symbol symbol input_path output_path.")
- return 1
- }
+ usage("Must specify --symbol symbol input_path output_path.")
+ return 1
+ }
var symbol = ""
var inputPath = ""
var outputPath = ""
- var nextIsSymbol = false;
- for (arg <- remainingArgs) {
+ var nextIsSymbol = false
+ for (arg ← remainingArgs) {
if (arg.startsWith("--s")) {
- nextIsSymbol = true;
+ nextIsSymbol = true
} else {
if (nextIsSymbol) {
- conf.set("symbol", arg);
- symbol = arg;
- nextIsSymbol = false;
+ conf.set("symbol", arg)
+ symbol = arg
+ nextIsSymbol = false
} else if (inputPath.isEmpty()) {
- inputPath = arg;
+ inputPath = arg
} else if (outputPath.isEmpty()) {
- outputPath = arg;
+ outputPath = arg
} else {
- usage("Too many arguments specified.");
- return 1;
+ usage("Too many arguments specified.")
+ return 1
}
}
}
@@ -54,7 +54,7 @@ class SecondarySort extends Configured with Tool {
usage("Must specify '--symbol symbol' argument!")
return 1
}
- println("Using Stock Symbol: "+symbol);
+ println("Using Stock Symbol: " + symbol)
if (inputPath.isEmpty()) {
usage("Must specify an input path argument!")
return 1
@@ -63,7 +63,7 @@ class SecondarySort extends Configured with Tool {
usage("Must specify an output path argument!")
return 1
}
-
+
// Because of type erasure, the intermediate Map output K-V types and
// final K-V types can't be inferred from the types of the mapper and
// reducer.
@@ -76,7 +76,7 @@ class SecondarySort extends Configured with Tool {
FileOutputFormat.setOutputPath(conf, new Path(outputPath))
conf.setMapperClass(classOf[StockMapper])
-
+
// Experiment with not setting the reducer class, which means the default
// "Identity Reducer" is used. Then try setting the number of reducers to zero.
// Next try setting the number of reducers to a number between 2 and 5, say.
@@ -87,10 +87,10 @@ class SecondarySort extends Configured with Tool {
conf.setNumReduceTasks(2)
// Would a combiner help? Not likely, because we won't have many identical key-value pairs!
-
+
// Specify our custom partitioner, etc.
conf.setPartitionerClass(classOf[PartitionByYear])
- conf.setOutputKeyComparatorClass(classOf[KeyComparator])
+ conf.setOutputKeyComparatorClass(classOf[KeyComparator])
conf.setOutputValueGroupingComparator(classOf[GroupComparator])
// This code was copied from the TwitterIndexer exercise we'll do later.
@@ -100,7 +100,7 @@ class SecondarySort extends Configured with Tool {
// to import the correct classes! However, Eclipse can handle that for you...
// The compression settings are optional and can be omitted.
// conf.setOutputFormat(classOf[SequenceFileOutputFormat])
- // SequenceFileOutputFormat.setCompressOutput(conf, true);
+ // SequenceFileOutputFormat.setCompressOutput(conf, true)
// SequenceFileOutputFormat.setOutputCompressorClass(conf, classOf[BZip2Codec])
// SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK)
@@ -112,10 +112,10 @@ class SecondarySort extends Configured with Tool {
object SecondarySort extends Configured {
def usage(message: String): Unit = {
- Console.err.println(message);
- Console.err.println("usage: java ...SecondarySort [generic_options] " +
- "--symbol stock_symbol in_path out_path");
- ToolRunner.printGenericCommandUsage(Console.err);
+ Console.err.println(message)
+ Console.err.println("usage: java ...SecondarySort [generic_options] " +
+ "--symbol stock_symbol in_path out_path")
+ ToolRunner.printGenericCommandUsage(Console.err)
}
def main(args: Array[String]): Unit = {
diff --git a/src/main/scala/secondarysort/StockMapper.scala b/src/main/scala/secondarysort/StockMapper.scala
index c58092c..2812b04 100644
--- a/src/main/scala/secondarysort/StockMapper.scala
+++ b/src/main/scala/secondarysort/StockMapper.scala
@@ -4,70 +4,69 @@ import org.apache.hadoop.io.{ LongWritable, NullWritable, Text }
import org.apache.hadoop.mapred.{ JobConf, InvalidJobConfException, MapReduceBase, Mapper, OutputCollector, Reporter }
/**
- * The mapper parses the record, extracting the date, the year from the date, and the closing price.
+ * The mapper parses the record, extracting the date, the year from the date, and the closing price.
* The input records have this format:
* exchange,symbol,yyyy-mm-dd,opening_price,high_price,low_price,closing_price,volume,adjusted_closing_price.
* The mapper extracts and uses the symbol (for filtering), the yyyy-mm-dd (from which it also extracts the year),
* and the adjusted closing price.
*/
-class StockMapper extends MapReduceBase with
- Mapper[LongWritable, Text, YearYMDClose, NullWritable] {
+class StockMapper extends MapReduceBase with Mapper[LongWritable, Text, YearYMDClose, NullWritable] {
var symbol: Option[String] = None
def setSymbol(newSymbol: String): Unit = {
symbol = newSymbol match {
- case null => None
- case _ =>
+ case null ⇒ None
+ case _ ⇒
println(s"Using Stock Symbol: $newSymbol")
Some(newSymbol)
}
}
-
+
override def configure(jobConf: JobConf): Unit = {
jobConf.get("symbol") match {
- case null => /* do nothing */
- case sym => setSymbol(sym)
+ case null ⇒ /* do nothing */
+ case sym ⇒ setSymbol(sym)
}
}
override def map(
- key: LongWritable, // offset in the file
- line: Text, // record on a line
- collector: OutputCollector[YearYMDClose, NullWritable],
+ key: LongWritable, // offset in the file
+ line: Text, // record on a line
+ collector: OutputCollector[YearYMDClose, NullWritable],
reporter: Reporter): Unit = {
- val stockSymbol: String = symbol match {
- case None =>
- reporter.incrCounter(RecordFormatRecords.NO_SYMBOL_SPECIFIED, 1)
- throw new InvalidJobConfException("No stock symbol was specified!")
- case Some(s) => s
- }
-
- try {
- reporter.incrCounter(RecordFormatRecords.MAP_RECORDS_SEEN, 1);
+ val stockSymbol: String = symbol match {
+ case None ⇒
+ reporter.incrCounter(RecordFormatRecords.NO_SYMBOL_SPECIFIED, 1)
+ throw new InvalidJobConfException("No stock symbol was specified!")
+ case Some(s) ⇒ s
+ }
- val fields = line.toString().split(","); // CSV
- val sym = fields(1)
- if (sym == stockSymbol) { // filter!
- val date = fields(2)
- val ymd = date.split("-")
- val year = ymd(0).toInt
- // Actually use the "adjusted" close, which is the last field in the record.
- val closing = fields(fields.length-1).toFloat
- val outputTuple = new YearYMDClose(year, date, closing)
- collector.collect(outputTuple, NullWritable.get())
- }
- } catch {
- case nfe: NumberFormatException =>
- // Gets directed to the task's user syserr log.
- // You can more carefully control how messages are logged, e.g., the severity level,
- // by using Apache Commons Logging. Log4J is actually used. The relevant log4j
- // appender is called TLA, for "Task Log Appender", in $HADOOP_HOME/conf/log4j.properties.
- Console.err.println(nfe+": ("+key.get()+", "+line+")")
+ try {
+ reporter.incrCounter(RecordFormatRecords.MAP_RECORDS_SEEN, 1)
- // Example of a Counter.
- reporter.incrCounter(RecordFormatRecords.RECORD_FORMAT_ERRORS, 1)
+ val fields = line.toString().split(",") // CSV
+ val sym = fields(1)
+ if (sym == stockSymbol) { // filter!
+ val date = fields(2)
+ val ymd = date.split("-")
+ val year = ymd(0).toInt
+ // Actually use the "adjusted" close, which is the last field in the record.
+ val closing = fields(fields.length - 1).toFloat
+ val outputTuple = new YearYMDClose(year, date, closing)
+ collector.collect(outputTuple, NullWritable.get())
}
+ } catch {
+ case nfe: NumberFormatException ⇒
+ // Gets directed to the task's user syserr log.
+ // You can more carefully control how messages are logged, e.g., the severity level,
+ // by using Apache Commons Logging. Log4J is actually used. The relevant log4j
+ // appender is called TLA, for "Task Log Appender", in $HADOOP_HOME/conf/log4j.properties.
+ Console.err.println(nfe + ": (" + key.get() + ", " + line + ")")
+
+ // Example of a Counter.
+ reporter.incrCounter(RecordFormatRecords.RECORD_FORMAT_ERRORS, 1)
+ }
}
}
diff --git a/src/main/scala/secondarysort/StockReducer.scala b/src/main/scala/secondarysort/StockReducer.scala
index cc2b91c..335d8dc 100644
--- a/src/main/scala/secondarysort/StockReducer.scala
+++ b/src/main/scala/secondarysort/StockReducer.scala
@@ -5,18 +5,16 @@ import org.apache.hadoop.io.{ FloatWritable, NullWritable, Text }
import org.apache.hadoop.mapred.{ MapReduceBase, OutputCollector, Reducer, Reporter }
import RecordFormatRecords._
-
/**
- * Reducer that converts the keys into tab-delimited date, closing-price values.
+ * Reducer that converts the keys into tab-delimited date, closing-price values.
* If we didn't care about the output format, we could use the default Identity reducer!
*/
-class StockReducer extends MapReduceBase with
- Reducer[YearYMDClose, NullWritable, Text, FloatWritable] {
+class StockReducer extends MapReduceBase with Reducer[YearYMDClose, NullWritable, Text, FloatWritable] {
- override def reduce(key: YearYMDClose,
- ignore: Iterator[NullWritable],
- output: OutputCollector[Text, FloatWritable],
- reporter: Reporter) = {
+ override def reduce(key: YearYMDClose,
+ ignore: Iterator[NullWritable],
+ output: OutputCollector[Text, FloatWritable],
+ reporter: Reporter) = {
reporter.incrCounter(REDUCE_RECORDS_SEEN, 1)
output.collect(new Text(key.ymd), new FloatWritable(key.closingPrice))
}
diff --git a/src/main/scala/secondarysort/YearYMDClose.scala b/src/main/scala/secondarysort/YearYMDClose.scala
index efd85e7..6aec16b 100644
--- a/src/main/scala/secondarysort/YearYMDClose.scala
+++ b/src/main/scala/secondarysort/YearYMDClose.scala
@@ -12,25 +12,25 @@ case class YearYMDClose(yearW: IntWritable, ymdW: Text, closingPriceW: FloatWrit
/**
* Convenience constructor.
*/
- def this(year: Int, ymd: String, closingPrice: Float) =
+ def this(year: Int, ymd: String, closingPrice: Float) =
this(new IntWritable(year), new Text(ymd), new FloatWritable(closingPrice))
- /**
+ /**
* You should not create an object in an invalid, e.g., uninitialized
* state. Unfortunately, Hadoop requires a default constructor here.
*/
def this() = this(0, "", 0.0f)
override def write(out: DataOutput): Unit = {
- yearW.write(out);
- ymdW.write(out);
- closingPriceW.write(out);
+ yearW.write(out)
+ ymdW.write(out)
+ closingPriceW.write(out)
}
override def readFields(in: DataInput): Unit = {
- yearW.readFields(in);
- ymdW.readFields(in);
- closingPriceW.readFields(in);
+ yearW.readFields(in)
+ ymdW.readFields(in)
+ closingPriceW.readFields(in)
}
def compareTo(other: YearYMDClose): Int = {
diff --git a/src/main/scala/wordcount/WordCount.scala b/src/main/scala/wordcount/WordCount.scala
index 0a8ce8f..63e5003 100644
--- a/src/main/scala/wordcount/WordCount.scala
+++ b/src/main/scala/wordcount/WordCount.scala
@@ -1,18 +1,18 @@
package wordcount
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{IntWritable, Text}
-import org.apache.hadoop.mapred.{FileInputFormat, FileOutputFormat, JobConf, JobClient}
+import org.apache.hadoop.io.{ IntWritable, Text }
+import org.apache.hadoop.mapred.{ FileInputFormat, FileOutputFormat, JobConf, JobClient }
import org.apache.hadoop.conf.Configured
-import org.apache.hadoop.util.{GenericOptionsParser, Tool, ToolRunner}
+import org.apache.hadoop.util.{ GenericOptionsParser, Tool, ToolRunner }
// Enable existential types, which we use below in several places:
import scala.language.existentials
object WordCount extends Configured with Tool {
- val HELP =
-"""Usage: WordCount *which_mapper* [--use-combiner] input_directory output_directory
+ val HELP =
+ """Usage: WordCount *which_mapper* [--use-combiner] input_directory output_directory
where *which_mapper* is one of the following options:
1 | no | no-buffer Simplest algorithm, but least efficient.
2 | not | no-buffer-use-tokenizer Like 'no', but uses a less efficient StringTokenizer, which yields more accurate results.
@@ -21,96 +21,96 @@ where *which_mapper* is one of the following options:
and
--use-combiner Use the reducer as a combiner."""
- def help(message: String = "") = {
- message match {
- case "" =>
- case _ => println(message)
- }
- println(HELP)
- ToolRunner.printGenericCommandUsage(Console.out)
- }
-
- override def run(args: Array[String]): Int = {
- val conf = new JobConf(this.getClass)
- conf.setJobName("Word Count")
-
- conf.setJarByClass(this.getClass)
-
- val optionsParser = new GenericOptionsParser(conf, args);
-
- val (mapper, useCombiner, inputPath, outputPath) =
- parseArgs(optionsParser.getRemainingArgs.toList) match {
- case Right((m, useC, in, out)) => (m, useC, in, out)
- case Left(0) => sys.exit(0)
- case Left(_) => sys.error("Invalid settings returned by parseArgs for input args: "+args)
- }
-
- FileInputFormat.addInputPath(conf, new Path(inputPath))
- FileOutputFormat.setOutputPath(conf, new Path(outputPath))
-
- conf.setMapperClass(mapper)
- conf.setReducerClass(classOf[WordCountReducer])
- if (useCombiner)
- conf.setCombinerClass(classOf[WordCountReducer])
-
- conf.setOutputKeyClass(classOf[Text])
- conf.setOutputValueClass(classOf[IntWritable])
-
- JobClient.runJob(conf)
- 0
- }
-
- private type MapperClass = Class[_ <: org.apache.hadoop.mapred.Mapper[_, _, _, _]]
-
- private case class Settings(
- mapperClass: Option[MapperClass],
- useCombiner: Boolean,
- inputPath: Option[String],
- outputPath: Option[String])
-
- private def parseArgs(args: List[String]): Either[Int,(MapperClass,Boolean,String,String)] = {
- args match {
- case ("-h" | "--help") :: tail =>
- help()
- Left(0)
- case _ if (args.length < 3) =>
- help(s"Insufficient number of input arguments: $args")
- Left(1)
- case _ => // continue
- }
-
- def parse(a: List[String], settings: Settings): Either[Int,Settings] = a match {
- case Nil => Right(settings)
- case head :: tail => head match {
- case "WordCount" => // should be first arg; this class name!
- parse(tail, settings)
- case "1" | "no" | "no-buffer" =>
- parse(tail, settings.copy(mapperClass = Some(classOf[WordCountNoBuffering.Map])))
- case "2" | "not" | "no-buffer-use-tokenizer" =>
- parse(tail, settings.copy(mapperClass = Some(classOf[WordCountNoBufferingTokenization.Map])))
- case "3" | "buffer" =>
- parse(tail, settings.copy(mapperClass = Some(classOf[WordCountBuffering.Map])))
- case "4" | "buffer-flush" =>
- parse(tail, settings.copy(mapperClass = Some(classOf[WordCountBufferingFlushing.Map])))
- case "--use-combiner" =>
- parse(tail, settings.copy(useCombiner = true))
- case s =>
- if (settings.inputPath == None)
- parse(tail, settings.copy(inputPath = Some(s)))
- else if (settings.outputPath == None)
- parse(tail, settings.copy(outputPath = Some(s)))
- else {
- help(s"Unrecognized argument '$s' in input arguments: $args")
- Left(1)
- }
- }
- }
- parse(args, Settings(None, false, None, None)) match {
- case Right(Settings(None, _, _, _)) => help("Must specify a mapper."); Left(1)
- case Right(Settings(_, _, None, _)) => help("Must specify an input path."); Left(1)
- case Right(Settings(_, _, _, None)) => help("Must specify an output path."); Left(1)
- case Right(Settings(Some(m), useC, Some(in), Some(out))) => Right((m, useC, in, out))
- case Left(x) => Left(x)
- }
+ def help(message: String = "") = {
+ message match {
+ case "" ⇒
+ case _ ⇒ println(message)
+ }
+ println(HELP)
+ ToolRunner.printGenericCommandUsage(Console.out)
+ }
+
+ override def run(args: Array[String]): Int = {
+ val conf = new JobConf(this.getClass)
+ conf.setJobName("Word Count")
+
+ conf.setJarByClass(this.getClass)
+
+ val optionsParser = new GenericOptionsParser(conf, args)
+
+ val (mapper, useCombiner, inputPath, outputPath) =
+ parseArgs(optionsParser.getRemainingArgs.toList) match {
+ case Right((m, useC, in, out)) ⇒ (m, useC, in, out)
+ case Left(0) ⇒ sys.exit(0)
+ case Left(_) ⇒ sys.error("Invalid settings returned by parseArgs for input args: " + args)
+ }
+
+ FileInputFormat.addInputPath(conf, new Path(inputPath))
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath))
+
+ conf.setMapperClass(mapper)
+ conf.setReducerClass(classOf[WordCountReducer])
+ if (useCombiner)
+ conf.setCombinerClass(classOf[WordCountReducer])
+
+ conf.setOutputKeyClass(classOf[Text])
+ conf.setOutputValueClass(classOf[IntWritable])
+
+ JobClient.runJob(conf)
+ 0
+ }
+
+ private type MapperClass = Class[_ <: org.apache.hadoop.mapred.Mapper[_, _, _, _]]
+
+ private case class Settings(
+ mapperClass: Option[MapperClass],
+ useCombiner: Boolean,
+ inputPath: Option[String],
+ outputPath: Option[String])
+
+ private def parseArgs(args: List[String]): Either[Int, (MapperClass, Boolean, String, String)] = {
+ args match {
+ case ("-h" | "--help") :: tail ⇒
+ help()
+ Left(0)
+ case _ if (args.length < 3) ⇒
+ help(s"Insufficient number of input arguments: $args")
+ Left(1)
+ case _ ⇒ // continue
+ }
+
+ def parse(a: List[String], settings: Settings): Either[Int, Settings] = a match {
+ case Nil ⇒ Right(settings)
+ case head :: tail ⇒ head match {
+ case "WordCount" ⇒ // should be first arg; this class name!
+ parse(tail, settings)
+ case "1" | "no" | "no-buffer" ⇒
+ parse(tail, settings.copy(mapperClass = Some(classOf[WordCountNoBuffering.Map])))
+ case "2" | "not" | "no-buffer-use-tokenizer" ⇒
+ parse(tail, settings.copy(mapperClass = Some(classOf[WordCountNoBufferingTokenization.Map])))
+ case "3" | "buffer" ⇒
+ parse(tail, settings.copy(mapperClass = Some(classOf[WordCountBuffering.Map])))
+ case "4" | "buffer-flush" ⇒
+ parse(tail, settings.copy(mapperClass = Some(classOf[WordCountBufferingFlushing.Map])))
+ case "--use-combiner" ⇒
+ parse(tail, settings.copy(useCombiner = true))
+ case s ⇒
+ if (settings.inputPath == None)
+ parse(tail, settings.copy(inputPath = Some(s)))
+ else if (settings.outputPath == None)
+ parse(tail, settings.copy(outputPath = Some(s)))
+ else {
+ help(s"Unrecognized argument '$s' in input arguments: $args")
+ Left(1)
+ }
+ }
+ }
+ parse(args, Settings(None, false, None, None)) match {
+ case Right(Settings(None, _, _, _)) ⇒ help("Must specify a mapper."); Left(1)
+ case Right(Settings(_, _, None, _)) ⇒ help("Must specify an input path."); Left(1)
+ case Right(Settings(_, _, _, None)) ⇒ help("Must specify an output path."); Left(1)
+ case Right(Settings(Some(m), useC, Some(in), Some(out))) ⇒ Right((m, useC, in, out))
+ case Left(x) ⇒ Left(x)
+ }
}
}
diff --git a/src/main/scala/wordcount/WordCountBuffering.scala b/src/main/scala/wordcount/WordCountBuffering.scala
index 13c33ff..43d0843 100644
--- a/src/main/scala/wordcount/WordCountBuffering.scala
+++ b/src/main/scala/wordcount/WordCountBuffering.scala
@@ -1,22 +1,22 @@
package wordcount
-import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
-import org.apache.hadoop.mapred.{MapReduceBase, Mapper, Reducer, OutputCollector, Reporter}
+import org.apache.hadoop.io.{ IntWritable, LongWritable, Text }
+import org.apache.hadoop.mapred.{ MapReduceBase, Mapper, Reducer, OutputCollector, Reporter }
import java.util.StringTokenizer
/**
* Buffer the counts and then emit them at the end, reducing the pairs emitted, and hence
- * the sort and shuffle overhead.
+ * the sort and shuffle overhead.
*/
object WordCountBuffering {
class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {
-
- val words = new scala.collection.mutable.HashMap[String,Int]
+
+ val words = new scala.collection.mutable.HashMap[String, Int]
// Save the output collector so we can use it in close. Is this safe??
var outputCollector: OutputCollector[Text, IntWritable] = _
- def map(key: LongWritable, valueDocContents: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter):Unit = {
+ def map(key: LongWritable, valueDocContents: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter): Unit = {
outputCollector = output
val tokenizer = new StringTokenizer(valueDocContents.toString, " \t\n\r\f.,:;?!-@()[]&'\"")
while (tokenizer.hasMoreTokens) {
@@ -26,11 +26,11 @@ object WordCountBuffering {
}
}
}
-
+
override def close() = {
- val word = new Text()
+ val word = new Text()
val count = new IntWritable(1)
- words foreach { kv =>
+ words foreach { kv ⇒
word.set(kv._1)
count.set(kv._2)
outputCollector.collect(word, count)
@@ -38,8 +38,8 @@ object WordCountBuffering {
}
protected def increment(wordString: String) = words.get(wordString) match {
- case Some(count) => words.put(wordString, count+1)
- case None => words.put(wordString, 1)
+ case Some(count) ⇒ words.put(wordString, count + 1)
+ case None ⇒ words.put(wordString, 1)
}
}
}
diff --git a/src/main/scala/wordcount/WordCountBufferingFlushing.scala b/src/main/scala/wordcount/WordCountBufferingFlushing.scala
index a58bb88..6657081 100644
--- a/src/main/scala/wordcount/WordCountBufferingFlushing.scala
+++ b/src/main/scala/wordcount/WordCountBufferingFlushing.scala
@@ -1,7 +1,7 @@
package wordcount
-import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
-import org.apache.hadoop.mapred.{MapReduceBase, Mapper, OutputCollector, Reporter}
+import org.apache.hadoop.io.{ IntWritable, LongWritable, Text }
+import org.apache.hadoop.mapred.{ MapReduceBase, Mapper, OutputCollector, Reporter }
import java.util.StringTokenizer
/**
@@ -14,14 +14,14 @@ import java.util.StringTokenizer
object WordCountBufferingFlushing {
class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {
-
+
val MAX_SIZE = 1000
var count = 0
- val words = new scala.collection.mutable.HashMap[String,Int]
+ val words = new scala.collection.mutable.HashMap[String, Int]
// Save the output collector so we can use it in close. Is this safe??
- var outputCollector: OutputCollector[Text, IntWritable] = _;
+ var outputCollector: OutputCollector[Text, IntWritable] = _
- def map(key: LongWritable, valueDocContents: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter):Unit = {
+ def map(key: LongWritable, valueDocContents: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter): Unit = {
outputCollector = output
val tokenizer = new StringTokenizer(valueDocContents.toString, " \t\n\r\f.,:;?!-@()[]&'\"")
while (tokenizer.hasMoreTokens) {
@@ -32,20 +32,20 @@ object WordCountBufferingFlushing {
}
}
}
-
- override def close() = flushIfLargerThan(1,0)
-
+
+ override def close() = flushIfLargerThan(1, 0)
+
protected def increment(wordString: String) = words.get(wordString) match {
- case Some(count) => words.put(wordString, count+1)
- case None => words.put(wordString, 1)
+ case Some(count) ⇒ words.put(wordString, count + 1)
+ case None ⇒ words.put(wordString, 1)
}
protected def flushIfLargerThan(count: Int, threshold: Int): Int = if (count < threshold) {
count + 1
} else {
- val word = new Text()
+ val word = new Text()
val count = new IntWritable(1)
- words foreach { kv =>
+ words foreach { kv ⇒
word.set(kv._1)
count.set(kv._2)
outputCollector.collect(word, count)
diff --git a/src/main/scala/wordcount/WordCountNoBuffering.scala b/src/main/scala/wordcount/WordCountNoBuffering.scala
index 0eb7706..790abe8 100644
--- a/src/main/scala/wordcount/WordCountNoBuffering.scala
+++ b/src/main/scala/wordcount/WordCountNoBuffering.scala
@@ -1,7 +1,7 @@
package wordcount
-import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
-import org.apache.hadoop.mapred.{MapReduceBase, Mapper, Reducer, OutputCollector, Reporter}
+import org.apache.hadoop.io.{ IntWritable, LongWritable, Text }
+import org.apache.hadoop.mapred.{ MapReduceBase, Mapper, Reducer, OutputCollector, Reporter }
import java.util.StringTokenizer
/**
@@ -11,32 +11,32 @@ import java.util.StringTokenizer
*/
object WordCountNoBuffering {
- val one = new IntWritable(1)
- val word = new Text // Value will be set in a non-thread-safe way!
+ val one = new IntWritable(1)
+ val word = new Text // Value will be set in a non-thread-safe way!
class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {
-
- def map(key: LongWritable, valueDocContents: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter):Unit = {
+
+ def map(key: LongWritable, valueDocContents: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter): Unit = {
val tokens = valueDocContents.toString.split("\\s+")
- for (wordString <- tokens) {
+ for (wordString ← tokens) {
if (wordString.length > 0) {
word.set(wordString.toLowerCase)
output.collect(word, one)
}
}
}
-
+
/**
* This method was used temporarily as map for a one-time measurement of
* the performance with the Regex splitting option.
*/
- def mapWithRegex(key: LongWritable, valueDocContents: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter):Unit = {
+ def mapWithRegex(key: LongWritable, valueDocContents: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter): Unit = {
for {
// In the Shakespeare text, there are also expressions like
// As both of you--God pardon it!--have done.
// So we also use "--" as a separator.
- wordString1 <- valueDocContents.toString.split("(\\s+|--)")
- wordString = wordString1.replaceAll("[.,:;?!'\"]+", "") // also strip out punctuation, etc.
+ wordString1 ← valueDocContents.toString.split("(\\s+|--)")
+ wordString = wordString1.replaceAll("[.,:;?!'\"]+", "") // also strip out punctuation, etc.
} {
word.set(wordString)
output.collect(word, one)
diff --git a/src/main/scala/wordcount/WordCountNoBufferingTokenization.scala b/src/main/scala/wordcount/WordCountNoBufferingTokenization.scala
index bb21f59..9259cbc 100644
--- a/src/main/scala/wordcount/WordCountNoBufferingTokenization.scala
+++ b/src/main/scala/wordcount/WordCountNoBufferingTokenization.scala
@@ -1,7 +1,7 @@
package wordcount
-import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
-import org.apache.hadoop.mapred.{MapReduceBase, Mapper, Reducer, OutputCollector, Reporter}
+import org.apache.hadoop.io.{ IntWritable, LongWritable, Text }
+import org.apache.hadoop.mapred.{ MapReduceBase, Mapper, Reducer, OutputCollector, Reporter }
import java.util.StringTokenizer
/**
@@ -10,12 +10,12 @@ import java.util.StringTokenizer
*/
object WordCountNoBufferingTokenization {
- val one = new IntWritable(1)
- val word = new Text // Value will be set in a non-thread-safe way!
+ val one = new IntWritable(1)
+ val word = new Text // Value will be set in a non-thread-safe way!
class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {
-
- def map(key: LongWritable, valueDocContents: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter):Unit = {
+
+ def map(key: LongWritable, valueDocContents: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter): Unit = {
val tokenizer = new StringTokenizer(valueDocContents.toString, " \t\n\r\f.,:;?!-@()[]&'\"")
while (tokenizer.hasMoreTokens) {
val wordString = tokenizer.nextToken
diff --git a/src/main/scala/wordcount/WordCountReducer.scala b/src/main/scala/wordcount/WordCountReducer.scala
index 92433d5..1db7d5e 100644
--- a/src/main/scala/wordcount/WordCountReducer.scala
+++ b/src/main/scala/wordcount/WordCountReducer.scala
@@ -1,11 +1,11 @@
package wordcount
-import org.apache.hadoop.io.{IntWritable, Text}
-import org.apache.hadoop.mapred.{MapReduceBase, Reducer, OutputCollector, Reporter}
+import org.apache.hadoop.io.{ IntWritable, Text }
+import org.apache.hadoop.mapred.{ MapReduceBase, Reducer, OutputCollector, Reporter }
class WordCountReducer extends MapReduceBase with Reducer[Text, IntWritable, Text, IntWritable] {
- def reduce(keyWord: Text, valuesCounts: java.util.Iterator[IntWritable], output: OutputCollector[Text, IntWritable], reporter: Reporter):Unit = {
+ def reduce(keyWord: Text, valuesCounts: java.util.Iterator[IntWritable], output: OutputCollector[Text, IntWritable], reporter: Reporter): Unit = {
var totalCount = 0
while (valuesCounts.hasNext) {
totalCount += valuesCounts.next.get
diff --git a/src/test/scala/secondarysort/SecondarySortSuite.scala b/src/test/scala/secondarysort/SecondarySortSuite.scala
index f9f4757..4a55ed6 100644
--- a/src/test/scala/secondarysort/SecondarySortSuite.scala
+++ b/src/test/scala/secondarysort/SecondarySortSuite.scala
@@ -3,80 +3,80 @@ package secondarysort
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.junit.Assert.assertEquals
-import java.util.{ List => JList, ArrayList => JArrayList }
+import java.util.{ List ⇒ JList, ArrayList ⇒ JArrayList }
import org.apache.hadoop.io.{ FloatWritable, LongWritable, NullWritable, Text }
import org.apache.hadoop.mrunit.{ MapDriver, ReduceDriver }
-import org.apache.hadoop.mrunit.types.{ Pair => MRPair }
+import org.apache.hadoop.mrunit.types.{ Pair ⇒ MRPair }
-/**
+/**
* Even though the full program sorts the tuples using a custom KeyComparator,
- * we can't configure it's use in the test, so the output will be sorted according
- * the YearYMDClose's own comparison implementation.
+ * we can't configure it's use in the test, so the output will be sorted according
+ * the YearYMDClose's own comparison implementation.
*/
class SecondarySortSuite extends JUnitSuite {
- @Test def `StockMapper parses records, filters by symbol, and outputs (YearYMDClose,NullWritable) pairs`() {
- val inputs1 = List(
- "NASDAQ,ABXA,2011-02-02,5.0,5.1,5.2,5.3,2000,5.25",
- "NASDAQ,AAPL,2011-02-02,300.0,301.0,302.0,303.0,10000,302.0",
- "NASDAQ,ABXA,2011-02-03,5.25,5.3,5.4,5.5,1000,5.5",
- "NASDAQ,AAPL,2011-02-03,302.0,303.0,304.0,305.0,20000,304.0",
- "NASDAQ,ABXA,2012-01-02,1.0,1.1,1.2,1.3,1000,1.25",
- "NASDAQ,AAPL,2012-01-02,400.0,401.0,402.0,403.0,10000,402.0",
- "NASDAQ,ABXA,2012-01-03,1.25,1.3,1.4,1.5,1000,1.5",
- "NASDAQ,AAPL,2012-01-03,402.0,403.0,404.0,405.0,20000,404.0")
- .zipWithIndex
- .map {
- case (line, offset) => new MRPair(new LongWritable(offset), new Text(line))
- }
+ @Test def `StockMapper parses records, filters by symbol, and outputs (YearYMDClose,NullWritable) pairs`() {
+ val inputs1 = List(
+ "NASDAQ,ABXA,2011-02-02,5.0,5.1,5.2,5.3,2000,5.25",
+ "NASDAQ,AAPL,2011-02-02,300.0,301.0,302.0,303.0,10000,302.0",
+ "NASDAQ,ABXA,2011-02-03,5.25,5.3,5.4,5.5,1000,5.5",
+ "NASDAQ,AAPL,2011-02-03,302.0,303.0,304.0,305.0,20000,304.0",
+ "NASDAQ,ABXA,2012-01-02,1.0,1.1,1.2,1.3,1000,1.25",
+ "NASDAQ,AAPL,2012-01-02,400.0,401.0,402.0,403.0,10000,402.0",
+ "NASDAQ,ABXA,2012-01-03,1.25,1.3,1.4,1.5,1000,1.5",
+ "NASDAQ,AAPL,2012-01-03,402.0,403.0,404.0,405.0,20000,404.0")
+ .zipWithIndex
+ .map {
+ case (line, offset) ⇒ new MRPair(new LongWritable(offset), new Text(line))
+ }
- val outputs1 = List(
- new YearYMDClose(2011, "2011-02-02", 302.0f),
- new YearYMDClose(2011, "2011-02-03", 304.0f),
- new YearYMDClose(2012, "2012-01-02", 402.0f),
- new YearYMDClose(2012, "2012-01-03", 404.0f))
- .map(y => new MRPair(y, NullWritable.get()))
-
- import scala.collection.JavaConversions._
+ val outputs1 = List(
+ new YearYMDClose(2011, "2011-02-02", 302.0f),
+ new YearYMDClose(2011, "2011-02-03", 304.0f),
+ new YearYMDClose(2012, "2012-01-02", 402.0f),
+ new YearYMDClose(2012, "2012-01-03", 404.0f))
+ .map(y ⇒ new MRPair(y, NullWritable.get()))
- val inputs: JList[MRPair[LongWritable, Text]] = inputs1
- val outputs: JList[MRPair[YearYMDClose, NullWritable]] = outputs1
+ import scala.collection.JavaConversions._
- // The mapper keys are actually wrong, but they are discarded.
- val mapper = new StockMapper()
- val mapDriver = new MapDriver[LongWritable, Text, YearYMDClose, NullWritable](mapper)
- mapper.setSymbol("AAPL")
- mapDriver.withAll(inputs).withAllOutput(outputs).runTest()
- }
+ val inputs: JList[MRPair[LongWritable, Text]] = inputs1
+ val outputs: JList[MRPair[YearYMDClose, NullWritable]] = outputs1
- @Test def `StockReducer converts presorted (YearYMDClose,NullWritable) pairs into (ymd, closingPrice)`() {
+ // The mapper keys are actually wrong, but they are discarded.
+ val mapper = new StockMapper()
+ val mapDriver = new MapDriver[LongWritable, Text, YearYMDClose, NullWritable](mapper)
+ mapper.setSymbol("AAPL")
+ mapDriver.withAll(inputs).withAllOutput(outputs).runTest()
+ }
- val nullList: JList[NullWritable] = new JArrayList[NullWritable]()
- nullList.add(NullWritable.get())
-
- val inputs1 = List(
- new YearYMDClose(2011, "2011-02-02", 302.0f),
- new YearYMDClose(2011, "2011-02-03", 304.0f),
- new YearYMDClose(2012, "2012-01-02", 402.0f),
- new YearYMDClose(2012, "2012-01-03", 404.0f))
- .map(y => new MRPair(y, nullList))
-
- val outputs1 = List(
- ("2011-02-02", 302.0f),
- ("2011-02-03", 304.0f),
- ("2012-01-02", 402.0f),
- ("2012-01-03", 404.0f))
- .map {
- case (ymd, price) => new MRPair(new Text(ymd), new FloatWritable(price))
- }
-
- import scala.collection.JavaConversions._
+ @Test def `StockReducer converts presorted (YearYMDClose,NullWritable) pairs into (ymd, closingPrice)`() {
- val inputs: JList[MRPair[YearYMDClose, JList[NullWritable]]] = inputs1
- val outputs: JList[MRPair[Text, FloatWritable]] = outputs1
- val reducer = new StockReducer()
- val reduceDriver = new ReduceDriver[YearYMDClose, NullWritable, Text, FloatWritable](reducer)
- reduceDriver.withAll(inputs).withAllOutput(outputs).runTest()
- }
+ val nullList: JList[NullWritable] = new JArrayList[NullWritable]()
+ nullList.add(NullWritable.get())
+
+ val inputs1 = List(
+ new YearYMDClose(2011, "2011-02-02", 302.0f),
+ new YearYMDClose(2011, "2011-02-03", 304.0f),
+ new YearYMDClose(2012, "2012-01-02", 402.0f),
+ new YearYMDClose(2012, "2012-01-03", 404.0f))
+ .map(y ⇒ new MRPair(y, nullList))
+
+ val outputs1 = List(
+ ("2011-02-02", 302.0f),
+ ("2011-02-03", 304.0f),
+ ("2012-01-02", 402.0f),
+ ("2012-01-03", 404.0f))
+ .map {
+ case (ymd, price) ⇒ new MRPair(new Text(ymd), new FloatWritable(price))
+ }
+
+ import scala.collection.JavaConversions._
+
+ val inputs: JList[MRPair[YearYMDClose, JList[NullWritable]]] = inputs1
+ val outputs: JList[MRPair[Text, FloatWritable]] = outputs1
+ val reducer = new StockReducer()
+ val reduceDriver = new ReduceDriver[YearYMDClose, NullWritable, Text, FloatWritable](reducer)
+ reduceDriver.withAll(inputs).withAllOutput(outputs).runTest()
+ }
}
diff --git a/src/test/scala/secondarysort/YearYMDCloseSpec.scala b/src/test/scala/secondarysort/YearYMDCloseSpec.scala
index 422bf3b..31fa942 100644
--- a/src/test/scala/secondarysort/YearYMDCloseSpec.scala
+++ b/src/test/scala/secondarysort/YearYMDCloseSpec.scala
@@ -23,7 +23,7 @@ class YearYMDCloseSpec extends FunSpec {
assert("2011-02-02" === y.ymd)
assert(302.0f === y.closingPrice)
}
- }
+ }
describe("default constructor") {
it("uses 0 and \"\", as appropriate, for the values") {
@@ -32,27 +32,27 @@ class YearYMDCloseSpec extends FunSpec {
assert("" === y.ymd)
assert(0.0f === y.closingPrice)
}
- }
+ }
describe("compareTo sorts lexicographically, ascending") {
- val year1 = new YearYMDClose(2010, "2011-02-02", 302.0f)
- val year2 = new YearYMDClose(2011, "2011-02-02", 302.0f)
- val ymd1 = new YearYMDClose(2011, "2011-02-01", 302.0f)
- val ymd2 = new YearYMDClose(2011, "2011-02-02", 302.0f)
- val cp1 = new YearYMDClose(2011, "2011-02-02", 301.0f)
- val cp2 = new YearYMDClose(2011, "2011-02-02", 302.0f)
-
- assert(year1.compareTo(year1) == 0)
- assert(year1.compareTo(year2) < 0)
- assert(year2.compareTo(year1) > 0)
-
- assert(ymd1.compareTo(ymd1) == 0)
- assert(ymd1.compareTo(ymd2) < 0)
- assert(ymd2.compareTo(ymd1) > 0)
-
- assert(cp1.compareTo(cp1) == 0)
- assert(cp1.compareTo(cp2) < 0)
- assert(cp2.compareTo(cp1) > 0)
+ val year1 = new YearYMDClose(2010, "2011-02-02", 302.0f)
+ val year2 = new YearYMDClose(2011, "2011-02-02", 302.0f)
+ val ymd1 = new YearYMDClose(2011, "2011-02-01", 302.0f)
+ val ymd2 = new YearYMDClose(2011, "2011-02-02", 302.0f)
+ val cp1 = new YearYMDClose(2011, "2011-02-02", 301.0f)
+ val cp2 = new YearYMDClose(2011, "2011-02-02", 302.0f)
+
+ assert(year1.compareTo(year1) == 0)
+ assert(year1.compareTo(year2) < 0)
+ assert(year2.compareTo(year1) > 0)
+
+ assert(ymd1.compareTo(ymd1) == 0)
+ assert(ymd1.compareTo(ymd2) < 0)
+ assert(ymd2.compareTo(ymd1) > 0)
+
+ assert(cp1.compareTo(cp1) == 0)
+ assert(cp1.compareTo(cp2) < 0)
+ assert(cp2.compareTo(cp1) > 0)
}
}
}
diff --git a/src/test/scala/wordcount/WordCountSuite.scala b/src/test/scala/wordcount/WordCountSuite.scala
index f93cc5f..8ab6fa9 100755
--- a/src/test/scala/wordcount/WordCountSuite.scala
+++ b/src/test/scala/wordcount/WordCountSuite.scala
@@ -3,13 +3,13 @@ package wordcount
import org.scalatest.junit.JUnitSuite
import org.junit.Assert._
import org.junit.Test
-import java.util.{ List => JList, ArrayList => JArrayList }
+import java.util.{ List ⇒ JList, ArrayList ⇒ JArrayList }
import org.apache.hadoop.io.{ IntWritable, LongWritable, Text }
import org.apache.hadoop.mapred.{ Mapper, Reducer }
import org.apache.hadoop.mrunit.{ MapDriver, ReduceDriver }
-import org.apache.hadoop.mrunit.types.{ Pair => MRPair }
+import org.apache.hadoop.mrunit.types.{ Pair ⇒ MRPair }
-/**
+/**
* Tests the WordCount examples.
* MRUnit is used, which is a JUnit plugin. Hence, we use ScalaTest's JUnit integration.
*/
@@ -20,20 +20,20 @@ class WordCountSuite extends JUnitSuite {
*/
@Test def `WordCountNoBuffering.Map outputs unbuffered (word,1) pairs`() {
val mapper: Mapper[LongWritable, Text, Text, IntWritable] =
- new WordCountNoBuffering.Map();
- val mapDriver: MapDriver[LongWritable, Text, Text, IntWritable] =
- new MapDriver[LongWritable, Text, Text, IntWritable](mapper);
+ new WordCountNoBuffering.Map()
+ val mapDriver: MapDriver[LongWritable, Text, Text, IntWritable] =
+ new MapDriver[LongWritable, Text, Text, IntWritable](mapper)
- val one = new IntWritable(1);
- val zero = new LongWritable(0L);
+ val one = new IntWritable(1)
+ val zero = new LongWritable(0L)
mapDriver.withInput(zero, new Text("Did the Buffalo buffalo Buffalo, NY?"))
- .withOutput(new Text("did"), one)
- .withOutput(new Text("the"), one)
- .withOutput(new Text("buffalo"), one)
- .withOutput(new Text("buffalo"), one)
+ .withOutput(new Text("did"), one)
+ .withOutput(new Text("the"), one)
+ .withOutput(new Text("buffalo"), one)
+ .withOutput(new Text("buffalo"), one)
.withOutput(new Text("buffalo,"), one)
- .withOutput(new Text("ny?"), one)
- .runTest();
+ .withOutput(new Text("ny?"), one)
+ .runTest()
}
/**
@@ -42,23 +42,23 @@ class WordCountSuite extends JUnitSuite {
*/
@Test def `WordCountNoBufferingTokenization.Map outputs unbuffered (word,1) pairs`() {
val mapper: Mapper[LongWritable, Text, Text, IntWritable] =
- new WordCountNoBufferingTokenization.Map();
- val mapDriver: MapDriver[LongWritable, Text, Text, IntWritable] =
- new MapDriver[LongWritable, Text, Text, IntWritable](mapper);
+ new WordCountNoBufferingTokenization.Map()
+ val mapDriver: MapDriver[LongWritable, Text, Text, IntWritable] =
+ new MapDriver[LongWritable, Text, Text, IntWritable](mapper)
- val one = new IntWritable(1);
- val zero = new LongWritable(0L);
+ val one = new IntWritable(1)
+ val zero = new LongWritable(0L)
mapDriver.withInput(zero, new Text("Did the Buffalo buffalo Buffalo, NY?"))
- .withOutput(new Text("did"), one)
- .withOutput(new Text("the"), one)
- .withOutput(new Text("buffalo"), one)
- .withOutput(new Text("buffalo"), one)
- .withOutput(new Text("buffalo"), one)
- .withOutput(new Text("ny"), one)
- .runTest();
+ .withOutput(new Text("did"), one)
+ .withOutput(new Text("the"), one)
+ .withOutput(new Text("buffalo"), one)
+ .withOutput(new Text("buffalo"), one)
+ .withOutput(new Text("buffalo"), one)
+ .withOutput(new Text("ny"), one)
+ .runTest()
}
- /**
+ /**
* Tests the buffered WordCount example, which does holds the key-value pairs,
* aggregating together the pairs for a given word. The close method outputs
* the final (word,N) pairs. Note that this version also used the improved
@@ -69,75 +69,75 @@ class WordCountSuite extends JUnitSuite {
*/
@Test def `WordCountBuffering.Map outputs buffered (word,N) pairs`() {
val mapper: Mapper[LongWritable, Text, Text, IntWritable] =
- new WordCountBuffering.Map();
- val mapDriver: MapDriver[LongWritable, Text, Text, IntWritable] =
- new MapDriver[LongWritable, Text, Text, IntWritable](mapper);
+ new WordCountBuffering.Map()
+ val mapDriver: MapDriver[LongWritable, Text, Text, IntWritable] =
+ new MapDriver[LongWritable, Text, Text, IntWritable](mapper)
- val zero = new LongWritable(0L);
- val one = new IntWritable(1);
- val two = new IntWritable(2);
- val results: JList[MRPair[Text, IntWritable]] =
+ val zero = new LongWritable(0L)
+ val one = new IntWritable(1)
+ val two = new IntWritable(2)
+ val results: JList[MRPair[Text, IntWritable]] =
mapDriver.withInput(zero, new Text("Did the Buffalo buffalo Buffalo, NY?")).run()
mapper.close()
// Note the actual order received!!
- assert("ny" === results.get(0).getFirst().toString())
- assert("did" === results.get(1).getFirst().toString())
- assert("the" === results.get(2).getFirst().toString())
- assert("buffalo" === results.get(3).getFirst().toString())
- assert(1 === results.get(0).getSecond().get())
- assert(1 === results.get(1).getSecond().get())
- assert(1 === results.get(2).getSecond().get())
- assert(3 === results.get(3).getSecond().get())
+ assert("ny" === results.get(0).getFirst().toString())
+ assert("did" === results.get(1).getFirst().toString())
+ assert("the" === results.get(2).getFirst().toString())
+ assert("buffalo" === results.get(3).getFirst().toString())
+ assert(1 === results.get(0).getSecond().get())
+ assert(1 === results.get(1).getSecond().get())
+ assert(1 === results.get(2).getSecond().get())
+ assert(3 === results.get(3).getSecond().get())
}
- /**
+ /**
* Tests the buffered WordCount example that also flushes its buffers periodically,
* a technique to avoid exhausting the Heap. As for the WordCountBuffering.Map test,
* we have to use MRUnit carefully...
*/
@Test def `WordCountBufferingFlushing.Map outputs buffered (word,N) pairs that are occasionally flushed`() {
val mapper: Mapper[LongWritable, Text, Text, IntWritable] =
- new WordCountBufferingFlushing.Map();
- val mapDriver: MapDriver[LongWritable, Text, Text, IntWritable] =
- new MapDriver[LongWritable, Text, Text, IntWritable](mapper);
+ new WordCountBufferingFlushing.Map()
+ val mapDriver: MapDriver[LongWritable, Text, Text, IntWritable] =
+ new MapDriver[LongWritable, Text, Text, IntWritable](mapper)
- val zero = new LongWritable(0L);
- val one = new IntWritable(1);
- val two = new IntWritable(2);
- val results: JList[MRPair[Text, IntWritable]] =
+ val zero = new LongWritable(0L)
+ val one = new IntWritable(1)
+ val two = new IntWritable(2)
+ val results: JList[MRPair[Text, IntWritable]] =
mapDriver.withInput(zero, new Text("Did the Buffalo buffalo Buffalo, NY?")).run()
mapper.close()
// Note the actual order received!!
- assert("ny" === results.get(0).getFirst().toString())
- assert("did" === results.get(1).getFirst().toString())
- assert("the" === results.get(2).getFirst().toString())
- assert("buffalo" === results.get(3).getFirst().toString())
- assert(1 === results.get(0).getSecond().get())
- assert(1 === results.get(1).getSecond().get())
- assert(1 === results.get(2).getSecond().get())
- assert(3 === results.get(3).getSecond().get())
+ assert("ny" === results.get(0).getFirst().toString())
+ assert("did" === results.get(1).getFirst().toString())
+ assert("the" === results.get(2).getFirst().toString())
+ assert("buffalo" === results.get(3).getFirst().toString())
+ assert(1 === results.get(0).getSecond().get())
+ assert(1 === results.get(1).getSecond().get())
+ assert(1 === results.get(2).getSecond().get())
+ assert(3 === results.get(3).getSecond().get())
}
/**
* Tests the Reducer used by all the examples.
*/
@Test def `WordCountReducer aggregates word counts`() {
- val reducer: Reducer[Text, IntWritable, Text, IntWritable] =
- new WordCountReducer();
- val reduceDriver: ReduceDriver[Text, IntWritable, Text, IntWritable] =
- new ReduceDriver[Text, IntWritable, Text, IntWritable](reducer);
-
- val buffalo = new JArrayList[IntWritable]();
- val one = new IntWritable(1);
- buffalo.add(one);
- buffalo.add(one);
- buffalo.add(one);
-
+ val reducer: Reducer[Text, IntWritable, Text, IntWritable] =
+ new WordCountReducer()
+ val reduceDriver: ReduceDriver[Text, IntWritable, Text, IntWritable] =
+ new ReduceDriver[Text, IntWritable, Text, IntWritable](reducer)
+
+ val buffalo = new JArrayList[IntWritable]()
+ val one = new IntWritable(1)
+ buffalo.add(one)
+ buffalo.add(one)
+ buffalo.add(one)
+
reduceDriver
.withInput(new Text("buffalo"), buffalo)
.withOutput(new Text("buffalo"), new IntWritable(3))
- .runTest();
+ .runTest()
}
}