From 05df02bc9d9507bfe4d2dcd380682b40f73cefc5 Mon Sep 17 00:00:00 2001 From: Eddie Xie Date: Thu, 7 May 2015 15:40:39 -0700 Subject: [PATCH 1/9] Proof of concept --- .../com/twitter/scalding/Operations.scala | 24 +++++++++++++++++- .../com/twitter/scalding/PageRankTest.scala | 25 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index 7e5a1a9507..b0688b4a9c 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -476,6 +476,16 @@ package com.twitter.scalding { } } + class KeyValueCounter(val nKey: Long, val valuesSquaredSum: Double, val valuesSum: Double) { + def add(v: Long) = { + new KeyValueCounter(nKey + 1, valuesSquaredSum + v * v, valuesSum + v) + } + + override def toString = { + "KeyValueCounter(nKeys =" + nKey + ", valueSquaredSum = " + valuesSquaredSum + ", valuesSum = " + valuesSum + ")" + } + } + /** In the typed API every reduce operation is handled by this Buffer */ class TypedBufferOp[K, V, U]( conv: TupleConverter[K], @@ -483,6 +493,7 @@ package com.twitter.scalding { valueField: Fields) extends BaseOperation[Any](valueField) with Buffer[Any] with ScaldingPrepare[Any] { val reduceFnSer = Externalizer(reduceFn) + var keyValueCounter = new KeyValueCounter(0L, 0L, 0L) def operate(flowProcess: FlowProcess[_], call: BufferCall[Any]) { val oc = call.getOutputCollector @@ -491,11 +502,22 @@ package com.twitter.scalding { .asScala .map(_.getObject(0).asInstanceOf[V]) + /* + val tmp = values.toList + + keyValueCounter = keyValueCounter.add(tmp.size) + println("AFTER UPDATE = " + keyValueCounter) + println("values.size = " + tmp.toList.size) + println("values = " + tmp.toList) + */ + // Avoiding a lambda here val resIter = reduceFnSer.get(key, values) while (resIter.hasNext) { val tup = Tuple.size(1) - tup.set(0, resIter.next) + val t2 = resIter.next + println("t2 = " + t2) + tup.set(0, t2) oc.add(tup) } } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala index abb61e8de6..2df88d42bb 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala @@ -17,6 +17,31 @@ package com.twitter.scalding import org.scalatest.{ Matchers, WordSpec } +class SimplySimplyClass(args: Args) extends Job(args) { + val t = TypedPipe.from(List((1, 2), (1, 3), (2, 3), (2, 5), (3, 1000))) + .group + .reduce{ + (a, b) => + a + b + } + .toTypedPipe + .write(TypedTsv[(Int, Int)](args("output"))) +} + +class ExperimentTest extends WordSpec with Matchers { + "A PageRank2 job" should { + JobTest(new com.twitter.scalding.SimplySimplyClass(_)) + .arg("output", "blah") + .sink[(Int, Int)](TypedTsv[(Int, Int)]("blah")){ + tuples => + println("RES = " + tuples) + } + .run + .finish + } + +} + class PageRankTest extends WordSpec with Matchers { "A PageRank job" should { JobTest(new com.twitter.scalding.examples.PageRank(_)) From d4aff6b1a2f9f8e1a364dad88f3398b68b3130c0 Mon Sep 17 00:00:00 2001 From: Eddie Xie Date: Tue, 12 May 2015 16:06:01 -0700 Subject: [PATCH 2/9] WIP. Working example --- .../com/twitter/scalding/Operations.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index b0688b4a9c..0602f52738 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -481,6 +481,8 @@ package com.twitter.scalding { new KeyValueCounter(nKey + 1, valuesSquaredSum + v * v, valuesSum + v) } + def getStd = math.sqrt(valuesSquaredSum*1.0/nKey - valuesSum*valuesSum) + override def toString = { "KeyValueCounter(nKeys =" + nKey + ", valueSquaredSum = " + valuesSquaredSum + ", valuesSum = " + valuesSum + ")" } @@ -502,24 +504,20 @@ package com.twitter.scalding { .asScala .map(_.getObject(0).asInstanceOf[V]) - /* - val tmp = values.toList - - keyValueCounter = keyValueCounter.add(tmp.size) - println("AFTER UPDATE = " + keyValueCounter) - println("values.size = " + tmp.toList.size) - println("values = " + tmp.toList) - */ + // We compute standard-deviation for number of values associated with keys here + val cache = values.toList + keyValueCounter = keyValueCounter.add(cache.size) // Avoiding a lambda here - val resIter = reduceFnSer.get(key, values) + val resIter = reduceFnSer.get(key, cache.toIterator) while (resIter.hasNext) { val tup = Tuple.size(1) val t2 = resIter.next - println("t2 = " + t2) tup.set(0, t2) oc.add(tup) } } + + def stdForValueNumbers = keyValueCounter.getStd } } From c8eebf850c17d72303998dc4b9a5134569458737 Mon Sep 17 00:00:00 2001 From: Eddie Xie Date: Tue, 12 May 2015 16:56:33 -0700 Subject: [PATCH 3/9] WIP --- .../com/twitter/scalding/PageRankTest.scala | 14 ++++++--- .../scalding/ReduceOperationsTest.scala | 29 +++++++++++++++++++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala index 2df88d42bb..2c6be97113 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala @@ -18,21 +18,27 @@ package com.twitter.scalding import org.scalatest.{ Matchers, WordSpec } class SimplySimplyClass(args: Args) extends Job(args) { - val t = TypedPipe.from(List((1, 2), (1, 3), (2, 3), (2, 5), (3, 1000))) + TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3)))) .group - .reduce{ + .foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) } + /*.reduce{ (a, b) => a + b } + */ .toTypedPipe - .write(TypedTsv[(Int, Int)](args("output"))) + .map{ + case (a: Int, (b: Int, c: Int)) => + (a, b, c) + } + .write(TypedTsv[(Int, Int, Int)](args("output"))) } class ExperimentTest extends WordSpec with Matchers { "A PageRank2 job" should { JobTest(new com.twitter.scalding.SimplySimplyClass(_)) .arg("output", "blah") - .sink[(Int, Int)](TypedTsv[(Int, Int)]("blah")){ + .sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("blah")){ tuples => println("RES = " + tuples) } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala index 6dc773df65..c1c96a94f4 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala @@ -160,4 +160,33 @@ class ReduceOperationsTest extends WordSpec with Matchers { .runHadoop .finish } + + "Std for number of values aassocated with each key " should { + class SDForReduceOperation(args: Args) extends Job(args) { + TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3)))) + .group + .foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) } + .toTypedPipe + .map{ + case (a: Int, (b: Int, c: Int)) => + (a, b, c) + } + .write(TypedTsv[(Int, Int, Int)](args("output"))) + } + + class ExperimentTest extends WordSpec with Matchers { + "A PageRank2 job" should { + JobTest(new com.twitter.scalding.SimplySimplyClass(_)) + .arg("output", "blah") + .sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("blah")){ + tuples => + println("RES = " + tuples) + } + .run + .finish + } + + } + + } } From b48da0b369da8e8a80188a08afd84b91a67fb4da Mon Sep 17 00:00:00 2001 From: Eddie Xie Date: Fri, 15 May 2015 21:39:30 -0700 Subject: [PATCH 4/9] Change proper name for test --- .../twitter/scalding/ReduceOperationsTest.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala index c1c96a94f4..5bca58fcae 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala @@ -168,20 +168,20 @@ class ReduceOperationsTest extends WordSpec with Matchers { .foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) } .toTypedPipe .map{ - case (a: Int, (b: Int, c: Int)) => - (a, b, c) - } + case (a: Int, (b: Int, c: Int)) => + (a, b, c) + } .write(TypedTsv[(Int, Int, Int)](args("output"))) } class ExperimentTest extends WordSpec with Matchers { "A PageRank2 job" should { - JobTest(new com.twitter.scalding.SimplySimplyClass(_)) + JobTest(new com.twitter.scalding.ReduceValueCountJob(_)) .arg("output", "blah") .sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("blah")){ - tuples => - println("RES = " + tuples) - } + tuples => + println("RES = " + tuples) + } .run .finish } From 0f495b1eb41b40e6d0dd1683d37ea9c16a2c2562 Mon Sep 17 00:00:00 2001 From: Eddie Xie Date: Fri, 15 May 2015 21:39:53 -0700 Subject: [PATCH 5/9] Add counter for number of values in each key for reduce operation --- .../com/twitter/scalding/Operations.scala | 59 +++++++++++++------ 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index 0602f52738..914c913932 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -476,26 +476,19 @@ package com.twitter.scalding { } } - class KeyValueCounter(val nKey: Long, val valuesSquaredSum: Double, val valuesSum: Double) { - def add(v: Long) = { - new KeyValueCounter(nKey + 1, valuesSquaredSum + v * v, valuesSum + v) - } - - def getStd = math.sqrt(valuesSquaredSum*1.0/nKey - valuesSum*valuesSum) - - override def toString = { - "KeyValueCounter(nKeys =" + nKey + ", valueSquaredSum = " + valuesSquaredSum + ", valuesSum = " + valuesSum + ")" - } + private[scalding] object SkewMonitorCounters { + // Strangely, if group name and key name are different, then the counter would be zero + val KeyCount = "scalding.skew.monitor.key.count" + val ValuesCountSum = "scalding.skew.monitor.value.count.sum" + val ValuesCountSquareSum = "scalding.skew.monitor.value.sum.square" } - /** In the typed API every reduce operation is handled by this Buffer */ class TypedBufferOp[K, V, U]( conv: TupleConverter[K], @transient reduceFn: (K, Iterator[V]) => Iterator[U], valueField: Fields) extends BaseOperation[Any](valueField) with Buffer[Any] with ScaldingPrepare[Any] { val reduceFnSer = Externalizer(reduceFn) - var keyValueCounter = new KeyValueCounter(0L, 0L, 0L) def operate(flowProcess: FlowProcess[_], call: BufferCall[Any]) { val oc = call.getOutputCollector @@ -504,20 +497,52 @@ package com.twitter.scalding { .asScala .map(_.getObject(0).asInstanceOf[V]) - // We compute standard-deviation for number of values associated with keys here - val cache = values.toList - keyValueCounter = keyValueCounter.add(cache.size) + /* + This prints out key = 1, 1, 1. un comment it and comment the block blow to see print + value count = 1 + value count = 1 + value count = 1 + + which is wrong. one of the value count should be 2 in the test + */ + + /* + var numValuesPerKey = 0L + + val resIter = reduceFnSer.get(key, values) + while (resIter.hasNext) { + val tup = Tuple.size(1) + val t2 = resIter.next + + numValuesPerKey += 1L + + tup.set(0, t2) + oc.add(tup) + } + val valueCountSum = numValuesPerKey + println("value count = " + numValuesPerKey) + */ + + val caches = values.toList + var numValuesPerKey = caches.size // Avoiding a lambda here - val resIter = reduceFnSer.get(key, cache.toIterator) + val resIter = reduceFnSer.get(key, caches.toIterator) while (resIter.hasNext) { val tup = Tuple.size(1) val t2 = resIter.next + tup.set(0, t2) oc.add(tup) } + val valueCountSum = numValuesPerKey + + println("value count = " + numValuesPerKey) + + flowProcess.increment(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount, 1L) + flowProcess.increment(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum, numValuesPerKey) + flowProcess.increment(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum, numValuesPerKey * numValuesPerKey) } - def stdForValueNumbers = keyValueCounter.getStd } } From 58c3f121e0ed79754eb31991b766abbb844c0d42 Mon Sep 17 00:00:00 2001 From: Eddie Xie Date: Fri, 15 May 2015 21:40:43 -0700 Subject: [PATCH 6/9] Add test --- .../scala/com/twitter/scalding/CoreTest.scala | 40 +++++++++++++++++++ .../com/twitter/scalding/PageRankTest.scala | 31 -------------- 2 files changed, 40 insertions(+), 31 deletions(-) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala index 1244e42771..21454f602f 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala @@ -1844,6 +1844,46 @@ class CounterJobTest extends WordSpec with Matchers { } } +class ReduceValueCountJob(args: Args) extends Job(args) { + TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3)))) + .group + .foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) } + .toTypedPipe + .map{ + case (a: Int, (b: Int, c: Int)) => + (a, b, c) + } + .write(TypedTsv[(Int, Int, Int)](args("output"))) +} + +class ReduceValueCounterTest extends WordSpec with Matchers { + "Reduce Values Count" should { + JobTest(new com.twitter.scalding.ReduceValueCountJob(_)) + .arg("output", "output0") + .counter(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount){ + x => + x should be(3) + } + .counter(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum) { + x => + // key 1 has two values, thus 2^2 = 4. key 2 and 3 has only one respectively + x should be(4 + 1 + 1) + } + .counter(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum) { + x => + // key 1 has two values, thus 2^2 = 4. key 2 and 3 has only one respectively + x should be (2 + 1 + 1) + } + .sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("output0")){ + tuples => + + } + .runHadoop + .finish + } + +} + object DailySuffixTsvJob { val strd1 = "2014-05-01" val strd2 = "2014-05-02" diff --git a/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala index 2c6be97113..abb61e8de6 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/PageRankTest.scala @@ -17,37 +17,6 @@ package com.twitter.scalding import org.scalatest.{ Matchers, WordSpec } -class SimplySimplyClass(args: Args) extends Job(args) { - TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3)))) - .group - .foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) } - /*.reduce{ - (a, b) => - a + b - } - */ - .toTypedPipe - .map{ - case (a: Int, (b: Int, c: Int)) => - (a, b, c) - } - .write(TypedTsv[(Int, Int, Int)](args("output"))) -} - -class ExperimentTest extends WordSpec with Matchers { - "A PageRank2 job" should { - JobTest(new com.twitter.scalding.SimplySimplyClass(_)) - .arg("output", "blah") - .sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("blah")){ - tuples => - println("RES = " + tuples) - } - .run - .finish - } - -} - class PageRankTest extends WordSpec with Matchers { "A PageRank job" should { JobTest(new com.twitter.scalding.examples.PageRank(_)) From 3f5cb5e860d9bafbb73bfcdbf90b8bf3f8d8f28b Mon Sep 17 00:00:00 2001 From: Eddie Xie Date: Mon, 18 May 2015 15:47:12 -0700 Subject: [PATCH 7/9] Remove comment --- .../com/twitter/scalding/Operations.scala | 33 ++----------------- 1 file changed, 2 insertions(+), 31 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index 914c913932..2ca2ed0f71 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -497,34 +497,8 @@ package com.twitter.scalding { .asScala .map(_.getObject(0).asInstanceOf[V]) - /* - This prints out key = 1, 1, 1. un comment it and comment the block blow to see print - value count = 1 - value count = 1 - value count = 1 - - which is wrong. one of the value count should be 2 in the test - */ - - /* - var numValuesPerKey = 0L - - val resIter = reduceFnSer.get(key, values) - while (resIter.hasNext) { - val tup = Tuple.size(1) - val t2 = resIter.next - - numValuesPerKey += 1L - - tup.set(0, t2) - oc.add(tup) - } - val valueCountSum = numValuesPerKey - println("value count = " + numValuesPerKey) - */ - val caches = values.toList - var numValuesPerKey = caches.size + val numValuesPerKey = caches.size.toLong // Avoiding a lambda here val resIter = reduceFnSer.get(key, caches.toIterator) @@ -535,10 +509,7 @@ package com.twitter.scalding { tup.set(0, t2) oc.add(tup) } - val valueCountSum = numValuesPerKey - - println("value count = " + numValuesPerKey) - + flowProcess.increment(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount, 1L) flowProcess.increment(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum, numValuesPerKey) flowProcess.increment(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum, numValuesPerKey * numValuesPerKey) From 2b767c05bae0aedcd825a99c3de7b04c44a63620 Mon Sep 17 00:00:00 2001 From: Eddie Xie Date: Mon, 18 May 2015 16:14:40 -0700 Subject: [PATCH 8/9] Add iterator wrapper; fix comment; change group names --- .../com/twitter/scalding/Operations.scala | 28 +++++++++++-------- .../scala/com/twitter/scalding/CoreTest.scala | 2 +- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index 2ca2ed0f71..c18d4079c5 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -478,9 +478,17 @@ package com.twitter.scalding { private[scalding] object SkewMonitorCounters { // Strangely, if group name and key name are different, then the counter would be zero - val KeyCount = "scalding.skew.monitor.key.count" - val ValuesCountSum = "scalding.skew.monitor.value.count.sum" - val ValuesCountSquareSum = "scalding.skew.monitor.value.sum.square" + val KeyCount = "scalding.debug.reduce.key.count" + val ValuesCountSum = "scalding.debug.reduce.value.count.sum" + val ValuesCountSquareSum = "scalding.debug.reduce.value.count.sum.square" + } + + class CountingIterator[T](wraps: Iterator[T]) extends Iterator[T] { + // This Wrapper lets us know how many values have been iterated in an Iterator + private[this] var nextCalls = 0L + def hasNext = wraps.hasNext + def next = { nextCalls += 1; wraps.next } + def seen: Long = nextCalls } class TypedBufferOp[K, V, U]( @@ -493,23 +501,21 @@ package com.twitter.scalding { def operate(flowProcess: FlowProcess[_], call: BufferCall[Any]) { val oc = call.getOutputCollector val key = conv(call.getGroup) - val values = call.getArgumentsIterator + val values = new CountingIterator(call.getArgumentsIterator .asScala - .map(_.getObject(0).asInstanceOf[V]) - - val caches = values.toList - val numValuesPerKey = caches.size.toLong + .map(_.getObject(0).asInstanceOf[V])) // Avoiding a lambda here - val resIter = reduceFnSer.get(key, caches.toIterator) + val resIter = reduceFnSer.get(key, values) while (resIter.hasNext) { val tup = Tuple.size(1) val t2 = resIter.next - tup.set(0, t2) oc.add(tup) } - + + val numValuesPerKey = values.seen + flowProcess.increment(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount, 1L) flowProcess.increment(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum, numValuesPerKey) flowProcess.increment(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum, numValuesPerKey * numValuesPerKey) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala index b351b69afd..ae49b340c6 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala @@ -1844,7 +1844,7 @@ class ReduceValueCounterTest extends WordSpec with Matchers { } .counter(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum) { x => - // key 1 has two values, thus 2^2 = 4. key 2 and 3 has only one respectively + // sum of keys = 2 (for key 1) + 1 (for key 2) + 1 (for key 3) x should be (2 + 1 + 1) } .sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("output0")){ From 1a8d6e21f3ce2d5effef82c31cbb11d23db52802 Mon Sep 17 00:00:00 2001 From: Eddie Xie Date: Thu, 21 May 2015 14:04:25 -0700 Subject: [PATCH 9/9] Address Oscar's review --- .../com/twitter/scalding/Operations.scala | 7 +++-- .../scala/com/twitter/scalding/CoreTest.scala | 6 +++- .../scalding/ReduceOperationsTest.scala | 29 ------------------- 3 files changed, 10 insertions(+), 32 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index c18d4079c5..0e9c8cdd9f 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -509,8 +509,7 @@ package com.twitter.scalding { val resIter = reduceFnSer.get(key, values) while (resIter.hasNext) { val tup = Tuple.size(1) - val t2 = resIter.next - tup.set(0, t2) + tup.set(0, resIter.next) oc.add(tup) } @@ -519,6 +518,10 @@ package com.twitter.scalding { flowProcess.increment(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount, 1L) flowProcess.increment(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum, numValuesPerKey) flowProcess.increment(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum, numValuesPerKey * numValuesPerKey) + + // Uncomment the following to trigger the bug + //flowProcess.increment("TestGroup", "TestKey", 1) + } } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala index ae49b340c6..f78868f693 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala @@ -1820,7 +1820,8 @@ class CounterJobTest extends WordSpec with Matchers { class ReduceValueCountJob(args: Args) extends Job(args) { TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3)))) .group - .foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) } + .forceToReducers + .sum .toTypedPipe .map{ case (a: Int, (b: Int, c: Int)) => @@ -1833,6 +1834,9 @@ class ReduceValueCounterTest extends WordSpec with Matchers { "Reduce Values Count" should { JobTest(new com.twitter.scalding.ReduceValueCountJob(_)) .arg("output", "output0") + .counter("TestGroup", "TestKey"){ + x => println("PRINTING KEY AND GROUP! " + x) + } .counter(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount){ x => x should be(3) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala index 89494c7e9d..bdd29eebad 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/ReduceOperationsTest.scala @@ -163,33 +163,4 @@ class ReduceOperationsTest extends WordSpec with Matchers { .runHadoop .finish } - - "Std for number of values aassocated with each key " should { - class SDForReduceOperation(args: Args) extends Job(args) { - TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3)))) - .group - .foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) } - .toTypedPipe - .map{ - case (a: Int, (b: Int, c: Int)) => - (a, b, c) - } - .write(TypedTsv[(Int, Int, Int)](args("output"))) - } - - class ExperimentTest extends WordSpec with Matchers { - "A PageRank2 job" should { - JobTest(new com.twitter.scalding.ReduceValueCountJob(_)) - .arg("output", "blah") - .sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("blah")){ - tuples => - println("RES = " + tuples) - } - .run - .finish - } - - } - - } }