Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prototype] Introduce KeyGrouping to make it possible to enable OrderedSerialization without code changes #1857

Open
wants to merge 3 commits into
base: 0.17.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ val sharedSettings = assemblySettings ++ scalariformSettings ++ Seq(
Opts.resolver.sonatypeSnapshots,
Opts.resolver.sonatypeReleases,
"Concurrent Maven Repo" at "http://conjars.org/repo",
"Twitter Maven" at "http://maven.twttr.com",
"Twitter Maven" at "https://maven.twttr.com",
"Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ import com.twitter.scalding._
import com.twitter.scalding.commons.scheme.KeyValueByteScheme
import com.twitter.scalding.commons.tap.VersionedTap
import com.twitter.scalding.commons.tap.VersionedTap.TapMode
import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck }
import com.twitter.scalding.typed.KeyedListLike
import com.twitter.scalding.typed.TypedSink
import com.twitter.scalding.source.{CheckedInversion, MaxFailuresCheck}
import com.twitter.scalding.typed.{KeyGrouping, KeyedListLike, TypedSink}
import org.apache.hadoop.mapred.JobConf
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -211,13 +210,13 @@ class VersionedKeyValSource[K, V](val path: String, val sourceVersion: Option[Lo

object RichPipeEx extends java.io.Serializable {
implicit def pipeToRichPipeEx(pipe: Pipe): RichPipeEx = new RichPipeEx(pipe)
implicit def typedPipeToRichPipeEx[K: Ordering, V: Monoid](pipe: TypedPipe[(K, V)]): TypedRichPipeEx[K, V] =
implicit def typedPipeToRichPipeEx[K: KeyGrouping, V: Monoid](pipe: TypedPipe[(K, V)]): TypedRichPipeEx[K, V] =
new TypedRichPipeEx(pipe)
implicit def keyedListLikeToRichPipeEx[K: Ordering, V: Monoid, T[K, +V] <: KeyedListLike[K, V, T]](
implicit def keyedListLikeToRichPipeEx[K: KeyGrouping, V: Monoid, T[K, +V] <: KeyedListLike[K, V, T]](
kll: KeyedListLike[K, V, T]): TypedRichPipeEx[K, V] = typedPipeToRichPipeEx(kll.toTypedPipe)
}

class TypedRichPipeEx[K: Ordering, V: Monoid](pipe: TypedPipe[(K, V)]) extends java.io.Serializable {
class TypedRichPipeEx[K: KeyGrouping, V: Monoid](pipe: TypedPipe[(K, V)]) extends java.io.Serializable {
import Dsl._
import TDsl._

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.twitter.scalding.typed

import com.twitter.algebird._
import com.twitter.scalding.serialization.macros.impl.BinaryOrdering.ordSer

/**
* Extension for TypedPipe to add a cumulativeSum method.
Expand Down Expand Up @@ -39,7 +40,7 @@ object CumulativeSum {
def cumulativeSum(
implicit sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): SortedGrouped[K, (U, V)] = {
ordK: KeyGrouping[K]): SortedGrouped[K, (U, V)] = {
pipe.group
.sortBy { case (u, _) => u }
.scanLeft(Nil: List[(U, V)]) {
Expand All @@ -59,17 +60,17 @@ object CumulativeSum {
* partitions for a single key to go through a single scan.
*/
def cumulativeSum[S](partition: U => S)(
implicit ordS: Ordering[S],
implicit ordS: KeyGrouping[S],
sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): TypedPipe[(K, (U, V))] = {
ordK: KeyGrouping[K]): TypedPipe[(K, (U, V))] = {

val sumPerS = pipe
.map { case (k, (u, v)) => (k, partition(u)) -> v }
.sumByKey
.map { case ((k, s), v) => (k, (s, v)) }
.group
.sortBy { case (s, v) => s }
.sortBy { case (s, v) => s }(ordS.ord)
.scanLeft(None: Option[(Option[V], V, S)]) {
case (acc, (s, v)) =>
acc match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@ limitations under the License.
*/
package com.twitter.scalding.mathematics

import com.twitter.algebird.{ Monoid, Group, Ring, Field }
import com.twitter.algebird.field._ // backwards compatiblity support
import com.twitter.algebird.{Field, Group, Monoid, Ring}
import com.twitter.algebird.field._
import com.twitter.scalding._

import cascading.pipe.assembly._
import cascading.pipe.joiner._
import cascading.pipe.Pipe
import cascading.tuple.Fields
import cascading.tuple._
import cascading.flow._
import cascading.tap._

import com.twitter.scalding.Dsl._
import com.twitter.scalding.typed.KeyGrouping
import scala.math.max
import scala.annotation.tailrec

Expand Down Expand Up @@ -137,11 +136,11 @@ class MatrixMappableExtensions[T](mappable: Mappable[T])(implicit fd: FlowDef, m
new Matrix[Row, Col, Val]('row, 'col, 'val, matPipe)
}

def toBlockMatrix[Group, Row, Col, Val](implicit ev: <:<[T, (Group, Row, Col, Val)], ord: Ordering[(Group, Row)],
def toBlockMatrix[Group, Row, Col, Val](implicit ev: <:<[T, (Group, Row, Col, Val)], ord: KeyGrouping[(Group, Row)],
setter: TupleSetter[(Group, Row, Col, Val)]): BlockMatrix[Group, Row, Col, Val] =
mapToBlockMatrix { _.asInstanceOf[(Group, Row, Col, Val)] }

def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: Ordering[(Group, Row)]): BlockMatrix[Group, Row, Col, Val] = {
def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: KeyGrouping[(Group, Row)]): BlockMatrix[Group, Row, Col, Val] = {
val matPipe = TypedPipe
.from(mappable)
.map(fn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ package com.twitter.scalding.mathematics
import cascading.flow.FlowDef
import cascading.pipe.Pipe
import cascading.tuple.Fields
import com.twitter.scalding.serialization.{ OrderedSerialization, OrderedSerialization2 }
import com.twitter.scalding._
import com.twitter.scalding.typed.{ ValuePipe, EmptyValue, LiteralValue, ComputedValue }
import com.twitter.algebird.{ Semigroup, Monoid, Ring, Group, Field }
import com.twitter.scalding.typed.{ ComputedValue, EmptyValue, KeyGrouping, LiteralValue, ValuePipe }
import com.twitter.algebird.{ Field, Group, Monoid, Ring, Semigroup }
import com.twitter.scalding.serialization.macros.impl.BinaryOrdering
import scala.collection.mutable.Map
import scala.collection.mutable.HashMap

import java.io.Serializable

/**
Expand All @@ -42,8 +41,8 @@ import java.io.Serializable
* a TypedPipe (call toTypedPipe) or the result may not be correct.
*/
sealed trait Matrix2[R, C, V] extends Serializable {
implicit def rowOrd: Ordering[R]
implicit def colOrd: Ordering[C]
implicit def rowOrd: KeyGrouping[R]
implicit def colOrd: KeyGrouping[C]
val sizeHint: SizeHint = NoClue
def +(that: Matrix2[R, C, V])(implicit mon: Monoid[V]): Matrix2[R, C, V] = Sum(this, that, mon)
def -(that: Matrix2[R, C, V])(implicit g: Group[V]): Matrix2[R, C, V] = Sum(this, that.negate, g)
Expand Down Expand Up @@ -104,7 +103,7 @@ sealed trait Matrix2[R, C, V] extends Serializable {

//This cast will always succeed:
lazy val joinedBool = mj.join(this.asInstanceOf[Matrix2[R, C, Boolean]], vec)
implicit val ord2: Ordering[C2] = vec.colOrd
implicit val ord2: KeyGrouping[(R, C2)] = KeyGrouping.tuple2(rowOrd, vec.colOrd)
lazy val resultPipe = joinedBool.flatMap {
case (key, ((row, bool), (col2, v))) =>
if (bool) Some((row, col2), v) else None // filter early
Expand All @@ -113,7 +112,7 @@ sealed trait Matrix2[R, C, V] extends Serializable {
.sum
.filter { kv => mon.isNonZero(kv._2) }
.map { case ((r, c2), v) => (r, c2, v) }
MatrixLiteral(resultPipe, this.sizeHint)
MatrixLiteral(resultPipe, this.sizeHint)(rowOrd, vec.colOrd)
}

def propagateRow[C2](mat: Matrix2[C, C2, Boolean])(implicit ev: =:=[R, Unit], mon: Monoid[V], mj: MatrixJoiner2): Matrix2[Unit, C2, V] =
Expand Down Expand Up @@ -158,30 +157,30 @@ sealed trait Matrix2[R, C, V] extends Serializable {
def getRow(index: R): Matrix2[Unit, C, V] =
MatrixLiteral(
toTypedPipe
.filter { case (r, c, v) => Ordering[R].equiv(r, index) }
.map { case (r, c, v) => ((), c, v) }, this.sizeHint.setRows(1L))
.filter { case (r, c, v) => rowOrd.ord.equiv(r, index) }
.map { case (r, c, v) => ((), c, v) }, this.sizeHint.setRows(1L))(KeyGrouping(BinaryOrdering.ordSer[Unit]), colOrd)

def getColumn(index: C): Matrix2[R, Unit, V] =
MatrixLiteral(
toTypedPipe
.filter { case (r, c, v) => Ordering[C].equiv(c, index) }
.map { case (r, c, v) => (r, (), v) }, this.sizeHint.setCols(1L))
.filter { case (r, c, v) => colOrd.ord.equiv(c, index) }
.map { case (r, c, v) => (r, (), v) }, this.sizeHint.setCols(1L))(rowOrd, KeyGrouping(BinaryOrdering.ordSer[Unit]))

/**
* Consider this Matrix as the r2 row of a matrix. The current matrix must be a row,
* which is to say, its row type must be Unit.
*/
def asRow[R2](r2: R2)(implicit ev: R =:= Unit, rowOrd: Ordering[R2]): Matrix2[R2, C, V] =
def asRow[R2](r2: R2)(implicit ev: R =:= Unit, rowOrd: KeyGrouping[R2]): Matrix2[R2, C, V] =
MatrixLiteral(toTypedPipe.map { case (r, c, v) => (r2, c, v) }, this.sizeHint)

def asCol[C2](c2: C2)(implicit ev: C =:= Unit, colOrd: Ordering[C2]): Matrix2[R, C2, V] =
def asCol[C2](c2: C2)(implicit ev: C =:= Unit, colOrd: KeyGrouping[C2]): Matrix2[R, C2, V] =
MatrixLiteral(toTypedPipe.map { case (r, c, v) => (r, c2, v) }, this.sizeHint)

// Compute the sum of the main diagonal. Only makes sense cases where the row and col type are
// equal
def trace(implicit mon: Monoid[V], ev: =:=[R, C]): Scalar2[V] =
Scalar2(toTypedPipe.asInstanceOf[TypedPipe[(R, R, V)]]
.filter{ case (r1, r2, _) => Ordering[R].equiv(r1, r2) }
.filter{ case (r1, r2, _) => rowOrd.ord.equiv(r1, r2) }
.map{ case (_, _, x) => x }
.sum(mon))

Expand Down Expand Up @@ -216,7 +215,7 @@ object MatrixJoiner2 {
class DefaultMatrixJoiner(sizeRatioThreshold: Long) extends MatrixJoiner2 {
def join[R, C, V, C2, V2](left: Matrix2[R, C, V],
right: Matrix2[C, C2, V2]): TypedPipe[(C, ((R, V), (C2, V2)))] = {
implicit val cOrd: Ordering[C] = left.colOrd
implicit val cOrd: KeyGrouping[C] = left.colOrd
val one = left.toTypedPipe.map { case (r, c, v) => (c, (r, v)) }.group
val two = right.toTypedPipe.map { case (c, c2, v2) => (c, (c2, v2)) }.group
val sizeOne = left.sizeHint.total.getOrElse(BigInt(1L))
Expand All @@ -242,9 +241,9 @@ class DefaultMatrixJoiner(sizeRatioThreshold: Long) extends MatrixJoiner2 {
/**
* Infinite column vector - only for intermediate computations
*/
case class OneC[R, V](implicit override val rowOrd: Ordering[R]) extends Matrix2[R, Unit, V] {
case class OneC[R, V](implicit override val rowOrd: KeyGrouping[R]) extends Matrix2[R, Unit, V] {
override val sizeHint: SizeHint = FiniteHint(Long.MaxValue, 1)
override def colOrd = Ordering[Unit]
override def colOrd = KeyGrouping(BinaryOrdering.ordSer[Unit])
def transpose = OneR()
override def negate(implicit g: Group[V]) = sys.error("Only used in intermediate computations, try (-1 * OneC)")
def toTypedPipe = sys.error("Only used in intermediate computations")
Expand All @@ -253,9 +252,9 @@ case class OneC[R, V](implicit override val rowOrd: Ordering[R]) extends Matrix2
/**
* Infinite row vector - only for intermediate computations
*/
case class OneR[C, V](implicit override val colOrd: Ordering[C]) extends Matrix2[Unit, C, V] {
case class OneR[C, V](implicit override val colOrd: KeyGrouping[C]) extends Matrix2[Unit, C, V] {
override val sizeHint: SizeHint = FiniteHint(1, Long.MaxValue)
override def rowOrd = Ordering[Unit]
override def rowOrd = KeyGrouping(BinaryOrdering.ordSer[Unit])
def transpose = OneC()
override def negate(implicit g: Group[V]) = sys.error("Only used in intermediate computations, try (-1 * OneR)")
def toTypedPipe = sys.error("Only used in intermediate computations")
Expand Down Expand Up @@ -294,10 +293,10 @@ case class Product[R, C, C2, V](left: Matrix2[R, C, V],
val localRing = ring

val joined = (if (leftMatrix) {
val ord: Ordering[R] = left.rowOrd
val ord: KeyGrouping[R] = left.rowOrd
left.toTypedPipe.groupBy(x => x._1)(ord)
} else {
val ord: Ordering[C] = right.rowOrd
val ord: KeyGrouping[C] = right.rowOrd
right.toTypedPipe.groupBy(x => x._1)(ord)
}).mapValues { _._3 }
.sum(localRing)
Expand All @@ -316,7 +315,7 @@ case class Product[R, C, C2, V](left: Matrix2[R, C, V],
if (isSpecialCase) {
specialCase
} else {
implicit val ord: Ordering[C] = right.rowOrd
implicit val ord: KeyGrouping[C] = right.rowOrd
val localRing = ring
joiner.join(left, right)
.map { case (key, ((l1, lv), (r2, rv))) => (l1, r2, localRing.times(lv, rv)) }
Expand Down Expand Up @@ -356,9 +355,10 @@ case class Product[R, C, C2, V](left: Matrix2[R, C, V],

override val sizeHint = left.sizeHint * right.sizeHint

implicit override val rowOrd: Ordering[R] = left.rowOrd
implicit override val colOrd: Ordering[C2] = right.colOrd
implicit def withOrderedSerialization: Ordering[(R, C2)] = OrderedSerialization2.maybeOrderedSerialization2(rowOrd, colOrd)
implicit override val rowOrd: KeyGrouping[R] = left.rowOrd
implicit override val colOrd: KeyGrouping[C2] = right.colOrd
implicit def withOrderedSerialization: KeyGrouping[(R, C2)] =
KeyGrouping.tuple2(rowOrd, colOrd)

override lazy val transpose: Product[C2, C, R, V] = Product(right.transpose, left.transpose, ring)
override def negate(implicit g: Group[V]): Product[R, C, C2, V] = {
Expand All @@ -381,12 +381,12 @@ case class Product[R, C, C2, V](left: Matrix2[R, C, V],
if (cost1 > cost2) {
val product2 = plan2.asInstanceOf[Product[C, R, C, V]]
val ord = left.colOrd
val filtered = product2.toOuterSum.filter{ case (c1, c2, _) => ord.equiv(c1, c2) }
val filtered = product2.toOuterSum.filter{ case (c1, c2, _) => ord.ord.equiv(c1, c2) }
Scalar2(product2.computePipe(filtered).map{ case (_, _, x) => x }.sum(mon))
} else {
val product1 = plan1.asInstanceOf[Product[R, C, R, V]]
val ord = left.rowOrd
val filtered = product1.toOuterSum.filter{ case (r1, r2, _) => ord.equiv(r1, r2) }
val filtered = product1.toOuterSum.filter{ case (r1, r2, _) => ord.ord.equiv(r1, r2) }
Scalar2(product1.computePipe(filtered).map{ case (_, _, x) => x }.sum(mon))
}

Expand Down Expand Up @@ -435,9 +435,10 @@ case class Sum[R, C, V](left: Matrix2[R, C, V], right: Matrix2[R, C, V], mon: Mo

override val sizeHint = left.sizeHint + right.sizeHint

implicit override val rowOrd: Ordering[R] = left.rowOrd
implicit override val colOrd: Ordering[C] = left.colOrd
implicit def withOrderedSerialization: Ordering[(R, C)] = OrderedSerialization2.maybeOrderedSerialization2(rowOrd, colOrd)
implicit override val rowOrd: KeyGrouping[R] = left.rowOrd
implicit override val colOrd: KeyGrouping[C] = left.colOrd
implicit def withOrderedSerialization: KeyGrouping[(R, C)] =
KeyGrouping.tuple2(rowOrd, colOrd)

override lazy val transpose: Sum[C, R, V] = Sum(left.transpose, right.transpose, mon)
override def negate(implicit g: Group[V]): Sum[R, C, V] = Sum(left.negate, right.negate, mon)
Expand All @@ -447,7 +448,7 @@ case class Sum[R, C, V](left: Matrix2[R, C, V], right: Matrix2[R, C, V], mon: Mo
override def trace(implicit mon: Monoid[V], ev: =:=[R, C]): Scalar2[V] =
Scalar2(collectAddends(this).map { pipe =>
pipe.asInstanceOf[TypedPipe[(R, R, V)]]
.filter { case (r, c, v) => Ordering[R].equiv(r, c) }
.filter { case (r, c, v) => rowOrd.ord.equiv(r, c) }
.map { _._3 }
}.reduce(_ ++ _).sum)
}
Expand Down Expand Up @@ -480,13 +481,14 @@ case class HadamardProduct[R, C, V](left: Matrix2[R, C, V],
else
HadamardProduct(left.negate, right, ring)

implicit override val rowOrd: Ordering[R] = left.rowOrd
implicit override val colOrd: Ordering[C] = left.colOrd
implicit def withOrderedSerialization: Ordering[(R, C)] = OrderedSerialization2.maybeOrderedSerialization2(rowOrd, colOrd)
implicit override val rowOrd: KeyGrouping[R] = left.rowOrd
implicit override val colOrd: KeyGrouping[C] = left.colOrd
implicit def withOrderedSerialization: KeyGrouping[(R, C)] =
KeyGrouping.tuple2(rowOrd, colOrd)
}

case class MatrixLiteral[R, C, V](override val toTypedPipe: TypedPipe[(R, C, V)],
override val sizeHint: SizeHint)(implicit override val rowOrd: Ordering[R], override val colOrd: Ordering[C])
override val sizeHint: SizeHint)(implicit override val rowOrd: KeyGrouping[R], override val colOrd: KeyGrouping[C])
extends Matrix2[R, C, V] {

override lazy val transpose: MatrixLiteral[C, R, V] =
Expand Down Expand Up @@ -535,10 +537,10 @@ trait Scalar2[V] extends Serializable {
case s @ Sum(left, right, mon) => Sum(this * left, this * right, mon)
case m @ MatrixLiteral(_, _) => timesLiteral(m) // handle literals here
case x @ OneC() =>
Product(OneC[Unit, V](), toMatrix, ring)
Product(OneC[Unit, V]()(KeyGrouping(BinaryOrdering.ordSer[Unit])), toMatrix, ring)
.asInstanceOf[Matrix2[R, C, V]]
case x @ OneR() =>
Product(toMatrix, OneR[Unit, V](), ring)
Product(toMatrix, OneR[Unit, V]()(KeyGrouping(BinaryOrdering.ordSer[Unit])), ring)
.asInstanceOf[Matrix2[R, C, V]]
}

Expand All @@ -562,7 +564,7 @@ trait Scalar2[V] extends Serializable {

def map[U](fn: V => U): Scalar2[U] = Scalar2(value.map(fn))
def toMatrix: Matrix2[Unit, Unit, V] =
MatrixLiteral(value.toTypedPipe.map(v => ((), (), v)), FiniteHint(1, 1))
MatrixLiteral(value.toTypedPipe.map(v => ((), (), v)), FiniteHint(1, 1))(KeyGrouping(BinaryOrdering.ordSer[Unit]), KeyGrouping(BinaryOrdering.ordSer[Unit]))
// TODO: FunctionMatrix[R,C,V](fn: (R,C) => V) and a Literal scalar is just: FuctionMatrix[Unit, Unit, V]({ (_, _) => v })
}

Expand All @@ -582,14 +584,14 @@ object Scalar2 {
}

object Matrix2 {
def apply[R: Ordering, C: Ordering, V](t: TypedPipe[(R, C, V)], hint: SizeHint): Matrix2[R, C, V] =
def apply[R: KeyGrouping, C: KeyGrouping, V](t: TypedPipe[(R, C, V)], hint: SizeHint): Matrix2[R, C, V] =
MatrixLiteral(t, hint)

def read[R, C, V](t: TypedSource[(R, C, V)],
hint: SizeHint)(implicit ordr: Ordering[R], ordc: Ordering[C]): Matrix2[R, C, V] =
hint: SizeHint)(implicit ordr: KeyGrouping[R], ordc: KeyGrouping[C]): Matrix2[R, C, V] =
MatrixLiteral(TypedPipe.from(t), hint)

def J[R, C, V](implicit ordR: Ordering[R], ordC: Ordering[C], ring: Ring[V], mj: MatrixJoiner2) =
def J[R, C, V](implicit ordR: KeyGrouping[R], ordC: KeyGrouping[C], ring: Ring[V], mj: MatrixJoiner2) =
Product(OneC[R, V]()(ordR), OneR[C, V]()(ordC), ring)

/**
Expand Down
Loading