Skip to content

Commit 4041723

Browse files
committed
add ut
1 parent 138bfed commit 4041723

File tree

2 files changed

+125
-24
lines changed

2 files changed

+125
-24
lines changed

mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,8 @@ class OnlineLDAOptimizer extends LDAOptimizer {
227227
private var k: Int = 0
228228
private var corpusSize: Long = 0
229229
private var vocabSize: Int = 0
230-
private var alpha: Double = 0
231-
private var eta: Double = 0
230+
private[clustering] var alpha: Double = 0
231+
private[clustering] var eta: Double = 0
232232
private var randomGenerator: java.util.Random = null
233233

234234
// Online LDA specific parameters
@@ -238,12 +238,11 @@ class OnlineLDAOptimizer extends LDAOptimizer {
238238

239239
// internal data structure
240240
private var docs: RDD[(Long, Vector)] = null
241-
private var lambda: BDM[Double] = null
242-
private var Elogbeta: BDM[Double] = null
243-
private var expElogbeta: BDM[Double] = null
241+
private[clustering] var lambda: BDM[Double] = null
244242

245243
// count of invocation to next, which helps deciding the weight for each iteration
246244
private var iteration: Int = 0
245+
private var gammaShape: Double = 100
247246

248247
/**
249248
* A (positive) learning parameter that downweights early iterations. Larger values make early
@@ -295,7 +294,24 @@ class OnlineLDAOptimizer extends LDAOptimizer {
295294
this
296295
}
297296

298-
override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
297+
/**
298+
* The function is for test only now. In the future, it can help support training strop/resume
299+
*/
300+
private[clustering] def setLambda(lambda: BDM[Double]): this.type = {
301+
this.lambda = lambda
302+
this
303+
}
304+
305+
/**
306+
* Used to control the gamma distribution. Larger value produces values closer to 1.0.
307+
*/
308+
private[clustering] def setGammaShape(shape: Double): this.type = {
309+
this.gammaShape = shape
310+
this
311+
}
312+
313+
override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA):
314+
OnlineLDAOptimizer = {
299315
this.k = lda.getK
300316
this.corpusSize = docs.count()
301317
this.vocabSize = docs.first()._2.size
@@ -307,26 +323,30 @@ class OnlineLDAOptimizer extends LDAOptimizer {
307323

308324
// Initialize the variational distribution q(beta|lambda)
309325
this.lambda = getGammaMatrix(k, vocabSize)
310-
this.Elogbeta = dirichletExpectation(lambda)
311-
this.expElogbeta = exp(Elogbeta)
312326
this.iteration = 0
313327
this
314328
}
315329

330+
override private[clustering] def next(): OnlineLDAOptimizer = {
331+
val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong())
332+
if (batch.isEmpty()) return this
333+
submitMiniBatch(batch)
334+
}
335+
336+
316337
/**
317338
* Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA
318339
* model, and it will update the topic distribution adaptively for the terms appearing in the
319340
* subset.
320341
*/
321-
override private[clustering] def next(): OnlineLDAOptimizer = {
342+
private[clustering] def submitMiniBatch(batch: RDD[(Long, Vector)]): OnlineLDAOptimizer = {
322343
iteration += 1
323-
val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong())
324-
if (batch.isEmpty()) return this
325-
326344
val k = this.k
327345
val vocabSize = this.vocabSize
328-
val expElogbeta = this.expElogbeta
346+
val Elogbeta = dirichletExpectation(lambda)
347+
val expElogbeta = exp(Elogbeta)
329348
val alpha = this.alpha
349+
val gammaShape = this.gammaShape
330350

331351
val stats: RDD[BDM[Double]] = batch.mapPartitions { docs =>
332352
val stat = BDM.zeros[Double](k, vocabSize)
@@ -340,7 +360,7 @@ class OnlineLDAOptimizer extends LDAOptimizer {
340360
}
341361

342362
// Initialize the variational distribution q(theta|gamma) for the mini-batch
343-
var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K
363+
var gammad = new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k).t // 1 * K
344364
var Elogthetad = digamma(gammad) - digamma(sum(gammad)) // 1 * K
345365
var expElogthetad = exp(Elogthetad) // 1 * K
346366
val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids
@@ -350,7 +370,7 @@ class OnlineLDAOptimizer extends LDAOptimizer {
350370
val ctsVector = new BDV[Double](cts).t // 1 * ids
351371

352372
// Iterate between gamma and phi until convergence
353-
while (meanchange > 1e-5) {
373+
while (meanchange > 1e-3) {
354374
val lastgamma = gammad
355375
// 1*K 1 * ids ids * k
356376
gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha
@@ -372,7 +392,10 @@ class OnlineLDAOptimizer extends LDAOptimizer {
372392
Iterator(stat)
373393
}
374394

375-
val batchResult: BDM[Double] = stats.reduce(_ += _)
395+
val statsSum: BDM[Double] = stats.reduce(_ += _)
396+
val batchResult = statsSum :* expElogbeta
397+
398+
// Note that this is an optimization to avoid batch.count
376399
update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt)
377400
this
378401
}
@@ -384,28 +407,23 @@ class OnlineLDAOptimizer extends LDAOptimizer {
384407
/**
385408
* Update lambda based on the batch submitted. batchSize can be different for each iteration.
386409
*/
387-
private def update(raw: BDM[Double], iter: Int, batchSize: Int): Unit = {
410+
private[clustering] def update(stat: BDM[Double], iter: Int, batchSize: Int): Unit = {
388411
val tau_0 = this.getTau_0
389412
val kappa = this.getKappa
390413

391414
// weight of the mini-batch.
392415
val weight = math.pow(tau_0 + iter, -kappa)
393416

394-
// This step finishes computing the sufficient statistics for the M step
395-
val stat = raw :* expElogbeta
396-
397417
// Update lambda based on documents.
398418
lambda = lambda * (1 - weight) +
399419
(stat * (corpusSize.toDouble / batchSize.toDouble) + eta) * weight
400-
Elogbeta = dirichletExpectation(lambda)
401-
expElogbeta = exp(Elogbeta)
402420
}
403421

404422
/**
405423
* Get a random matrix to initialize lambda
406424
*/
407425
private def getGammaMatrix(row: Int, col: Int): BDM[Double] = {
408-
val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0)
426+
val gammaRandomGenerator = new Gamma(gammaShape, 1.0 / gammaShape)
409427
val temp = gammaRandomGenerator.sample(row * col).toArray
410428
new BDM[Double](col, row, temp).t
411429
}

mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.mllib.clustering
1919

20+
import breeze.linalg.{DenseMatrix => BDM}
21+
2022
import org.scalatest.FunSuite
2123

2224
import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors}
@@ -54,7 +56,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
5456
}
5557
}
5658

57-
test("running and DistributedLDAModel") {
59+
test("running and DistributedLDAModel with default Optimizer (EM)") {
5860
val k = 3
5961
val topicSmoothing = 1.2
6062
val termSmoothing = 1.2
@@ -131,6 +133,87 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
131133
assert(lda.getBeta === 3.0)
132134
assert(lda.getTopicConcentration === 3.0)
133135
}
136+
137+
test("OnlineLDAOptimizer initialization") {
138+
val lda = new LDA().setK(2)
139+
val corpus = sc.parallelize(tinyCorpus, 2)
140+
val op = new OnlineLDAOptimizer().initialize(corpus, lda)
141+
op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau_0(567)
142+
assert(op.alpha == 0.5) // default 1.0 / k
143+
assert(op.eta == 0.5) // default 1.0 / k
144+
assert(op.getKappa == 0.9876)
145+
assert(op.getMiniBatchFraction == 0.123)
146+
assert(op.getTau_0 == 567)
147+
}
148+
149+
test("OnlineLDAOptimizer one iteration") {
150+
// run OnlineLDAOptimizer for 1 iteration to verify it's consistency with Blei-lab,
151+
// [[https://github.com/Blei-Lab/onlineldavb]]
152+
val k = 2
153+
val vocabSize = 6
154+
155+
def docs: Array[(Long, Vector)] = Array(
156+
Vectors.sparse(vocabSize, Array(0, 1, 2), Array(1, 1, 1)), // apple, orange, banana
157+
Vectors.sparse(vocabSize, Array(3, 4, 5), Array(1, 1, 1))) // tiger, cat, dog
158+
.zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
159+
val corpus = sc.parallelize(docs, 2)
160+
161+
// setGammaShape large so to avoid the stochastic impact.
162+
val op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51).setGammaShape(1e40)
163+
.setMiniBatchFraction(1)
164+
val lda = new LDA().setK(k).setMaxIterations(1).setOptimizer(op)
165+
166+
val state = op.initialize(corpus, lda)
167+
// override lambda to simulate an intermediate state
168+
// [[ 1.1 1.2 1.3 0.9 0.8 0.7]
169+
// [ 0.9 0.8 0.7 1.1 1.2 1.3]]
170+
op.setLambda(new BDM[Double](k, vocabSize,
171+
Array(1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 0.9, 1.1, 0.8, 1.2, 0.7, 1.3)))
172+
173+
// run for one iteration
174+
state.submitMiniBatch(corpus)
175+
176+
// verify the result, Note this generate the identical result as
177+
// [[https://github.com/Blei-Lab/onlineldavb]]
178+
val topic1 = op.lambda(0, ::).inner.toArray.map("%.4f".format(_)).mkString(", ")
179+
val topic2 = op.lambda(1, ::).inner.toArray.map("%.4f".format(_)).mkString(", ")
180+
assert("1.1101, 1.2076, 1.3050, 0.8899, 0.7924, 0.6950" == topic1)
181+
assert("0.8899, 0.7924, 0.6950, 1.1101, 1.2076, 1.3050" == topic2)
182+
}
183+
184+
test("OnlineLDAOptimizer with toy data") {
185+
def toydata: Array[(Long, Vector)] = Array(
186+
Vectors.sparse(6, Array(0, 1), Array(1, 1)),
187+
Vectors.sparse(6, Array(1, 2), Array(1, 1)),
188+
Vectors.sparse(6, Array(0, 2), Array(1, 1)),
189+
190+
Vectors.sparse(6, Array(3, 4), Array(1, 1)),
191+
Vectors.sparse(6, Array(3, 5), Array(1, 1)),
192+
Vectors.sparse(6, Array(4, 5), Array(1, 1))
193+
).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
194+
195+
val docs = sc.parallelize(toydata)
196+
val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau_0(1024).setKappa(0.51)
197+
.setGammaShape(1e10)
198+
val lda = new LDA().setK(2)
199+
.setDocConcentration(0.01)
200+
.setTopicConcentration(0.01)
201+
.setMaxIterations(100)
202+
.setOptimizer(op)
203+
204+
val ldaModel = lda.run(docs)
205+
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
206+
val topics = topicIndices.map { case (terms, termWeights) =>
207+
terms.zip(termWeights)
208+
}
209+
210+
// check distribution for each topic, typical distribution is (0.3, 0.3, 0.3, 0.02, 0.02, 0.02)
211+
topics.foreach(topic =>{
212+
val smalls = topic.filter(t => (t._2 < 0.1)).map(_._2)
213+
assert(smalls.size == 3 && smalls.sum < 0.2)
214+
})
215+
}
216+
134217
}
135218

136219
private[clustering] object LDASuite {

0 commit comments

Comments
 (0)