Skip to content

Commit 0d14bd3

Browse files
SPARK-3278 changed Java api to match Scala api's (Double, Double, Double)
1 parent 3c2954b commit 0d14bd3

File tree

4 files changed

+25
-25
lines changed

4 files changed

+25
-25
lines changed

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class PoolAdjacentViolators private [mllib]
9898

9999
override def run(
100100
input: RDD[(Double, Double, Double)],
101-
isotonic: Boolean): IsotonicRegressionModel = {
101+
isotonic: Boolean = true): IsotonicRegressionModel = {
102102
createModel(
103103
parallelPoolAdjacentViolators(input, isotonic),
104104
isotonic)
@@ -217,18 +217,20 @@ object IsotonicRegression {
217217
}
218218

219219
/**
220-
* Train a monotone regression model given an RDD of (label, feature).
220+
* Train a monotone regression model given an RDD of (label, feature, weight).
221221
* Label is the dependent y value
222-
* Weight defaults to 1
222+
* Weight of the data point is the number of measurements. Default is 1
223223
*
224-
* @param input RDD of (label, feature).
224+
* @param input RDD of (label, feature, weight).
225225
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
226226
* @return
227227
*/
228228
def train(
229-
input: JavaPairRDD[java.lang.Double, java.lang.Double],
229+
input: JavaRDD[(java.lang.Double, java.lang.Double, java.lang.Double)],
230230
isotonic: Boolean): IsotonicRegressionModel = {
231231
new PoolAdjacentViolators()
232-
.run(input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), 1d)), isotonic)
232+
.run(
233+
input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), x._3.doubleValue())),
234+
isotonic)
233235
}
234236
}

mllib/src/main/scala/org/apache/spark/mllib/util/IsotonicDataGenerator.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ object IsotonicDataGenerator {
3434
* @param labels list of labels for the data points
3535
* @return Java List of input.
3636
*/
37-
def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[(JDouble, JDouble)] = {
37+
def generateIsotonicInputAsList(
38+
labels: Array[Double]):java.util.List[(JDouble, JDouble, JDouble)] = {
3839
seqAsJavaList(
39-
generateIsotonicInput(
40-
wrapDoubleArray(labels):_*).map(x => (new JDouble(x._1), new JDouble(x._2))))
40+
generateIsotonicInput(wrapDoubleArray(labels):_*)
41+
.map(x => (new JDouble(x._1), new JDouble(x._2), new JDouble(1))))
4142
}
4243

4344
/**

mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.mllib.regression;
1919

20-
import org.apache.spark.api.java.JavaPairRDD;
2120
import org.apache.spark.api.java.JavaRDD;
2221
import org.apache.spark.api.java.JavaSparkContext;
2322
import org.apache.spark.api.java.function.Function;
@@ -26,7 +25,7 @@
2625
import org.junit.Assert;
2726
import org.junit.Before;
2827
import org.junit.Test;
29-
import scala.Tuple2;
28+
import scala.Tuple3;
3029

3130
import java.io.Serializable;
3231
import java.util.List;
@@ -45,11 +44,11 @@ public void tearDown() {
4544
sc = null;
4645
}
4746

48-
double difference(List<Tuple2<Double, Double>> expected, IsotonicRegressionModel model) {
47+
double difference(List<Tuple3<Double, Double, Double>> expected, IsotonicRegressionModel model) {
4948
double diff = 0;
5049

5150
for(int i = 0; i < model.predictions().length(); i++) {
52-
Tuple2<Double, Double> exp = expected.get(i);
51+
Tuple3<Double, Double, Double> exp = expected.get(i);
5352
diff += Math.abs(model.predict(exp._2()) - exp._1());
5453
}
5554

@@ -58,13 +57,13 @@ public void tearDown() {
5857

5958
@Test
6059
public void runIsotonicRegressionUsingStaticMethod() {
61-
JavaPairRDD<Double, Double> trainRDD = sc.parallelizePairs(
60+
JavaRDD<Tuple3<Double, Double, Double>> trainRDD = sc.parallelize(
6261
IsotonicDataGenerator.generateIsotonicInputAsList(
6362
new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();
6463

6564
IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true);
6665

67-
List<Tuple2<Double, Double>> expected = IsotonicDataGenerator
66+
List<Tuple3<Double, Double, Double>> expected = IsotonicDataGenerator
6867
.generateIsotonicInputAsList(
6968
new double[] {1, 2, 7d/3, 7d/3, 7d/3, 6, 7, 8, 10, 10, 10, 12});
7069

@@ -73,15 +72,15 @@ public void runIsotonicRegressionUsingStaticMethod() {
7372

7473
@Test
7574
public void testPredictJavaRDD() {
76-
JavaPairRDD<Double, Double> trainRDD = sc.parallelizePairs(
75+
JavaRDD<Tuple3<Double, Double, Double>> trainRDD = sc.parallelize(
7776
IsotonicDataGenerator.generateIsotonicInputAsList(
7877
new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12})).cache();
7978

8079
IsotonicRegressionModel model = IsotonicRegression.train(trainRDD, true);
8180

82-
JavaRDD<Double> testRDD = trainRDD.map(new Function<Tuple2<Double, Double>, Double>() {
81+
JavaRDD<Double> testRDD = trainRDD.map(new Function<Tuple3<Double, Double, Double>, Double>() {
8382
@Override
84-
public Double call(Tuple2<Double, Double> v) throws Exception {
83+
public Double call(Tuple3<Double, Double, Double> v) throws Exception {
8584
return v._2();
8685
}
8786
});
@@ -91,5 +90,4 @@ public Double call(Tuple2<Double, Double> v) throws Exception {
9190
Assert.assertTrue(predictions.get(0) == 1d);
9291
Assert.assertTrue(predictions.get(11) == 12d);
9392
}
94-
}
95-
93+
}

mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,16 +197,15 @@ class IsotonicRegressionClusterSuite
197197
extends FunSuite
198198
with LocalClusterSparkContext {
199199

200-
//TODO: FIX
201200
test("task size should be small in both training and prediction") {
202-
val n = 135000
201+
val n = 1000
203202

204203
val trainData = (0 to n).map(i => (i.toDouble, i.toDouble, 1d))
205-
val points = sc.parallelize(trainData, 1)
204+
val points = sc.parallelize(trainData, 2)
206205

207206
// If we serialize data directly in the task closure, the size of the serialized task would be
208207
// greater than 1MB and hence Spark would throw an error.
209-
val model = IsotonicRegression.train(points, true)
208+
val model = IsotonicRegression.train(points)
210209
val predictions = model.predict(points.map(_._2))
211210
}
212-
}
211+
}

0 commit comments

Comments
 (0)