Skip to content

Commit 88eb4e2

Browse files
SPARK-3278 changes after PR comments apache#3519. Isotonic parameter removed from algorithm, defined behaviour for multiple data points with the same feature value, added tests to verify it
1 parent e60a34f commit 88eb4e2

File tree

2 files changed

+70
-41
lines changed

2 files changed

+70
-41
lines changed

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ import org.apache.spark.rdd.RDD
3030
* @param boundaries Array of boundaries for which predictions are known.
3131
* Boundaries must be sorted in increasing order.
3232
* @param predictions Array of predictions associated to the boundaries at the same index.
33-
* Result of isotonic regression and therefore is monotone.
33+
* Results of isotonic regression and therefore monotone.
3434
*/
3535
class IsotonicRegressionModel (
3636
boundaries: Array[Double],
37-
val predictions: Array[Double])
37+
val predictions: Array[Double],
38+
isotonic: Boolean)
3839
extends Serializable {
3940

4041
private def isSorted(xs: Array[Double]): Boolean = {
@@ -46,6 +47,12 @@ class IsotonicRegressionModel (
4647
true
4748
}
4849

50+
if (isotonic) {
51+
assert(isSorted(predictions))
52+
} else {
53+
assert(isSorted(predictions.map(-_)))
54+
}
55+
4956
assert(isSorted(boundaries))
5057
assert(boundaries.length == predictions.length)
5158

@@ -77,11 +84,15 @@ class IsotonicRegressionModel (
7784
*
7885
* @param testData Feature to be labeled.
7986
* @return Predicted label.
80-
* If testData exactly matches a boundary then associated prediction is directly returned.
81-
* If testData is lower or higher than all boundaries.
82-
* then first or last prediction is returned respectively.
83-
* If testData falls between two values in boundary array then predictions is treated
84-
* as piecewise linear function and interpolated value is returned.
87+
* 1) If testData exactly matches a boundary then associated prediction is returned.
88+
* In case there are multiple predictions with the same boundary then one of them
89+
* is returned. Which one is undefined (same as java.util.Arrays.binarySearch).
90+
* 2) If testData is lower or higher than all boundaries then first or last prediction
91+
* is returned respectively. In case there are multiple predictions with the same
92+
* boundary then the lowest or highest is returned respectively.
93+
* 3) If testData falls between two values in boundary array then prediction is treated
94+
* as piecewise linear function and interpolated value is returned. In case there are
95+
* multiple values with the same boundary then the same rules as in 2) are used.
8596
*/
8697
def predict(testData: Double): Double = {
8798

@@ -131,12 +142,14 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
131142

132143
/**
133144
* Constructs IsotonicRegression instance with default parameter isotonic = true.
145+
*
134146
* @return New instance of IsotonicRegression.
135147
*/
136148
def this() = this(true)
137149

138150
/**
139151
* Sets the isotonic parameter.
152+
*
140153
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
141154
* @return This instance of IsotonicRegression.
142155
*/
@@ -151,10 +164,23 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
151164
* @param input RDD of tuples (label, feature, weight) where label is dependent variable
152165
* for which we calculate isotonic regression, feature is independent variable
153166
* and weight represents number of measures with default 1.
167+
* If multiple labels share the same feature value then they are ordered before
168+
* the algorithm is executed.
154169
* @return Isotonic regression model.
155170
*/
156171
def run(input: RDD[(Double, Double, Double)]): IsotonicRegressionModel = {
157-
createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic)
172+
val preprocessedInput = if (isotonic) {
173+
input
174+
} else {
175+
input.map(x => (-x._1, x._2, x._3))
176+
}
177+
178+
val isotonicRegression = parallelPoolAdjacentViolators(preprocessedInput)
179+
180+
val predictions = if (isotonic) isotonicRegression.map(_._1) else isotonicRegression.map(-_._1)
181+
val boundaries = isotonicRegression.map(_._2)
182+
183+
new IsotonicRegressionModel(boundaries, predictions, isotonic)
158184
}
159185

