Skip to content

Commit 98c556e

Browse files
freeman-labmengxr
authored andcommitted
Streaming KMeans [MLLIB][SPARK-3254]
This adds a Streaming KMeans algorithm to MLlib. It uses an update rule that generalizes the mini-batch KMeans update to incorporate a decay factor, which allows past data to be forgotten. The decay factor can be specified explicitly, or via a more intuitive "fractional decay" setting, in units of either data points or batches. The PR includes: - StreamingKMeans algorithm with decay factor settings - Usage example - Additions to documentation clustering page - Unit tests of basic behavior and decay behaviors tdas mengxr rezazadeh Author: freeman <[email protected]> Author: Jeremy Freeman <[email protected]> Author: Xiangrui Meng <[email protected]> Closes apache#2942 from freeman-lab/streaming-kmeans and squashes the following commits: b2e5b4a [freeman] Fixes to docs / examples 078617c [Jeremy Freeman] Merge pull request #1 from mengxr/SPARK-3254 2e682c0 [Xiangrui Meng] take discount on previous weights; use BLAS; detect dying clusters 0411bf5 [freeman] Change decay parameterization 9f7aea9 [freeman] Style fixes 374a706 [freeman] Formatting ad9bdc2 [freeman] Use labeled points and predictOnValues in examples 77dbd3f [freeman] Make initialization check an assertion 9cfc301 [freeman] Make random seed an argument 44050a9 [freeman] Simpler constructor c7050d5 [freeman] Fix spacing 2899623 [freeman] Use pattern matching for clarity a4a316b [freeman] Use collect 1472ec5 [freeman] Doc formatting ea22ec8 [freeman] Fix imports 2086bdc [freeman] Log cluster center updates ea9877c [freeman] More documentation 9facbe3 [freeman] Bug fix 5db7074 [freeman] Example usage for StreamingKMeans f33684b [freeman] Add explanation and example to docs b5b5f8d [freeman] Add better documentation a0fd790 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-kmeans 9fd9c15 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-kmeans b93350f [freeman] Streaming KMeans with decay
1 parent 8602195 commit 98c556e

File tree

4 files changed

+597
-1
lines changed

4 files changed

+597
-1
lines changed

docs/mllib-clustering.md

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ a given dataset, the algorithm returns the best clustering result).
3434
* *initializationSteps* determines the number of steps in the k-means\|\| algorithm.
3535
* *epsilon* determines the distance threshold within which we consider k-means to have converged.
3636

37-
## Examples
37+
### Examples
3838

