diff --git a/src/main/scala/edu/berkeley/cs/amplab/spark/indexedrdd/IndexableRDD.scala b/src/main/scala/edu/berkeley/cs/amplab/spark/indexedrdd/IndexableRDD.scala new file mode 100644 index 0000000..42c03bd --- /dev/null +++ b/src/main/scala/edu/berkeley/cs/amplab/spark/indexedrdd/IndexableRDD.scala @@ -0,0 +1,123 @@ +package edu.berkeley.cs.amplab.spark.indexedrdd + +import edu.berkeley.cs.amplab.spark.indexedrdd.IndexableRDD.IndexableKey +import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._ +import org.apache.spark.SparkContext._ +import org.apache.spark._ +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag + +/** + * An extension of the IndexedRDD which supports a wide variety of types + * Created by mader on 2/23/15. + * + * @param ik the class for converting between keys and ids + * @param baseRDD made with an IndexedRDD to make it easier to perform map, filter, etc + * @tparam K the key of the dataset + * @tparam V the value (can be anything + */ +class IndexableRDD[K: ClassTag, V: ClassTag](ik: IndexableKey[K], + baseRDD: IndexedRDD[V]) extends Serializable { + + /** + * An RDD that looks like it is supposed to + */ + lazy val rawRDD = IndexableRDD.indexToKeys(ik,baseRDD) + + def get(key: K): Option[V] = multiget(Array(key)).get(key) + + def multiget(keys: Array[K]): Map[K,V] = + baseRDD.multiget(keys.map(ik.toId)).map(kv => (ik.fromId(kv._1),kv._2)) + + /** + * Unconditionally updates the specified key to have the specified value. Returns a new + * IndexableRDD that reflects the modification. + */ + def put(k: K, v: V): IndexableRDD[K,V] = multiput(Map(k -> v)) + + /** + * Unconditionally updates the keys in `kvs` to their corresponding values. Returns a new + * IndexedRDD that reflects the modification. + */ + def multiput(kvs: Map[K, V]): IndexableRDD[K,V] = + new IndexableRDD(ik,baseRDD.multiput(kvs.map(kv => (ik.toId(kv._1),kv._2)))) + + /** + * Updates the keys in `kvs` to their corresponding values, running `merge` on old and new values + * if necessary. Returns a new IndexedRDD that reflects the modification. + */ + def multiput(kvs: Map[K, V], merge: (K, V, V) => V): IndexableRDD[K,V] = { + val idMerge = (a: Id, b: V, c: V) => merge(ik.fromId(a),b,c) + new IndexableRDD(ik,baseRDD.multiput(kvs.map(kv => (ik.toId(kv._1),kv._2)),idMerge)) + } + + /** + * Restricts the entries to those satisfying the given predicate. This operation preserves the + * index for efficient joins with the original IndexedRDD and is implemented using soft deletions. + * + * @param pred the user defined predicate, which takes a tuple to conform to the `RDD[(K, V)]` + * interface + */ + def filter(pred: Tuple2[K, V] => Boolean): IndexableRDD[K,V] = { + val idPred = (nt: Tuple2[Id,V]) => pred((ik.fromId(nt._1),nt._2)) + new IndexableRDD(ik,baseRDD.filter(idPred)) + } + + + def cache() = new IndexableRDD(ik,baseRDD.cache) + + def collect() = rawRDD.collect() + + def count() = rawRDD.count() +} + + +object IndexableRDD extends Serializable { + type Id = Long + + + /** + * The mapping between the key type and an id (Long) + * @note it is essential that for all possible K in the program toId(fromId(idval)) == idval + * @tparam K the type of the key (anything at all) + * + */ + trait IndexableKey[K] extends Serializable { + def toId(key: K): Id + def fromId(id: Id): K + } + + def keysToIndex[K: ClassTag, V: ClassTag](ik: IndexableKey[K],elems: RDD[(K, V)]) = + elems.mapPartitions(iter => + (iter.map(kv=> (ik.toId(kv._1),kv._2))), + preservesPartitioning = true) + def indexToKeys[K: ClassTag, V: ClassTag](ik: IndexableKey[K],elems: RDD[(Id, V)]) = + elems.mapPartitions(iter => + (iter.map(kv=> (ik.fromId(kv._1),kv._2))), + preservesPartitioning = true) + /** + * Constructs an IndexedRDD from an RDD of pairs, partitioning keys using a hash partitioner, + * preserving the number of partitions of `elems`, and merging duplicate keys arbitrarily. + */ + def apply[K: ClassTag, V: ClassTag](ik: IndexableKey[K],elems: RDD[(K, V)]): IndexableRDD[K,V] = { + new IndexableRDD(ik, IndexedRDD(keysToIndex(ik,elems))) + } + + /** Constructs an IndexedRDD from an RDD of pairs, merging duplicate keys arbitrarily. */ + def apply[K: ClassTag, V: ClassTag](ik: IndexableKey[K], elems: RDD[(K, V)], + partitioner: Partitioner): + IndexableRDD[K,V] = { + val partitioned: RDD[(K, V)] = elems.partitionBy(partitioner) + new IndexableRDD(ik, IndexedRDD(keysToIndex(ik,partitioned))) + } + + /** Constructs an IndexedRDD from an RDD of pairs. */ + def apply[K: ClassTag, V: ClassTag](ik: IndexableKey[K], + elems: RDD[(K, V)], partitioner: Partitioner, + mergeValues: (V, V) => V): IndexableRDD[K,V] = { + + val partitioned: RDD[(K, V)] = elems.partitionBy(partitioner) + new IndexableRDD(ik, IndexedRDD(keysToIndex(ik,partitioned),partitioner,mergeValues)) + } +} diff --git a/src/test/scala/edu/berkeley/cs/amplab/spark/indexedrdd/IndexableRDDSuite.scala b/src/test/scala/edu/berkeley/cs/amplab/spark/indexedrdd/IndexableRDDSuite.scala new file mode 100644 index 0000000..072116d --- /dev/null +++ b/src/test/scala/edu/berkeley/cs/amplab/spark/indexedrdd/IndexableRDDSuite.scala @@ -0,0 +1,98 @@ +package edu.berkeley.cs.amplab.spark.indexedrdd + + +import edu.berkeley.cs.amplab.spark.indexedrdd.IndexableRDD.{Id, IndexableKey} +import org.apache.spark.SparkContext +import org.scalatest.FunSuite + +/** + * Created by mader on 2/23/15. + */ +class IndexableRDDSuite extends FunSuite with SharedSparkContext { + import edu.berkeley.cs.amplab.spark.indexedrdd.IndexableRDDSuite._ + + val n = 10 + + test("get, multiget") { + + val ps = pairsP3D(sc, n).cache + assert(ps.get(Point3D(0,0,0)) === Some(0), "Get the first point") + assert(ps.get(Point3D(1,1,1)) === Some(1), "Get the second point") + assert(ps.get(Point3D(9,9,9)) === Some(9), "Get one of the points at the end") + } + + test("put, multiput") { + val ps = pairsIds(sc, n).cache + val plusOne = ps.put(IdString("-1"), -1) + assert(plusOne.count === n+1+1,"Adding a single element that was not in the list before") + assert(plusOne.get(IdString("-1")) === Some(-1),"Make sure the element is correct") + + val plusMany = ps.multiput(Map(IdString("0") -> 10, IdString("2") -> 10), + (_,a: Int, b: Int) => a+b) + assert(plusMany.count===n+1,"No new elements should have been added") + assert(plusMany.get(IdString("0")) === Some(10),"New base value should be") + assert(plusMany.get(IdString("1")) === Some(1),"New base value should be") + assert(plusMany.get(IdString("2")) === Some(12),"New second value should be") + } + + test("filter on value") { + val ps = IndexableRDDSuite.pairsP3D(sc, n) + val evens = ps.filter(q => ((q._2 % 2) == 0)).cache() + + assert(evens.get(Point3D(2,2,2)) === Some(2),"Contains an even point") + assert(evens.get(Point3D(1,1,1)) === None,"Contains no odd points") + assert(evens.count===Math.ceil((n+1.)/2).toInt,"Check the length") + } + test("filter on key") { + val ps = IndexableRDDSuite.pairsP3D(sc, n) + val limitVal = 5 + val lessThan5 = ps.filter(q => (q._1.x<=limitVal)).cache() + + assert(lessThan5.get(Point3D(1,1,1)) === Some(1),"Contains a point below "+limitVal) + assert(lessThan5.get(Point3D(6,6,6)) === None,"Contains no point above "+limitVal) + assert(lessThan5.count===limitVal+1,"Check the length") + } + + +} + + +/** + * Declared outside of test suite class to avoid closure capture (just like IndexedRDDSuite + */ +object IndexableRDDSuite { + + case class IdString(x: String) + + val idsKey = new IndexableKey[IdString] { + override def toId(key: IdString): Id = key.x.toLong + override def fromId(id: Id): IdString = IdString(id.toString) + } + + case class Point3D(x: Int, y: Int, z: Int) + + + /** + * offers translation for positive Point3D between 0,0,0 and 99,99,99 + */ + val p3dKey = new IndexableKey[Point3D] { + override def toId(key: Point3D): Id = (100*key.z+key.y)*100+key.x + + override def fromId(id: Id): Point3D = + Point3D( + Math.floor(id/1000).toInt, + Math.floor((id % 1000)/100).toInt, + (id%100).toInt + ) + } + def pairsP3D(sc: SparkContext, n: Int) = { + IndexableRDD(p3dKey,sc.parallelize((0 to n).map(x => (Point3D(x,x,x), x)), 5)) + } + def pairsIds(sc: SparkContext, n: Int) = { + IndexableRDD(idsKey,sc.parallelize((0 to n).map(x => (IdString(x.toString()), x)), 5)) + } + + class SumFunction[K] extends Function3[K, Int, Int, Int] with Serializable { + def apply(junk: K, a: Int, b: Int) = a + b + } +} \ No newline at end of file