diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 871e81f8d245c..86834df8cce95 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -204,7 +204,7 @@ class EdgePartition[ if (size > 0) { builder.add(currSrcId, currDstId, currAttr) } - builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet) + builder.toEdgePartitionWithoutSort.withVertices(vertices).withActiveSet(activeSet) } /** @@ -238,7 +238,7 @@ class EdgePartition[ } i += 1 } - builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet) + builder.toEdgePartitionWithoutSort.withVertices(vertices).withActiveSet(activeSet) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index ecb49bef42e45..e2cfc079f748c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -69,4 +69,36 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla new EdgePartition(srcIds, dstIds, data, index, vertices) } + + /** If every edge add to edges in Edge.lexicographicOrdering, we don't need to sort edgeArray. */ + def toEdgePartitionWithoutSort: EdgePartition[ED, VD] = { + val edgeArray = edges.trim().array + val srcIds = edgeArray.map(edge => edge.srcId) + val dstIds = edgeArray.map(edge => edge.dstId) + val data = edgeArray.map(edge => edge.attr) + val index = new PrimitiveKeyOpenHashMap[VertexId, Int] + + val edgeNum = edgeArray.length + if (edgeNum > 0) { + index.update(srcIds(0), 0) + var currSrcId: VertexId = srcIds(0) + var i = 0 + while (i < edgeNum) { + if (srcIds(i) != currSrcId) { + currSrcId = srcIds(i) + index.update(currSrcId, i) + } + i += 1 + } + } + + // Create and populate a VertexPartition with vids from the edges, but no attributes + val vidsIter = srcIds.iterator ++ dstIds.iterator + val vertexIds = new OpenHashSet[VertexId] + vidsIter.foreach(vid => vertexIds.add(vid)) + val vertices = new VertexPartition( + vertexIds, new Array[VD](vertexIds.capacity), vertexIds.getBitSet) + + new EdgePartition(srcIds, dstIds, data, index, vertices) + } }