Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved styling #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ resolvers ++= Seq(
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")

addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.0.1")
9 changes: 9 additions & 0 deletions scalariform.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import scalariform.formatter.preferences._

scalariformSettings

ScalariformKeys.preferences := FormattingPreferences()
.setPreference(RewriteArrowSymbols, true)
.setPreference(AlignParameters, true)
.setPreference(AlignSingleLineCaseStatements, true)
.setPreference(DoubleIndentClassDeclaration, true)
2 changes: 1 addition & 1 deletion src/main/scala/driver/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ import wordcount._
object Driver {

def main(args: Array[String]): Unit = {
ToolRunner.run(new Configuration, WordCount, args);
ToolRunner.run(new Configuration, WordCount, args)
}
}
12 changes: 6 additions & 6 deletions src/main/scala/secondarysort/GroupComparator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import org.apache.hadoop.io.{ WritableComparable, WritableComparator }

/**
* We just consider the <em>full date</em> when grouping in the reducer.
* If you just consider the year, you end up with just one key-value pair per year
* If you just consider the year, you end up with just one key-value pair per year
* in the final output and probably not the maximum!!
*/
class GroupComparator extends WritableComparator(classOf[YearYMDClose], true) {

override def compare(w1: WritableComparable[_], w2: WritableComparable[_]): Int = {
val t1 = w1.asInstanceOf[YearYMDClose];
val t2 = w2.asInstanceOf[YearYMDClose];
t1.ymd.compareTo(t2.ymd); // compare the 2nd field.
}
val t1 = w1.asInstanceOf[YearYMDClose]
val t2 = w2.asInstanceOf[YearYMDClose]
t1.ymd.compareTo(t2.ymd) // compare the 2nd field.
}
}
8 changes: 4 additions & 4 deletions src/main/scala/secondarysort/KeyComparator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ class KeyComparator extends WritableComparator(classOf[YearYMDClose], true) {
* year (1st element) ascending, then the price (3rd element) descending.
*/
override def compare(w1: WritableComparable[_], w2: WritableComparable[_]): Int = {
val t1 = w1.asInstanceOf[YearYMDClose];
val t2 = w2.asInstanceOf[YearYMDClose];
val cmp = t1.year.compareTo(t2.year);
val t1 = w1.asInstanceOf[YearYMDClose]
val t2 = w2.asInstanceOf[YearYMDClose]
val cmp = t1.year.compareTo(t2.year)
if (cmp != 0) {
cmp
} else {
-t1.closingPrice.compareTo(t2.closingPrice); // REVERSE!
-t1.closingPrice.compareTo(t2.closingPrice) // REVERSE!
}
}
}
14 changes: 7 additions & 7 deletions src/main/scala/secondarysort/PartitionByYear.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import org.apache.hadoop.mapred.{ JobConf, Partitioner }
* When determining how to send key-value pairs to reducers, consider ONLY the year!
*/
class PartitionByYear extends Partitioner[YearYMDClose, NullWritable] {
override def configure(job: JobConf): Unit = {}
override def getPartition(
key: YearYMDClose,
value: NullWritable,
numPartitions: Int): Int = math.abs(key.year) % numPartitions;

override def configure(job: JobConf): Unit = {}

override def getPartition(
key: YearYMDClose,
value: NullWritable,
numPartitions: Int): Int = math.abs(key.year) % numPartitions
}
54 changes: 27 additions & 27 deletions src/main/scala/secondarysort/SecondarySort.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package secondarysort

import org.apache.hadoop.conf.Configured
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ FloatWritable, NullWritable, Text}
import org.apache.hadoop.io.{ FloatWritable, NullWritable, Text }
import org.apache.hadoop.mapred.{ FileInputFormat, FileOutputFormat, JobClient, JobConf }
import org.apache.hadoop.util.{ GenericOptionsParser, Tool, ToolRunner }

