Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding second moment of values per key for Typed-API reduce operations #1279

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
30 changes: 26 additions & 4 deletions scalding-core/src/main/scala/com/twitter/scalding/Operations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,21 @@ package com.twitter.scalding {
}
}

/** In the typed API every reduce operation is handled by this Buffer */
private[scalding] object SkewMonitorCounters {
// Strangely, if group name and key name are different, then the counter would be zero
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is bug by itself. Can you confirm this is still true and not due to another issue? We don't want to make a bunch of groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a bug. if you simply put

flowProcess.increment("TestGroup", "TestKey", 1)

in def operate(flowProcess: FlowProcess[_], call: BufferCall[Any]) {

and this should print 3 in the test

ReduceValueCounterTest. However it prints

PRINTING KEY AND GROUP! 0

val KeyCount = "scalding.debug.reduce.key.count"
val ValuesCountSum = "scalding.debug.reduce.value.count.sum"
val ValuesCountSquareSum = "scalding.debug.reduce.value.count.sum.square"
}

class CountingIterator[T](wraps: Iterator[T]) extends Iterator[T] {
// This Wrapper lets us know how many values have been iterated in an Iterator
private[this] var nextCalls = 0L
def hasNext = wraps.hasNext
def next = { nextCalls += 1; wraps.next }
def seen: Long = nextCalls
}

class TypedBufferOp[K, V, U](
conv: TupleConverter[K],
@transient reduceFn: (K, Iterator[V]) => Iterator[U],
Expand All @@ -487,17 +501,25 @@ package com.twitter.scalding {
def operate(flowProcess: FlowProcess[_], call: BufferCall[Any]) {
val oc = call.getOutputCollector
val key = conv(call.getGroup)
val values = call.getArgumentsIterator
val values = new CountingIterator(call.getArgumentsIterator
.asScala
.map(_.getObject(0).asInstanceOf[V])
.map(_.getObject(0).asInstanceOf[V]))

// Avoiding a lambda here
val resIter = reduceFnSer.get(key, values)
while (resIter.hasNext) {
val tup = Tuple.size(1)
tup.set(0, resIter.next)
val t2 = resIter.next
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

tup.set(0, t2)
oc.add(tup)
}

val numValuesPerKey = values.seen

flowProcess.increment(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount, 1L)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better to use a group like "scalding.debug" since I think we want to group them up so they are easy to see next to each other in the job tracker UI.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want the group and counter to be the same string. We want the group to be something like "scalding debug" and the counter value is fine.

flowProcess.increment(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum, numValuesPerKey)
flowProcess.increment(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum, numValuesPerKey * numValuesPerKey)
}

}
}
40 changes: 40 additions & 0 deletions scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,46 @@ class CounterJobTest extends WordSpec with Matchers {
}
}

class ReduceValueCountJob(args: Args) extends Job(args) {
TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3))))
.group
.foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) }
.toTypedPipe
.map{
case (a: Int, (b: Int, c: Int)) =>
(a, b, c)
}
.write(TypedTsv[(Int, Int, Int)](args("output")))
}

class ReduceValueCounterTest extends WordSpec with Matchers {
"Reduce Values Count" should {
JobTest(new com.twitter.scalding.ReduceValueCountJob(_))
.arg("output", "output0")
.counter(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount){
x =>
x should be(3)
}
.counter(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum) {
x =>
// key 1 has two values, thus 2^2 = 4. key 2 and 3 has only one respectively
x should be(4 + 1 + 1)
}
.counter(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum) {
x =>
// sum of keys = 2 (for key 1) + 1 (for key 2) + 1 (for key 3)
x should be (2 + 1 + 1)
}
.sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("output0")){
tuples =>

}
.runHadoop
.finish
}

}

object DailySuffixTsvJob {
val strd1 = "2014-05-01"
val strd2 = "2014-05-02"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,33 @@ class ReduceOperationsTest extends WordSpec with Matchers {
.runHadoop
.finish
}

"Std for number of values aassocated with each key " should {
class SDForReduceOperation(args: Args) extends Job(args) {
TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3))))
.group
.foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could do .forceToReducers.sum and it should get the same effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. FIxed.

.toTypedPipe
.map{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map { note the space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

case (a: Int, (b: Int, c: Int)) =>
(a, b, c)
}
.write(TypedTsv[(Int, Int, Int)](args("output")))
}

class ExperimentTest extends WordSpec with Matchers {
"A PageRank2 job" should {
JobTest(new com.twitter.scalding.ReduceValueCountJob(_))
.arg("output", "blah")
.sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("blah")){
tuples =>
println("RES = " + tuples)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove println statements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Dup-test. Thanks for the catch

}
.run
.finish
}

}

}
}