From c8e4d28a43683d991356e8312a915b0c4625b6b3 Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Fri, 14 Jul 2017 14:06:52 -0700 Subject: [PATCH 1/3] Setting version to 0.17.2 --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index b2722e8c5f..9da74bfb67 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.17.2" +version in ThisBuild := "0.17.2" \ No newline at end of file From c9bac2fa9973fa00d7bf063727bc9cbedaa6a7e1 Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Wed, 23 May 2018 16:12:42 -0700 Subject: [PATCH 2/3] Fix artifactory --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index a21592c1d5..f86d9c1913 100644 --- a/build.sbt +++ b/build.sbt @@ -71,7 +71,7 @@ val sharedSettings = assemblySettings ++ scalariformSettings ++ Seq( Opts.resolver.sonatypeSnapshots, Opts.resolver.sonatypeReleases, "Concurrent Maven Repo" at "http://conjars.org/repo", - "Twitter Maven" at "http://maven.twttr.com", + "Twitter Maven" at "https://maven.twttr.com", "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/" ), From d5c5760a40532357cdc7ecc1f46e5d4f72a8e8d8 Mon Sep 17 00:00:00 2001 From: Timur Abishev Date: Wed, 23 May 2018 16:13:08 -0700 Subject: [PATCH 3/3] Introduce KeyGrouping and print Ordering info to stdout during compilation --- .../source/VersionedKeyValSource.scala | 11 ++- .../com/twitter/scalding/CumulativeSum.scala | 9 +- .../twitter/scalding/mathematics/Matrix.scala | 11 ++- .../scalding/mathematics/Matrix2.scala | 86 ++++++++++--------- .../mathematics/TypedSimilarity.scala | 34 ++++---- .../twitter/scalding/typed/KeyGrouping.scala | 25 ++++++ .../scalding/typed/KeyGroupingMacro.scala | 54 ++++++++++++ .../twitter/scalding/typed/LookupJoin.scala | 9 +- .../com/twitter/scalding/typed/Sketched.scala | 19 ++-- .../twitter/scalding/typed/TypedPipe.scala | 52 ++++++----- .../scalding/typed/TypedPipeDiff.scala | 14 +-- 11 files changed, 196 insertions(+), 128 deletions(-) create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/typed/KeyGrouping.scala create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/typed/KeyGroupingMacro.scala diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala index 0a5878c762..3dc4170c94 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala @@ -29,9 +29,8 @@ import com.twitter.scalding._ import com.twitter.scalding.commons.scheme.KeyValueByteScheme import com.twitter.scalding.commons.tap.VersionedTap import com.twitter.scalding.commons.tap.VersionedTap.TapMode -import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck } -import com.twitter.scalding.typed.KeyedListLike -import com.twitter.scalding.typed.TypedSink +import com.twitter.scalding.source.{CheckedInversion, MaxFailuresCheck} +import com.twitter.scalding.typed.{KeyGrouping, KeyedListLike, TypedSink} import org.apache.hadoop.mapred.JobConf import scala.collection.JavaConverters._ @@ -211,13 +210,13 @@ class VersionedKeyValSource[K, V](val path: String, val sourceVersion: Option[Lo object RichPipeEx extends java.io.Serializable { implicit def pipeToRichPipeEx(pipe: Pipe): RichPipeEx = new RichPipeEx(pipe) - implicit def typedPipeToRichPipeEx[K: Ordering, V: Monoid](pipe: TypedPipe[(K, V)]): TypedRichPipeEx[K, V] = + implicit def typedPipeToRichPipeEx[K: KeyGrouping, V: Monoid](pipe: TypedPipe[(K, V)]): TypedRichPipeEx[K, V] = new TypedRichPipeEx(pipe) - implicit def keyedListLikeToRichPipeEx[K: Ordering, V: Monoid, T[K, +V] <: KeyedListLike[K, V, T]]( + implicit def keyedListLikeToRichPipeEx[K: KeyGrouping, V: Monoid, T[K, +V] <: KeyedListLike[K, V, T]]( kll: KeyedListLike[K, V, T]): TypedRichPipeEx[K, V] = typedPipeToRichPipeEx(kll.toTypedPipe) } -class TypedRichPipeEx[K: Ordering, V: Monoid](pipe: TypedPipe[(K, V)]) extends java.io.Serializable { +class TypedRichPipeEx[K: KeyGrouping, V: Monoid](pipe: TypedPipe[(K, V)]) extends java.io.Serializable { import Dsl._ import TDsl._ diff --git a/scalding-core/src/main/scala/com/twitter/scalding/CumulativeSum.scala b/scalding-core/src/main/scala/com/twitter/scalding/CumulativeSum.scala index b26882c732..39088e0619 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/CumulativeSum.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/CumulativeSum.scala @@ -1,6 +1,7 @@ package com.twitter.scalding.typed import com.twitter.algebird._ +import com.twitter.scalding.serialization.macros.impl.BinaryOrdering.ordSer /** * Extension for TypedPipe to add a cumulativeSum method. @@ -39,7 +40,7 @@ object CumulativeSum { def cumulativeSum( implicit sg: Semigroup[V], ordU: Ordering[U], - ordK: Ordering[K]): SortedGrouped[K, (U, V)] = { + ordK: KeyGrouping[K]): SortedGrouped[K, (U, V)] = { pipe.group .sortBy { case (u, _) => u } .scanLeft(Nil: List[(U, V)]) { @@ -59,17 +60,17 @@ object CumulativeSum { * partitions for a single key to go through a single scan. */ def cumulativeSum[S](partition: U => S)( - implicit ordS: Ordering[S], + implicit ordS: KeyGrouping[S], sg: Semigroup[V], ordU: Ordering[U], - ordK: Ordering[K]): TypedPipe[(K, (U, V))] = { + ordK: KeyGrouping[K]): TypedPipe[(K, (U, V))] = { val sumPerS = pipe .map { case (k, (u, v)) => (k, partition(u)) -> v } .sumByKey .map { case ((k, s), v) => (k, (s, v)) } .group - .sortBy { case (s, v) => s } + .sortBy { case (s, v) => s }(ordS.ord) .scanLeft(None: Option[(Option[V], V, S)]) { case (acc, (s, v)) => acc match { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Matrix.scala b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Matrix.scala index be2e0f3d8c..4794f62723 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Matrix.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Matrix.scala @@ -15,10 +15,9 @@ limitations under the License. */ package com.twitter.scalding.mathematics -import com.twitter.algebird.{ Monoid, Group, Ring, Field } -import com.twitter.algebird.field._ // backwards compatiblity support +import com.twitter.algebird.{Field, Group, Monoid, Ring} +import com.twitter.algebird.field._ import com.twitter.scalding._ - import cascading.pipe.assembly._ import cascading.pipe.joiner._ import cascading.pipe.Pipe @@ -26,8 +25,8 @@ import cascading.tuple.Fields import cascading.tuple._ import cascading.flow._ import cascading.tap._ - import com.twitter.scalding.Dsl._ +import com.twitter.scalding.typed.KeyGrouping import scala.math.max import scala.annotation.tailrec @@ -137,11 +136,11 @@ class MatrixMappableExtensions[T](mappable: Mappable[T])(implicit fd: FlowDef, m new Matrix[Row, Col, Val]('row, 'col, 'val, matPipe) } - def toBlockMatrix[Group, Row, Col, Val](implicit ev: <:<[T, (Group, Row, Col, Val)], ord: Ordering[(Group, Row)], + def toBlockMatrix[Group, Row, Col, Val](implicit ev: <:<[T, (Group, Row, Col, Val)], ord: KeyGrouping[(Group, Row)], setter: TupleSetter[(Group, Row, Col, Val)]): BlockMatrix[Group, Row, Col, Val] = mapToBlockMatrix { _.asInstanceOf[(Group, Row, Col, Val)] } - def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: Ordering[(Group, Row)]): BlockMatrix[Group, Row, Col, Val] = { + def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: KeyGrouping[(Group, Row)]): BlockMatrix[Group, Row, Col, Val] = { val matPipe = TypedPipe .from(mappable) .map(fn) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Matrix2.scala b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Matrix2.scala index afb3aa1d0c..8bbca77244 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Matrix2.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Matrix2.scala @@ -18,13 +18,12 @@ package com.twitter.scalding.mathematics import cascading.flow.FlowDef import cascading.pipe.Pipe import cascading.tuple.Fields -import com.twitter.scalding.serialization.{ OrderedSerialization, OrderedSerialization2 } import com.twitter.scalding._ -import com.twitter.scalding.typed.{ ValuePipe, EmptyValue, LiteralValue, ComputedValue } -import com.twitter.algebird.{ Semigroup, Monoid, Ring, Group, Field } +import com.twitter.scalding.typed.{ ComputedValue, EmptyValue, KeyGrouping, LiteralValue, ValuePipe } +import com.twitter.algebird.{ Field, Group, Monoid, Ring, Semigroup } +import com.twitter.scalding.serialization.macros.impl.BinaryOrdering import scala.collection.mutable.Map import scala.collection.mutable.HashMap - import java.io.Serializable /** @@ -42,8 +41,8 @@ import java.io.Serializable * a TypedPipe (call toTypedPipe) or the result may not be correct. */ sealed trait Matrix2[R, C, V] extends Serializable { - implicit def rowOrd: Ordering[R] - implicit def colOrd: Ordering[C] + implicit def rowOrd: KeyGrouping[R] + implicit def colOrd: KeyGrouping[C] val sizeHint: SizeHint = NoClue def +(that: Matrix2[R, C, V])(implicit mon: Monoid[V]): Matrix2[R, C, V] = Sum(this, that, mon) def -(that: Matrix2[R, C, V])(implicit g: Group[V]): Matrix2[R, C, V] = Sum(this, that.negate, g) @@ -104,7 +103,7 @@ sealed trait Matrix2[R, C, V] extends Serializable { //This cast will always succeed: lazy val joinedBool = mj.join(this.asInstanceOf[Matrix2[R, C, Boolean]], vec) - implicit val ord2: Ordering[C2] = vec.colOrd + implicit val ord2: KeyGrouping[(R, C2)] = KeyGrouping.tuple2(rowOrd, vec.colOrd) lazy val resultPipe = joinedBool.flatMap { case (key, ((row, bool), (col2, v))) => if (bool) Some((row, col2), v) else None // filter early @@ -113,7 +112,7 @@ sealed trait Matrix2[R, C, V] extends Serializable { .sum .filter { kv => mon.isNonZero(kv._2) } .map { case ((r, c2), v) => (r, c2, v) } - MatrixLiteral(resultPipe, this.sizeHint) + MatrixLiteral(resultPipe, this.sizeHint)(rowOrd, vec.colOrd) } def propagateRow[C2](mat: Matrix2[C, C2, Boolean])(implicit ev: =:=[R, Unit], mon: Monoid[V], mj: MatrixJoiner2): Matrix2[Unit, C2, V] = @@ -158,30 +157,30 @@ sealed trait Matrix2[R, C, V] extends Serializable { def getRow(index: R): Matrix2[Unit, C, V] = MatrixLiteral( toTypedPipe - .filter { case (r, c, v) => Ordering[R].equiv(r, index) } - .map { case (r, c, v) => ((), c, v) }, this.sizeHint.setRows(1L)) + .filter { case (r, c, v) => rowOrd.ord.equiv(r, index) } + .map { case (r, c, v) => ((), c, v) }, this.sizeHint.setRows(1L))(KeyGrouping(BinaryOrdering.ordSer[Unit]), colOrd) def getColumn(index: C): Matrix2[R, Unit, V] = MatrixLiteral( toTypedPipe - .filter { case (r, c, v) => Ordering[C].equiv(c, index) } - .map { case (r, c, v) => (r, (), v) }, this.sizeHint.setCols(1L)) + .filter { case (r, c, v) => colOrd.ord.equiv(c, index) } + .map { case (r, c, v) => (r, (), v) }, this.sizeHint.setCols(1L))(rowOrd, KeyGrouping(BinaryOrdering.ordSer[Unit])) /** * Consider this Matrix as the r2 row of a matrix. The current matrix must be a row, * which is to say, its row type must be Unit. */ - def asRow[R2](r2: R2)(implicit ev: R =:= Unit, rowOrd: Ordering[R2]): Matrix2[R2, C, V] = + def asRow[R2](r2: R2)(implicit ev: R =:= Unit, rowOrd: KeyGrouping[R2]): Matrix2[R2, C, V] = MatrixLiteral(toTypedPipe.map { case (r, c, v) => (r2, c, v) }, this.sizeHint) - def asCol[C2](c2: C2)(implicit ev: C =:= Unit, colOrd: Ordering[C2]): Matrix2[R, C2, V] = + def asCol[C2](c2: C2)(implicit ev: C =:= Unit, colOrd: KeyGrouping[C2]): Matrix2[R, C2, V] = MatrixLiteral(toTypedPipe.map { case (r, c, v) => (r, c2, v) }, this.sizeHint) // Compute the sum of the main diagonal. Only makes sense cases where the row and col type are // equal def trace(implicit mon: Monoid[V], ev: =:=[R, C]): Scalar2[V] = Scalar2(toTypedPipe.asInstanceOf[TypedPipe[(R, R, V)]] - .filter{ case (r1, r2, _) => Ordering[R].equiv(r1, r2) } + .filter{ case (r1, r2, _) => rowOrd.ord.equiv(r1, r2) } .map{ case (_, _, x) => x } .sum(mon)) @@ -216,7 +215,7 @@ object MatrixJoiner2 { class DefaultMatrixJoiner(sizeRatioThreshold: Long) extends MatrixJoiner2 { def join[R, C, V, C2, V2](left: Matrix2[R, C, V], right: Matrix2[C, C2, V2]): TypedPipe[(C, ((R, V), (C2, V2)))] = { - implicit val cOrd: Ordering[C] = left.colOrd + implicit val cOrd: KeyGrouping[C] = left.colOrd val one = left.toTypedPipe.map { case (r, c, v) => (c, (r, v)) }.group val two = right.toTypedPipe.map { case (c, c2, v2) => (c, (c2, v2)) }.group val sizeOne = left.sizeHint.total.getOrElse(BigInt(1L)) @@ -242,9 +241,9 @@ class DefaultMatrixJoiner(sizeRatioThreshold: Long) extends MatrixJoiner2 { /** * Infinite column vector - only for intermediate computations */ -case class OneC[R, V](implicit override val rowOrd: Ordering[R]) extends Matrix2[R, Unit, V] { +case class OneC[R, V](implicit override val rowOrd: KeyGrouping[R]) extends Matrix2[R, Unit, V] { override val sizeHint: SizeHint = FiniteHint(Long.MaxValue, 1) - override def colOrd = Ordering[Unit] + override def colOrd = KeyGrouping(BinaryOrdering.ordSer[Unit]) def transpose = OneR() override def negate(implicit g: Group[V]) = sys.error("Only used in intermediate computations, try (-1 * OneC)") def toTypedPipe = sys.error("Only used in intermediate computations") @@ -253,9 +252,9 @@ case class OneC[R, V](implicit override val rowOrd: Ordering[R]) extends Matrix2 /** * Infinite row vector - only for intermediate computations */ -case class OneR[C, V](implicit override val colOrd: Ordering[C]) extends Matrix2[Unit, C, V] { +case class OneR[C, V](implicit override val colOrd: KeyGrouping[C]) extends Matrix2[Unit, C, V] { override val sizeHint: SizeHint = FiniteHint(1, Long.MaxValue) - override def rowOrd = Ordering[Unit] + override def rowOrd = KeyGrouping(BinaryOrdering.ordSer[Unit]) def transpose = OneC() override def negate(implicit g: Group[V]) = sys.error("Only used in intermediate computations, try (-1 * OneR)") def toTypedPipe = sys.error("Only used in intermediate computations") @@ -294,10 +293,10 @@ case class Product[R, C, C2, V](left: Matrix2[R, C, V], val localRing = ring val joined = (if (leftMatrix) { - val ord: Ordering[R] = left.rowOrd + val ord: KeyGrouping[R] = left.rowOrd left.toTypedPipe.groupBy(x => x._1)(ord) } else { - val ord: Ordering[C] = right.rowOrd + val ord: KeyGrouping[C] = right.rowOrd right.toTypedPipe.groupBy(x => x._1)(ord) }).mapValues { _._3 } .sum(localRing) @@ -316,7 +315,7 @@ case class Product[R, C, C2, V](left: Matrix2[R, C, V], if (isSpecialCase) { specialCase } else { - implicit val ord: Ordering[C] = right.rowOrd + implicit val ord: KeyGrouping[C] = right.rowOrd val localRing = ring joiner.join(left, right) .map { case (key, ((l1, lv), (r2, rv))) => (l1, r2, localRing.times(lv, rv)) } @@ -356,9 +355,10 @@ case class Product[R, C, C2, V](left: Matrix2[R, C, V], override val sizeHint = left.sizeHint * right.sizeHint - implicit override val rowOrd: Ordering[R] = left.rowOrd - implicit override val colOrd: Ordering[C2] = right.colOrd - implicit def withOrderedSerialization: Ordering[(R, C2)] = OrderedSerialization2.maybeOrderedSerialization2(rowOrd, colOrd) + implicit override val rowOrd: KeyGrouping[R] = left.rowOrd + implicit override val colOrd: KeyGrouping[C2] = right.colOrd + implicit def withOrderedSerialization: KeyGrouping[(R, C2)] = + KeyGrouping.tuple2(rowOrd, colOrd) override lazy val transpose: Product[C2, C, R, V] = Product(right.transpose, left.transpose, ring) override def negate(implicit g: Group[V]): Product[R, C, C2, V] = { @@ -381,12 +381,12 @@ case class Product[R, C, C2, V](left: Matrix2[R, C, V], if (cost1 > cost2) { val product2 = plan2.asInstanceOf[Product[C, R, C, V]] val ord = left.colOrd - val filtered = product2.toOuterSum.filter{ case (c1, c2, _) => ord.equiv(c1, c2) } + val filtered = product2.toOuterSum.filter{ case (c1, c2, _) => ord.ord.equiv(c1, c2) } Scalar2(product2.computePipe(filtered).map{ case (_, _, x) => x }.sum(mon)) } else { val product1 = plan1.asInstanceOf[Product[R, C, R, V]] val ord = left.rowOrd - val filtered = product1.toOuterSum.filter{ case (r1, r2, _) => ord.equiv(r1, r2) } + val filtered = product1.toOuterSum.filter{ case (r1, r2, _) => ord.ord.equiv(r1, r2) } Scalar2(product1.computePipe(filtered).map{ case (_, _, x) => x }.sum(mon)) } @@ -435,9 +435,10 @@ case class Sum[R, C, V](left: Matrix2[R, C, V], right: Matrix2[R, C, V], mon: Mo override val sizeHint = left.sizeHint + right.sizeHint - implicit override val rowOrd: Ordering[R] = left.rowOrd - implicit override val colOrd: Ordering[C] = left.colOrd - implicit def withOrderedSerialization: Ordering[(R, C)] = OrderedSerialization2.maybeOrderedSerialization2(rowOrd, colOrd) + implicit override val rowOrd: KeyGrouping[R] = left.rowOrd + implicit override val colOrd: KeyGrouping[C] = left.colOrd + implicit def withOrderedSerialization: KeyGrouping[(R, C)] = + KeyGrouping.tuple2(rowOrd, colOrd) override lazy val transpose: Sum[C, R, V] = Sum(left.transpose, right.transpose, mon) override def negate(implicit g: Group[V]): Sum[R, C, V] = Sum(left.negate, right.negate, mon) @@ -447,7 +448,7 @@ case class Sum[R, C, V](left: Matrix2[R, C, V], right: Matrix2[R, C, V], mon: Mo override def trace(implicit mon: Monoid[V], ev: =:=[R, C]): Scalar2[V] = Scalar2(collectAddends(this).map { pipe => pipe.asInstanceOf[TypedPipe[(R, R, V)]] - .filter { case (r, c, v) => Ordering[R].equiv(r, c) } + .filter { case (r, c, v) => rowOrd.ord.equiv(r, c) } .map { _._3 } }.reduce(_ ++ _).sum) } @@ -480,13 +481,14 @@ case class HadamardProduct[R, C, V](left: Matrix2[R, C, V], else HadamardProduct(left.negate, right, ring) - implicit override val rowOrd: Ordering[R] = left.rowOrd - implicit override val colOrd: Ordering[C] = left.colOrd - implicit def withOrderedSerialization: Ordering[(R, C)] = OrderedSerialization2.maybeOrderedSerialization2(rowOrd, colOrd) + implicit override val rowOrd: KeyGrouping[R] = left.rowOrd + implicit override val colOrd: KeyGrouping[C] = left.colOrd + implicit def withOrderedSerialization: KeyGrouping[(R, C)] = + KeyGrouping.tuple2(rowOrd, colOrd) } case class MatrixLiteral[R, C, V](override val toTypedPipe: TypedPipe[(R, C, V)], - override val sizeHint: SizeHint)(implicit override val rowOrd: Ordering[R], override val colOrd: Ordering[C]) + override val sizeHint: SizeHint)(implicit override val rowOrd: KeyGrouping[R], override val colOrd: KeyGrouping[C]) extends Matrix2[R, C, V] { override lazy val transpose: MatrixLiteral[C, R, V] = @@ -535,10 +537,10 @@ trait Scalar2[V] extends Serializable { case s @ Sum(left, right, mon) => Sum(this * left, this * right, mon) case m @ MatrixLiteral(_, _) => timesLiteral(m) // handle literals here case x @ OneC() => - Product(OneC[Unit, V](), toMatrix, ring) + Product(OneC[Unit, V]()(KeyGrouping(BinaryOrdering.ordSer[Unit])), toMatrix, ring) .asInstanceOf[Matrix2[R, C, V]] case x @ OneR() => - Product(toMatrix, OneR[Unit, V](), ring) + Product(toMatrix, OneR[Unit, V]()(KeyGrouping(BinaryOrdering.ordSer[Unit])), ring) .asInstanceOf[Matrix2[R, C, V]] } @@ -562,7 +564,7 @@ trait Scalar2[V] extends Serializable { def map[U](fn: V => U): Scalar2[U] = Scalar2(value.map(fn)) def toMatrix: Matrix2[Unit, Unit, V] = - MatrixLiteral(value.toTypedPipe.map(v => ((), (), v)), FiniteHint(1, 1)) + MatrixLiteral(value.toTypedPipe.map(v => ((), (), v)), FiniteHint(1, 1))(KeyGrouping(BinaryOrdering.ordSer[Unit]), KeyGrouping(BinaryOrdering.ordSer[Unit])) // TODO: FunctionMatrix[R,C,V](fn: (R,C) => V) and a Literal scalar is just: FuctionMatrix[Unit, Unit, V]({ (_, _) => v }) } @@ -582,14 +584,14 @@ object Scalar2 { } object Matrix2 { - def apply[R: Ordering, C: Ordering, V](t: TypedPipe[(R, C, V)], hint: SizeHint): Matrix2[R, C, V] = + def apply[R: KeyGrouping, C: KeyGrouping, V](t: TypedPipe[(R, C, V)], hint: SizeHint): Matrix2[R, C, V] = MatrixLiteral(t, hint) def read[R, C, V](t: TypedSource[(R, C, V)], - hint: SizeHint)(implicit ordr: Ordering[R], ordc: Ordering[C]): Matrix2[R, C, V] = + hint: SizeHint)(implicit ordr: KeyGrouping[R], ordc: KeyGrouping[C]): Matrix2[R, C, V] = MatrixLiteral(TypedPipe.from(t), hint) - def J[R, C, V](implicit ordR: Ordering[R], ordC: Ordering[C], ring: Ring[V], mj: MatrixJoiner2) = + def J[R, C, V](implicit ordR: KeyGrouping[R], ordC: KeyGrouping[C], ring: Ring[V], mj: MatrixJoiner2) = Product(OneC[R, V]()(ordR), OneR[C, V]()(ordC), ring) /** diff --git a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala index eb3ae6c14e..2e3c4bf1ac 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala @@ -15,8 +15,7 @@ limitations under the License. */ package com.twitter.scalding.mathematics -import com.twitter.scalding.typed.{ Grouped, TypedPipe, WithReducers } - +import com.twitter.scalding.typed.{Grouped, KeyGrouping, TypedPipe, WithReducers} import java.io.Serializable /** @@ -52,17 +51,17 @@ object GraphOperations extends Serializable { .values // Returns all Vertices with non-zero in-degree - def withInDegree[N, E](g: TypedPipe[Edge[N, E]])(implicit ord: Ordering[N]): TypedPipe[Edge[N, (E, InDegree)]] = joinAggregate(g.groupBy { _.to }) { it => + def withInDegree[N, E](g: TypedPipe[Edge[N, E]])(implicit ord: KeyGrouping[N]): TypedPipe[Edge[N, (E, InDegree)]] = joinAggregate(g.groupBy { _.to }) { it => InDegree(it.size) } // Returns all Vertices with non-zero out-degree - def withOutDegree[N, E](g: TypedPipe[Edge[N, E]])(implicit ord: Ordering[N]): TypedPipe[Edge[N, (E, OutDegree)]] = joinAggregate(g.groupBy { _.from }) { it => + def withOutDegree[N, E](g: TypedPipe[Edge[N, E]])(implicit ord: KeyGrouping[N]): TypedPipe[Edge[N, (E, OutDegree)]] = joinAggregate(g.groupBy { _.from }) { it => OutDegree(it.size) } // Returns all Vertices with weights and non-zero norms - def withInNorm[N, E](g: TypedPipe[Edge[N, Weight]])(implicit ord: Ordering[N]): TypedPipe[Edge[N, (Weight, L2Norm)]] = joinAggregate(g.groupBy { _.to }) { it => + def withInNorm[N, E](g: TypedPipe[Edge[N, Weight]])(implicit ord: KeyGrouping[N]): TypedPipe[Edge[N, (Weight, L2Norm)]] = joinAggregate(g.groupBy { _.to }) { it => val norm = scala.math.sqrt( it.iterator.map { a => val x = a.data.weight @@ -88,7 +87,7 @@ case class SetSimilarity(intersection: Int, sizeLeft: Int, sizeRight: Int) { } trait TypedSimilarity[N, E, S] extends Serializable { - def nodeOrdering: Ordering[N] + def nodeOrdering: KeyGrouping[N] /** * Given a TypedPipe of edges, and a predicate for a smaller group (smallpred) of nodes * and a bigger group (bigpred), compute the similarity between each item in the two sets @@ -115,8 +114,8 @@ object TypedSimilarity extends Serializable { // key: document, // value: (word, documentsWithWord) // return: Edge of similarity between words measured by documents - def exactSetSimilarity[N: Ordering](g: Grouped[N, (N, Int)], - smallpred: N => Boolean, bigpred: N => Boolean): TypedPipe[Edge[N, SetSimilarity]] = + def exactSetSimilarity[N: KeyGrouping](g: Grouped[N, (N, Int)], + smallpred: N => Boolean, bigpred: N => Boolean): TypedPipe[Edge[N, SetSimilarity]] = { /* E_{ij} = 1 if document -> word exists * (E^T E)_ij = # of shared documents of i,j * = \sum_k E_ki E_kj @@ -131,14 +130,15 @@ object TypedSimilarity extends Serializable { .group, g.reducers) // Use reduceLeft to push to reducers, no benefit in mapside here .reduceLeft { (left, right) => - // The degrees we always take the left: - val (leftCnt, deg1, deg2) = left - (leftCnt + right._1, deg1, deg2) - } + // The degrees we always take the left: + val (leftCnt, deg1, deg2) = left + (leftCnt + right._1, deg1, deg2) + } .map { case ((node1, node2), (cnt, deg1, deg2)) => Edge(node1, node2, SetSimilarity(cnt, deg1, deg2)) } + } /* * key: document, @@ -146,7 +146,7 @@ object TypedSimilarity extends Serializable { * return: Edge of similarity between words measured by documents * See: http://arxiv.org/pdf/1206.2082v2.pdf */ - def discoCosineSimilarity[N: Ordering](smallG: Grouped[N, (N, Int)], + def discoCosineSimilarity[N: KeyGrouping](smallG: Grouped[N, (N, Int)], bigG: Grouped[N, (N, Int)], oversample: Double): TypedPipe[Edge[N, Double]] = { // 1) make rnd lazy due to serialization, // 2) fix seed so that map-reduce speculative execution does not give inconsistent results. @@ -183,7 +183,7 @@ object TypedSimilarity extends Serializable { * return: Edge of similarity between words measured by documents * See: http://stanford.edu/~rezab/papers/dimsum.pdf */ - def dimsumCosineSimilarity[N: Ordering](smallG: Grouped[N, (N, Double, Double)], + def dimsumCosineSimilarity[N: KeyGrouping](smallG: Grouped[N, (N, Double, Double)], bigG: Grouped[N, (N, Double, Double)], oversample: Double): TypedPipe[Edge[N, Double]] = { lazy val rnd = new scala.util.Random(1024) maybeWithReducers(smallG.cogroup(bigG) { (n: N, leftit: Iterator[(N, Double, Double)], rightit: Iterable[(N, Double, Double)]) => @@ -217,7 +217,7 @@ object TypedSimilarity extends Serializable { * This algothm is just matrix multiplication done by hand to make it * clearer when we do the sampling implementation */ -class ExactInCosine[N](reducers: Int = -1)(implicit override val nodeOrdering: Ordering[N]) extends TypedSimilarity[N, InDegree, Double] { +class ExactInCosine[N](reducers: Int = -1)(implicit override val nodeOrdering: KeyGrouping[N]) extends TypedSimilarity[N, InDegree, Double] { def apply(graph: TypedPipe[Edge[N, InDegree]], smallpred: N => Boolean, bigpred: N => Boolean): TypedPipe[Edge[N, Double]] = { @@ -238,7 +238,7 @@ class ExactInCosine[N](reducers: Int = -1)(implicit override val nodeOrdering: O * boundedProb: the probability we have larger than delta error * see: http://arxiv.org/pdf/1206.2082v2.pdf for more details */ -class DiscoInCosine[N](minCos: Double, delta: Double, boundedProb: Double, reducers: Int = -1)(implicit override val nodeOrdering: Ordering[N]) extends TypedSimilarity[N, InDegree, Double] { +class DiscoInCosine[N](minCos: Double, delta: Double, boundedProb: Double, reducers: Int = -1)(implicit override val nodeOrdering: KeyGrouping[N]) extends TypedSimilarity[N, InDegree, Double] { // The probability of being more than delta error is approx: // boundedProb ~ exp(-p delta^2 / 2) @@ -262,7 +262,7 @@ class DiscoInCosine[N](minCos: Double, delta: Double, boundedProb: Double, reduc } -class DimsumInCosine[N](minCos: Double, delta: Double, boundedProb: Double, reducers: Int = -1)(implicit override val nodeOrdering: Ordering[N]) extends TypedSimilarity[N, (Weight, L2Norm), Double] { +class DimsumInCosine[N](minCos: Double, delta: Double, boundedProb: Double, reducers: Int = -1)(implicit override val nodeOrdering: KeyGrouping[N]) extends TypedSimilarity[N, (Weight, L2Norm), Double] { // The probability of being more than delta error is approx: // boundedProb ~ exp(-p delta^2 / 2) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyGrouping.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyGrouping.scala new file mode 100644 index 0000000000..3c62c59a74 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyGrouping.scala @@ -0,0 +1,25 @@ +package com.twitter.scalding.typed + +import com.twitter.scalding.serialization.{ + OrderedSerialization, + OrderedSerialization2 +} +import scala.math.Ordering + +case class KeyGrouping[T](ord: Ordering[T]) + +object KeyGrouping { + import scala.language.experimental.macros + + implicit def tuple2[T1, T2](implicit ord1: KeyGrouping[T1], + ord2: KeyGrouping[T2]): KeyGrouping[(T1, T2)] = + KeyGrouping[(T1, T2)]( + OrderedSerialization2.maybeOrderedSerialization2(ord1.ord, ord2.ord)) + + implicit def ordSerConvert[T]( + ordSer: OrderedSerialization[T]): KeyGrouping[T] = KeyGrouping(ordSer) + // add unsafe convert as well to make everything compatible? add it somehow in a way with macro log? + // implicit def unsafeConvert[T](ord: Ordering[T]): KeyGrouping[T] = KeyGrouping(ord) + + implicit def convert[T]: KeyGrouping[T] = macro KeyGroupingMacro[T] +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyGroupingMacro.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyGroupingMacro.scala new file mode 100644 index 0000000000..63eb7a8d21 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/KeyGroupingMacro.scala @@ -0,0 +1,54 @@ +package com.twitter.scalding.typed + +import scala.math.Ordering + +object KeyGroupingMacro { + import reflect.macros.Context + + def apply[T](c: Context)( + implicit T: c.WeakTypeTag[T]): c.Expr[KeyGrouping[T]] = { + import c.mirror._ + import c.universe._ + + val ordTypeTag = implicitly[c.WeakTypeTag[Ordering[T]]] + val implicitInScope = c.inferImplicitValue(ordTypeTag.tpe) + + if (implicitInScope.toString() == "") + throw new IllegalArgumentException("Just can't") + + val pos = c.macroApplication.pos + + val desc = ordSerDesc(implicitInScope.toString()) + .getOrElse(implicitInScope.toString()) + val path = + if (pos.source.path.contains("/workspace/")) + pos.source.path.substring(pos.source.path.indexOf("/workspace/")) + else + pos.source.path + + val info = s">>> ordering | ${sanitize(path)} | ${pos.line} | " + + s"${sanitize(T.tpe.toString())} | ${sanitize(desc)} <<<" + println(info) + + val call = + q"""com.twitter.scalding.typed.KeyGrouping[$T]($implicitInScope)""" + c.Expr[KeyGrouping[T]](call) + } + + def sanitize(s: String): String = + s.replace(">>>", "!!!") + .replace("<<<", "!!!") + .replace("|", "!") + .replace('\n', ' ') + + def ordSerDesc(desc: String): Option[String] = { + val str = + "com.twitter.scalding.serialization.macros.impl.ordered_serialization.runtime_helpers.MacroEqualityOrderedSerialization[" + if (desc.contains(str)) { + val tmp = desc.substring(desc.indexOf(str) + str.length) + Some("ordered serialization for: " + tmp.substring(0, tmp.indexOf("]"))) + } else { + None + } + } +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/LookupJoin.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/LookupJoin.scala index 4df34601a0..0365dbd90d 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/LookupJoin.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/LookupJoin.scala @@ -17,7 +17,6 @@ package com.twitter.scalding.typed import java.io.Serializable - import com.twitter.algebird.Semigroup /* @@ -79,7 +78,7 @@ object LookupJoin extends Serializable { * much time is between the left and the right */ - def apply[T: Ordering, K: Ordering, V, JoinedV]( + def apply[T: Ordering, K: KeyGrouping, V, JoinedV]( left: TypedPipe[(T, (K, V))], right: TypedPipe[(T, (K, JoinedV))], reducers: Option[Int] = None): TypedPipe[(T, (K, (V, Option[JoinedV])))] = @@ -90,7 +89,7 @@ object LookupJoin extends Serializable { * In this case, the right pipe is fed through a scanLeft doing a Semigroup.plus * before joined to the left */ - def rightSumming[T: Ordering, K: Ordering, V, JoinedV: Semigroup](left: TypedPipe[(T, (K, V))], + def rightSumming[T: Ordering, K: KeyGrouping, V, JoinedV: Semigroup](left: TypedPipe[(T, (K, V))], right: TypedPipe[(T, (K, JoinedV))], reducers: Option[Int] = None): TypedPipe[(T, (K, (V, Option[JoinedV])))] = withWindowRightSumming(left, right, reducers)((_, _) => true) @@ -100,7 +99,7 @@ object LookupJoin extends Serializable { * as the joined value. * Useful for bounding the time of the join to a recent window */ - def withWindow[T: Ordering, K: Ordering, V, JoinedV](left: TypedPipe[(T, (K, V))], + def withWindow[T: Ordering, K: KeyGrouping, V, JoinedV](left: TypedPipe[(T, (K, V))], right: TypedPipe[(T, (K, JoinedV))], reducers: Option[Int] = None)(gate: (T, T) => Boolean): TypedPipe[(T, (K, (V, Option[JoinedV])))] = { @@ -113,7 +112,7 @@ object LookupJoin extends Serializable { * as the joined value, and sums are only done as long as they they come * within the gate interval as well */ - def withWindowRightSumming[T: Ordering, K: Ordering, V, JoinedV: Semigroup](left: TypedPipe[(T, (K, V))], + def withWindowRightSumming[T: Ordering, K: KeyGrouping, V, JoinedV: Semigroup](left: TypedPipe[(T, (K, V))], right: TypedPipe[(T, (K, JoinedV))], reducers: Option[Int] = None)(gate: (T, T) => Boolean): TypedPipe[(T, (K, (V, Option[JoinedV])))] = { /** diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala index 027c619929..377c7b2883 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala @@ -15,11 +15,9 @@ limitations under the License. */ package com.twitter.scalding.typed -import com.twitter.algebird.{ Bytes, CMS, CMSHasherImplicits, Batched } +import com.twitter.algebird.{ Batched, Bytes, CMS, CMSHasherImplicits } import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ -import com.twitter.scalding.serialization.{ OrderedSerialization, OrderedSerialization2 } import com.twitter.algebird.CMSMonoid - import scala.language.experimental.macros // This was a bad design choice, we should have just put these in the CMSHasher object @@ -34,7 +32,7 @@ case class Sketched[K, V](pipe: TypedPipe[(K, V)], delta: Double, eps: Double, seed: Int)(implicit serialization: K => Array[Byte], - ordering: Ordering[K]) + ordering: KeyGrouping[K]) extends MustHaveReducers { def serialize(k: K): Array[Byte] = serialization(k) @@ -77,7 +75,7 @@ case class Sketched[K, V](pipe: TypedPipe[(K, V)], def leftJoin[V2](right: TypedPipe[(K, V2)]) = cogroup(right)(Joiner.hashLeft2) } -case class SketchJoined[K: Ordering, V, V2, R](left: Sketched[K, V], +case class SketchJoined[K: KeyGrouping, V, V2, R](left: Sketched[K, V], right: TypedPipe[(K, V2)], numReducers: Int)(joiner: (K, V, Iterable[V2]) => Iterator[R]) extends MustHaveReducers { @@ -110,15 +108,8 @@ case class SketchJoined[K: Ordering, V, V2, R](left: Sketched[K, V], .map{ case ((r, k), v) => (k, v) } } - private implicit def intKeyOrd: Ordering[(Int, K)] = { - val kord = implicitly[Ordering[K]] - - kord match { - case kos: OrderedSerialization[_] => new OrderedSerialization2(ordSer[Int], kos.asInstanceOf[OrderedSerialization[K]]) - case _ => Ordering.Tuple2[Int, K] - } - } - + private implicit def intKeyOrd: KeyGrouping[(Int, K)] = + KeyGrouping.tuple2(KeyGrouping(ordSer[Int]), implicitly[KeyGrouping[K]]) } object SketchJoined { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index 251021fb7d..a2363792fb 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -15,22 +15,20 @@ limitations under the License. */ package com.twitter.scalding.typed -import java.io.{ OutputStream, InputStream, Serializable } -import java.util.{ Random, UUID } - +import java.io.{InputStream, OutputStream, Serializable} +import java.util.{Random, UUID} import cascading.flow.FlowDef -import cascading.pipe.{ Each, Pipe } +import cascading.pipe.{Each, Pipe} import cascading.tap.Tap -import cascading.tuple.{ Fields, TupleEntry } -import com.twitter.algebird.{ Aggregator, Batched, Monoid, Semigroup } -import com.twitter.scalding.TupleConverter.{ TupleEntryConverter, singleConverter, tuple2Converter } -import com.twitter.scalding.TupleSetter.{ singleSetter, tup2Setter } +import cascading.tuple.{Fields, TupleEntry} +import com.twitter.algebird.{Aggregator, Batched, Monoid, Semigroup} +import com.twitter.scalding.TupleConverter.{TupleEntryConverter, singleConverter, tuple2Converter} +import com.twitter.scalding.TupleSetter.{singleSetter, tup2Setter} import com.twitter.scalding._ import com.twitter.scalding.serialization.OrderedSerialization import com.twitter.scalding.serialization.OrderedSerialization.Result import com.twitter.scalding.serialization.macros.impl.BinaryOrdering import com.twitter.scalding.serialization.macros.impl.BinaryOrdering._ - import scala.util.Try /** @@ -204,7 +202,7 @@ trait TypedPipe[+T] extends Serializable { * in some sense, this is the dual of groupAll */ @annotation.implicitNotFound(msg = "For asKeys method to work, the type in TypedPipe must have an Ordering.") - def asKeys[U >: T](implicit ord: Ordering[U]): Grouped[U, Unit] = + def asKeys[U >: T](implicit ord: KeyGrouping[U]): Grouped[U, Unit] = map((_, ())).group /** @@ -253,16 +251,16 @@ trait TypedPipe[+T] extends Serializable { * The latter creates 1 map/reduce phase rather than 2 */ @annotation.implicitNotFound(msg = "For distinct method to work, the type in TypedPipe must have an Ordering.") - def distinct(implicit ord: Ordering[_ >: T]): TypedPipe[T] = - asKeys(ord.asInstanceOf[Ordering[T]]).sum.keys + def distinct(implicit ord: KeyGrouping[_ >: T]): TypedPipe[T] = + asKeys(ord.asInstanceOf[KeyGrouping[T]]).sum.keys /** * Returns the set of distinct elements identified by a given lambda extractor in the TypedPipe */ @annotation.implicitNotFound(msg = "For distinctBy method to work, the type to distinct on in the TypedPipe must have an Ordering.") - def distinctBy[U](fn: T => U, numReducers: Option[Int] = None)(implicit ord: Ordering[_ >: U]): TypedPipe[T] = { + def distinctBy[U](fn: T => U, numReducers: Option[Int] = None)(implicit ord: KeyGrouping[_ >: U]): TypedPipe[T] = { // cast because Ordering is not contravariant, but should be (and this cast is safe) - implicit val ordT: Ordering[U] = ord.asInstanceOf[Ordering[U]] + implicit val ordT: KeyGrouping[U] = ord.asInstanceOf[KeyGrouping[U]] // Semigroup to handle duplicates for a given key might have different values. implicit val sg: Semigroup[T] = new Semigroup[T] { @@ -372,24 +370,24 @@ trait TypedPipe[+T] extends Serializable { /** * This is the default means of grouping all pairs with the same key. Generally this triggers 1 Map/Reduce transition */ - def group[K, V](implicit ev: <:<[T, (K, V)], ord: Ordering[K]): Grouped[K, V] = + def group[K, V](implicit ev: <:<[T, (K, V)], ord: KeyGrouping[K]): Grouped[K, V] = //If the type of T is not (K,V), then at compile time, this will fail. It uses implicits to do //a compile time check that one type is equivalent to another. If T is not (K,V), we can't //automatically group. We cast because it is safe to do so, and we need to convert to K,V, but //the ev is not needed for the cast. In fact, you can do the cast with ev(t) and it will return //it as (K,V), but the problem is, ev is not serializable. So we do the cast, which due to ev //being present, will always pass. - Grouped(raiseTo[(K, V)]).withDescription(LineNumber.tryNonScaldingCaller.map(_.toString)) + Grouped(raiseTo[(K, V)])(ord.ord).withDescription(LineNumber.tryNonScaldingCaller.map(_.toString)) /** Send all items to a single reducer */ - def groupAll: Grouped[Unit, T] = groupBy(x => ())(ordSer[Unit]).withReducers(1) + def groupAll: Grouped[Unit, T] = groupBy(x => ())(KeyGrouping(ordSer[Unit])).withReducers(1) /** Given a key function, add the key, then call .group */ - def groupBy[K](g: T => K)(implicit ord: Ordering[K]): Grouped[K, T] = + def groupBy[K](g: T => K)(implicit ord: KeyGrouping[K]): Grouped[K, T] = map { t => (g(t), t) }.group /** Group using an explicit Ordering on the key. */ - def groupWith[K, V](ord: Ordering[K])(implicit ev: <:<[T, (K, V)]): Grouped[K, V] = group(ev, ord) + def groupWith[K, V](ord: KeyGrouping[K])(implicit ev: <:<[T, (K, V)]): Grouped[K, V] = group(ev, ord) /** * Forces a shuffle by randomly assigning each item into one @@ -403,7 +401,7 @@ trait TypedPipe[+T] extends Serializable { def groupRandomly(partitions: Int): Grouped[Int, T] = { // Make it lazy so all mappers get their own: lazy val rng = new java.util.Random(123) // seed this so it is repeatable - groupBy { _ => rng.nextInt(partitions) }(TypedPipe.identityOrdering) + groupBy { _ => rng.nextInt(partitions) }(KeyGrouping(TypedPipe.identityOrdering)) .withReducers(partitions) } @@ -487,7 +485,7 @@ trait TypedPipe[+T] extends Serializable { /** * Reasonably common shortcut for cases of associative/commutative reduction by Key */ - def sumByKey[K, V](implicit ev: T <:< (K, V), ord: Ordering[K], plus: Semigroup[V]): UnsortedGrouped[K, V] = + def sumByKey[K, V](implicit ev: T <:< (K, V), ord: KeyGrouping[K], plus: Semigroup[V]): UnsortedGrouped[K, V] = group[K, V].sum[V] /** @@ -734,7 +732,7 @@ trait TypedPipe[+T] extends Serializable { delta: Double = 0.01, //5 rows (= 5 hashes) seed: Int = 12345)(implicit ev: TypedPipe[T] <:< TypedPipe[(K, V)], serialization: K => Array[Byte], - ordering: Ordering[K]): Sketched[K, V] = + ordering: KeyGrouping[K]): Sketched[K, V] = Sketched(ev(this), reducers, delta, eps, seed) /** @@ -760,7 +758,7 @@ final case object EmptyTypedPipe extends TypedPipe[Nothing] { // Cross product with empty is always empty. override def cross[U](tiny: TypedPipe[U]): TypedPipe[(Nothing, U)] = this - override def distinct(implicit ord: Ordering[_ >: Nothing]) = this + override def distinct(implicit ord: KeyGrouping[_ >: Nothing]) = this override def flatMap[U](f: Nothing => TraversableOnce[U]) = this @@ -1145,10 +1143,10 @@ case class WithDescriptionTypedPipe[T](typedPipe: TypedPipe[T], description: Str * import Syntax.joinOnMappablePipe */ class MappablePipeJoinEnrichment[T](pipe: TypedPipe[T]) { - def joinBy[K, U](smaller: TypedPipe[U])(g: (T => K), h: (U => K), reducers: Int = -1)(implicit ord: Ordering[K]): CoGrouped[K, (T, U)] = pipe.groupBy(g).withReducers(reducers).join(smaller.groupBy(h)) - def leftJoinBy[K, U](smaller: TypedPipe[U])(g: (T => K), h: (U => K), reducers: Int = -1)(implicit ord: Ordering[K]): CoGrouped[K, (T, Option[U])] = pipe.groupBy(g).withReducers(reducers).leftJoin(smaller.groupBy(h)) - def rightJoinBy[K, U](smaller: TypedPipe[U])(g: (T => K), h: (U => K), reducers: Int = -1)(implicit ord: Ordering[K]): CoGrouped[K, (Option[T], U)] = pipe.groupBy(g).withReducers(reducers).rightJoin(smaller.groupBy(h)) - def outerJoinBy[K, U](smaller: TypedPipe[U])(g: (T => K), h: (U => K), reducers: Int = -1)(implicit ord: Ordering[K]): CoGrouped[K, (Option[T], Option[U])] = pipe.groupBy(g).withReducers(reducers).outerJoin(smaller.groupBy(h)) + def joinBy[K, U](smaller: TypedPipe[U])(g: (T => K), h: (U => K), reducers: Int = -1)(implicit ord: KeyGrouping[K]): CoGrouped[K, (T, U)] = pipe.groupBy(g).withReducers(reducers).join(smaller.groupBy(h)) + def leftJoinBy[K, U](smaller: TypedPipe[U])(g: (T => K), h: (U => K), reducers: Int = -1)(implicit ord: KeyGrouping[K]): CoGrouped[K, (T, Option[U])] = pipe.groupBy(g).withReducers(reducers).leftJoin(smaller.groupBy(h)) + def rightJoinBy[K, U](smaller: TypedPipe[U])(g: (T => K), h: (U => K), reducers: Int = -1)(implicit ord: KeyGrouping[K]): CoGrouped[K, (Option[T], U)] = pipe.groupBy(g).withReducers(reducers).rightJoin(smaller.groupBy(h)) + def outerJoinBy[K, U](smaller: TypedPipe[U])(g: (T => K), h: (U => K), reducers: Int = -1)(implicit ord: KeyGrouping[K]): CoGrouped[K, (Option[T], Option[U])] = pipe.groupBy(g).withReducers(reducers).outerJoin(smaller.groupBy(h)) } /** diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipeDiff.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipeDiff.scala index 39e9f7f2d1..30920e2da3 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipeDiff.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipeDiff.scala @@ -1,9 +1,8 @@ package com.twitter.scalding.typed import java.io.{ BufferedWriter, File, FileWriter } - import com.twitter.scalding.Execution - +import com.twitter.scalding.serialization.macros.impl.BinaryOrdering import scala.reflect.ClassTag /** @@ -27,7 +26,7 @@ object TypedPipeDiff { * Requires that T have an ordering and a hashCode and equals that is stable across JVMs (not reference based). * See diffArrayPipes for diffing pipes of arrays, since arrays do not meet these requirements by default. */ - def diff[T: Ordering](left: TypedPipe[T], right: TypedPipe[T], reducers: Option[Int] = None): UnsortedGrouped[T, (Long, Long)] = { + def diff[T: KeyGrouping](left: TypedPipe[T], right: TypedPipe[T], reducers: Option[Int] = None): UnsortedGrouped[T, (Long, Long)] = { val lefts = left.map { x => (x, (1L, 0L)) } val rights = right.map { x => (x, (0L, 1L)) } val counts = (lefts ++ rights).sumByKey @@ -74,7 +73,7 @@ object TypedPipeDiff { * or maybe x => x.timestamp, if x's hashCode is not stable, assuming there's shouldn't be too * many records with the same timestamp. */ - def diffByGroup[T, K: Ordering]( + def diffByGroup[T, K: KeyGrouping]( left: TypedPipe[T], right: TypedPipe[T], reducers: Option[Int] = None)(groupByFn: T => K): TypedPipe[(T, (Long, Long))] = { @@ -100,16 +99,17 @@ object TypedPipeDiff { def diffByHashCode[T]( left: TypedPipe[T], right: TypedPipe[T], - reducers: Option[Int] = None): TypedPipe[(T, (Long, Long))] = diffByGroup(left, right, reducers)(_.hashCode) + reducers: Option[Int] = None): TypedPipe[(T, (Long, Long))] = diffByGroup(left, right, reducers)(_.hashCode)(KeyGrouping(BinaryOrdering + .ordSer[Int])) object Enrichments { implicit class Diff[T](val left: TypedPipe[T]) extends AnyVal { - def diff(right: TypedPipe[T], reducers: Option[Int] = None)(implicit ev: Ordering[T]): UnsortedGrouped[T, (Long, Long)] = + def diff(right: TypedPipe[T], reducers: Option[Int] = None)(implicit ev: KeyGrouping[T]): UnsortedGrouped[T, (Long, Long)] = TypedPipeDiff.diff(left, right, reducers) - def diffByGroup[K: Ordering](right: TypedPipe[T], reducers: Option[Int] = None)(groupByFn: T => K): TypedPipe[(T, (Long, Long))] = + def diffByGroup[K: KeyGrouping](right: TypedPipe[T], reducers: Option[Int] = None)(groupByFn: T => K): TypedPipe[(T, (Long, Long))] = TypedPipeDiff.diffByGroup(left, right, reducers)(groupByFn) def diffByHashCode(right: TypedPipe[T], reducers: Option[Int] = None): TypedPipe[(T, (Long, Long))] = TypedPipeDiff.diffByHashCode(left, right, reducers)