Skip to content

Commit 35d044e

Browse files
committed
update paraPAVA
1 parent 077606b commit 35d044e

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
210210

211211
// Pools sub array within given bounds assigning weighted average value to all elements.
212212
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
213-
val poolSubArray = input.slice(start, end + 1)
213+
val poolSubArray = input.view.slice(start, end + 1)
214214

215215
val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
216216
val weight = poolSubArray.map(_._3).sum
@@ -259,11 +259,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
259259
*/
260260
private def parallelPoolAdjacentViolators(
261261
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
262-
263262
val parallelStepResult = input
264263
.sortBy(x => (x._2, x._1))
265-
.mapPartitions(it => poolAdjacentViolators(it.toArray).toIterator)
266-
267-
poolAdjacentViolators(parallelStepResult.collect())
264+
.glom()
265+
.flatMap(poolAdjacentViolators)
266+
.collect()
267+
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
268+
poolAdjacentViolators(parallelStepResult)
268269
}
269270
}

0 commit comments

Comments
 (0)