Expand All @@ -11,8 +11,8 @@ import org.apache.hadoop.util.{ GenericOptionsParser, Tool, ToolRunner }
* user-specified stock, but sort by year ascending, followed by closing price
* descending. Hence, the first record for a given year will be the maximum price
* for that year.
* See also the secondary sort example that comes with the Hadoop distribution, and
* discussions in "Hadoop: The Definitive Guide"
* See also the secondary sort example that comes with the Hadoop distribution, and
* discussions in "Hadoop: The Definitive Guide"
*/
class SecondarySort extends Configured with Tool {
import SecondarySort._
Expand All @@ -21,40 +21,40 @@ class SecondarySort extends Configured with Tool {
val conf = new JobConf(classOf[SecondarySort])
conf.setJobName("Stock Analysis")
val optionsParser = new GenericOptionsParser(conf, args)

val remainingArgs = optionsParser.getRemainingArgs()
if (remainingArgs.length < 4) {
usage("Must specify --symbol symbol input_path output_path.")
return 1
}
usage("Must specify --symbol symbol input_path output_path.")
return 1
}

var symbol = ""
var inputPath = ""
var outputPath = ""
var nextIsSymbol = false;
for (arg <- remainingArgs) {
var nextIsSymbol = false
for (arg remainingArgs) {
if (arg.startsWith("--s")) {
nextIsSymbol = true;
nextIsSymbol = true
} else {
if (nextIsSymbol) {
conf.set("symbol", arg);
symbol = arg;
nextIsSymbol = false;
conf.set("symbol", arg)
symbol = arg
nextIsSymbol = false
} else if (inputPath.isEmpty()) {
inputPath = arg;
inputPath = arg
} else if (outputPath.isEmpty()) {
outputPath = arg;
outputPath = arg
} else {
usage("Too many arguments specified.");
return 1;
usage("Too many arguments specified.")
return 1
}
}
}
if (symbol.isEmpty()) {
usage("Must specify '--symbol symbol' argument!")
return 1
}
println("Using Stock Symbol: "+symbol);
println("Using Stock Symbol: " + symbol)
if (inputPath.isEmpty()) {
usage("Must specify an input path argument!")
return 1
Expand All @@ -63,7 +63,7 @@ class SecondarySort extends Configured with Tool {
usage("Must specify an output path argument!")
return 1
}

// Because of type erasure, the intermediate Map output K-V types and
// final K-V types can't be inferred from the types of the mapper and
// reducer.
Expand All @@ -76,7 +76,7 @@ class SecondarySort extends Configured with Tool {
FileOutputFormat.setOutputPath(conf, new Path(outputPath))

conf.setMapperClass(classOf[StockMapper])

// Experiment with not setting the reducer class, which means the default
// "Identity Reducer" is used. Then try setting the number of reducers to zero.
// Next try setting the number of reducers to a number between 2 and 5, say.
Expand All @@ -87,10 +87,10 @@ class SecondarySort extends Configured with Tool {
conf.setNumReduceTasks(2)

// Would a combiner help? Not likely, because we won't have many identical key-value pairs!

// Specify our custom partitioner, etc.
conf.setPartitionerClass(classOf[PartitionByYear])
conf.setOutputKeyComparatorClass(classOf[KeyComparator])
conf.setOutputKeyComparatorClass(classOf[KeyComparator])
conf.setOutputValueGroupingComparator(classOf[GroupComparator])

// This code was copied from the TwitterIndexer exercise we'll do later.
Expand All @@ -100,7 +100,7 @@ class SecondarySort extends Configured with Tool {
// to import the correct classes! However, Eclipse can handle that for you...
// The compression settings are optional and can be omitted.
// conf.setOutputFormat(classOf[SequenceFileOutputFormat])
// SequenceFileOutputFormat.setCompressOutput(conf, true);
// SequenceFileOutputFormat.setCompressOutput(conf, true)
// SequenceFileOutputFormat.setOutputCompressorClass(conf, classOf[BZip2Codec])
// SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK)

Expand All @@ -112,10 +112,10 @@ class SecondarySort extends Configured with Tool {
object SecondarySort extends Configured {

def usage(message: String): Unit = {
Console.err.println(message);
Console.err.println("usage: java ...SecondarySort [generic_options] " +
"--symbol stock_symbol in_path out_path");
ToolRunner.printGenericCommandUsage(Console.err);
Console.err.println(message)
Console.err.println("usage: java ...SecondarySort [generic_options] " +
"--symbol stock_symbol in_path out_path")
ToolRunner.printGenericCommandUsage(Console.err)
}

def main(args: Array[String]): Unit = {
Expand Down
79 changes: 39 additions & 40 deletions src/main/scala/secondarysort/StockMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,70 +4,69 @@ import org.apache.hadoop.io.{ LongWritable, NullWritable, Text }
import org.apache.hadoop.mapred.{ JobConf, InvalidJobConfException, MapReduceBase, Mapper, OutputCollector, Reporter }

/**
* The mapper parses the record, extracting the date, the year from the date, and the closing price.
* The mapper parses the record, extracting the date, the year from the date, and the closing price.
* The input records have this format:
* exchange,symbol,yyyy-mm-dd,opening_price,high_price,low_price,closing_price,volume,adjusted_closing_price.
* The mapper extracts and uses the symbol (for filtering), the yyyy-mm-dd (from which it also extracts the year),
* and the adjusted closing price.
*/
class StockMapper extends MapReduceBase with
Mapper[LongWritable, Text, YearYMDClose, NullWritable] {
class StockMapper extends MapReduceBase with Mapper[LongWritable, Text, YearYMDClose, NullWritable] {

var symbol: Option[String] = None

def setSymbol(newSymbol: String): Unit = {
symbol = newSymbol match {
case null => None
case _ =>
case null None
case _
println(s"Using Stock Symbol: $newSymbol")
Some(newSymbol)
}
}

override def configure(jobConf: JobConf): Unit = {
jobConf.get("symbol") match {
case null => /* do nothing */
case sym => setSymbol(sym)
case null /* do nothing */
case sym setSymbol(sym)
}
}

override def map(
key: LongWritable, // offset in the file
line: Text, // record on a line
collector: OutputCollector[YearYMDClose, NullWritable],
key: LongWritable, // offset in the file
line: Text, // record on a line
collector: OutputCollector[YearYMDClose, NullWritable],
reporter: Reporter): Unit = {

val stockSymbol: String = symbol match {
case None =>
reporter.incrCounter(RecordFormatRecords.NO_SYMBOL_SPECIFIED, 1)
throw new InvalidJobConfException("No stock symbol was specified!")
case Some(s) => s
}

try {
reporter.incrCounter(RecordFormatRecords.MAP_RECORDS_SEEN, 1);
val stockSymbol: String = symbol match {
case None ⇒
reporter.incrCounter(RecordFormatRecords.NO_SYMBOL_SPECIFIED, 1)
throw new InvalidJobConfException("No stock symbol was specified!")
case Some(s) ⇒ s
}

val fields = line.toString().split(","); // CSV
val sym = fields(1)
if (sym == stockSymbol) { // filter!
val date = fields(2)
val ymd = date.split("-")
val year = ymd(0).toInt
// Actually use the "adjusted" close, which is the last field in the record.
val closing = fields(fields.length-1).toFloat
val outputTuple = new YearYMDClose(year, date, closing)
collector.collect(outputTuple, NullWritable.get())
}
} catch {
case nfe: NumberFormatException =>
// Gets directed to the task's user syserr log.
// You can more carefully control how messages are logged, e.g., the severity level,
// by using Apache Commons Logging. Log4J is actually used. The relevant log4j
// appender is called TLA, for "Task Log Appender", in $HADOOP_HOME/conf/log4j.properties.
Console.err.println(nfe+": ("+key.get()+", "+line+")")
try {
reporter.incrCounter(RecordFormatRecords.MAP_RECORDS_SEEN, 1)

// Example of a Counter.
reporter.incrCounter(RecordFormatRecords.RECORD_FORMAT_ERRORS, 1)
val fields = line.toString().split(",") // CSV
val sym = fields(1)
if (sym == stockSymbol) { // filter!
val date = fields(2)
val ymd = date.split("-")
val year = ymd(0).toInt
// Actually use the "adjusted" close, which is the last field in the record.
val closing = fields(fields.length - 1).toFloat
val outputTuple = new YearYMDClose(year, date, closing)
collector.collect(outputTuple, NullWritable.get())
}
} catch {
case nfe: NumberFormatException ⇒
// Gets directed to the task's user syserr log.
// You can more carefully control how messages are logged, e.g., the severity level,
// by using Apache Commons Logging. Log4J is actually used. The relevant log4j
// appender is called TLA, for "Task Log Appender", in $HADOOP_HOME/conf/log4j.properties.
Console.err.println(nfe + ": (" + key.get() + ", " + line + ")")

// Example of a Counter.
reporter.incrCounter(RecordFormatRecords.RECORD_FORMAT_ERRORS, 1)
}
}
}
14 changes: 6 additions & 8 deletions src/main/scala/secondarysort/StockReducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ import org.apache.hadoop.io.{ FloatWritable, NullWritable, Text }
import org.apache.hadoop.mapred.{ MapReduceBase, OutputCollector, Reducer, Reporter }
import RecordFormatRecords._


/**
* Reducer that converts the keys into tab-delimited date, closing-price values.
* Reducer that converts the keys into tab-delimited date, closing-price values.
* If we didn't care about the output format, we could use the default Identity reducer!
*/
class StockReducer extends MapReduceBase with
Reducer[YearYMDClose, NullWritable, Text, FloatWritable] {
class StockReducer extends MapReduceBase with Reducer[YearYMDClose, NullWritable, Text, FloatWritable] {

override def reduce(key: YearYMDClose,
ignore: Iterator[NullWritable],
output: OutputCollector[Text, FloatWritable],
reporter: Reporter) = {
override def reduce(key: YearYMDClose,
ignore: Iterator[NullWritable],
output: OutputCollector[Text, FloatWritable],
reporter: Reporter) = {
reporter.incrCounter(REDUCE_RECORDS_SEEN, 1)
output.collect(new Text(key.ymd), new FloatWritable(key.closingPrice))
}
Expand Down
16 changes: 8 additions & 8 deletions src/main/scala/secondarysort/YearYMDClose.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,25 @@ case class YearYMDClose(yearW: IntWritable, ymdW: Text, closingPriceW: FloatWrit
/**
* Convenience constructor.
*/
def this(year: Int, ymd: String, closingPrice: Float) =
def this(year: Int, ymd: String, closingPrice: Float) =
this(new IntWritable(year), new Text(ymd), new FloatWritable(closingPrice))

/**
/**
* You should not create an object in an invalid, e.g., uninitialized
* state. Unfortunately, Hadoop requires a default constructor here.
*/
def this() = this(0, "", 0.0f)

override def write(out: DataOutput): Unit = {
yearW.write(out);
ymdW.write(out);
closingPriceW.write(out);
yearW.write(out)
ymdW.write(out)
closingPriceW.write(out)
}

override def readFields(in: DataInput): Unit = {
yearW.readFields(in);
ymdW.readFields(in);
closingPriceW.readFields(in);
yearW.readFields(in)
ymdW.readFields(in)
closingPriceW.readFields(in)
}

def compareTo(other: YearYMDClose): Int = {
Expand Down
Loading