Skip to content

Commit 20d80a3

Browse files
committed
[SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc
create verticesDeduplicate with reduceByKey, using mergeFunc then proceed with verticesDedup
1 parent b715aa0 commit 20d80a3

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -410,9 +410,10 @@ object VertexRDD {
410410
def apply[VD: ClassTag](
411411
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
412412
): VertexRDD[VD] = {
413-
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
414-
case Some(p) => vertices
415-
case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
413+
val verticesDedup = vertices.reduceByKey((VD1, VD2) => mergeFunc(VD1, VD2))
414+
val vPartitioned: RDD[(VertexId, VD)] = verticesDedup.partitioner match {
415+
case Some(p) => verticesDedup
416+
case None => verticesDedup.copartitionWithVertices(new HashPartitioner(verticesDedup.partitions.size))
416417
}
417418
val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
418419
val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {

0 commit comments

Comments
 (0)