|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one or more |
| 3 | + * contributor license agreements. See the NOTICE file distributed with |
| 4 | + * this work for additional information regarding copyright ownership. |
| 5 | + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| 6 | + * (the "License"); you may not use this file except in compliance with |
| 7 | + * the License. You may obtain a copy of the License at |
| 8 | + * |
| 9 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | + * |
| 11 | + * Unless required by applicable law or agreed to in writing, software |
| 12 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | + * See the License for the specific language governing permissions and |
| 15 | + * limitations under the License. |
| 16 | + */ |
| 17 | + |
| 18 | +package org.apache.spark.mllib.regression |
| 19 | + |
| 20 | +import java.io.Serializable |
| 21 | +import java.lang.{Double => JDouble} |
| 22 | +import java.util.Arrays.binarySearch |
| 23 | + |
| 24 | +import scala.collection.mutable.ArrayBuffer |
| 25 | + |
| 26 | +import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} |
| 27 | +import org.apache.spark.rdd.RDD |
| 28 | + |
| 29 | +/** |
| 30 | + * Regression model for isotonic regression. |
| 31 | + * |
| 32 | + * @param boundaries Array of boundaries for which predictions are known. |
| 33 | + * Boundaries must be sorted in increasing order. |
| 34 | + * @param predictions Array of predictions associated to the boundaries at the same index. |
| 35 | + * Results of isotonic regression and therefore monotone. |
| 36 | + * @param isotonic indicates whether this is isotonic or antitonic. |
| 37 | + */ |
| 38 | +class IsotonicRegressionModel ( |
| 39 | + val boundaries: Array[Double], |
| 40 | + val predictions: Array[Double], |
| 41 | + val isotonic: Boolean) extends Serializable { |
| 42 | + |
| 43 | + private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse |
| 44 | + |
| 45 | + require(boundaries.length == predictions.length) |
| 46 | + assertOrdered(boundaries) |
| 47 | + assertOrdered(predictions)(predictionOrd) |
| 48 | + |
| 49 | + /** Asserts the input array is monotone with the given ordering. */ |
| 50 | + private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = { |
| 51 | + var i = 1 |
| 52 | + while (i < xs.length) { |
| 53 | + require(ord.compare(xs(i - 1), xs(i)) <= 0, |
| 54 | + s"Elements (${xs(i - 1)}, ${xs(i)}) are not ordered.") |
| 55 | + i += 1 |
| 56 | + } |
| 57 | + } |
| 58 | + |
| 59 | + /** |
| 60 | + * Predict labels for provided features. |
| 61 | + * Using a piecewise linear function. |
| 62 | + * |
| 63 | + * @param testData Features to be labeled. |
| 64 | + * @return Predicted labels. |
| 65 | + */ |
| 66 | + def predict(testData: RDD[Double]): RDD[Double] = { |
| 67 | + testData.map(predict) |
| 68 | + } |
| 69 | + |
| 70 | + /** |
| 71 | + * Predict labels for provided features. |
| 72 | + * Using a piecewise linear function. |
| 73 | + * |
| 74 | + * @param testData Features to be labeled. |
| 75 | + * @return Predicted labels. |
| 76 | + */ |
| 77 | + def predict(testData: JavaDoubleRDD): JavaDoubleRDD = { |
| 78 | + JavaDoubleRDD.fromRDD(predict(testData.rdd.retag.asInstanceOf[RDD[Double]])) |
| 79 | + } |
| 80 | + |
| 81 | + /** |
| 82 | + * Predict a single label. |
| 83 | + * Using a piecewise linear function. |
| 84 | + * |
| 85 | + * @param testData Feature to be labeled. |
| 86 | + * @return Predicted label. |
| 87 | + * 1) If testData exactly matches a boundary then associated prediction is returned. |
| 88 | + * In case there are multiple predictions with the same boundary then one of them |
| 89 | + * is returned. Which one is undefined (same as java.util.Arrays.binarySearch). |
| 90 | + * 2) If testData is lower or higher than all boundaries then first or last prediction |
| 91 | + * is returned respectively. In case there are multiple predictions with the same |
| 92 | + * boundary then the lowest or highest is returned respectively. |
| 93 | + * 3) If testData falls between two values in boundary array then prediction is treated |
| 94 | + * as piecewise linear function and interpolated value is returned. In case there are |
| 95 | + * multiple values with the same boundary then the same rules as in 2) are used. |
| 96 | + */ |
| 97 | + def predict(testData: Double): Double = { |
| 98 | + |
| 99 | + def linearInterpolation(x1: Double, y1: Double, x2: Double, y2: Double, x: Double): Double = { |
| 100 | + y1 + (y2 - y1) * (x - x1) / (x2 - x1) |
| 101 | + } |
| 102 | + |
| 103 | + val foundIndex = binarySearch(boundaries, testData) |
| 104 | + val insertIndex = -foundIndex - 1 |
| 105 | + |
| 106 | + // Find if the index was lower than all values, |
| 107 | + // higher than all values, in between two values or exact match. |
| 108 | + if (insertIndex == 0) { |
| 109 | + predictions.head |
| 110 | + } else if (insertIndex == boundaries.length){ |
| 111 | + predictions.last |
| 112 | + } else if (foundIndex < 0) { |
| 113 | + linearInterpolation( |
| 114 | + boundaries(insertIndex - 1), |
| 115 | + predictions(insertIndex - 1), |
| 116 | + boundaries(insertIndex), |
| 117 | + predictions(insertIndex), |
| 118 | + testData) |
| 119 | + } else { |
| 120 | + predictions(foundIndex) |
| 121 | + } |
| 122 | + } |
| 123 | +} |
| 124 | + |
| 125 | +/** |
| 126 | + * Isotonic regression. |
| 127 | + * Currently implemented using parallelized pool adjacent violators algorithm. |
| 128 | + * Only univariate (single feature) algorithm supported. |
| 129 | + * |
| 130 | + * Sequential PAV implementation based on: |
| 131 | + * Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani. |
| 132 | + * "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61. |
| 133 | + * Available from http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf |
| 134 | + * |
| 135 | + * Sequential PAV parallelization based on: |
| 136 | + * Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset. |
| 137 | + * "An approach to parallelizing isotonic regression." |
| 138 | + * Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147. |
| 139 | + * Available from http://softlib.rice.edu/pub/CRPC-TRs/reports/CRPC-TR96640.pdf |
| 140 | + */ |
| 141 | +class IsotonicRegression private (private var isotonic: Boolean) extends Serializable { |
| 142 | + |
| 143 | + /** |
| 144 | + * Constructs IsotonicRegression instance with default parameter isotonic = true. |
| 145 | + * |
| 146 | + * @return New instance of IsotonicRegression. |
| 147 | + */ |
| 148 | + def this() = this(true) |
| 149 | + |
| 150 | + /** |
| 151 | + * Sets the isotonic parameter. |
| 152 | + * |
| 153 | + * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence. |
| 154 | + * @return This instance of IsotonicRegression. |
| 155 | + */ |
| 156 | + def setIsotonic(isotonic: Boolean): this.type = { |
| 157 | + this.isotonic = isotonic |
| 158 | + this |
| 159 | + } |
| 160 | + |
| 161 | + /** |
| 162 | + * Run IsotonicRegression algorithm to obtain isotonic regression model. |
| 163 | + * |
| 164 | + * @param input RDD of tuples (label, feature, weight) where label is dependent variable |
| 165 | + * for which we calculate isotonic regression, feature is independent variable |
| 166 | + * and weight represents number of measures with default 1. |
| 167 | + * If multiple labels share the same feature value then they are ordered before |
| 168 | + * the algorithm is executed. |
| 169 | + * @return Isotonic regression model. |
| 170 | + */ |
| 171 | + def run(input: RDD[(Double, Double, Double)]): IsotonicRegressionModel = { |
| 172 | + val preprocessedInput = if (isotonic) { |
| 173 | + input |
| 174 | + } else { |
| 175 | + input.map(x => (-x._1, x._2, x._3)) |
| 176 | + } |
| 177 | + |
| 178 | + val pooled = parallelPoolAdjacentViolators(preprocessedInput) |
| 179 | + |
| 180 | + val predictions = if (isotonic) pooled.map(_._1) else pooled.map(-_._1) |
| 181 | + val boundaries = pooled.map(_._2) |
| 182 | + |
| 183 | + new IsotonicRegressionModel(boundaries, predictions, isotonic) |
| 184 | + } |
| 185 | + |
| 186 | + /** |
| 187 | + * Run pool adjacent violators algorithm to obtain isotonic regression model. |
| 188 | + * |
| 189 | + * @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable |
| 190 | + * for which we calculate isotonic regression, feature is independent variable |
| 191 | + * and weight represents number of measures with default 1. |
| 192 | + * If multiple labels share the same feature value then they are ordered before |
| 193 | + * the algorithm is executed. |
| 194 | + * @return Isotonic regression model. |
| 195 | + */ |
| 196 | + def run(input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = { |
| 197 | + run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]]) |
| 198 | + } |
| 199 | + |
| 200 | + /** |
| 201 | + * Performs a pool adjacent violators algorithm (PAV). |
| 202 | + * Uses approach with single processing of data where violators |
| 203 | + * in previously processed data created by pooling are fixed immediately. |
| 204 | + * Uses optimization of discovering monotonicity violating sequences (blocks). |
| 205 | + * |
| 206 | + * @param input Input data of tuples (label, feature, weight). |
| 207 | + * @return Result tuples (label, feature, weight) where labels were updated |
| 208 | + * to form a monotone sequence as per isotonic regression definition. |
| 209 | + */ |
| 210 | + private def poolAdjacentViolators( |
| 211 | + input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { |
| 212 | + |
| 213 | + if (input.isEmpty) { |
| 214 | + return Array.empty |
| 215 | + } |
| 216 | + |
| 217 | + // Pools sub array within given bounds assigning weighted average value to all elements. |
| 218 | + def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { |
| 219 | + val poolSubArray = input.slice(start, end + 1) |
| 220 | + |
| 221 | + val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum |
| 222 | + val weight = poolSubArray.map(_._3).sum |
| 223 | + |
| 224 | + var i = start |
| 225 | + while (i <= end) { |
| 226 | + input(i) = (weightedSum / weight, input(i)._2, input(i)._3) |
| 227 | + i = i + 1 |
| 228 | + } |
| 229 | + } |
| 230 | + |
| 231 | + var i = 0 |
| 232 | + while (i < input.length) { |
| 233 | + var j = i |
| 234 | + |
| 235 | + // Find monotonicity violating sequence, if any. |
| 236 | + while (j < input.length - 1 && input(j)._1 > input(j + 1)._1) { |
| 237 | + j = j + 1 |
| 238 | + } |
| 239 | + |
| 240 | + // If monotonicity was not violated, move to next data point. |
| 241 | + if (i == j) { |
| 242 | + i = i + 1 |
| 243 | + } else { |
| 244 | + // Otherwise pool the violating sequence |
| 245 | + // and check if pooling caused monotonicity violation in previously processed points. |
| 246 | + while (i >= 0 && input(i)._1 > input(i + 1)._1) { |
| 247 | + pool(input, i, j) |
| 248 | + i = i - 1 |
| 249 | + } |
| 250 | + |
| 251 | + i = j |
| 252 | + } |
| 253 | + } |
| 254 | + |
| 255 | + // For points having the same prediction, we only keep two boundary points. |
| 256 | + val compressed = ArrayBuffer.empty[(Double, Double, Double)] |
| 257 | + |
| 258 | + var (curLabel, curFeature, curWeight) = input.head |
| 259 | + var rightBound = curFeature |
| 260 | + def merge(): Unit = { |
| 261 | + compressed += ((curLabel, curFeature, curWeight)) |
| 262 | + if (rightBound > curFeature) { |
| 263 | + compressed += ((curLabel, rightBound, 0.0)) |
| 264 | + } |
| 265 | + } |
| 266 | + i = 1 |
| 267 | + while (i < input.length) { |
| 268 | + val (label, feature, weight) = input(i) |
| 269 | + if (label == curLabel) { |
| 270 | + curWeight += weight |
| 271 | + rightBound = feature |
| 272 | + } else { |
| 273 | + merge() |
| 274 | + curLabel = label |
| 275 | + curFeature = feature |
| 276 | + curWeight = weight |
| 277 | + rightBound = curFeature |
| 278 | + } |
| 279 | + i += 1 |
| 280 | + } |
| 281 | + merge() |
| 282 | + |
| 283 | + compressed.toArray |
| 284 | + } |
| 285 | + |
| 286 | + /** |
| 287 | + * Performs parallel pool adjacent violators algorithm. |
| 288 | + * Performs Pool adjacent violators algorithm on each partition and then again on the result. |
| 289 | + * |
| 290 | + * @param input Input data of tuples (label, feature, weight). |
| 291 | + * @return Result tuples (label, feature, weight) where labels were updated |
| 292 | + * to form a monotone sequence as per isotonic regression definition. |
| 293 | + */ |
| 294 | + private def parallelPoolAdjacentViolators( |
| 295 | + input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { |
| 296 | + val parallelStepResult = input |
| 297 | + .sortBy(x => (x._2, x._1)) |
| 298 | + .glom() |
| 299 | + .flatMap(poolAdjacentViolators) |
| 300 | + .collect() |
| 301 | + .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering. |
| 302 | + poolAdjacentViolators(parallelStepResult) |
| 303 | + } |
| 304 | +} |
0 commit comments