Skip to content

Commit 9c65fa7

Browse files
mengxrmateiz
authored andcommitted
[SPARK-1212, Part II] Support sparse data in MLlib
In PR #117, we added dense/sparse vector data model and updated KMeans to support sparse input. This PR is to replace all other `Array[Double]` usage by `Vector` in generalized linear models (GLMs) and Naive Bayes. Major changes: 1. `LabeledPoint` becomes `LabeledPoint(Double, Vector)`. 2. Methods that accept `RDD[Array[Double]]` now accept `RDD[Vector]`. We cannot support both in an elegant way because of type erasure. 3. Mark 'createModel' and 'predictPoint' protected because they are not for end users. 4. Add libSVMFile to MLContext. 5. NaiveBayes can accept arbitrary labels (introducing a breaking change to Python's `NaiveBayesModel`). 6. Gradient computation no longer creates temp vectors. 7. Column normalization and centering are removed from Lasso and Ridge because the operation will densify the data. Simple feature transformation can be done before training. TODO: 1. ~~Use axpy when possible.~~ 2. ~~Optimize Naive Bayes.~~ Author: Xiangrui Meng <[email protected]> Closes #245 from mengxr/vector and squashes the following commits: eb6e793 [Xiangrui Meng] move libSVMFile to MLUtils and rename to loadLibSVMData c26c4fc [Xiangrui Meng] update DecisionTree to use RDD[Vector] 11999c7 [Xiangrui Meng] Merge branch 'master' into vector f7da54b [Xiangrui Meng] add minSplits to libSVMFile da25e24 [Xiangrui Meng] revert the change to default addIntercept because it might change the behavior of existing code without warning 493f26f [Xiangrui Meng] Merge branch 'master' into vector 7c1bc01 [Xiangrui Meng] add a TODO to NB b9b7ef7 [Xiangrui Meng] change default value of addIntercept to false b01df54 [Xiangrui Meng] allow to change or clear threshold in LR and SVM 4addc50 [Xiangrui Meng] merge master 4ca5b1b [Xiangrui Meng] remove normalization from Lasso and update tests f04fe8a [Xiangrui Meng] remove normalization from RidgeRegression and update tests d088552 [Xiangrui Meng] use static constructor for MLContext 6f59eed [Xiangrui Meng] update libSVMFile to determine number of features automatically 3432e84 [Xiangrui Meng] update NaiveBayes to support sparse data 0f8759b [Xiangrui Meng] minor updates to NB b11659c [Xiangrui Meng] style update 78c4671 [Xiangrui Meng] add libSVMFile to MLContext f0fe616 [Xiangrui Meng] add a test for sparse linear regression 44733e1 [Xiangrui Meng] use in-place gradient computation e981396 [Xiangrui Meng] use axpy in Updater db808a1 [Xiangrui Meng] update JavaLR example befa592 [Xiangrui Meng] passed scala/java tests 75c83a4 [Xiangrui Meng] passed test compile 1859701 [Xiangrui Meng] passed compile 834ada2 [Xiangrui Meng] optimized MLUtils.computeStats update some ml algorithms to use Vector (cont.) 135ab72 [Xiangrui Meng] merge glm 0e57aa4 [Xiangrui Meng] update Lasso and RidgeRegression to parse the weights correctly from GLM mark createModel protected mark predictPoint protected d7f629f [Xiangrui Meng] fix a bug in GLM when intercept is not used 3f346ba [Xiangrui Meng] update some ml algorithms to use Vector
1 parent ed730c9 commit 9c65fa7

40 files changed

+926
-591
lines changed

examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,17 @@
1717

1818
package org.apache.spark.mllib.examples;
1919

20+
import java.util.regex.Pattern;
2021

2122
import org.apache.spark.api.java.JavaRDD;
2223
import org.apache.spark.api.java.JavaSparkContext;
2324
import org.apache.spark.api.java.function.Function;
2425

2526
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
2627
import org.apache.spark.mllib.classification.LogisticRegressionModel;
28+
import org.apache.spark.mllib.linalg.Vectors;
2729
import org.apache.spark.mllib.regression.LabeledPoint;
2830

29-
import java.util.Arrays;
30-
import java.util.regex.Pattern;
31-
3231
/**
3332
* Logistic regression based classification using ML Lib.
3433
*/
@@ -47,14 +46,10 @@ public LabeledPoint call(String line) {
4746
for (int i = 0; i < tok.length; ++i) {
4847
x[i] = Double.parseDouble(tok[i]);
4948
}
50-
return new LabeledPoint(y, x);
49+
return new LabeledPoint(y, Vectors.dense(x));
5150
}
5251
}
5352

