Skip to content

Commit 52dc7f7

Browse files
committed
pass mergeFunc to VertexPartitionBase, where merge is handled
1 parent 20d80a3 commit 52dc7f7

File tree

2 files changed

+7
-7
lines changed

2 files changed

+7
-7
lines changed

graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -410,10 +410,9 @@ object VertexRDD {
410410
def apply[VD: ClassTag](
411411
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
412412
): VertexRDD[VD] = {
413-
val verticesDedup = vertices.reduceByKey((VD1, VD2) => mergeFunc(VD1, VD2))
414-
val vPartitioned: RDD[(VertexId, VD)] = verticesDedup.partitioner match {
415-
case Some(p) => verticesDedup
416-
case None => verticesDedup.copartitionWithVertices(new HashPartitioner(verticesDedup.partitions.size))
413+
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
414+
case Some(p) => vertices
415+
case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
417416
}
418417
val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
419418
val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {

graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,14 @@ object ShippableVertexPartition {
4040

4141
/**
4242
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
43-
* table, filling in missing vertices mentioned in the routing table using `defaultVal`.
43+
* table, filling in missing vertices mentioned in the routing table using `defaultVal`,
44+
* and merging duplicate vertex atrribute with mergeFunc.
4445
*/
4546
def apply[VD: ClassTag](
46-
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
47+
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD, mergeFunc: (VD, VD) => VD)
4748
: ShippableVertexPartition[VD] = {
4849
val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
49-
val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a)
50+
val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, mergeFunc)
5051
new ShippableVertexPartition(index, values, mask, routingTable)
5152
}
5253

0 commit comments

Comments
 (0)