Skip to content

Commit c85076d

Browse files
committed
Readability improvements
1 parent b567be2 commit c85076d

File tree

1 file changed

+15
-4
lines changed

1 file changed

+15
-4
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ class EdgePartition[
9898
activeSet)
9999
}
100100

101-
def srcIds(i: Int): VertexId = local2global(localSrcIds(i))
101+
private def srcIds(pos: Int): VertexId = local2global(localSrcIds(pos))
102102

103-
def dstIds(i: Int): VertexId = local2global(localDstIds(i))
103+
private def dstIds(pos: Int): VertexId = local2global(localDstIds(pos))
104104

105105
/** Look up vid in activeSet, throwing an exception if it is None. */
106106
def isActive(vid: VertexId): Boolean = {
@@ -231,23 +231,34 @@ class EdgePartition[
231231
global2local, local2global, vertexAttrs)
232232
var currSrcId: VertexId = null.asInstanceOf[VertexId]
233233
var currDstId: VertexId = null.asInstanceOf[VertexId]
234+
var currLocalSrcId = -1
235+
var currLocalDstId = -1
234236
var currAttr: ED = null.asInstanceOf[ED]
237+
// Iterate through the edges, accumulating runs of identical edges using the curr* variables and
238+
// releasing them to the builder when we see the beginning of the next run
235239
var i = 0
236240
while (i < size) {
237241
if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
242+
// This edge should be accumulated into the existing run
238243
currAttr = merge(currAttr, data(i))
239244
} else {
245+
// This edge starts a new run of edges
240246
if (i > 0) {
241-
builder.add(currSrcId, currDstId, localSrcIds(i - 1), localDstIds(i - 1), currAttr)
247+
// First release the existing run to the builder
248+
builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
242249
}
250+
// Then start accumulating for a new run
243251
currSrcId = srcIds(i)
244252
currDstId = dstIds(i)
253+
currLocalSrcId = localSrcIds(i)
254+
currLocalDstId = localDstIds(i)
245255
currAttr = data(i)
246256
}
247257
i += 1
248258
}
259+
// Finally, release the last accumulated run
249260
if (size > 0) {
250-
builder.add(currSrcId, currDstId, localSrcIds(i - 1), localDstIds(i - 1), currAttr)
261+
builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
251262
}
252263
builder.toEdgePartition.withActiveSet(activeSet)
253264
}

0 commit comments

Comments
 (0)