Skip to content

Commit 941fd1f

Browse files
SPARK-3278 Isotonic regression java api
1 parent a24e29f commit 941fd1f

File tree

4 files changed

+71
-72
lines changed

4 files changed

+71
-72
lines changed

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.mllib.regression
1919

20+
import org.apache.spark.api.java.{JavaRDD, JavaPairRDD}
2021
import org.apache.spark.rdd.RDD
2122

2223
/**
@@ -30,9 +31,30 @@ class IsotonicRegressionModel (
3031
val isotonic: Boolean)
3132
extends Serializable {
3233

34+
/**
35+
* Predict labels for provided features
36+
*
37+
* @param testData features to be labeled
38+
* @return predicted labels
39+
*/
3340
def predict(testData: RDD[Double]): RDD[Double] =
3441
testData.map(predict)
3542

43+
/**
44+
* Predict labels for provided features
45+
*
46+
* @param testData features to be labeled
47+
* @return predicted labels
48+
*/
49+
def predict(testData: JavaRDD[java.lang.Double]): RDD[java.lang.Double] =
50+
testData.rdd.map(x => x.doubleValue()).map(predict)
51+
52+
/**
53+
* Predict a single label
54+
*
55+
* @param testData feature to be labeled
56+
* @return predicted label
57+
*/
3658
def predict(testData: Double): Double =
3759
// Take the highest of data points smaller than our feature or data point with lowest feature
3860
(predictions.head +: predictions.filter(y => y._2 <= testData)).last._1
@@ -59,7 +81,7 @@ trait IsotonicRegressionAlgorithm
5981
/**
6082
* Run algorithm to obtain isotonic regression model
6183
*
62-
* @param input data
84+
* @param input (label, feature, weight)
6385
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
6486
* @return isotonic regression model
6587
*/
@@ -115,12 +137,11 @@ class PoolAdjacentViolators private [mllib]
115137
}
116138
}
117139

118-
def monotonicityConstraint(isotonic: Boolean): (Double, Double) => Boolean =
119-
(x, y) => if(isotonic) {
120-
x <= y
121-
} else {
122-
x >= y
123-
}
140+
val isotonicConstraint: (Double, Double) => Boolean = (x, y) => x <= y
141+
val antitonicConstraint: (Double, Double) => Boolean = (x, y) => x >= y
142+
143+
def monotonicityConstraint(isotonic: Boolean) =
144+
if(isotonic) isotonicConstraint else antitonicConstraint
124145

125146
val monotonicityConstraintHolds = monotonicityConstraint(isotonic)
126147

@@ -179,12 +200,11 @@ class PoolAdjacentViolators private [mllib]
179200
object IsotonicRegression {
180201

181202
/**
182-
* Train a monotone regression model given an RDD of (label, features, weight).
183-
* Currently only one dimensional algorithm is supported (features.length is one)
203+
* Train a monotone regression model given an RDD of (label, feature, weight).
184204
* Label is the dependent y value
185205
* Weight of the data point is the number of measurements. Default is 1
186206
*
187-
* @param input RDD of (label, array of features, weight).
207+
* @param input RDD of (label, feature, weight).
188208
* Each point describes a row of the data
189209
* matrix A as well as the corresponding right hand side label y
190210
* and weight as number of measurements
@@ -195,4 +215,20 @@ object IsotonicRegression {
195215
isotonic: Boolean = true): IsotonicRegressionModel = {
196216
new PoolAdjacentViolators().run(input, isotonic)
197217
}
218+
219+
/**
220+
* Train a monotone regression model given an RDD of (label, feature).
221+
* Label is the dependent y value
222+
* Weight defaults to 1
223+
*
224+
* @param input RDD of (label, feature).
225+
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
226+
* @return
227+
*/
228+
def train(
229+
input: JavaPairRDD[java.lang.Double, java.lang.Double],
230+
isotonic: Boolean): IsotonicRegressionModel = {
231+
new PoolAdjacentViolators()
232+
.run(input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), 1d)), isotonic)
233+
}
198234
}

mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.mllib.util
1919

2020
import scala.collection.JavaConversions._
21+
import java.lang.{Double => JDouble}
2122

2223
object IsotonicDataGenerator {
2324

@@ -26,13 +27,11 @@ object IsotonicDataGenerator {
2627
* @param labels list of labels for the data points
2728
* @return Java List of input.
2829
*/
29-
def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[(Double, Double, Double)] = {
30-
seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*))
30+
def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[(JDouble, JDouble)] = {
31+
seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*).map(x => (new JDouble(x._1), new JDouble(x._2))))
3132
//.map(d => new Tuple3(new java.lang.Double(d._1), new java.lang.Double(d._2), new java.lang.Double(d._3))))
3233
}
3334

34-
def bam(d: Option[Double]): Double = d.get
35-
3635
/**
3736
* Return an ordered sequence of labeled data points with default weights
3837
* @param labels list of labels for the data points

mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java

Lines changed: 19 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,22 @@
1313
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
16-
*//*
17-
16+
*/
1817

1918
package org.apache.spark.mllib.regression;
2019

2120
import org.apache.spark.api.java.JavaPairRDD;
2221
import org.apache.spark.api.java.JavaRDD;
2322
import org.apache.spark.api.java.JavaSparkContext;
2423
import org.apache.spark.api.java.function.Function;
25-
import org.apache.spark.mllib.linalg.Vector;
26-
import org.apache.spark.mllib.linalg.Vectors;
2724
import org.apache.spark.mllib.util.IsotonicDataGenerator;
2825
import org.junit.After;
2926
import org.junit.Assert;
3027
import org.junit.Before;
3128
import org.junit.Test;
3229
import scala.Tuple2;
33-
import scala.Tuple3;
3430

