Skip to content

Commit d93c8f9

Browse files
SPARK-3278 changes after PR comments apache#3519. Change to IsotonicRegression api. Isotonic parameter now follows api of other mllib algorithms
1 parent 1fff77d commit d93c8f9

File tree

3 files changed

+46
-30
lines changed

3 files changed

+46
-30
lines changed

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.mllib.regression
1919

2020
import java.io.Serializable
21+
import java.lang.{Double => JDouble}
2122
import java.util.Arrays.binarySearch
2223

2324
import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
@@ -53,8 +54,9 @@ class IsotonicRegressionModel (
5354
* @param testData Features to be labeled.
5455
* @return Predicted labels.
5556
*/
56-
def predict(testData: RDD[Double]): RDD[Double] =
57+
def predict(testData: RDD[Double]): RDD[Double] = {
5758
testData.map(predict)
59+
}
5860

5961
/**
6062
* Predict labels for provided features.
@@ -63,8 +65,9 @@ class IsotonicRegressionModel (
6365
* @param testData Features to be labeled.
6466
* @return Predicted labels.
6567
*/
66-
def predict(testData: JavaDoubleRDD): JavaDoubleRDD =
68+
def predict(testData: JavaDoubleRDD): JavaDoubleRDD = {
6769
JavaDoubleRDD.fromRDD(predict(testData.rdd.asInstanceOf[RDD[Double]]))
70+
}
6871

6972
/**
7073
* Predict a single label.
@@ -75,8 +78,8 @@ class IsotonicRegressionModel (
7578
* If testData exactly matches a boundary then associated prediction is directly returned
7679
* If testData is lower or higher than all boundaries
7780
* then first or last prediction is returned respectively
78-
* If testData falls between two values in boundary then predictions is treated as piecewise
79-
* linear function and interpolated value is returned
81+
* If testData falls between two values in boundary then predictions is treated
82+
* as piecewise linear function and interpolated value is returned
8083
*/
8184
def predict(testData: Double): Double = {
8285

@@ -88,8 +91,8 @@ class IsotonicRegressionModel (
8891

8992
val normalisedInsertIndex = -insertIndex - 1
9093

91-
//Find if the index was lower than all values,
92-
//higher than all values, inbetween two values or exact match.
94+
// Find if the index was lower than all values,
95+
// higher than all values, inbetween two values or exact match.
9396
if (insertIndex == -1) {
9497
predictions.head
9598
} else if (normalisedInsertIndex == boundaries.length){
@@ -121,37 +124,50 @@ class IsotonicRegressionModel (
121124
* "An approach to parallelizing isotonic regression."
122125
* Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
123126
*/
124-
class IsotonicRegression extends Serializable {
127+
class IsotonicRegression private (private var isotonic: Boolean) extends Serializable {
125128

126129
/**
127-
* Run pool adjacent violators algorithm to obtain isotonic regression model.
130+
* Constructs IsotonicRegression instance with default parameter isotonic = true
131+
* @return New instance of IsotonicRegression
132+
*/
133+
def this() = this(true)
134+
135+
/**
136+
* Sets the isotonic parameter
137+
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
138+
* @return The instance of IsotonicRegression
139+
*/
140+
def setIsotonic(isotonic: Boolean): this.type = {
141+
this.isotonic = isotonic
142+
this
143+
}
144+
145+
/**
146+
* Run IsotonicRegression algorithm to obtain isotonic regression model.
128147
*
129148
* @param input RDD of tuples (label, feature, weight) where label is dependent variable
130149
* for which we calculate isotonic regression, feature is independent variable
131150
* and weight represents number of measures with default 1.
132151
*
133-
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
134152
* @return Isotonic regression model.
135153
*/
136-
def run(
137-
input: RDD[(Double, Double, Double)],
138-
isotonic: Boolean): IsotonicRegressionModel =
154+
def run(input: RDD[(Double, Double, Double)]): IsotonicRegressionModel = {
139155
createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic)
156+
}
140157

141-
/**
158+
/**
142159
* Run pool adjacent violators algorithm to obtain isotonic regression model.
143160
*
144161
* @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
145162
* for which we calculate isotonic regression, feature is independent variable
146163
* and weight represents number of measures with default 1.
147164
*
148-
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
149165
* @return Isotonic regression model.
150166
*/
151167
def run(
152-
input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)],
153-
isotonic: Boolean): IsotonicRegressionModel =
154-
run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]], isotonic)
168+
input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = {
169+
run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]])
170+
}
155171

156172
/**
157173
* Creates isotonic regression model with given parameters.
@@ -164,11 +180,7 @@ class IsotonicRegression extends Serializable {
164180
protected def createModel(
165181
predictions: Array[(Double, Double, Double)],
166182
isotonic: Boolean): IsotonicRegressionModel = {
167-
168-
val labels = predictions.map(_._1)
169-
val features = predictions.map(_._2)
170-
171-
new IsotonicRegressionModel(features, labels)
183+
new IsotonicRegressionModel(predictions.map(_._2), predictions.map(_._1))
172184
}
173185

174186
/**
@@ -249,4 +261,4 @@ class IsotonicRegression extends Serializable {
249261

250262
poolAdjacentViolators(parallelStepResult.collect(), isotonic)
251263
}
252-
}
264+
}

mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private IsotonicRegressionModel runIsotonicRegression(double[] labels) {
6161
JavaRDD<Tuple3<Double, Double, Double>> trainRDD =
6262
sc.parallelize(generateIsotonicInput(labels)).cache();
6363

64-
return new IsotonicRegression().run(trainRDD, true);
64+
return new IsotonicRegression().run(trainRDD);
6565
}
6666

6767
@Before

mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,35 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
2323

2424
class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers {
2525

26-
private def round(d: Double) =
26+
private def round(d: Double) = {
2727
Math.round(d * 100).toDouble / 100
28+
}
2829

29-
private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] =
30+
private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = {
3031
labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d))
32+
}
3133

3234
private def generateIsotonicInput(
3335
labels: Seq[Double],
34-
weights: Seq[Double]): Seq[(Double, Double, Double)] =
36+
weights: Seq[Double]): Seq[(Double, Double, Double)] = {
3537
labels.zip(1 to labels.size)
3638
.zip(weights)
3739
.map(point => (point._1._1, point._1._2.toDouble, point._2))
40+
}
3841

3942
private def runIsotonicRegression(
4043
labels: Seq[Double],
4144
weights: Seq[Double],
4245
isotonic: Boolean): IsotonicRegressionModel = {
4346
val trainRDD = sc.parallelize(generateIsotonicInput(labels, weights)).cache()
44-
new IsotonicRegression().run(trainRDD, isotonic)
47+
new IsotonicRegression().setIsotonic(isotonic).run(trainRDD)
4548
}
4649

4750
private def runIsotonicRegression(
4851
labels: Seq[Double],
49-
isotonic: Boolean): IsotonicRegressionModel =
52+
isotonic: Boolean): IsotonicRegressionModel = {
5053
runIsotonicRegression(labels, Array.fill(labels.size)(1d), isotonic)
54+
}
5155

5256
test("increasing isotonic regression") {
5357
val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true)
@@ -99,7 +103,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
99103

100104
test("isotonic regression with unordered input") {
101105
val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache()
102-
val model = new IsotonicRegression().run(trainRDD, true)
106+
val model = new IsotonicRegression().run(trainRDD)
103107

104108
assert(model.predictions === Array(1, 2, 3, 4, 5))
105109
}

0 commit comments

Comments
 (0)