Skip to content

Commit 1d6abe3

Browse files
ankurdaverxin
authored andcommitted
Mark all fields of EdgePartition, Graph, and GraphOps transient
These classes are only serializable to work around closure capture, so their fields should all be marked `@transient` to avoid wasteful serialization. This PR supersedes apache#519 and fixes the same bug. Author: Ankur Dave <[email protected]> Closes apache#520 from ankurdave/graphx-transient and squashes the following commits: 6431760 [Ankur Dave] Mark all fields of EdgePartition, Graph, and GraphOps `@transient`
1 parent d485eec commit 1d6abe3

File tree

3 files changed

+12
-12
lines changed

3 files changed

+12
-12
lines changed

graphx/src/main/scala/org/apache/spark/graphx/Graph.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
4646
* @note vertex ids are unique.
4747
* @return an RDD containing the vertices in this graph
4848
*/
49-
val vertices: VertexRDD[VD]
49+
@transient val vertices: VertexRDD[VD]
5050

5151
/**
5252
* An RDD containing the edges and their associated attributes. The entries in the RDD contain
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
5959
* along with their vertex data.
6060
*
6161
*/
62-
val edges: EdgeRDD[ED]
62+
@transient val edges: EdgeRDD[ED]
6363

6464
/**
6565
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
@@ -77,7 +77,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
7777
* val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
7878
* }}}
7979
*/
80-
val triplets: RDD[EdgeTriplet[VD, ED]]
80+
@transient val triplets: RDD[EdgeTriplet[VD, ED]]
8181

8282
/**
8383
* Caches the vertices and edges associated with this graph at the specified storage level.

graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,28 +34,28 @@ import scala.util.Random
3434
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {
3535

3636
/** The number of edges in the graph. */
37-
lazy val numEdges: Long = graph.edges.count()
37+
@transient lazy val numEdges: Long = graph.edges.count()
3838

3939
/** The number of vertices in the graph. */
40-
lazy val numVertices: Long = graph.vertices.count()
40+
@transient lazy val numVertices: Long = graph.vertices.count()
4141

4242
/**
4343
* The in-degree of each vertex in the graph.
4444
* @note Vertices with no in-edges are not returned in the resulting RDD.
4545
*/
46-
lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
46+
@transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
4747

4848
/**
4949
* The out-degree of each vertex in the graph.
5050
* @note Vertices with no out-edges are not returned in the resulting RDD.
5151
*/
52-
lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
52+
@transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
5353

5454
/**
5555
* The degree of each vertex in the graph.
5656
* @note Vertices with no edges are not returned in the resulting RDD.
5757
*/
58-
lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
58+
@transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
5959

6060
/**
6161
* Computes the neighboring vertex degrees.

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
3434
*/
3535
private[graphx]
3636
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
37-
val srcIds: Array[VertexId],
38-
val dstIds: Array[VertexId],
39-
val data: Array[ED],
40-
val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
37+
@transient val srcIds: Array[VertexId],
38+
@transient val dstIds: Array[VertexId],
39+
@transient val data: Array[ED],
40+
@transient val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
4141

4242
/**
4343
* Reverse all the edges in this partition.

0 commit comments

Comments
 (0)