Skip to content

Commit

Permalink
Cherrypick for 0.17.3 (#1730)
Browse files Browse the repository at this point in the history
* protect against null counters

The `getCounter` method of the `Reporter` returned from `HadoopFlowProcess` was
returning null in some cases for a few jobs that we run in production. (It is
unclear why these jobs were seeing null counters.)

From looking at the Hadoop source code, getCounter does return null in some instances,
in particular the Reporter.NULL implementation unconditionally returns null from
its getCounter implementation. Hadoop does this despite not documenting that null
is a valid return value.

Solution: Null check the return value of `Reporter.getCounter` to workaround the issue.

Fixes #1716

* Use a null check rather than foreach

* Prepare for 0.17.3
johnynek authored Sep 30, 2017
1 parent 184ec94 commit db4f268
Showing 4 changed files with 16 additions and 4 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Scalding #

## Version 0.17.3 ###
Cherry picks of develop PRs to fix an issue with null counter reporters
* https://github.com/twitter/scalding/pull/1729
* https://github.com/twitter/scalding/pull/1726

### Version 0.17.2 ###
This version is basically the same as 0.17.1 but backward compatible with 0.17.0.
* Revert memory estimator changes on 0.17.x branch: #1704
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -97,7 +97,7 @@ We use [Coveralls](https://coveralls.io/r/twitter/scalding) for code coverage re

Scalding modules are available from maven central.

The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.17.2`.
The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.17.3`.

Current published artifacts are

2 changes: 1 addition & 1 deletion scalding-core/src/main/scala/com/twitter/package.scala
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ package object scalding {
/**
* Make sure this is in sync with version.sbt
*/
val scaldingVersion: String = "0.17.2"
val scaldingVersion: String = "0.17.3"

object RichPathFilter {
implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f)
11 changes: 9 additions & 2 deletions scalding-core/src/main/scala/com/twitter/scalding/Stats.scala
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import cascading.flow.{ Flow, FlowListener, FlowDef, FlowProcess }
import cascading.flow.hadoop.HadoopFlowProcess
import cascading.stats.CascadingStats
import java.util.concurrent.ConcurrentHashMap
import org.apache.hadoop.mapreduce.Counter
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -64,8 +65,14 @@ private[scalding] case class GenericFlowPCounterImpl(fp: FlowProcess[_], statKey
}

private[scalding] case class HadoopFlowPCounterImpl(fp: HadoopFlowProcess, statKey: StatKey) extends CounterImpl {
private[this] val cntr = fp.getReporter().getCounter(statKey.group, statKey.counter)
override def increment(amount: Long): Unit = cntr.increment(amount)
// we use a nullable type here for efficiency
private[this] val counter: Counter = (for {
r <- Option(fp.getReporter)
c <- Option(r.getCounter(statKey.group, statKey.counter))
} yield c).orNull

override def increment(amount: Long): Unit =
if (counter != null) counter.increment(amount) else ()
}

object Stat {

0 comments on commit db4f268

Please sign in to comment.