@@ -97,7 +97,8 @@ object PageRank extends Logging {
97
97
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
98
98
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
99
99
val rankUpdates = rankGraph.mapReduceTriplets[Double ](
100
- e => Iterator ((e.dstId, e.srcAttr * e.attr)), _ + _, mapUsesSrcAttr = true , mapUsesDstAttr = false )
100
+ e => Iterator ((e.dstId, e.srcAttr * e.attr)), _ + _,
101
+ mapUsesSrcAttr = true , mapUsesDstAttr = false )
101
102
102
103
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
103
104
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
@@ -171,7 +172,8 @@ object PageRank extends Logging {
171
172
172
173
// Execute a dynamic version of Pregel.
173
174
Pregel (pagerankGraph, initialMessage, activeDirection = EdgeDirection .Out )(
174
- vertexProgram, sendMessage, messageCombiner, sendMsgUsesSrcAttr = true , sendMsgUsesDstAttr = false )
175
+ vertexProgram, sendMessage, messageCombiner,
176
+ sendMsgUsesSrcAttr = true , sendMsgUsesDstAttr = false )
175
177
.mapVertices((vid, attr) => attr._1)
176
178
} // end of deltaPageRank
177
179
}
0 commit comments