Skip to content

Commit 1fec3ce

Browse files
Yves RaimondDB Tsai
authored andcommitted
[SPARK-11496][GRAPHX] Parallel implementation of personalized pagerank
(Updated version of [PR-9457](apache#9457), rebased on latest Spark master, and using mllib-local). This implements a parallel version of personalized pagerank, which runs all propagations for a list of source vertices in parallel. I ran a few benchmarks on the full [DBpedia](http://dbpedia.org/) graph. When running personalized pagerank for only one source node, the existing implementation is twice as fast as the parallel one (because of the SparseVector overhead). However for 10 source nodes, the parallel implementation is four times as fast. When increasing the number of source nodes, this difference becomes even greater. ![image](https://cloud.githubusercontent.com/assets/2491/10927702/dd82e4fa-8256-11e5-89a8-4799b407f502.png) Author: Yves Raimond <[email protected]> Closes apache#14998 from moustaki/parallel-ppr.
1 parent 3354917 commit 1fec3ce

File tree

4 files changed

+121
-1
lines changed

4 files changed

+121
-1
lines changed

graphx/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@
4646
<type>test-jar</type>
4747
<scope>test</scope>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.apache.spark</groupId>
51+
<artifactId>spark-mllib-local_${scala.binary.version}</artifactId>
52+
<version>${project.version}</version>
53+
</dependency>
4954
<dependency>
5055
<groupId>org.apache.xbean</groupId>
5156
<artifactId>xbean-asm5-shaded</artifactId>

graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ package org.apache.spark.graphx
2020
import scala.reflect.ClassTag
2121
import scala.util.Random
2222

23-
import org.apache.spark.SparkException
2423
import org.apache.spark.graphx.lib._
24+
import org.apache.spark.ml.linalg.Vector
2525
import org.apache.spark.rdd.RDD
26+
import org.apache.spark.SparkException
2627

2728
/**
2829
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
@@ -391,6 +392,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
391392
PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src))
392393
}
393394

395+
/**
396+
* Run parallel personalized PageRank for a given array of source vertices, such
397+
* that all random walks are started relative to the source vertices
398+
*/
399+
def staticParallelPersonalizedPageRank(sources: Array[VertexId], numIter: Int,
400+
resetProb: Double = 0.15) : Graph[Vector, Double] = {
401+
PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources)
402+
}
403+
394404
/**
395405
* Run Personalized PageRank for a fixed number of iterations with
396406
* with all iterations originating at the source node

graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ package org.apache.spark.graphx.lib
1919

2020
import scala.reflect.ClassTag
2121

22+
import breeze.linalg.{Vector => BV}
23+
2224
import org.apache.spark.graphx._
2325
import org.apache.spark.internal.Logging
26+
import org.apache.spark.ml.linalg.{Vector, Vectors}
2427

2528
/**
2629
* PageRank algorithm implementation. There are two implementations of PageRank implemented.
@@ -162,6 +165,84 @@ object PageRank extends Logging {
162165
rankGraph
163166
}
164167

168+
/**
169+
* Run Personalized PageRank for a fixed number of iterations, for a
170+
* set of starting nodes in parallel. Returns a graph with vertex attributes
171+
* containing the pagerank relative to all starting nodes (as a sparse vector) and
172+
* edge attributes the normalized edge weight
173+
*
174+
* @tparam VD The original vertex attribute (not used)
175+
* @tparam ED The original edge attribute (not used)
176+
*
177+
* @param graph The graph on which to compute personalized pagerank
178+
* @param numIter The number of iterations to run
179+
* @param resetProb The random reset probability
180+
* @param sources The list of sources to compute personalized pagerank from
181+
* @return the graph with vertex attributes
182+
* containing the pagerank relative to all starting nodes (as a sparse vector) and
183+
* edge attributes the normalized edge weight
184+
*/
185+
def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
186+
numIter: Int, resetProb: Double = 0.15,
187+
sources: Array[VertexId]): Graph[Vector, Double] = {
188+
// TODO if one sources vertex id is outside of the int range
189+
// we won't be able to store its activations in a sparse vector
190+
val zero = Vectors.sparse(sources.size, List()).asBreeze
191+
val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
192+
val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze
193+
(vid, v)
194+
}.toMap
195+
val sc = graph.vertices.sparkContext
196+
val sourcesInitMapBC = sc.broadcast(sourcesInitMap)
197+
// Initialize the PageRank graph with each edge attribute having
198+
// weight 1/outDegree and each source vertex with attribute 1.0.
199+
var rankGraph = graph
200+
// Associate the degree with each vertex
201+
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
202+
// Set the weight on the edges based on the degree
203+
.mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
204+
.mapVertices { (vid, attr) =>
205+
if (sourcesInitMapBC.value contains vid) {
206+
sourcesInitMapBC.value(vid)
207+
} else {
208+
zero
209+
}
210+
}
211+
212+
var i = 0
213+
while (i < numIter) {
214+
val prevRankGraph = rankGraph
215+
// Propagates the message along outbound edges
216+
// and adding start nodes back in with activation resetProb
217+
val rankUpdates = rankGraph.aggregateMessages[BV[Double]](
218+
ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
219+
(a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src)
220+
221+
rankGraph = rankGraph.joinVertices(rankUpdates) {
222+
(vid, oldRank, msgSum) =>
223+
val popActivations: BV[Double] = msgSum :* (1.0 - resetProb)
224+
val resetActivations = if (sourcesInitMapBC.value contains vid) {
225+
sourcesInitMapBC.value(vid)
226+
} else {
227+
zero
228+
}
229+
popActivations :+ resetActivations
230+
}.cache()
231+
232+
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
233+
prevRankGraph.vertices.unpersist(false)
234+
prevRankGraph.edges.unpersist(false)
235+
236+
logInfo(s"Parallel Personalized PageRank finished iteration $i.")
237+
238+
i += 1
239+
}
240+
241+
rankGraph.mapVertices { (vid, attr) =>
242+
Vectors.fromBreeze(attr)
243+
}
244+
}
245+
165246
/**
166247
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
167248
* PageRank and edge attributes containing the normalized edge weight.

graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,29 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
118118
val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
119119
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
120120

121+
val parallelStaticRanks1 = starGraph
122+
.staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices {
123+
case (vertexId, vector) => vector(0)
124+
}.vertices.cache()
125+
assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol)
126+
127+
val parallelStaticRanks2 = starGraph
128+
.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
129+
case (vertexId, vector) => vector(0)
130+
}.vertices.cache()
131+
assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol)
132+
121133
// We have one outbound edge from 1 to 0
122134
val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
123135
.vertices.cache()
124136
val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
137+
val otherParallelStaticRanks2 = starGraph
138+
.staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
139+
case (vertexId, vector) => vector(1)
140+
}.vertices.cache()
125141
assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
142+
assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol)
143+
assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol)
126144
}
127145
} // end of test Star PersonalPageRank
128146

@@ -177,6 +195,12 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
177195
val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices
178196

179197
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
198+
199+
val parallelStaticRanks = chain
200+
.staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices {
201+
case (vertexId, vector) => vector(0)
202+
}.vertices.cache()
203+
assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
180204
}
181205
}
182206
}

0 commit comments

Comments
 (0)