Skip to content

[MLLIB][SPARK-3278] Monotone (Isotonic) regression using parallel pool adjacent violators algorithm #3519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
3de71d0
SPARK-3278 added initial version of Isotonic regression algorithm inc…
zapletal-martin Nov 19, 2014
961aa05
Merge remote-tracking branch 'upstream/master' into SPARK-3278
zapletal-martin Nov 22, 2014
05d9048
SPARK-3278 isotonic regression refactoring and api changes
zapletal-martin Nov 26, 2014
629a1ce
SPARK-3278 added isotonic regression for weighted data. Added tests f…
zapletal-martin Nov 29, 2014
8f5daf9
SPARK-3278 added comments and cleaned up api to consistently handle w…
zapletal-martin Nov 30, 2014
6046550
SPARK-3278 scalastyle errors resolved
zapletal-martin Dec 1, 2014
c06f88c
Merge remote-tracking branch 'upstream/master' into SPARK-3278
zapletal-martin Dec 27, 2014
089bf86
Removed MonotonicityConstraint, Isotonic and Antitonic constraints. R…
zapletal-martin Dec 27, 2014
34760d5
Removed WeightedLabeledPoint. Replaced by tuple of doubles
zapletal-martin Dec 27, 2014
b8b1620
Removed WeightedLabeledPoint. Replaced by tuple of doubles
zapletal-martin Dec 27, 2014
cab5a46
SPARK-3278 PR 3519 refactoring WeightedLabeledPoint to tuple as per c…
zapletal-martin Dec 30, 2014
8cefd18
Merge remote-tracking branch 'upstream/master' into SPARK-3278-weight…
zapletal-martin Jan 10, 2015
deb0f17
SPARK-3278 refactored weightedlabeledpoint to (double, double, double…
zapletal-martin Jan 10, 2015
a24e29f
SPARK-3278 refactored weightedlabeledpoint to (double, double, double…
zapletal-martin Jan 10, 2015
941fd1f
SPARK-3278 Isotonic regression java api
zapletal-martin Jan 11, 2015
823d803
Merge remote-tracking branch 'upstream/master' into SPARK-3278
zapletal-martin Jan 11, 2015
e9b3323
Merge branch 'SPARK-3278-weightedLabeledPoint' into SPARK-3278
zapletal-martin Jan 11, 2015
45aa7e8
SPARK-3278 Isotonic regression java api
zapletal-martin Jan 12, 2015
3c2954b
SPARK-3278 Isotonic regression java api
zapletal-martin Jan 12, 2015
0d14bd3
SPARK-3278 changed Java api to match Scala api's (Double, Double, Dou…
zapletal-martin Jan 13, 2015
f90c8c7
Merge remote-tracking branch 'upstream/master' into SPARK-3278
zapletal-martin Jan 17, 2015
ce0e30c
SPARK-3278 readability refactoring
zapletal-martin Jan 18, 2015
fad4bf9
SPARK-3278 changes after PR comments https://github.com/apache/spark/…
zapletal-martin Jan 21, 2015
9ae9d53
SPARK-3278 changes after PR feedback https://github.com/apache/spark/…
zapletal-martin Jan 23, 2015
7aca4cc
SPARK-3278 comment spelling
zapletal-martin Jan 23, 2015
12151e6
Merge remote-tracking branch 'upstream/master' into SPARK-3278
zapletal-martin Jan 23, 2015
1fff77d
SPARK-3278 changes after PR comments https://github.com/apache/spark/…
zapletal-martin Jan 29, 2015
d93c8f9
SPARK-3278 changes after PR comments https://github.com/apache/spark/…
zapletal-martin Jan 29, 2015
e60a34f
SPARK-3278 changes after PR comments https://github.com/apache/spark/…
zapletal-martin Jan 30, 2015
88eb4e2
SPARK-3278 changes after PR comments https://github.com/apache/spark/…
zapletal-martin Jan 30, 2015
75eac55
Merge remote-tracking branch 'upstream/master' into SPARK-3278
zapletal-martin Jan 30, 2015
3da56e5
SPARK-3278 fixed indentation error
zapletal-martin Jan 30, 2015
80c6681
update IRModel
mengxr Jan 30, 2015
5925113
Merge remote-tracking branch 'zapletal-martin/SPARK-3278' into SPARK-…
mengxr Jan 30, 2015
05422a8
add unit test for model construction
mengxr Jan 30, 2015
077606b
minor
mengxr Jan 30, 2015
35d044e
update paraPAVA
mengxr Jan 30, 2015
0b35c15
compress pools and update tests
mengxr Jan 30, 2015
4dfe136
add cache back
mengxr Jan 30, 2015
ded071c
Merge pull request #1 from mengxr/SPARK-3278
zapletal-martin Jan 30, 2015
d8feb82
Merge remote-tracking branch 'upstream/master' into SPARK-3278
zapletal-martin Jan 30, 2015
e3c0e44
Merge remote-tracking branch 'origin/SPARK-3278' into SPARK-3278
zapletal-martin Jan 30, 2015
37ba24e
fix java tests
mengxr Jan 30, 2015
5a54ea4
Merge pull request #2 from mengxr/isotonic-fix-java
zapletal-martin Jan 31, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import java.io.Serializable
import java.lang.{Double => JDouble}
import java.util.Arrays.binarySearch

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
import org.apache.spark.rdd.RDD

/**
* Regression model for isotonic regression.
*
* @param boundaries Array of boundaries for which predictions are known.
* Boundaries must be sorted in increasing order.
* @param predictions Array of predictions associated to the boundaries at the same index.
* Results of isotonic regression and therefore monotone.
* @param isotonic indicates whether this is isotonic or antitonic.
*/
class IsotonicRegressionModel (
val boundaries: Array[Double],
val predictions: Array[Double],
val isotonic: Boolean) extends Serializable {

private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse

require(boundaries.length == predictions.length)
assertOrdered(boundaries)
assertOrdered(predictions)(predictionOrd)

/** Asserts the input array is monotone with the given ordering. */
private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
var i = 1
while (i < xs.length) {
require(ord.compare(xs(i - 1), xs(i)) <= 0,
s"Elements (${xs(i - 1)}, ${xs(i)}) are not ordered.")
i += 1
}
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth validating that features is ordered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added validations for features (now boundaries) and labels (now predictions) to be ordered and for boundaries and predictions to have the same length.

Could not find an existing solution for order validation, so used the fastest available (see https://groups.google.com/forum/#!topic/scala-user/oo4nuA_3U7Q), because genericity was not required.

* Predict labels for provided features.
* Using a piecewise linear function.
*
* @param testData Features to be labeled.
* @return Predicted labels.
*/
def predict(testData: RDD[Double]): RDD[Double] = {
testData.map(predict)
}

/**
* Predict labels for provided features.
* Using a piecewise linear function.
*
* @param testData Features to be labeled.
* @return Predicted labels.
*/
def predict(testData: JavaDoubleRDD): JavaDoubleRDD = {
JavaDoubleRDD.fromRDD(predict(testData.rdd.retag.asInstanceOf[RDD[Double]]))
}

/**
* Predict a single label.
* Using a piecewise linear function.
*
* @param testData Feature to be labeled.
* @return Predicted label.
* 1) If testData exactly matches a boundary then associated prediction is returned.
* In case there are multiple predictions with the same boundary then one of them
* is returned. Which one is undefined (same as java.util.Arrays.binarySearch).
* 2) If testData is lower or higher than all boundaries then first or last prediction
* is returned respectively. In case there are multiple predictions with the same
* boundary then the lowest or highest is returned respectively.
* 3) If testData falls between two values in boundary array then prediction is treated
* as piecewise linear function and interpolated value is returned. In case there are
* multiple values with the same boundary then the same rules as in 2) are used.
*/
def predict(testData: Double): Double = {

def linearInterpolation(x1: Double, y1: Double, x2: Double, y2: Double, x: Double): Double = {
y1 + (y2 - y1) * (x - x1) / (x2 - x1)
}

val foundIndex = binarySearch(boundaries, testData)
val insertIndex = -foundIndex - 1

// Find if the index was lower than all values,
// higher than all values, in between two values or exact match.
if (insertIndex == 0) {
predictions.head
} else if (insertIndex == boundaries.length){
predictions.last
} else if (foundIndex < 0) {
linearInterpolation(
boundaries(insertIndex - 1),
predictions(insertIndex - 1),
boundaries(insertIndex),
predictions(insertIndex),
testData)
} else {
predictions(foundIndex)
}
}
}

/**
* Isotonic regression.
* Currently implemented using parallelized pool adjacent violators algorithm.
* Only univariate (single feature) algorithm supported.
*
* Sequential PAV implementation based on:
* Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
* "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
* Available from http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf
*
* Sequential PAV parallelization based on:
* Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
* "An approach to parallelizing isotonic regression."
* Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
* Available from http://softlib.rice.edu/pub/CRPC-TRs/reports/CRPC-TR96640.pdf
*/
class IsotonicRegression private (private var isotonic: Boolean) extends Serializable {

/**
* Constructs IsotonicRegression instance with default parameter isotonic = true.
*
* @return New instance of IsotonicRegression.
*/
def this() = this(true)

/**
* Sets the isotonic parameter.
*
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
* @return This instance of IsotonicRegression.
*/
def setIsotonic(isotonic: Boolean): this.type = {
this.isotonic = isotonic
this
}

/**
* Run IsotonicRegression algorithm to obtain isotonic regression model.
*
* @param input RDD of tuples (label, feature, weight) where label is dependent variable
* for which we calculate isotonic regression, feature is independent variable
* and weight represents number of measures with default 1.
* If multiple labels share the same feature value then they are ordered before
* the algorithm is executed.
* @return Isotonic regression model.
*/
def run(input: RDD[(Double, Double, Double)]): IsotonicRegressionModel = {
val preprocessedInput = if (isotonic) {
input
} else {
input.map(x => (-x._1, x._2, x._3))
}

val pooled = parallelPoolAdjacentViolators(preprocessedInput)

val predictions = if (isotonic) pooled.map(_._1) else pooled.map(-_._1)
val boundaries = pooled.map(_._2)

new IsotonicRegressionModel(boundaries, predictions, isotonic)
}

/**
* Run pool adjacent violators algorithm to obtain isotonic regression model.
*
* @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
* for which we calculate isotonic regression, feature is independent variable
* and weight represents number of measures with default 1.
* If multiple labels share the same feature value then they are ordered before
* the algorithm is executed.
* @return Isotonic regression model.
*/
def run(input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = {
run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]])
}

/**
* Performs a pool adjacent violators algorithm (PAV).
* Uses approach with single processing of data where violators
* in previously processed data created by pooling are fixed immediately.
* Uses optimization of discovering monotonicity violating sequences (blocks).
*
* @param input Input data of tuples (label, feature, weight).
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def poolAdjacentViolators(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

if (input.isEmpty) {
return Array.empty
}

// Pools sub array within given bounds assigning weighted average value to all elements.
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
val poolSubArray = input.slice(start, end + 1)

val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
val weight = poolSubArray.map(_._3).sum

var i = start
while (i <= end) {
input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
i = i + 1
}
}

var i = 0
while (i < input.length) {
var j = i

// Find monotonicity violating sequence, if any.
while (j < input.length - 1 && input(j)._1 > input(j + 1)._1) {
j = j + 1
}

// If monotonicity was not violated, move to next data point.
if (i == j) {
i = i + 1
} else {
// Otherwise pool the violating sequence
// and check if pooling caused monotonicity violation in previously processed points.
while (i >= 0 && input(i)._1 > input(i + 1)._1) {
pool(input, i, j)
i = i - 1
}

i = j
}
}

// For points having the same prediction, we only keep two boundary points.
val compressed = ArrayBuffer.empty[(Double, Double, Double)]

var (curLabel, curFeature, curWeight) = input.head
var rightBound = curFeature
def merge(): Unit = {
compressed += ((curLabel, curFeature, curWeight))
if (rightBound > curFeature) {
compressed += ((curLabel, rightBound, 0.0))
}
}
i = 1
while (i < input.length) {
val (label, feature, weight) = input(i)
if (label == curLabel) {
curWeight += weight
rightBound = feature
} else {
merge()
curLabel = label
curFeature = feature
curWeight = weight
rightBound = curFeature
}
i += 1
}
merge()

compressed.toArray
}

/**
* Performs parallel pool adjacent violators algorithm.
* Performs Pool adjacent violators algorithm on each partition and then again on the result.
*
* @param input Input data of tuples (label, feature, weight).
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def parallelPoolAdjacentViolators(
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
val parallelStepResult = input
.sortBy(x => (x._2, x._1))
.glom()
.flatMap(poolAdjacentViolators)
.collect()
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
poolAdjacentViolators(parallelStepResult)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression;

import java.io.Serializable;
import java.util.List;

import scala.Tuple3;

import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class JavaIsotonicRegressionSuite implements Serializable {
private transient JavaSparkContext sc;

private List<Tuple3<Double, Double, Double>> generateIsotonicInput(double[] labels) {
List<Tuple3<Double, Double, Double>> input = Lists.newArrayList();

for (int i = 1; i <= labels.length; i++) {
input.add(new Tuple3<Double, Double, Double>(labels[i-1], (double) i, 1d));
}

return input;
}

private IsotonicRegressionModel runIsotonicRegression(double[] labels) {
JavaRDD<Tuple3<Double, Double, Double>> trainRDD =
sc.parallelize(generateIsotonicInput(labels), 2).cache();

return new IsotonicRegression().run(trainRDD);
}

@Before
public void setUp() {
sc = new JavaSparkContext("local", "JavaLinearRegressionSuite");
}

@After
public void tearDown() {
sc.stop();
sc = null;
}

@Test
public void testIsotonicRegressionJavaRDD() {
IsotonicRegressionModel model =
runIsotonicRegression(new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12});

Assert.assertArrayEquals(
new double[] {1, 2, 7d/3, 7d/3, 6, 7, 8, 10, 10, 12}, model.predictions(), 1e-14);
}

@Test
public void testIsotonicRegressionPredictionsJavaRDD() {
IsotonicRegressionModel model =
runIsotonicRegression(new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12});

JavaDoubleRDD testRDD = sc.parallelizeDoubles(Lists.newArrayList(0.0, 1.0, 9.5, 12.0, 13.0));
List<Double> predictions = model.predict(testRDD).collect();

Assert.assertTrue(predictions.get(0) == 1d);
Assert.assertTrue(predictions.get(1) == 1d);
Assert.assertTrue(predictions.get(2) == 10d);
Assert.assertTrue(predictions.get(3) == 12d);
Assert.assertTrue(predictions.get(4) == 12d);
}
}
Loading