Skip to content

[SPARK-2174][MLLIB] treeReduce and treeAggregate #1110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from

Conversation

mengxr
Copy link
Contributor

@mengxr mengxr commented Jun 17, 2014

In reduce and aggregate, the driver node spends linear time on the number of partitions. It becomes a bottleneck when there are many partitions and the data from each partition is big.

SPARK-1485 (#506) tracks the progress of implementing AllReduce on Spark. I did several implementations including butterfly, reduce + broadcast, and treeReduce + broadcast. treeReduce + BT broadcast seems to be right way to go for Spark. Using binary tree may introduce some overhead in communication, because the driver still need to coordinate on data shuffling. In my experiments, n -> sqrt(n) -> 1 gives the best performance in general, which is why I set "depth = 2" in MLlib algorithms. But it certainly needs more testing.

I left treeReduce and treeAggregate public for easy testing. Some numbers from a test on 32-node m3.2xlarge cluster.

code:

import breeze.linalg._
import org.apache.log4j._

Logger.getRootLogger.setLevel(Level.OFF)

for (n <- Seq(1, 10, 100, 1000, 10000, 100000, 1000000)) {
  val vv = sc.parallelize(0 until 1024, 1024).map(i => DenseVector.zeros[Double](n))
  var start = System.nanoTime(); vv.treeReduce(_ + _, 2); println((System.nanoTime() - start) / 1e9)
  start = System.nanoTime(); vv.reduce(_ + _); println((System.nanoTime() - start) / 1e9)
}

out:

n treeReduce(,2) reduce
10 0.215538731 0.204206899
100 0.278405907 0.205732582
1000 0.208972182 0.214298272
10000 0.194792071 0.349353687
100000 0.347683285 6.086671892
1000000 2.589350682 66.572906702

CC: @pwendell

This is clearly more scalable than the default implementation. My question is whether we should use this implementation in reduce and aggregate or put them as separate methods. The concern is that users may use reduce and aggregate as collect, where having multiple stages doesn't reduce the data size. However, in this case, collect is more appropriate.

@dbtsai
Copy link
Member

dbtsai commented Jul 1, 2014

We benchmarked treeReduce in our random forest implementation, and since the trees generated from each partition are fairly large (more than 100MB), we found that treeReduce can significantly reduce the shuffle time from 6mins to 2mins. Nice work!

(ps, 10 x r3.4xlarge Amazon machine for testing)

@mengxr
Copy link
Contributor Author

mengxr commented Jul 1, 2014

@dbtsai Thanks for testing it! I'm going to move treeReduce and treeAggregate to mllib.rdd.RDDFunctions. For normal data processing, people generally use more partitions than number of cores. In those cases, the driver can collect task result while other tasks are running. This is not the optimal case for machine learning algorithms. So I think we can keep treeReduce and treeAggregate in mllib for now.

@mengxr mengxr changed the title [WIP][SPARK-2174][MLLIB] treeReduce and treeAggregate [SPARK-2174][MLLIB] treeReduce and treeAggregate Jul 1, 2014
@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA tests have started for PR 1110. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17264/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA results for PR 1110:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17264/consoleFull

@mengxr
Copy link
Contributor Author

mengxr commented Jul 28, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA tests have started for PR 1110. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17275/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA results for PR 1110:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17275/consoleFull

* @see [[org.apache.spark.rdd.RDD#reduce]]
*/
def treeReduce(f: (T, T) => T, depth: Int): T = {
require(depth >= 1, s"Depth must be greater than 1 but got $depth.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

greater or equal to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1110. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17345/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1110. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17346/consoleFull

@rxin
Copy link
Contributor

rxin commented Jul 29, 2014

LGTM.

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1110:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17345/consoleFull

@rxin
Copy link
Contributor

rxin commented Jul 29, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1110:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17346/consoleFull

@rxin
Copy link
Contributor

rxin commented Jul 29, 2014

Merging this in master. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1110. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17347/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1110:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17347/consoleFull

@asfgit asfgit closed this in 20424da Jul 29, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
In `reduce` and `aggregate`, the driver node spends linear time on the number of partitions. It becomes a bottleneck when there are many partitions and the data from each partition is big.

SPARK-1485 (apache#506) tracks the progress of implementing AllReduce on Spark. I did several implementations including butterfly, reduce + broadcast, and treeReduce + broadcast. treeReduce + BT broadcast seems to be right way to go for Spark. Using binary tree may introduce some overhead in communication, because the driver still need to coordinate on data shuffling. In my experiments, n -> sqrt(n) -> 1 gives the best performance in general, which is why I set "depth = 2" in MLlib algorithms. But it certainly needs more testing.

I left `treeReduce` and `treeAggregate` public for easy testing. Some numbers from a test on 32-node m3.2xlarge cluster.

code:

~~~
import breeze.linalg._
import org.apache.log4j._

Logger.getRootLogger.setLevel(Level.OFF)

for (n <- Seq(1, 10, 100, 1000, 10000, 100000, 1000000)) {
  val vv = sc.parallelize(0 until 1024, 1024).map(i => DenseVector.zeros[Double](n))
  var start = System.nanoTime(); vv.treeReduce(_ + _, 2); println((System.nanoTime() - start) / 1e9)
  start = System.nanoTime(); vv.reduce(_ + _); println((System.nanoTime() - start) / 1e9)
}
~~~

out:

| n | treeReduce(,2) | reduce |
|---|---------------------|-----------|
| 10 | 0.215538731 | 0.204206899 |
| 100 | 0.278405907 | 0.205732582 |
| 1000 | 0.208972182 | 0.214298272 |
| 10000 | 0.194792071 | 0.349353687 |
| 100000 | 0.347683285 | 6.086671892 |
| 1000000 | 2.589350682 | 66.572906702 |

CC: @pwendell

This is clearly more scalable than the default implementation. My question is whether we should use this implementation in `reduce` and `aggregate` or put them as separate methods. The concern is that users may use `reduce` and `aggregate` as collect, where having multiple stages doesn't reduce the data size. However, in this case, `collect` is more appropriate.

Author: Xiangrui Meng <[email protected]>

Closes apache#1110 from mengxr/tree and squashes the following commits:

c6cd267 [Xiangrui Meng] make depth default to 2
b04b96a [Xiangrui Meng] address comments
9bcc5d3 [Xiangrui Meng] add depth for readability
7495681 [Xiangrui Meng] fix compile error
142a857 [Xiangrui Meng] merge master
d58a087 [Xiangrui Meng] move treeReduce and treeAggregate to mllib
8a2a59c [Xiangrui Meng] Merge branch 'master' into tree
be6a88a [Xiangrui Meng] use treeAggregate in mllib
0f94490 [Xiangrui Meng] add docs
eb71c33 [Xiangrui Meng] add treeReduce
fe42a5e [Xiangrui Meng] add treeAggregate
@debasish83
Copy link

debasish83 commented Jun 5, 2016

@mengxr say I have 20 nodes and 16 cores on each node, do you recommend running treeReduce with 320 partitions and OpenBLAS with numThreads=1 on each partition for SeqOp OR treeReduce with 20 partitions and OpenBLAS with numThreads=16 on each partition for SeqOp...Do you have plans on further improvement ideas of decreasing network shuffle using treeReduce/treeAggregate or if there is a JIRA open so that we can move the discussion on it ? Looks like shuffle is compressed by default on Spark using snappy already...do you recommend compressing the vector logically ?

SparkContext: 20 nodes, 16 cores, sc.defaultParallelism 320

def gramSize(n: Int) = (n*n+1)/2

val combOp = (v1: Array[Float], v2: Array[Float]) => {
var i = 0
while (i < v1.length) {
v1(i) += v2(i)
i += 1
}
v1
}

val n = gramSize(4096)
val vv = sc.parallelize(0 until sc.defaultParallelism).map(i => Array.fill[Float](0))

Option 1: 320 partitions, 1 thread on combOp per partition

val start = System.nanoTime();
vv.treeReduce(combOp, 2);
val reduceTime = (System.nanoTime() - start)*1e-9
reduceTime: Double = 5.6390302430000006

Option 2: 20 partitions, 1 thread on combOp per partition

val coalescedvv = vv.coalesce(20)
coalescedvv.count

val start = System.nanoTime();
coalescedvv.treeReduce(combOp, 2);
val reduceTime = (System.nanoTime() - start)*1e-9
reduceTime: Double = 3.9140685640000004

Option 3: 20 partitions, OpenBLAS numThread=16 per partition

Setting up OpenBLAS on cluster, I will update soon.

Let me know your thoughts. I think if underlying operations are Dense BLAS level1, level2 or level3, running with higher OpenBLAS threads and reducing number of partitions should help in decreasing cross partition shuffle.

wangyum pushed a commit that referenced this pull request May 26, 2023
…ntial cost (#1110)

* [CARMEL-6352] Adjust scan partition size dynamically considering potential cost

* fix ut

* Add optimize tag

* minor

* Add limit for partition number
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants