Skip to content

Commit 5c8ebe5

Browse files
committed
Python API for IsotonicRegression
1 parent 343d3bf commit 5c8ebe5

File tree

3 files changed

+98
-1
lines changed

3 files changed

+98
-1
lines changed

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,22 @@ private[python] class PythonMLLibAPI extends Serializable {
282282
map(_.asInstanceOf[Object]).asJava
283283
}
284284

285+
/**
286+
* Java stub for Python mllib IsotonicRegression.run()
287+
*/
288+
def trainIsotonicRegressionModel(
289+
data: JavaRDD[Vector],
290+
isotonic: Boolean): JList[Object] = {
291+
val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic)
292+
try {
293+
val model = isotonicRegressionAlg.run(data.rdd.map(_.toArray).map {
294+
x => (x(0), x(1), x(2)) }.persist(StorageLevel.MEMORY_AND_DISK))
295+
List(model.boundaries, model.predictions).map(_.asInstanceOf[Object]).asJava
296+
} finally {
297+
data.rdd.unpersist(blocking = false)
298+
}
299+
}
300+
285301
/**
286302
* Java stub for Python mllib KMeans.run()
287303
*/

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.Serializable
2121
import java.lang.{Double => JDouble}
2222
import java.util.Arrays.binarySearch
2323

24+
import scala.collection.JavaConverters._
2425
import scala.collection.mutable.ArrayBuffer
2526

2627
import org.json4s._
@@ -57,6 +58,13 @@ class IsotonicRegressionModel (
5758
assertOrdered(boundaries)
5859
assertOrdered(predictions)(predictionOrd)
5960

61+
/** A Java-friendly constructor that takes two Iterable parameters and one Boolean parameter. */
62+
def this(boundaries: java.lang.Iterable[Double],
63+
predictions: java.lang.Iterable[Double],
64+
isotonic: java.lang.Boolean) = {
65+
this(boundaries.asScala.toArray, predictions.asScala.toArray, isotonic)
66+
}
67+
6068
/** Asserts the input array is monotone with the given ordering. */
6169
private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
6270
var i = 1

python/pyspark/mllib/regression.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
import numpy as np
1919
from numpy import array
2020

21+
from pyspark import RDD
2122
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
22-
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
23+
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
2324
from pyspark.mllib.util import Saveable, Loader
2425

2526
__all__ = ['LabeledPoint', 'LinearModel',
@@ -396,6 +397,78 @@ def train(rdd, i):
396397
return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)
397398

398399

400+
class IsotonicRegressionModel(Saveable, Loader):
401+
402+
"""Regression model for isotonic regression.
403+
404+
>>> data = [(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)]
405+
>>> irm = IsotonicRegression.train(sc.parallelize(data))
406+
>>> irm.predict(1.5)
407+
2.0
408+
>>> irm.predict(2.5)
409+
4.5
410+
>>> irm.predict(4)
411+
6.0
412+
>>> irm.predict(sc.parallelize([1.5, 2.5, 4])).collect()
413+
[2.0, 4.5, 6.0]
414+
>>> import os, tempfile
415+
>>> path = tempfile.mkdtemp()
416+
>>> irm.save(sc, path)
417+
>>> sameModel = IsotonicRegressionModel.load(sc, path)
418+
>>> sameModel.predict(1.5)
419+
2.0
420+
>>> sameModel.predict(2.5)
421+
4.5
422+
>>> sameModel.predict(4)
423+
6.0
424+
>>> try:
425+
... os.removedirs(path)
426+
... except OSError:
427+
... pass
428+
"""
429+
430+
def __init__(self, boundaries, predictions, isotonic):
431+
self.boundaries = boundaries
432+
self.predictions = predictions
433+
self.isotonic = isotonic
434+
435+
def predict(self, x):
436+
if isinstance(x, RDD):
437+
return x.map(lambda v: self.predict(v))
438+
return np.interp(x, self.boundaries, self.predictions)
439+
440+
def save(self, sc, path):
441+
java_boundaries = _py2java(sc, self.boundaries.tolist())
442+
java_predictions = _py2java(sc, self.predictions.tolist())
443+
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel(
444+
java_boundaries, java_predictions, self.isotonic)
445+
java_model.save(sc._jsc.sc(), path)
446+
447+
@classmethod
448+
def load(cls, sc, path):
449+
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
450+
sc._jsc.sc(), path)
451+
py_boundaries = _java2py(sc, java_model.boundaries())
452+
py_predictions = _java2py(sc, java_model.predictions())
453+
return IsotonicRegressionModel(np.array(py_boundaries),
454+
np.array(py_predictions), java_model.isotonic)
455+
456+
457+
class IsotonicRegression(object):
458+
"""
459+
Run IsotonicRegression algorithm to obtain isotonic regression model.
460+
461+
:param data: RDD of data points
462+
:param isotonic: Whether this is isotonic or antitonic.
463+
"""
464+
@classmethod
465+
def train(cls, data, isotonic=True):
466+
"""Train a isotonic regression model on the given data."""
467+
boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel",
468+
data.map(_convert_to_vector), bool(isotonic))
469+
return IsotonicRegressionModel(np.array(boundaries), np.array(predictions), isotonic)
470+
471+
399472
def _test():
400473
import doctest
401474
from pyspark import SparkContext

0 commit comments

Comments
 (0)