Skip to content

Commit fc0a147

Browse files
JerryLeadankurdave
authored andcommitted
[SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage
The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672 Iterative GraphX applications always have long lineage, while checkpoint() on EdgeRDD and VertexRDD themselves cannot shorten the lineage. In contrast, if we perform checkpoint() on their ParitionsRDD, the long lineage can be cut off. Moreover, the existing operations such as cache() in this code is performed on the PartitionsRDD, so checkpoint() should do the same way. More details and explanation can be found in the JIRA. Author: JerryLead <[email protected]> Author: Lijie Xu <[email protected]> Closes apache#3549 from JerryLead/my_graphX_checkpoint and squashes the following commits: d1aa8d8 [JerryLead] Perform checkpoint() on PartitionsRDD not VertexRDD and EdgeRDD themselves ff08ed4 [JerryLead] Merge branch 'master' of https://github.com/apache/spark c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark 52799e3 [Lijie Xu] Merge pull request #1 from apache/master
1 parent 5da21f0 commit fc0a147

File tree

2 files changed

+8
-0
lines changed

2 files changed

+8
-0
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
7070
this
7171
}
7272

73+
override def checkpoint() = {
74+
partitionsRDD.checkpoint()
75+
}
76+
7377
/** The number of edges in the RDD. */
7478
override def count(): Long = {
7579
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ class VertexRDDImpl[VD] private[graphx] (
7171
this
7272
}
7373

74+
override def checkpoint() = {
75+
partitionsRDD.checkpoint()
76+
}
77+
7478
/** The number of vertices in the RDD. */
7579
override def count(): Long = {
7680
partitionsRDD.map(_.size).reduce(_ + _)

0 commit comments

Comments
 (0)