Skip to content

Commit ded071c

Browse files
Merge pull request #1 from mengxr/SPARK-3278
Update isotonic regression
2 parents 3da56e5 + 4dfe136 commit ded071c

File tree

2 files changed

+142
-57
lines changed

2 files changed

+142
-57
lines changed

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.io.Serializable
2121
import java.lang.{Double => JDouble}
2222
import java.util.Arrays.binarySearch
2323

24+
import scala.collection.mutable.ArrayBuffer
25+
2426
import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
2527
import org.apache.spark.rdd.RDD
2628

@@ -31,31 +33,29 @@ import org.apache.spark.rdd.RDD
3133
* Boundaries must be sorted in increasing order.
3234
* @param predictions Array of predictions associated to the boundaries at the same index.
3335
* Results of isotonic regression and therefore monotone.
36+
* @param isotonic indicates whether this is isotonic or antitonic.
3437
*/
3538
class IsotonicRegressionModel (
36-
boundaries: Array[Double],
39+
val boundaries: Array[Double],
3740
val predictions: Array[Double],
38-
isotonic: Boolean)
39-
extends Serializable {
41+
val isotonic: Boolean) extends Serializable {
42+
43+
private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse
44+
45+
require(boundaries.length == predictions.length)
46+
assertOrdered(boundaries)
47+
assertOrdered(predictions)(predictionOrd)
4048

41-
private def isSorted(xs: Array[Double]): Boolean = {
49+
/** Asserts the input array is monotone with the given ordering. */
50+
private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
4251
var i = 1
4352
while (i < xs.length) {
44-
if (xs(i) < xs(i - 1)) false
53+
require(ord.compare(xs(i - 1), xs(i)) <= 0,
54+
s"Elements (${xs(i - 1)}, ${xs(i)}) are not ordered.")
4555
i += 1
4656
}
47-
true
48-
}
49-
50-
if (isotonic) {
51-
assert(isSorted(predictions))
52-
} else {
53-
assert(isSorted(predictions.map(-_)))
5457
}
5558

56-
assert(isSorted(boundaries))
57-
assert(boundaries.length == predictions.length)
58-
5959
/**
6060
* Predict labels for provided features.
6161
* Using a piecewise linear function.
@@ -175,10 +175,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
175175
input.map(x => (-x._1, x._2, x._3))
176176
}
177177

178-
val isotonicRegression = parallelPoolAdjacentViolators(preprocessedInput)
178+
val pooled = parallelPoolAdjacentViolators(preprocessedInput)
179179

180-
val predictions = if (isotonic) isotonicRegression.map(_._1) else isotonicRegression.map(-_._1)
181-
val boundaries = isotonicRegression.map(_._2)
180+
val predictions = if (isotonic) pooled.map(_._1) else pooled.map(-_._1)
181+
val boundaries = pooled.map(_._2)
182182

183183
new IsotonicRegressionModel(boundaries, predictions, isotonic)
184184
}
@@ -210,6 +210,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
210210
private def poolAdjacentViolators(
211211
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
212212

213+
if (input.isEmpty) {
214+
return Array.empty
215+
}
216+
213217
// Pools sub array within given bounds assigning weighted average value to all elements.
214218
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
215219
val poolSubArray = input.slice(start, end + 1)
@@ -248,7 +252,35 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
248252
}
249253
}
250254

251-
input
255+
// For points having the same prediction, we only keep two boundary points.
256+
val compressed = ArrayBuffer.empty[(Double, Double, Double)]
257+
258+
var (curLabel, curFeature, curWeight) = input.head
259+
var rightBound = curFeature
260+
def merge(): Unit = {
261+
compressed += ((curLabel, curFeature, curWeight))
262+
if (rightBound > curFeature) {
263+
compressed += ((curLabel, rightBound, 0.0))
264+
}
265+
}
266+
i = 1
267+
while (i < input.length) {
268+
val (label, feature, weight) = input(i)
269+
if (label == curLabel) {
270+
curWeight += weight
271+
rightBound = feature
272+
} else {
273+
merge()
274+
curLabel = label
275+
curFeature = feature
276+
curWeight = weight
277+
rightBound = curFeature
278+
}
279+
i += 1
280+
}
281+
merge()
282+
283+
compressed.toArray
252284
}
253285

254286
/**
@@ -261,11 +293,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
261293
*/
262294
private def parallelPoolAdjacentViolators(
263295
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
264-
265296
val parallelStepResult = input
266297
.sortBy(x => (x._2, x._1))
267-
.mapPartitions(it => poolAdjacentViolators(it.toArray).toIterator)
268-
269-
poolAdjacentViolators(parallelStepResult.collect())
298+
.glom()
299+
.flatMap(poolAdjacentViolators)
300+
.collect()
301+
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
302+
poolAdjacentViolators(parallelStepResult)
270303
}
271304
}

mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala

Lines changed: 86 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.regression
2020
import org.scalatest.{Matchers, FunSuite}
2121

2222
import org.apache.spark.mllib.util.MLlibTestSparkContext
23+
import org.apache.spark.mllib.util.TestingUtils._
2324

2425
class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers {
2526

@@ -28,15 +29,13 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
2829
}
2930

3031
private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = {
31-
labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d))
32+
Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, 1d))
3233
}
3334

3435
private def generateIsotonicInput(
3536
labels: Seq[Double],
3637
weights: Seq[Double]): Seq[(Double, Double, Double)] = {
37-
labels.zip(1 to labels.size)
38-
.zip(weights)
39-
.map(point => (point._1._1, point._1._2.toDouble, point._2))
38+
Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, weights(i)))
4039
}
4140

4241
private def runIsotonicRegression(
@@ -54,9 +53,24 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
5453
}
5554

5655
test("increasing isotonic regression") {
57-
val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true)
56+
/*
57+
The following result could be re-produced with sklearn.
5858
59-
assert(model.predictions === Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 16.5, 16.5, 17, 18))
59+
> from sklearn.isotonic import IsotonicRegression
60+
> x = range(9)
61+
> y = [1, 2, 3, 1, 6, 17, 16, 17, 18]
62+
> ir = IsotonicRegression(x, y)
63+
> print ir.predict(x)
64+
65+
array([ 1. , 2. , 2. , 2. , 6. , 16.5, 16.5, 17. , 18. ])
66+
*/
67+
val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true)
68+
69+
assert(Array.tabulate(9)(x => model.predict(x)) === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18))
70+
71+
assert(model.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8))
72+
assert(model.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0))
73+
assert(model.isotonic)
6074
}
6175

6276
test("isotonic regression with size 0") {
@@ -80,74 +94,82 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
8094
test("isotonic regression strictly decreasing sequence") {
8195
val model = runIsotonicRegression(Seq(5, 4, 3, 2, 1), true)
8296

83-
assert(model.predictions === Array(3, 3, 3, 3, 3))
97+
assert(model.boundaries === Array(0, 4))
98+
assert(model.predictions === Array(3, 3))
8499
}
85100

86101
test("isotonic regression with last element violating monotonicity") {
87102
val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), true)
88103

89-
assert(model.predictions === Array(1, 2, 3, 3, 3))
104+
assert(model.boundaries === Array(0, 1, 2, 4))
105+
assert(model.predictions === Array(1, 2, 3, 3))
90106
}
91107

92108
test("isotonic regression with first element violating monotonicity") {
93109
val model = runIsotonicRegression(Seq(4, 2, 3, 4, 5), true)
94110

95-
assert(model.predictions === Array(3, 3, 3, 4, 5))
111+
assert(model.boundaries === Array(0, 2, 3, 4))
112+
assert(model.predictions === Array(3, 3, 4, 5))
96113
}
97114

98115
test("isotonic regression with negative labels") {
99116
val model = runIsotonicRegression(Seq(-1, -2, 0, 1, -1), true)
100117

101-
assert(model.predictions === Array(-1.5, -1.5, 0, 0, 0))
118+
assert(model.boundaries === Array(0, 1, 2, 4))
119+
assert(model.predictions === Array(-1.5, -1.5, 0, 0))
102120
}
103121

104122
test("isotonic regression with unordered input") {
105-
val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache()
106-
val model = new IsotonicRegression().run(trainRDD)
123+
val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2).cache()
107124

125+
val model = new IsotonicRegression().run(trainRDD)
108126
assert(model.predictions === Array(1, 2, 3, 4, 5))
109127
}
110128

111129
test("weighted isotonic regression") {
112130
val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2), true)
113131

114-
assert(model.predictions === Array(1, 2, 2.75, 2.75,2.75))
132+
assert(model.boundaries === Array(0, 1, 2, 4))
133+
assert(model.predictions === Array(1, 2, 2.75, 2.75))
115134
}
116135

117136
test("weighted isotonic regression with weights lower than 1") {
118137
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true)
119138