3939
<div class="codetabs">
4040
<div data-lang="scala" markdown="1">
@@ -153,3 +153,97 @@ provided in the [Self-Contained Applications](quick-start.html#self-contained-ap
153153
section of the Spark
154154
Quick Start guide. Be sure to also include *spark-mllib* to your build file as
155155
a dependency.
156+
157+
## Streaming clustering
158+
159+
When data arrive in a stream, we may want to estimate clusters dynamically,
160+
updating them as new data arrive. MLlib provides support for streaming k-means clustering,
161+
with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm
162+
uses a generalization of the mini-batch k-means update rule. For each batch of data, we assign
163+
all points to their nearest cluster, compute new cluster centers, then update each cluster using:
164+
165+
`\begin{equation}
166+
c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t}
167+
\end{equation}`
168+
`\begin{equation}
169+
n_{t+1} = n_t + m_t
170+
\end{equation}`
171+
172+
Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned
173+
to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$`
174+
is the number of points added to the cluster in the current batch. The decay factor `$\alpha$`
175+
can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning;
176+
with `$\alpha$=0` only the most recent data will be used. This is analogous to an
177+
exponentially-weighted moving average.
178+
179+
The decay can be specified using a `halfLife` parameter, which determines the
180+
correct decay factor `a` such that, for data acquired
181+
at time `t`, its contribution by time `t + halfLife` will have dropped to 0.5.
182+
The unit of time can be specified either as `batches` or `points` and the update rule
183+
will be adjusted accordingly.
184+
185+
### Examples
186+
187+
This example shows how to estimate clusters on streaming data.
188+
189+
<div class="codetabs">
190+
191+
<div data-lang="scala" markdown="1">
192+
193+
First we import the neccessary classes.
194+
195+
{% highlight scala %}
196+
197+
import org.apache.spark.mllib.linalg.Vectors
198+
import org.apache.spark.mllib.regression.LabeledPoint
199+
import org.apache.spark.mllib.clustering.StreamingKMeans
200+
201+
{% endhighlight %}
202+
203+
Then we make an input stream of vectors for training, as well as a stream of labeled data
204+
points for testing. We assume a StreamingContext `ssc` has been created, see
205+
[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.
206+
207+
{% highlight scala %}
208+
209+
val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
210+
val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
211+
212+
{% endhighlight %}
213+
214+
We create a model with random clusters and specify the number of clusters to find
215+
216+
{% highlight scala %}
217+
218+
val numDimensions = 3
219+
val numClusters = 2
220+
val model = new StreamingKMeans()
221+
.setK(numClusters)
222+
.setDecayFactor(1.0)
223+
.setRandomCenters(numDimensions, 0.0)
224+
225+
{% endhighlight %}
226+
227+
Now register the streams for training and testing and start the job, printing
228+
the predicted cluster assignments on new data points as they arrive.
229+
230+
{% highlight scala %}
231+
232+
model.trainOn(trainingData)
233+
model.predictOnValues(testData).print()
234+
235+
ssc.start()
236+
ssc.awaitTermination()
237+
238+
{% endhighlight %}
239+
240+
As you add new text files with data the cluster centers will update. Each training
241+
point should be formatted as `[x1, x2, x3]`, and each test data point
242+
should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or identifier
243+
(e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir`
244+
the model will update. Anytime a text file is placed in `/testing/data/dir`
245+
you will see predictions. With new data, the cluster centers will change!
246+
247+
</div>
248+
249+
</div>
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.mllib
19+
20+
import org.apache.spark.mllib.linalg.Vectors
21+
import org.apache.spark.mllib.regression.LabeledPoint
22+
import org.apache.spark.mllib.clustering.StreamingKMeans
23+
import org.apache.spark.SparkConf
24+
import org.apache.spark.streaming.{Seconds, StreamingContext}
25+
26+
/**
27+
* Estimate clusters on one stream of data and make predictions
28+
* on another stream, where the data streams arrive as text files
29+
* into two different directories.
30+
*
31+
* The rows of the training text files must be vector data in the form
32+
* `[x1,x2,x3,...,xn]`
33+
* Where n is the number of dimensions.
34+
*
35+
* The rows of the test text files must be labeled data in the form
36+
* `(y,[x1,x2,x3,...,xn])`
37+
* Where y is some identifier. n must be the same for train and test.
38+
*
39+
* Usage: StreamingKmeans <trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>
40+
*
41+
* To run on your local machine using the two directories `trainingDir` and `testDir`,
42+
* with updates every 5 seconds, 2 dimensions per data point, and 3 clusters, call:
43+
* $ bin/run-example \
44+
* org.apache.spark.examples.mllib.StreamingKMeans trainingDir testDir 5 3 2
45+
*
46+
* As you add text files to `trainingDir` the clusters will continuously update.
47+
* Anytime you add text files to `testDir`, you'll see predicted labels using the current model.
48+
*
49+
*/
50+
object StreamingKMeans {
51+
52+
def main(args: Array[String]) {
53+
if (args.length != 5) {
54+
System.err.println(
55+
"Usage: StreamingKMeans " +
56+
"<trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>")
57+
System.exit(1)
58+
}
59+
60+
val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
61+
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
62+
63+
val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
64+
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
65+
66+
val model = new StreamingKMeans()
67+
.setK(args(3).toInt)
68+
.setDecayFactor(1.0)
69+
.setRandomCenters(args(4).toInt, 0.0)
70+
71+
model.trainOn(trainingData)
72+
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
73+
74+
ssc.start()
75+
ssc.awaitTermination()
76+
}
77+
}

0 commit comments

Comments
 (0)