Skip to content

Commit 45f4c66

Browse files
Brennon Yorkankurdave
authored andcommitted
[SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in VertexRDD
Changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]. This change maintains backwards compatibility and better unifies the VertexRDD methods to match each other. Author: Brennon York <[email protected]> Closes #4733 from brennonyork/SPARK-5922 and squashes the following commits: e800f08 [Brennon York] fixed merge conflicts b9274af [Brennon York] fixed merge conflicts f86375c [Brennon York] fixed minor include line 398ddb4 [Brennon York] fixed merge conflicts aac1810 [Brennon York] updated to aggregateUsingIndex and added test to ensure that method works properly 2af0b88 [Brennon York] removed deprecation line 753c963 [Brennon York] fixed merge conflicts and set preference to use the diff(other: VertexRDD[VD]) method 2c678c6 [Brennon York] added mima exclude to exclude new public diff method from VertexRDD 93186f3 [Brennon York] added back the original diff method to sustain binary compatibility f18356e [Brennon York] changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]
1 parent aa6536f commit 45f4c66

File tree

4 files changed

+29
-0
lines changed

4 files changed

+29
-0
lines changed

graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,15 @@ abstract class VertexRDD[VD](
121121
*/
122122
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
123123

124+
/**
125+
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
126+
* differing values; for values that are different, keeps the values from `other`. This is
127+
* only guaranteed to work if the VertexRDDs share a common ancestor.
128+
*
129+
* @param other the other RDD[(VertexId, VD)] with which to diff against.
130+
*/
131+
def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
132+
124133
/**
125134
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
126135
* differing values; for values that are different, keeps the values from `other`. This is

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] (
103103
override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
104104
this.mapVertexPartitions(_.map(f))
105105

106+
override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
107+
diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
108+
}
109+
106110
override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
107111
val otherPartition = other match {
108112
case other: VertexRDD[_] if this.partitioner == other.partitioner =>

graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
2020
import org.scalatest.FunSuite
2121

2222
import org.apache.spark.{HashPartitioner, SparkContext}
23+
import org.apache.spark.rdd.RDD
2324
import org.apache.spark.storage.StorageLevel
2425

2526
class VertexRDDSuite extends FunSuite with LocalSparkContext {
@@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
5859
}
5960
}
6061

62+
test("diff with RDD[(VertexId, VD)]") {
63+
withSpark { sc =>
64+
val n = 100
65+
val verts = vertices(sc, n).cache()
66+
val flipEvens: RDD[(VertexId, Int)] =
67+
sc.parallelize(0L to 100L)
68+
.map(id => if (id % 2 == 0) (id, -id.toInt) else (id, id.toInt)).cache()
69+
// diff should keep only the changed vertices
70+
assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
71+
}
72+
}
73+
6174
test("diff vertices with the non-equal number of partitions") {
6275
withSpark { sc =>
6376
val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))

project/MimaExcludes.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ object MimaExcludes {
181181
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
182182
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
183183
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
184+
) ++ Seq(
185+
// SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
186+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff")
184187
)
185188

186189
case v if v.startsWith("1.2") =>

0 commit comments

Comments
 (0)