120-
assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2))
139+
assert(model.boundaries === Array(0, 1, 2, 4))
140+
assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2))
121141
}
122142

123143
test("weighted isotonic regression with negative weights") {
124144
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
125145

126-
assert(model.predictions === Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6))
146+
assert(model.boundaries === Array(0.0, 1.0, 4.0))
147+
assert(model.predictions === Array(1.0, 10.0/6, 10.0/6))
127148
}
128149

129150
test("weighted isotonic regression with zero weights") {
130151
val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true)
131152

132-
assert(model.predictions === Array(1, 2, 2, 2, 2))
153+
assert(model.boundaries === Array(0.0, 1.0, 4.0))
154+
assert(model.predictions === Array(1, 2, 2))
133155
}
134156

135157
test("isotonic regression prediction") {
136158
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
137159

160+
assert(model.predict(-2) === 1)
138161
assert(model.predict(-1) === 1)
139-
assert(model.predict(0) === 1)
140-
assert(model.predict(1.5) === 1.5)
141-
assert(model.predict(1.75) === 1.75)
142-
assert(model.predict(2) === 2)
143-
assert(model.predict(3) === 10d/3)
144-
assert(model.predict(10) === 10d/3)
162+
assert(model.predict(0.5) === 1.5)
163+
assert(model.predict(0.75) === 1.75)
164+
assert(model.predict(1) === 2)
165+
assert(model.predict(2) === 10d/3)
166+
assert(model.predict(9) === 10d/3)
145167
}
146168

147169
test("isotonic regression prediction with duplicate features") {
148170
val trainRDD = sc.parallelize(
149171
Seq[(Double, Double, Double)](
150-
(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1))).cache()
172+
(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2).cache()
151173
val model = new IsotonicRegression().run(trainRDD)
152174

153175
assert(model.predict(0) === 1)
@@ -159,7 +181,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
159181
test("antitonic regression prediction with duplicate features") {
160182
val trainRDD = sc.parallelize(
161183
Seq[(Double, Double, Double)](
162-
(5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1))).cache()
184+
(5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2).cache()
163185
val model = new IsotonicRegression().setIsotonic(false).run(trainRDD)
164186

165187
assert(model.predict(0) === 6)
@@ -170,20 +192,50 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
170192

171193
test("isotonic regression RDD prediction") {
172194
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
173-
val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache()
174195

175-
assert(model.predict(testRDD).collect() === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3))
196+
val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2).cache()
197+
val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2)
198+
assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3))
176199
}
177200

178201
test("antitonic regression prediction") {
179202
val model = runIsotonicRegression(Seq(7, 5, 3, 5, 1), false)
180203

204+
assert(model.predict(-2) === 7)
181205
assert(model.predict(-1) === 7)
182-
assert(model.predict(0) === 7)
183-
assert(model.predict(1.5) === 6)
184-
assert(model.predict(1.75) === 5.5)
185-
assert(model.predict(2) === 5)
186-
assert(model.predict(3) === 4)
187-
assert(model.predict(10) === 1)
188-
}
189-
}
206+
assert(model.predict(0.5) === 6)
207+
assert(model.predict(0.75) === 5.5)
208+
assert(model.predict(1) === 5)
209+
assert(model.predict(2) === 4)
210+
assert(model.predict(9) === 1)
211+
}
212+
213+
test("model construction") {
214+
val model = new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = true)
215+
assert(model.predict(-0.5) === 1.0)
216+
assert(model.predict(0.0) === 1.0)
217+
assert(model.predict(0.5) ~== 1.5 absTol 1e-14)
218+
assert(model.predict(1.0) === 2.0)
219+
assert(model.predict(1.5) === 2.0)
220+
221+
intercept[IllegalArgumentException] {
222+
// different array sizes.
223+
new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0), isotonic = true)
224+
}
225+
226+
intercept[IllegalArgumentException] {
227+
// unordered boundaries
228+
new IsotonicRegressionModel(Array(1.0, 0.0), Array(1.0, 2.0), isotonic = true)
229+
}
230+
231+
intercept[IllegalArgumentException] {
232+
// unordered predictions (isotonic)
233+
new IsotonicRegressionModel(Array(0.0, 1.0), Array(2.0, 1.0), isotonic = true)
234+
}
235+
236+
intercept[IllegalArgumentException] {
237+
// unordered predictions (antitonic)
238+
new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = false)
239+
}
240+
}
241+
}

0 commit comments

Comments
 (0)