Skip to content

Commit 3bbbdd8

Browse files
larryxiaoankurdave
authored andcommitted
[SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc
VertexRDD.apply had a bug where it ignored the merge function for duplicate vertices and instead used whichever vertex attribute occurred first. This commit fixes the bug by passing the merge function through to ShippableVertexPartition.apply, which merges any duplicates using the merge function and then fills in missing vertices using the specified default vertex attribute. This commit also adds a unit test for VertexRDD.apply. Author: Larry Xiao <[email protected]> Author: Blie Arkansol <[email protected]> Author: Ankur Dave <[email protected]> Closes #1903 from larryxiao/2062 and squashes the following commits: 625aa9d [Blie Arkansol] Merge pull request #1 from ankurdave/SPARK-2062 476770b [Ankur Dave] ShippableVertexPartition.initFrom: Don't run mergeFunc on default values 614059f [Larry Xiao] doc update: note about the default null value vertices construction dfdb3c9 [Larry Xiao] minor fix 1c70366 [Larry Xiao] scalastyle check: wrap line, parameter list indent 4 spaces e4ca697 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc 6a35ea8 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc 4fbc29c [Blie Arkansol] undo unnecessary change efae765 [Larry Xiao] fix mistakes: should be able to call with or without mergeFunc b2422f9 [Larry Xiao] Merge branch '2062' of github.com:larryxiao/spark into 2062 52dc7f7 [Larry Xiao] pass mergeFunc to VertexPartitionBase, where merge is handled 581e9ee [Larry Xiao] TODO: VertexRDDSuite 20d80a3 [Larry Xiao] [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc
1 parent e76ef5c commit 3bbbdd8

File tree

3 files changed

+36
-7
lines changed

3 files changed

+36
-7
lines changed

graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ object VertexRDD {
392392
*/
393393
def apply[VD: ClassTag](
394394
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
395-
VertexRDD(vertices, edges, defaultVal, (a, b) => b)
395+
VertexRDD(vertices, edges, defaultVal, (a, b) => a)
396396
}
397397

398398
/**
@@ -419,7 +419,7 @@ object VertexRDD {
419419
(vertexIter, routingTableIter) =>
420420
val routingTable =
421421
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
422-
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal))
422+
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal, mergeFunc))
423423
}
424424
new VertexRDD(vertexPartitions)
425425
}

graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,36 @@ private[graphx]
3636
object ShippableVertexPartition {
3737
/** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */
3838
def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] =
39-
apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD])
39+
apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD], (a, b) => a)
4040

4141
/**
4242
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
4343
* table, filling in missing vertices mentioned in the routing table using `defaultVal`.
4444
*/
4545
def apply[VD: ClassTag](
4646
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
47-
: ShippableVertexPartition[VD] = {
48-
val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
49-
val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a)
50-
new ShippableVertexPartition(index, values, mask, routingTable)
47+
: ShippableVertexPartition[VD] =
48+
apply(iter, routingTable, defaultVal, (a, b) => a)
49+
50+
/**
51+
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
52+
* table, filling in missing vertices mentioned in the routing table using `defaultVal`,
53+
* and merging duplicate vertex atrribute with mergeFunc.
54+
*/
55+
def apply[VD: ClassTag](
56+
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
57+
mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
58+
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
59+
// Merge the given vertices using mergeFunc
60+
iter.foreach { pair =>
61+
map.setMerge(pair._1, pair._2, mergeFunc)
62+
}
63+
// Fill in missing vertices mentioned in the routing table
64+
routingTable.iterator.foreach { vid =>
65+
map.changeValue(vid, defaultVal, identity)
66+
}
67+
68+
new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
5169
}
5270

5371
import scala.language.implicitConversions

graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,15 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
9999
}
100100
}
101101

102+
test("mergeFunc") {
103+
// test to see if the mergeFunc is working correctly
104+
withSpark { sc =>
105+
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
106+
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
107+
val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
108+
// test merge function
109+
assert(rdd.collect.toSet == Set((0L, 0), (1L, 3), (2L, 9)))
110+
}
111+
}
112+
102113
}

0 commit comments

Comments
 (0)