-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-3936] Remove Bytecode Inspection for Join Elimination #2815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…explicit join elimination flags.
@ankurdave and @rxin I have not updated the applications to use the new explicit flags. I will do that in this PR pending approval for the API changes. |
* @param mapUsesSrcAttr indicates whether the source vertex attribute should be included in | ||
* the triplet. Setting this to false can improve performance if the source vertex attribute | ||
* is not needed. | ||
* @param mapUsesSrcAttr indicates whether the destination vertex attribute should be included in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mapUsesSrcAttr -> mapUsesDstAttr
QA tests have started for PR 2815 at commit
|
QA tests have finished for PR 2815 at commit
|
Jenkins, test this please. |
QA tests have started for PR 2815 at commit
|
QA tests have started for PR 2815 at commit
|
QA tests have finished for PR 2815 at commit
|
QA tests have finished for PR 2815 at commit
|
QA tests have started for PR 2815 at commit
|
QA tests have finished for PR 2815 at commit
|
What is the status on this patch? I would like to merge it soon so that the python GraphX API can support these additional flags. |
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { | ||
mapTriplets((pid, iter) => iter.map(map)) | ||
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2, | ||
mapUsesSrcAttr: Boolean = true, mapUsesDstAttr: Boolean = true): Graph[VD, ED2] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd create an enum for that
I added the |
At this point I could also imagine actually having a separate function closure for each version. mapTriplets(f: Edge => ED2)
mapTriplets(f: SrcEdge => ED2)
mapTriplets(f: DstEdge => ED2)
mapTriplets(f: Triplet => ED2) Though to do this would require users to annotate their functions: g.mapTriplets( (t: SrcEdge) => t.src ) What do you all think? |
Test build #22667 has finished for PR 2815 at commit
|
How about mapSourceTriplets, mapDst ... |
vertices.cache() | ||
// Filter the vertices, reusing the partitioner and the index from this graph | ||
val newVerts = vertices.mapVertexPartitions(_.filter(vpred)) | ||
// Filter the triplets. We must always upgrade the triplet view fully because vpred always runs | ||
// on both src and dst vertices | ||
replicatedVertexView.upgrade(vertices, true, true) | ||
replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this change is correct - see the comment on the line above.
This design looks good to me. The alternatives are all worse: @jegonzal It won't work to overload on the function type like that because of type erasure:
@rxin That will prevent us from passing triplet field information along, as we do in Pregel. |
* is not needed. | ||
* @param mapUsesDstAttr indicates whether the destination vertex attribute should be included in | ||
* the triplet. Setting this to false can improve performance if the destination vertex attribute | ||
* is not needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These comments are out of date now that we have tripletFields
Let's not change the old API. Just deprecate it, and then add this tripletFields thing to the new API Ankur is going to submit. |
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements: 1. Messages are sent using an imperative interface based on EdgeContext rather than by returning an iterator of messages. This is more efficient, providing a 20.2% speedup on PageRank over apache#3054 (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 403 s to 322 s). 2. Rather than attempting bytecode inspection, the required triplet fields must be explicitly specified by the user by passing a TripletFields object. This fixes SPARK-3936. Subsumes apache#2815.
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements: 1. Messages are sent using an imperative interface based on EdgeContext rather than by returning an iterator of messages. 2. Rather than attempting bytecode inspection, the required triplet fields must be explicitly specified by the user by passing a TripletFields object. This fixes SPARK-3936. Additionally, this PR includes the following optimizations for aggregateMessages and EdgePartition: 1. EdgePartition now stores local vertex ids instead of global ids. This avoids hash lookups when looking up vertex attributes and aggregating messages. 2. Internal iterators in aggregateMessages are inlined into a while loop. In total, these optimizations were tested to provide a 37% speedup on PageRank (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 513 s to 322 s). Subsumes apache#2815. Also fixes SPARK-4173. Author: Ankur Dave <[email protected]> Closes apache#3100 from ankurdave/aggregateMessages and squashes the following commits: f5b65d0 [Ankur Dave] Address @rxin comments on apache#3054 and apache#3100 1e80aca [Ankur Dave] Add aggregateMessages, which supersedes mapReduceTriplets 194a2df [Ankur Dave] Test triplet iterator in EdgePartition serialization test e0f8ecc [Ankur Dave] Take activeSet in ExistingEdgePartitionBuilder c85076d [Ankur Dave] Readability improvements b567be2 [Ankur Dave] iter.foreach -> while loop 4a566dc [Ankur Dave] Optimizations for mapReduceTriplets and EdgePartition
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements: 1. Messages are sent using an imperative interface based on EdgeContext rather than by returning an iterator of messages. 2. Rather than attempting bytecode inspection, the required triplet fields must be explicitly specified by the user by passing a TripletFields object. This fixes SPARK-3936. Additionally, this PR includes the following optimizations for aggregateMessages and EdgePartition: 1. EdgePartition now stores local vertex ids instead of global ids. This avoids hash lookups when looking up vertex attributes and aggregating messages. 2. Internal iterators in aggregateMessages are inlined into a while loop. In total, these optimizations were tested to provide a 37% speedup on PageRank (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 513 s to 322 s). Subsumes #2815. Also fixes SPARK-4173. Author: Ankur Dave <[email protected]> Closes #3100 from ankurdave/aggregateMessages and squashes the following commits: f5b65d0 [Ankur Dave] Address @rxin comments on #3054 and #3100 1e80aca [Ankur Dave] Add aggregateMessages, which supersedes mapReduceTriplets 194a2df [Ankur Dave] Test triplet iterator in EdgePartition serialization test e0f8ecc [Ankur Dave] Take activeSet in ExistingEdgePartitionBuilder c85076d [Ankur Dave] Readability improvements b567be2 [Ankur Dave] iter.foreach -> while loop 4a566dc [Ankur Dave] Optimizations for mapReduceTriplets and EdgePartition (cherry picked from commit faeb41d) Signed-off-by: Reynold Xin <[email protected]>
Is the Pregel API going to be updated so that it uses |
We should update Pregel to use aggregateMessages instead. Would you like to submit a patch for that? |
@rxin @balduz We can update Pregel to use aggregateMessages without changing the API, but it won't provide any performance benefit since we'll still be handling iterators. Ultimately, the plan is to add a new version of Pregel that uses an aggregateMessages-style interface and has some other changes to expose activeness more explicitly. |
Removing bytecode inspection from triplet operations and introducing explicit join elimination flags. The explicit flags make the join elimination more robust.