3531
import java.io.Serializable;
36-
import java.util.Arrays;
3732
import java.util.List;
3833

3934
public class JavaIsotonicRegressionSuite implements Serializable {
@@ -50,52 +45,26 @@ public void tearDown() {
5045
sc = null;
5146
}
5247

53-
double difference(List<Tuple3<Double, Double, Double>> expected, IsotonicRegressionModel model) {
48+
double difference(List<Tuple2<Double, Double>> expected, IsotonicRegressionModel model) {
5449
double diff = 0;
5550

5651
for(int i = 0; i < model.predictions().length(); i++) {
57-
Tuple3<Double, Double, Double> exp = expected.get(i);
52+
Tuple2<Double, Double> exp = expected.get(i);
5853
diff += Math.abs(model.predict(exp._2()) - exp._1());
5954
}
6055

6156
return diff;
6257
}
6358

64-
*/
65-
/*@Test
66-
public void runIsotonicRegressionUsingConstructor() {
67-
JavaRDD<Tuple3<Double, Double, Double>> testRDD = sc.parallelize(IsotonicDataGenerator
68-
.generateIsotonicInputAsList(
69-
new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();
70-
71-
IsotonicRegressionAlgorithm isotonicRegressionAlgorithm = new PoolAdjacentViolators();
72-
IsotonicRegressionModel model = isotonicRegressionAlgorithm.run(testRDD.rdd(), true);
73-
74-
List<Tuple3<Double, Double, Double>> expected = IsotonicDataGenerator
75-
.generateIsotonicInputAsList(
76-
new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12});
77-
78-
Assert.assertTrue(difference(expected, model) == 0);
79-
}*//*
80-
81-
8259
@Test
8360
public void runIsotonicRegressionUsingStaticMethod() {
84-
*/
85-
/*JavaRDD<Tuple3<Double, Double, Double>> testRDD = sc.parallelize(IsotonicDataGenerator
86-
.generateIsotonicInputAsList(
87-
new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();*//*
88-
89-
90-
*/
91-
/*JavaRDD<Tuple3<Double, Double, Double>> testRDD = sc.parallelize(Arrays.asList(new Tuple3(1.0, 1.0, 1.0)));*//*
61+
JavaPairRDD<Double, Double> trainRDD = sc.parallelizePairs(
62+
IsotonicDataGenerator.generateIsotonicInputAsList(
63+
new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();
9264

65+
IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true);
9366

94-
JavaPairRDD<Double, Double> testRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<Double, Double>(1.0, 1.0)));
95-
96-
IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), true);
97-
98-
List<Tuple3<Double, Double, Double>> expected = IsotonicDataGenerator
67+
List<Tuple2<Double, Double>> expected = IsotonicDataGenerator
9968
.generateIsotonicInputAsList(
10069
new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12});
10170

@@ -104,23 +73,23 @@ public void runIsotonicRegressionUsingStaticMethod() {
10473

10574
@Test
10675
public void testPredictJavaRDD() {
107-
JavaRDD<Tuple3<Double, Double, Double>> testRDD = sc.parallelize(IsotonicDataGenerator
108-
.generateIsotonicInputAsList(
109-
new double[] {1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();
76+
JavaPairRDD<Double, Double> trainRDD = sc.parallelizePairs(
77+
IsotonicDataGenerator.generateIsotonicInputAsList(
78+
new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();
11079

111-
IsotonicRegressionModel model = IsotonicRegression.train(testRDD.rdd(), true);
80+
IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true);
11281

113-
JavaRDD<Vector> vectors = testRDD.map(new Function<Tuple3<Double, Double, Double>, Vector>() {
82+
JavaRDD<Double> testRDD = trainRDD.map(new Function<Tuple2<Double, Double>, Double>() {
11483
@Override
115-
public Vector call(Tuple3<Double, Double, Double> v) throws Exception {
116-
return Vectors.dense(v._2());
84+
public Double call(Tuple2<Double, Double> v) throws Exception {
85+
return v._2();
11786
}
11887
});
11988

120-
List<Double> predictions = model.predict(vectors).collect();
89+
Double[] predictions = model.predict(testRDD).collect();
12190

122-
Assert.assertTrue(predictions.get(0) == 1d);
123-
Assert.assertTrue(predictions.get(11) == 12d);
91+
Assert.assertTrue(predictions[0] == 1d);
92+
Assert.assertTrue(predictions[11] == 12d);
12493
}
12594
}
126-
*/
95+

mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -197,18 +197,13 @@ class IsotonicRegressionClusterSuite
197197
extends FunSuite
198198
with LocalClusterSparkContext {
199199

200+
//TODO: FIX
200201
test("task size should be small in both training and prediction") {
201-
val n = 5
202-
203-
val trainData = (0 to n).map(i => (i.toDouble, i.toDouble, 1.toDouble))
202+
val n = 135000
204203

204+
val trainData = (0 to n).map(i => (i.toDouble, i.toDouble, 1d))
205205
val points = sc.parallelize(trainData, 1)
206206

207-
/*val points = sc.parallelize(0 until n, 2).mapPartitionsWithIndex { (idx, iter) =>
208-
val random = new Random(idx)
209-
iter.map(i => (random.nextDouble(), random.nextDouble(), 1))
210-
}.cache()*/
211-
212207
// If we serialize data directly in the task closure, the size of the serialized task would be
213208
// greater than 1MB and hence Spark would throw an error.
214209
val model = IsotonicRegression.train(points, true)

0 commit comments

Comments
 (0)