54-
public static void printWeights(double[] a) {
55-
System.out.println(Arrays.toString(a));
56-
}
57-
5853
public static void main(String[] args) {
5954
if (args.length != 4) {
6055
System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
@@ -80,8 +75,7 @@ public static void main(String[] args) {
8075
LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
8176
iterations, stepSize);
8277

83-
System.out.print("Final w: ");
84-
printWeights(model.weights());
78+
System.out.print("Final w: " + model.weights());
8579

8680
System.exit(0);
8781
}

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 109 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -110,92 +110,144 @@ class PythonMLLibAPI extends Serializable {
110110

111111
private def trainRegressionModel(
112112
trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
113-
dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
114-
java.util.LinkedList[java.lang.Object] = {
113+
dataBytesJRDD: JavaRDD[Array[Byte]],
114+
initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
115115
val data = dataBytesJRDD.rdd.map(xBytes => {
116116
val x = deserializeDoubleVector(xBytes)
117-
LabeledPoint(x(0), x.slice(1, x.length))
117+
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
118118
})
119119
val initialWeights = deserializeDoubleVector(initialWeightsBA)
120120
val model = trainFunc(data, initialWeights)
121121
val ret = new java.util.LinkedList[java.lang.Object]()
122-
ret.add(serializeDoubleVector(model.weights))
122+
ret.add(serializeDoubleVector(model.weights.toArray))
123123
ret.add(model.intercept: java.lang.Double)
124124
ret
125125
}
126126

127127
/**
128128
* Java stub for Python mllib LinearRegressionWithSGD.train()
129129
*/
130-
def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
131-
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
130+
def trainLinearRegressionModelWithSGD(
131+
dataBytesJRDD: JavaRDD[Array[Byte]],
132+
numIterations: Int,
133+
stepSize: Double,
134+
miniBatchFraction: Double,
132135
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
133-
trainRegressionModel((data, initialWeights) =>
134-
LinearRegressionWithSGD.train(data, numIterations, stepSize,
135-
miniBatchFraction, initialWeights),
136-
dataBytesJRDD, initialWeightsBA)
136+
trainRegressionModel(
137+
(data, initialWeights) =>
138+
LinearRegressionWithSGD.train(
139+
data,
140+
numIterations,
141+
stepSize,
142+
miniBatchFraction,
143+
Vectors.dense(initialWeights)),
144+
dataBytesJRDD,
145+
initialWeightsBA)
137146
}
138147

139148
/**
140149
* Java stub for Python mllib LassoWithSGD.train()
141150
*/
142-
def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
143-
stepSize: Double, regParam: Double, miniBatchFraction: Double,
151+
def trainLassoModelWithSGD(
152+
dataBytesJRDD: JavaRDD[Array[Byte]],
153+
numIterations: Int,
154+
stepSize: Double,
155+
regParam: Double,
156+
miniBatchFraction: Double,
144157
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
145-
trainRegressionModel((data, initialWeights) =>
146-
LassoWithSGD.train(data, numIterations, stepSize, regParam,
147-
miniBatchFraction, initialWeights),
148-
dataBytesJRDD, initialWeightsBA)
158+
trainRegressionModel(
159+
(data, initialWeights) =>
160+
LassoWithSGD.train(
161+
data,
162+
numIterations,
163+
stepSize,
164+
regParam,
165+
miniBatchFraction,
166+
Vectors.dense(initialWeights)),
167+
dataBytesJRDD,
168+
initialWeightsBA)
149169
}
150170

151171
/**
152172
* Java stub for Python mllib RidgeRegressionWithSGD.train()
153173
*/
154-
def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
155-
stepSize: Double, regParam: Double, miniBatchFraction: Double,
174+
def trainRidgeModelWithSGD(
175+
dataBytesJRDD: JavaRDD[Array[Byte]],
176+
numIterations: Int,
177+
stepSize: Double,
178+
regParam: Double,
179+
miniBatchFraction: Double,
156180
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
157-
trainRegressionModel((data, initialWeights) =>
158-
RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
159-
miniBatchFraction, initialWeights),
160-
dataBytesJRDD, initialWeightsBA)
181+
trainRegressionModel(
182+
(data, initialWeights) =>
183+
RidgeRegressionWithSGD.train(
184+
data,
185+
numIterations,
186+
stepSize,
187+
regParam,
188+
miniBatchFraction,
189+
Vectors.dense(initialWeights)),
190+
dataBytesJRDD,
191+
initialWeightsBA)
161192
}
162193

163194
/**
164195
* Java stub for Python mllib SVMWithSGD.train()
165196
*/
166-
def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
167-
stepSize: Double, regParam: Double, miniBatchFraction: Double,
197+
def trainSVMModelWithSGD(
198+
dataBytesJRDD: JavaRDD[Array[Byte]],
199+
numIterations: Int,
200+
stepSize: Double,
201+
regParam: Double,
202+
miniBatchFraction: Double,
168203
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
169-
trainRegressionModel((data, initialWeights) =>
170-
SVMWithSGD.train(data, numIterations, stepSize, regParam,
171-
miniBatchFraction, initialWeights),
172-
dataBytesJRDD, initialWeightsBA)
204+
trainRegressionModel(
205+
(data, initialWeights) =>
206+
SVMWithSGD.train(
207+
data,
208+
numIterations,
209+
stepSize,
210+
regParam,
211+
miniBatchFraction,
212+
Vectors.dense(initialWeights)),
213+
dataBytesJRDD,
214+
initialWeightsBA)
173215
}
174216

175217
/**
176218
* Java stub for Python mllib LogisticRegressionWithSGD.train()
177219
*/
178-
def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
179-
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
220+
def trainLogisticRegressionModelWithSGD(
221+
dataBytesJRDD: JavaRDD[Array[Byte]],
222+
numIterations: Int,
223+
stepSize: Double,
224+
miniBatchFraction: Double,
180225
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
181-
trainRegressionModel((data, initialWeights) =>
182-
LogisticRegressionWithSGD.train(data, numIterations, stepSize,
183-
miniBatchFraction, initialWeights),
184-
dataBytesJRDD, initialWeightsBA)
226+
trainRegressionModel(
227+
(data, initialWeights) =>
228+
LogisticRegressionWithSGD.train(
229+
data,
230+
numIterations,
231+
stepSize,
232+
miniBatchFraction,
233+
Vectors.dense(initialWeights)),
234+
dataBytesJRDD,
235+
initialWeightsBA)
185236
}
186237

187238
/**
188239
* Java stub for NaiveBayes.train()
189240
*/
190-
def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double)
191-
: java.util.List[java.lang.Object] =
192-
{
241+
def trainNaiveBayes(
242+
dataBytesJRDD: JavaRDD[Array[Byte]],
243+
lambda: Double): java.util.List[java.lang.Object] = {
193244
val data = dataBytesJRDD.rdd.map(xBytes => {
194245
val x = deserializeDoubleVector(xBytes)
195-
LabeledPoint(x(0), x.slice(1, x.length))
246+
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
196247
})
197248
val model = NaiveBayes.train(data, lambda)
198249
val ret = new java.util.LinkedList[java.lang.Object]()
250+
ret.add(serializeDoubleVector(model.labels))
199251
ret.add(serializeDoubleVector(model.pi))
200252
ret.add(serializeDoubleMatrix(model.theta))
201253
ret
@@ -204,9 +256,12 @@ class PythonMLLibAPI extends Serializable {
204256
/**
205257
* Java stub for Python mllib KMeans.train()
206258
*/
207-
def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
208-
maxIterations: Int, runs: Int, initializationMode: String):
209-
java.util.List[java.lang.Object] = {
259+
def trainKMeansModel(
260+
dataBytesJRDD: JavaRDD[Array[Byte]],
261+
k: Int,
262+
maxIterations: Int,
263+
runs: Int,
264+
initializationMode: String): java.util.List[java.lang.Object] = {
210265
val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes)))
211266
val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
212267
val ret = new java.util.LinkedList[java.lang.Object]()
@@ -259,8 +314,12 @@ class PythonMLLibAPI extends Serializable {
259314
* needs to be taken in the Python code to ensure it gets freed on exit; see
260315
* the Py4J documentation.
261316
*/
262-
def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
263-
iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
317+
def trainALSModel(
318+
ratingsBytesJRDD: JavaRDD[Array[Byte]],
319+
rank: Int,
320+
iterations: Int,
321+
lambda: Double,
322+
blocks: Int): MatrixFactorizationModel = {
264323
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
265324
ALS.train(ratings, rank, iterations, lambda, blocks)
266325
}
@@ -271,8 +330,13 @@ class PythonMLLibAPI extends Serializable {
271330
* Extra care needs to be taken in the Python code to ensure it gets freed on
272331
* exit; see the Py4J documentation.
273332
*/
274-
def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
275-
iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
333+
def trainImplicitALSModel(
334+
ratingsBytesJRDD: JavaRDD[Array[Byte]],
335+
rank: Int,
336+
iterations: Int,
337+
lambda: Double,
338+
blocks: Int,
339+
alpha: Double): MatrixFactorizationModel = {
276340
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
277341
ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
278342
}

mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,27 @@
1717

1818
package org.apache.spark.mllib.classification
1919

20+
import org.apache.spark.mllib.linalg.Vector
2021
import org.apache.spark.rdd.RDD
2122

23+
/**
24+
* Represents a classification model that predicts to which of a set of categories an example
25+
* belongs. The categories are represented by double values: 0.0, 1.0, 2.0, etc.
26+
*/
2227
trait ClassificationModel extends Serializable {
2328
/**
2429
* Predict values for the given data set using the model trained.
2530
*
2631
* @param testData RDD representing data points to be predicted
27-
* @return RDD[Int] where each entry contains the corresponding prediction
32+
* @return an RDD[Double] where each entry contains the corresponding prediction
2833
*/
29-
def predict(testData: RDD[Array[Double]]): RDD[Double]
34+
def predict(testData: RDD[Vector]): RDD[Double]
3035

3136
/**
3237
* Predict values for a single data point using the model trained.
3338
*
3439
* @param testData array representing a single data point
35-
* @return Int prediction from the trained model
40+
* @return predicted category from the trained model
3641
*/
37-
def predict(testData: Array[Double]): Double
42+
def predict(testData: Vector): Double
3843
}

0 commit comments

Comments
 (0)