160186
/**
@@ -163,42 +189,26 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
163189
* @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
164190
* for which we calculate isotonic regression, feature is independent variable
165191
* and weight represents number of measures with default 1.
166-
*
192+
* If multiple labels share the same feature value then they are ordered before
193+
* the algorithm is executed.
167194
* @return Isotonic regression model.
168195
*/
169-
def run(
170-
input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = {
196+
def run(input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = {
171197
run(input.rdd.asInstanceOf[RDD[(Double, Double, Double)]])
172198
}
173199

174-
/**
175-
* Creates isotonic regression model with given parameters.
176-
*
177-
* @param predictions Predictions calculated using pool adjacent violators algorithm.
178-
* Used for predictions on new data points.
179-
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
180-
* @return Isotonic regression model.
181-
*/
182-
protected def createModel(
183-
predictions: Array[(Double, Double, Double)],
184-
isotonic: Boolean): IsotonicRegressionModel = {
185-
new IsotonicRegressionModel(predictions.map(_._2), predictions.map(_._1))
186-
}
187-
188200
/**
189201
* Performs a pool adjacent violators algorithm (PAV).
190202
* Uses approach with single processing of data where violators
191203
* in previously processed data created by pooling are fixed immediately.
192204
* Uses optimization of discovering monotonicity violating sequences (blocks).
193205
*
194206
* @param input Input data of tuples (label, feature, weight).
195-
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
196207
* @return Result tuples (label, feature, weight) where labels were updated
197208
* to form a monotone sequence as per isotonic regression definition.
198209
*/
199210
private def poolAdjacentViolators(
200-
input: Array[(Double, Double, Double)],
201-
isotonic: Boolean): Array[(Double, Double, Double)] = {
211+
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
202212

203213
// Pools sub array within given bounds assigning weighted average value to all elements.
204214
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
@@ -214,15 +224,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
214224
}
215225
}
216226

217-
val monotonicityConstraintHolds: (Double, Double) => Boolean =
218-
(x, y) => if (isotonic) x <= y else x >= y
219-
220227
var i = 0
221228
while (i < input.length) {
222229
var j = i
223230

224231
// Find monotonicity violating sequence, if any.
225-
while (j < input.length - 1 && !monotonicityConstraintHolds(input(j)._1, input(j + 1)._1)) {
232+
while (j < input.length - 1 && input(j)._1 > input(j + 1)._1) {
226233
j = j + 1
227234
}
228235

@@ -232,7 +239,7 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
232239
} else {
233240
// Otherwise pool the violating sequence
234241
// and check if pooling caused monotonicity violation in previously processed points.
235-
while (i >= 0 && !monotonicityConstraintHolds(input(i)._1, input(i + 1)._1)) {
242+
while (i >= 0 && input(i)._1 > input(i + 1)._1) {
236243
pool(input, i, j)
237244
i = i - 1
238245
}
@@ -248,19 +255,17 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
248255
* Performs parallel pool adjacent violators algorithm.
249256
* Performs Pool adjacent violators algorithm on each partition and then again on the result.
250257
*
251-
* @param testData Input data of tuples (label, feature, weight).
252-
* @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
258+
* @param input Input data of tuples (label, feature, weight).
253259
* @return Result tuples (label, feature, weight) where labels were updated
254260
* to form a monotone sequence as per isotonic regression definition.
255261
*/
256262
private def parallelPoolAdjacentViolators(
257-
testData: RDD[(Double, Double, Double)],
258-
isotonic: Boolean): Array[(Double, Double, Double)] = {
263+
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
259264

260-
val parallelStepResult = testData
261-
.sortBy(_._2)
262-
.mapPartitions(it => poolAdjacentViolators(it.toArray, isotonic).toIterator)
265+
val parallelStepResult = input
266+
.sortBy(x => (x._2, x._1))
267+
.mapPartitions(it => poolAdjacentViolators(it.toArray).toIterator)
263268

264-
poolAdjacentViolators(parallelStepResult.collect(), isotonic)
269+
poolAdjacentViolators(parallelStepResult.collect())
265270
}
266271
}

mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,30 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
144144
assert(model.predict(10) === 10d/3)
145145
}
146146

147+
test("isotonic regression prediction with duplicate features") {
148+
val trainRDD = sc.parallelize(
149+
Seq[(Double, Double, Double)](
150+
(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1))).cache()
151+
val model = new IsotonicRegression().run(trainRDD)
152+
153+
assert(model.predict(0) === 1)
154+
assert(model.predict(1.5) === 2)
155+
assert(model.predict(2.5) === 4.5)
156+
assert(model.predict(4) === 6)
157+
}
158+
159+
test("antitonic regression prediction with duplicate features") {
160+
val trainRDD = sc.parallelize(
161+
Seq[(Double, Double, Double)](
162+
(5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1))).cache()
163+
val model = new IsotonicRegression().setIsotonic(false).run(trainRDD)
164+
165+
assert(model.predict(0) === 6)
166+
assert(model.predict(1.5) === 4.5)
167+
assert(model.predict(2.5) === 2)
168+
assert(model.predict(4) === 1)
169+
}
170+
147171
test("isotonic regression RDD prediction") {
148172
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
149173
val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache()

0 commit comments

Comments
 (0)