Skip to content

Commit fad4bf9

Browse files
SPARK-3278 changes after PR comments apache#3519
1 parent ce0e30c commit fad4bf9

File tree

4 files changed

+82
-84
lines changed

4 files changed

+82
-84
lines changed

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark.mllib.regression
1919

20-
import org.apache.spark.api.java.{JavaRDD, JavaPairRDD}
20+
import java.io.Serializable
21+
22+
import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
2123
import org.apache.spark.rdd.RDD
2224

2325
/**
@@ -46,8 +48,8 @@ class IsotonicRegressionModel (
4648
* @param testData features to be labeled
4749
* @return predicted labels
4850
*/
49-
def predict(testData: JavaRDD[java.lang.Double]): JavaRDD[java.lang.Double] =
50-
testData.rdd.map(_.doubleValue()).map(predict).map(new java.lang.Double(_))
51+
def predict(testData: JavaRDD[java.lang.Double]): JavaDoubleRDD =
52+
JavaDoubleRDD.fromRDD(predict(testData.rdd.asInstanceOf[RDD[Double]]))
5153

5254
/**
5355
* Predict a single label
@@ -61,23 +63,12 @@ class IsotonicRegressionModel (
6163
}
6264

6365
/**
64-
* Base representing algorithm for isotonic regression
66+
* Isotonic regression
67+
* Currently implemented using oarallel pool adjacent violators algorithm for monotone regression
6568
*/
66-
trait IsotonicRegressionAlgorithm
69+
class IsotonicRegression
6770
extends Serializable {
6871

69-
/**
70-
* Creates isotonic regression model with given parameters
71-
*
72-
* @param predictions labels estimated using isotonic regression algorithm.
73-
* Used for predictions on new data points.
74-
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
75-
* @return isotonic regression model
76-
*/
77-
protected def createModel(
78-
predictions: Seq[(Double, Double, Double)],
79-
isotonic: Boolean): IsotonicRegressionModel
80-
8172
/**
8273
* Run algorithm to obtain isotonic regression model
8374
*
@@ -86,25 +77,22 @@ trait IsotonicRegressionAlgorithm
8677
* @return isotonic regression model
8778
*/
8879
def run(
89-
input: RDD[(Double, Double, Double)],
90-
isotonic: Boolean): IsotonicRegressionModel
91-
}
92-
93-
/**
94-
* Parallel pool adjacent violators algorithm for monotone regression
95-
*/
96-
class PoolAdjacentViolators private [mllib]
97-
extends IsotonicRegressionAlgorithm {
98-
99-
override def run(
10080
input: RDD[(Double, Double, Double)],
10181
isotonic: Boolean = true): IsotonicRegressionModel = {
10282
createModel(
10383
parallelPoolAdjacentViolators(input, isotonic),
10484
isotonic)
10585
}
10686

107-
override protected def createModel(
87+
/**
88+
* Creates isotonic regression model with given parameters
89+
*
90+
* @param predictions labels estimated using isotonic regression algorithm.
91+
* Used for predictions on new data points.
92+
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
93+
* @return isotonic regression model
94+
*/
95+
protected def createModel(
10896
predictions: Seq[(Double, Double, Double)],
10997
isotonic: Boolean): IsotonicRegressionModel = {
11098
new IsotonicRegressionModel(predictions, isotonic)
@@ -132,31 +120,27 @@ class PoolAdjacentViolators private [mllib]
132120
val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
133121
val weight = poolSubArray.map(_._3).sum
134122

135-
for(i <- start to end) {
123+
var i = start
124+
while (i <= end) {
136125
in(i) = (weightedSum / weight, in(i)._2, in(i)._3)
126+
i = i + 1
137127
}
138128
}
139129

140-
val isotonicConstraint: (Double, Double) => Boolean = (x, y) => x <= y
141-
val antitonicConstraint: (Double, Double) => Boolean = (x, y) => x >= y
142-
143-
def monotonicityConstraint(isotonic: Boolean) =
144-
if(isotonic) isotonicConstraint else antitonicConstraint
145-
146-
val monotonicityConstraintHolds = monotonicityConstraint(isotonic)
130+
val monotonicityConstraintHolds: (Double, Double) => Boolean =
131+
(x, y) => if (isotonic) x <= y else x >= y
147132

148133
var i = 0
149-
150-
while(i < in.length) {
134+
while (i < in.length) {
151135
var j = i
152136

153137
// Find monotonicity violating sequence, if any
154-
while(j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) {
138+
while (j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) {
155139
j = j + 1
156140
}
157141

158142
// If monotonicity was not violated, move to next data point
159-
if(i == j) {
143+
if (i == j) {
160144
i = i + 1
161145
} else {
162146
// Otherwise pool the violating sequence
@@ -212,7 +196,7 @@ object IsotonicRegression {
212196
def train(
213197
input: RDD[(Double, Double, Double)],
214198
isotonic: Boolean = true): IsotonicRegressionModel = {
215-
new PoolAdjacentViolators().run(input, isotonic)
199+
new IsotonicRegression().run(input, isotonic)
216200
}
217201

218202
/**
@@ -227,7 +211,7 @@ object IsotonicRegression {
227211
def train(
228212
input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)],
229213
isotonic: Boolean): IsotonicRegressionModel = {
230-
new PoolAdjacentViolators()
214+
new IsotonicRegression()
231215
.run(
232216
input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), x._3.doubleValue())),
233217
isotonic)

mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,20 @@
1717

1818
package org.apache.spark.mllib.regression;
1919

20-
import org.apache.spark.api.java.JavaRDD;
21-
import org.apache.spark.api.java.JavaSparkContext;
22-
import org.apache.spark.api.java.function.Function;
23-
import org.apache.spark.mllib.util.IsotonicDataGenerator;
20+
import java.io.Serializable;
21+
import java.util.List;
22+
23+
import scala.Tuple3;
24+
2425
import org.junit.After;
2526
import org.junit.Assert;
2627
import org.junit.Before;
2728
import org.junit.Test;
28-
import scala.Tuple3;
2929

30-
import java.io.Serializable;
31-
import java.util.List;
30+
import org.apache.spark.api.java.function.Function;
31+
import org.apache.spark.api.java.JavaRDD;
32+
import org.apache.spark.api.java.JavaSparkContext;
33+
import org.apache.spark.mllib.util.IsotonicDataGenerator;
3234

3335
public class JavaIsotonicRegressionSuite implements Serializable {
3436
private transient JavaSparkContext sc;

mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
package org.apache.spark.mllib.regression
1919

20-
import org.apache.spark.mllib.linalg.Vectors
21-
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
2220
import org.scalatest.{Matchers, FunSuite}
23-
import scala.util.Random
21+
22+
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
2423
import org.apache.spark.mllib.util.IsotonicDataGenerator._
2524

2625
class IsotonicRegressionSuite
@@ -32,26 +31,34 @@ class IsotonicRegressionSuite
3231
Math.round(d * 100).toDouble / 100
3332

3433
test("increasing isotonic regression") {
35-
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()
34+
val trainRDD = sc.parallelize(
35+
generateIsotonicInput(
36+
1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()
3637

37-
val alg = new PoolAdjacentViolators
38+
val alg = new IsotonicRegression
3839
val model = alg.run(trainRDD, true)
3940

40-
model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
41+
model.predictions should be(
42+
generateIsotonicInput(
43+
1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
4144
}
4245

4346
test("increasing isotonic regression using api") {
44-
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()
47+
val trainRDD = sc.parallelize(
48+
generateIsotonicInput(
49+
1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12, 14, 15, 17, 16, 17, 18, 19, 20)).cache()
4550

4651
val model = IsotonicRegression.train(trainRDD, true)
4752

48-
model.predictions should be(generateIsotonicInput(1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
53+
model.predictions should be(
54+
generateIsotonicInput(
55+
1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12, 14, 15, 16.5, 16.5, 17, 18, 19, 20))
4956
}
5057

5158
test("isotonic regression with size 0") {
5259
val trainRDD = sc.parallelize(List[(Double, Double, Double)]()).cache()
5360

54-
val alg = new PoolAdjacentViolators
61+
val alg = new IsotonicRegression
5562
val model = alg.run(trainRDD, true)
5663

5764
model.predictions should be(List())
@@ -60,7 +67,7 @@ class IsotonicRegressionSuite
6067
test("isotonic regression with size 1") {
6168
val trainRDD = sc.parallelize(generateIsotonicInput(1)).cache()
6269

63-
val alg = new PoolAdjacentViolators
70+
val alg = new IsotonicRegression
6471
val model = alg.run(trainRDD, true)
6572

6673
model.predictions should be(generateIsotonicInput(1))
@@ -69,7 +76,7 @@ class IsotonicRegressionSuite
6976
test("isotonic regression strictly increasing sequence") {
7077
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5)).cache()
7178

72-
val alg = new PoolAdjacentViolators
79+
val alg = new IsotonicRegression
7380
val model = alg.run(trainRDD, true)
7481

7582
model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5))
@@ -78,7 +85,7 @@ class IsotonicRegressionSuite
7885
test("isotonic regression strictly decreasing sequence") {
7986
val trainRDD = sc.parallelize(generateIsotonicInput(5, 4, 3, 2, 1)).cache()
8087

81-
val alg = new PoolAdjacentViolators
88+
val alg = new IsotonicRegression
8289
val model = alg.run(trainRDD, true)
8390

8491
model.predictions should be(generateIsotonicInput(3, 3, 3, 3, 3))
@@ -87,7 +94,7 @@ class IsotonicRegressionSuite
8794
test("isotonic regression with last element violating monotonicity") {
8895
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 2)).cache()
8996

90-
val alg = new PoolAdjacentViolators
97+
val alg = new IsotonicRegression
9198
val model = alg.run(trainRDD, true)
9299

93100
model.predictions should be(generateIsotonicInput(1, 2, 3, 3, 3))
@@ -96,7 +103,7 @@ class IsotonicRegressionSuite
96103
test("isotonic regression with first element violating monotonicity") {
97104
val trainRDD = sc.parallelize(generateIsotonicInput(4, 2, 3, 4, 5)).cache()
98105

99-
val alg = new PoolAdjacentViolators
106+
val alg = new IsotonicRegression
100107
val model = alg.run(trainRDD, true)
101108

102109
model.predictions should be(generateIsotonicInput(3, 3, 3, 4, 5))
@@ -105,7 +112,7 @@ class IsotonicRegressionSuite
105112
test("isotonic regression with negative labels") {
106113
val trainRDD = sc.parallelize(generateIsotonicInput(-1, -2, 0, 1, -1)).cache()
107114

108-
val alg = new PoolAdjacentViolators
115+
val alg = new IsotonicRegression
109116
val model = alg.run(trainRDD, true)
110117

111118
model.predictions should be(generateIsotonicInput(-1.5, -1.5, 0, 0, 0))
@@ -114,45 +121,48 @@ class IsotonicRegressionSuite
114121
test("isotonic regression with unordered input") {
115122
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 3, 4, 5).reverse).cache()
116123

117-
val alg = new PoolAdjacentViolators
124+
val alg = new IsotonicRegression
118125
val model = alg.run(trainRDD, true)
119126

120127
model.predictions should be(generateIsotonicInput(1, 2, 3, 4, 5))
121128
}
122129

123130
test("weighted isotonic regression") {
124-
val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache()
131+
val trainRDD = sc.parallelize(
132+
generateWeightedIsotonicInput(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2))).cache()
125133

126-
val alg = new PoolAdjacentViolators
134+
val alg = new IsotonicRegression
127135
val model = alg.run(trainRDD, true)
128136

129-
model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2)))
137+
model.predictions should be(
138+
generateWeightedIsotonicInput(Seq(1, 2, 2.75, 2.75,2.75), Seq(1, 1, 1, 1, 2)))
130139
}
131140

132141
test("weighted isotonic regression with weights lower than 1") {
133-
val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache()
142+
val trainRDD = sc.parallelize(
143+
generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1))).cache()
134144

135-
val alg = new PoolAdjacentViolators
145+
val alg = new IsotonicRegression
136146
val model = alg.run(trainRDD, true)
137147

138-
model.predictions.map(p => p.copy(_1 = round(p._1))) should be
139-
(generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1)))
148+
model.predictions.map(p => p.copy(_1 = round(p._1))) should be(
149+
generateWeightedIsotonicInput(Seq(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2), Seq(1, 1, 1, 0.1, 0.1)))
140150
}
141151

142152
test("weighted isotonic regression with negative weights") {
143153
val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5))).cache()
144154

145-
val alg = new PoolAdjacentViolators
155+
val alg = new IsotonicRegression
146156
val model = alg.run(trainRDD, true)
147157

148-
model.predictions.map(p => p.copy(_1 = round(p._1))) should be
149-
(generateWeightedIsotonicInput(Seq(1, 10/6, 10/6, 10/6, 10/6), Seq(-1, 1, -3, 1, -5)))
158+
model.predictions should be(
159+
generateWeightedIsotonicInput(Seq(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6), Seq(-1, 1, -3, 1, -5)))
150160
}
151161

152162
test("weighted isotonic regression with zero weights") {
153163
val trainRDD = sc.parallelize(generateWeightedIsotonicInput(Seq(1, 2, 3, 2, 1), Seq(0, 0, 0, 1, 0))).cache()
154164

155-
val alg = new PoolAdjacentViolators
165+
val alg = new IsotonicRegression
156166
val model = alg.run(trainRDD, true)
157167

158168
model.predictions should be(generateWeightedIsotonicInput(Seq(1, 2, 2, 2, 2), Seq(0, 0, 0, 1, 0)))
@@ -161,7 +171,7 @@ class IsotonicRegressionSuite
161171
test("isotonic regression prediction") {
162172
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache()
163173

164-
val alg = new PoolAdjacentViolators
174+
val alg = new IsotonicRegression
165175
val model = alg.run(trainRDD, true)
166176

167177
model.predict(0) should be(1)
@@ -172,18 +182,18 @@ class IsotonicRegressionSuite
172182

173183
test("isotonic regression RDD prediction") {
174184
val trainRDD = sc.parallelize(generateIsotonicInput(1, 2, 7, 1, 2)).cache()
175-
val testRDD = sc.parallelize(List(0d, 2d, 3d, 10d)).cache()
185+
val testRDD = sc.parallelize(List(0.0, 2.0, 3.0, 10.0)).cache()
176186

177-
val alg = new PoolAdjacentViolators
187+
val alg = new IsotonicRegression
178188
val model = alg.run(trainRDD, true)
179189

180-
model.predict(testRDD).collect() should be(Array(1, 2, 10d/3, 10d/3))
190+
model.predict(testRDD).collect() should be(Array(1, 2, 10.0/3, 10.0/3))
181191
}
182192

183193
test("antitonic regression prediction") {
184194
val trainRDD = sc.parallelize(generateIsotonicInput(7, 5, 3, 5, 1)).cache()
185195

186-
val alg = new PoolAdjacentViolators
196+
val alg = new IsotonicRegression
187197
val model = alg.run(trainRDD, false)
188198

189199
model.predict(0) should be(7)

mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala renamed to mllib/src/test/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.spark.mllib.util
1919

20-
import org.apache.spark.annotation.DeveloperApi
21-
import scala.collection.JavaConversions._
2220
import java.lang.{Double => JDouble}
2321

22+
import scala.collection.JavaConversions._
23+
24+
import org.apache.spark.annotation.DeveloperApi
25+
2426
/**
2527
* :: DeveloperApi ::
2628
* Generate test data for Isotonic regresision.

0 commit comments

Comments
 (0)