Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit d18c9f9

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into clean-moar
2 parents 65ef07b + 60336e3 commit d18c9f9

File tree

17 files changed

+739
-185
lines changed

17 files changed

+739
-185
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,15 @@ private[spark] object PythonUtils {
5050
/**
5151
* Convert list of T into seq of T (for calling API with varargs)
5252
*/
53-
def toSeq[T](cols: JList[T]): Seq[T] = {
54-
cols.toList.toSeq
53+
def toSeq[T](vs: JList[T]): Seq[T] = {
54+
vs.toList.toSeq
55+
}
56+
57+
/**
58+
* Convert list of T into array of T (for calling API with array)
59+
*/
60+
def toArray[T](vs: JList[T]): Array[T] = {
61+
vs.toArray().asInstanceOf[Array[T]]
5562
}
5663

5764
/**

docs/ml-features.md

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,95 @@ for features_label in featurized.select("features", "label").take(3):
106106
</div>
107107
</div>
108108

109+
## Word2Vec
110+
111+
`Word2Vec` is an `Estimator` which takes sequences of words that represents documents and trains a `Word2VecModel`. The model is a `Map(String, Vector)` essentially, which maps each word to an unique fix-sized vector. The `Word2VecModel` transforms each documents into a vector using the average of all words in the document, which aims to other computations of documents such as similarity calculation consequencely. Please refer to the [MLlib user guide on Word2Vec](mllib-feature-extraction.html#Word2Vec) for more details on Word2Vec.
112+
113+
Word2Vec is implemented in [Word2Vec](api/scala/index.html#org.apache.spark.ml.feature.Word2Vec). In the following code segment, we start with a set of documents, each of them is represented as a sequence of words. For each document, we transform it into a feature vector. This feature vector could then be passed to a learning algorithm.
114+
115+
<div class="codetabs">
116+
<div data-lang="scala" markdown="1">
117+
{% highlight scala %}
118+
import org.apache.spark.ml.feature.Word2Vec
119+
120+
// Input data: Each row is a bag of words from a sentence or document.
121+
val documentDF = sqlContext.createDataFrame(Seq(
122+
"Hi I heard about Spark".split(" "),
123+
"I wish Java could use case classes".split(" "),
124+
"Logistic regression models are neat".split(" ")
125+
).map(Tuple1.apply)).toDF("text")
126+
127+
// Learn a mapping from words to Vectors.
128+
val word2Vec = new Word2Vec()
129+
.setInputCol("text")
130+
.setOutputCol("result")
131+
.setVectorSize(3)
132+
.setMinCount(0)
133+
val model = word2Vec.fit(documentDF)
134+
val result = model.transform(documentDF)
135+
result.select("result").take(3).foreach(println)
136+
{% endhighlight %}
137+
</div>
138+
139+
<div data-lang="java" markdown="1">
140+
{% highlight java %}
141+
import com.google.common.collect.Lists;
142+
143+
import org.apache.spark.api.java.JavaRDD;
144+
import org.apache.spark.api.java.JavaSparkContext;
145+
import org.apache.spark.sql.DataFrame;
146+
import org.apache.spark.sql.Row;
147+
import org.apache.spark.sql.RowFactory;
148+
import org.apache.spark.sql.SQLContext;
149+
import org.apache.spark.sql.types.*;
150+
151+
JavaSparkContext jsc = ...
152+
SQLContext sqlContext = ...
153+
154+
// Input data: Each row is a bag of words from a sentence or document.
155+
JavaRDD<Row> jrdd = jsc.parallelize(Lists.newArrayList(
156+
RowFactory.create(Lists.newArrayList("Hi I heard about Spark".split(" "))),
157+
RowFactory.create(Lists.newArrayList("I wish Java could use case classes".split(" "))),
158+
RowFactory.create(Lists.newArrayList("Logistic regression models are neat".split(" ")))
159+
));
160+
StructType schema = new StructType(new StructField[]{
161+
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
162+
});
163+
DataFrame documentDF = sqlContext.createDataFrame(jrdd, schema);
164+
165+
// Learn a mapping from words to Vectors.
166+
Word2Vec word2Vec = new Word2Vec()
167+
.setInputCol("text")
168+
.setOutputCol("result")
169+
.setVectorSize(3)
170+
.setMinCount(0);
171+
Word2VecModel model = word2Vec.fit(documentDF);
172+
DataFrame result = model.transform(documentDF);
173+
for (Row r: result.select("result").take(3)) {
174+
System.out.println(r);
175+
}
176+
{% endhighlight %}
177+
</div>
178+
179+
<div data-lang="python" markdown="1">
180+
{% highlight python %}
181+
from pyspark.ml.feature import Word2Vec
182+
183+
# Input data: Each row is a bag of words from a sentence or document.
184+
documentDF = sqlContext.createDataFrame([
185+
("Hi I heard about Spark".split(" "), ),
186+
("I wish Java could use case classes".split(" "), ),
187+
("Logistic regression models are neat".split(" "), )
188+
], ["text"])
189+
# Learn a mapping from words to Vectors.
190+
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
191+
model = word2Vec.fit(documentDF)
192+
result = model.transform(documentDF)
193+
for feature in result.select("result").take(3):
194+
print(feature)
195+
{% endhighlight %}
196+
</div>
197+
</div>
109198

110199
# Feature Transformers
111200

docs/mllib-data-types.md

Lines changed: 64 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -296,70 +296,6 @@ backed by an RDD of its entries.
296296
The underlying RDDs of a distributed matrix must be deterministic, because we cache the matrix size.
297297
In general the use of non-deterministic RDDs can lead to errors.
298298

299-
### BlockMatrix
300-
301-
A `BlockMatrix` is a distributed matrix backed by an RDD of `MatrixBlock`s, where a `MatrixBlock` is
302-
a tuple of `((Int, Int), Matrix)`, where the `(Int, Int)` is the index of the block, and `Matrix` is
303-
the sub-matrix at the given index with size `rowsPerBlock` x `colsPerBlock`.
304-
`BlockMatrix` supports methods such as `add` and `multiply` with another `BlockMatrix`.
305-
`BlockMatrix` also has a helper function `validate` which can be used to check whether the
306-
`BlockMatrix` is set up properly.
307-
308-
<div class="codetabs">
309-
<div data-lang="scala" markdown="1">
310-
311-
A [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) can be
312-
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
313-
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
314-
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.
315-
316-
{% highlight scala %}
317-
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
318-
319-
val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
320-
// Create a CoordinateMatrix from an RDD[MatrixEntry].
321-
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
322-
// Transform the CoordinateMatrix to a BlockMatrix
323-
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
324-
325-
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
326-
// Nothing happens if it is valid.
327-
matA.validate()
328-
329-
// Calculate A^T A.
330-
val ata = matA.transpose.multiply(matA)
331-
{% endhighlight %}
332-
</div>
333-
334-
<div data-lang="java" markdown="1">
335-
336-
A [`BlockMatrix`](api/java/org/apache/spark/mllib/linalg/distributed/BlockMatrix.html) can be
337-
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
338-
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
339-
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.
340-
341-
{% highlight java %}
342-
import org.apache.spark.api.java.JavaRDD;
343-
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
344-
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
345-
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
346-
347-
JavaRDD<MatrixEntry> entries = ... // a JavaRDD of (i, j, v) Matrix Entries
348-
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
349-
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
350-
// Transform the CoordinateMatrix to a BlockMatrix
351-
BlockMatrix matA = coordMat.toBlockMatrix().cache();
352-
353-
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
354-
// Nothing happens if it is valid.
355-
matA.validate();
356-
357-
// Calculate A^T A.
358-
BlockMatrix ata = matA.transpose().multiply(matA);
359-
{% endhighlight %}
360-
</div>
361-
</div>
362-
363299
### RowMatrix
364300

365301
A `RowMatrix` is a row-oriented distributed matrix without meaningful row indices, backed by an RDD
@@ -530,3 +466,67 @@ IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();
530466
{% endhighlight %}
531467
</div>
532468
</div>
469+
470+
### BlockMatrix
471+
472+
A `BlockMatrix` is a distributed matrix backed by an RDD of `MatrixBlock`s, where a `MatrixBlock` is
473+
a tuple of `((Int, Int), Matrix)`, where the `(Int, Int)` is the index of the block, and `Matrix` is
474+
the sub-matrix at the given index with size `rowsPerBlock` x `colsPerBlock`.
475+
`BlockMatrix` supports methods such as `add` and `multiply` with another `BlockMatrix`.
476+
`BlockMatrix` also has a helper function `validate` which can be used to check whether the
477+
`BlockMatrix` is set up properly.
478+
479+
<div class="codetabs">
480+
<div data-lang="scala" markdown="1">
481+
482+
A [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) can be
483+
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
484+
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
485+
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.
486+
487+
{% highlight scala %}
488+
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
489+
490+
val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
491+
// Create a CoordinateMatrix from an RDD[MatrixEntry].
492+
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
493+
// Transform the CoordinateMatrix to a BlockMatrix
494+
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
495+
496+
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
497+
// Nothing happens if it is valid.
498+
matA.validate()
499+
500+
// Calculate A^T A.
501+
val ata = matA.transpose.multiply(matA)
502+
{% endhighlight %}
503+
</div>
504+
505+
<div data-lang="java" markdown="1">
506+
507+
A [`BlockMatrix`](api/java/org/apache/spark/mllib/linalg/distributed/BlockMatrix.html) can be
508+
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
509+
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
510+
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.
511+
512+
{% highlight java %}
513+
import org.apache.spark.api.java.JavaRDD;
514+
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
515+
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
516+
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
517+
518+
JavaRDD<MatrixEntry> entries = ... // a JavaRDD of (i, j, v) Matrix Entries
519+
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
520+
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
521+
// Transform the CoordinateMatrix to a BlockMatrix
522+
BlockMatrix matA = coordMat.toBlockMatrix().cache();
523+
524+
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
525+
// Nothing happens if it is valid.
526+
matA.validate();
527+
528+
// Calculate A^T A.
529+
BlockMatrix ata = matA.transpose().multiply(matA);
530+
{% endhighlight %}
531+
</div>
532+
</div>

ec2/spark_ec2.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,11 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
864864
for i in cluster_instances:
865865
i.update()
866866

867-
statuses = conn.get_all_instance_status(instance_ids=[i.id for i in cluster_instances])
867+
max_batch = 100
868+
statuses = []
869+
for j in xrange(0, len(cluster_instances), max_batch):
870+
batch = [i.id for i in cluster_instances[j:j + max_batch]]
871+
statuses.extend(conn.get_all_instance_status(instance_ids=batch))
868872

869873
if cluster_state == 'ssh-ready':
870874
if all(i.state == 'running' for i in cluster_instances) and \

mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@ import java.lang.{Iterable => JIterable}
2121

2222
import scala.collection.JavaConverters._
2323

24-
import breeze.linalg.{Axis, DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}
25-
import breeze.numerics.{exp => brzExp, log => brzLog}
2624
import org.json4s.JsonDSL._
2725
import org.json4s.jackson.JsonMethods._
2826

2927
import org.apache.spark.{Logging, SparkContext, SparkException}
30-
import org.apache.spark.mllib.linalg.{BLAS, DenseVector, SparseVector, Vector}
28+
import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, SparseVector, Vector, Vectors}
3129
import org.apache.spark.mllib.regression.LabeledPoint
3230
import org.apache.spark.mllib.util.{Loader, Saveable}
3331
import org.apache.spark.rdd.RDD
@@ -50,6 +48,9 @@ class NaiveBayesModel private[mllib] (
5048
val modelType: String)
5149
extends ClassificationModel with Serializable with Saveable {
5250

51+
private val piVector = new DenseVector(pi)
52+
private val thetaMatrix = new DenseMatrix(labels.size, theta(0).size, theta.flatten, true)
53+
5354
private[mllib] def this(labels: Array[Double], pi: Array[Double], theta: Array[Array[Double]]) =
5455
this(labels, pi, theta, "Multinomial")
5556

@@ -60,17 +61,18 @@ class NaiveBayesModel private[mllib] (
6061
theta: JIterable[JIterable[Double]]) =
6162
this(labels.asScala.toArray, pi.asScala.toArray, theta.asScala.toArray.map(_.asScala.toArray))
6263

63-
private val brzPi = new BDV[Double](pi)
64-
private val brzTheta = new BDM(theta(0).length, theta.length, theta.flatten).t
65-
6664
// Bernoulli scoring requires log(condprob) if 1, log(1-condprob) if 0.
67-
// This precomputes log(1.0 - exp(theta)) and its sum which are used for the linear algebra
65+
// This precomputes log(1.0 - exp(theta)) and its sum which are used for the linear algebra
6866
// application of this condition (in predict function).
69-
private val (brzNegTheta, brzNegThetaSum) = modelType match {
67+
private val (thetaMinusNegTheta, negThetaSum) = modelType match {
7068
case "Multinomial" => (None, None)
7169
case "Bernoulli" =>
72-
val negTheta = brzLog((brzExp(brzTheta.copy) :*= (-1.0)) :+= 1.0) // log(1.0 - exp(x))
73-
(Option(negTheta), Option(brzSum(negTheta, Axis._1)))
70+
val negTheta = thetaMatrix.map(value => math.log(1.0 - math.exp(value)))
71+
val ones = new DenseVector(Array.fill(thetaMatrix.numCols){1.0})
72+
val thetaMinusNegTheta = thetaMatrix.map { value =>
73+
value - math.log(1.0 - math.exp(value))
74+
}
75+
(Option(thetaMinusNegTheta), Option(negTheta.multiply(ones)))
7476
case _ =>
7577
// This should never happen.
7678
throw new UnknownError(s"NaiveBayesModel was created with an unknown ModelType: $modelType")
@@ -85,17 +87,22 @@ class NaiveBayesModel private[mllib] (
8587
}
8688

8789
override def predict(testData: Vector): Double = {
88-
val brzData = testData.toBreeze
8990
modelType match {
9091
case "Multinomial" =>
91-
labels(brzArgmax(brzPi + brzTheta * brzData))
92+
val prob = thetaMatrix.multiply(testData)
93+
BLAS.axpy(1.0, piVector, prob)
94+
labels(prob.argmax)
9295
case "Bernoulli" =>
93-
if (!brzData.forall(v => v == 0.0 || v == 1.0)) {
94-
throw new SparkException(
95-
s"Bernoulli Naive Bayes requires 0 or 1 feature values but found $testData.")
96+
testData.foreachActive { (index, value) =>
97+
if (value != 0.0 && value != 1.0) {
98+
throw new SparkException(
99+
s"Bernoulli Naive Bayes requires 0 or 1 feature values but found $testData.")
100+
}
96101
}
97-
labels(brzArgmax(brzPi +
98-
(brzTheta - brzNegTheta.get) * brzData + brzNegThetaSum.get))
102+
val prob = thetaMinusNegTheta.get.multiply(testData)
103+
BLAS.axpy(1.0, piVector, prob)
104+
BLAS.axpy(1.0, negThetaSum.get, prob)
105+
labels(prob.argmax)
99106
case _ =>
100107
// This should never happen.
101108
throw new UnknownError(s"NaiveBayesModel was created with an unknown ModelType: $modelType")

0 commit comments

Comments
 (0)