@@ -81,7 +81,7 @@ class EMLDAOptimizer extends LDAOptimizer {
81
81
import LDA ._
82
82
83
83
/**
84
- * Following fields will only be initialized through initialize method
84
+ * The following fields will only be initialized through the initialize() method
85
85
*/
86
86
private [clustering] var graph : Graph [TopicCounts , TokenCount ] = null
87
87
private [clustering] var k : Int = 0
@@ -94,7 +94,7 @@ class EMLDAOptimizer extends LDAOptimizer {
94
94
/**
95
95
* Compute bipartite term/doc graph.
96
96
*/
97
- private [clustering] override def initialize (docs : RDD [(Long , Vector )], lda : LDA ): LDAOptimizer = {
97
+ override private [clustering] def initialize (docs : RDD [(Long , Vector )], lda : LDA ): LDAOptimizer = {
98
98
99
99
val docConcentration = lda.getDocConcentration
100
100
val topicConcentration = lda.getTopicConcentration
@@ -121,7 +121,7 @@ class EMLDAOptimizer extends LDAOptimizer {
121
121
122
122
// Create vertices.
123
123
// Initially, we use random soft assignments of tokens to topics (random gamma).
124
- def createVertices () : RDD [(VertexId , TopicCounts )] = {
124
+ val docTermVertices : RDD [(VertexId , TopicCounts )] = {
125
125
val verticesTMP : RDD [(VertexId , TopicCounts )] =
126
126
edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
127
127
val random = new Random (partIndex + randomSeed)
@@ -134,8 +134,6 @@ class EMLDAOptimizer extends LDAOptimizer {
134
134
verticesTMP.reduceByKey(_ + _)
135
135
}
136
136
137
- val docTermVertices = createVertices()
138
-
139
137
// Partition such that edges are grouped by document
140
138
this .graph = Graph (docTermVertices, edges).partitionBy(PartitionStrategy .EdgePartition1D )
141
139
this .k = k
@@ -147,7 +145,7 @@ class EMLDAOptimizer extends LDAOptimizer {
147
145
this
148
146
}
149
147
150
- private [clustering] override def next (): EMLDAOptimizer = {
148
+ override private [clustering] def next (): EMLDAOptimizer = {
151
149
require(graph != null , " graph is null, EMLDAOptimizer not initialized." )
152
150
153
151
val eta = topicConcentration
@@ -204,7 +202,7 @@ class EMLDAOptimizer extends LDAOptimizer {
204
202
graph.vertices.filter(isTermVertex).values.fold(BDV .zeros[Double ](numTopics))(_ += _)
205
203
}
206
204
207
- private [clustering] override def getLDAModel (iterationTimes : Array [Double ]): LDAModel = {
205
+ override private [clustering] def getLDAModel (iterationTimes : Array [Double ]): LDAModel = {
208
206
require(graph != null , " graph is null, EMLDAOptimizer not initialized." )
209
207
this .graphCheckpointer.deleteAllCheckpoints()
210
208
new DistributedLDAModel (this , iterationTimes)
@@ -216,10 +214,10 @@ class EMLDAOptimizer extends LDAOptimizer {
216
214
* :: Experimental ::
217
215
*
218
216
* An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA
219
- * algorithm, which processes a subset of the corpus on each iteration, and update the term-topic
217
+ * algorithm, which processes a subset of the corpus on each iteration, and updates the term-topic
220
218
* distribution adaptively.
221
219
*
222
- * References :
220
+ * Original Online LDA paper :
223
221
* Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
224
222
*/
225
223
@ Experimental
@@ -236,31 +234,30 @@ class OnlineLDAOptimizer extends LDAOptimizer {
236
234
// Online LDA specific parameters
237
235
private var tau_0 : Double = 1024
238
236
private var kappa : Double = 0.51
239
- private var minibatchFraction : Double = 0.01
237
+ private var miniBatchFraction : Double = 0.01
240
238
241
239
// internal data structure
242
240
private var docs : RDD [(Long , Vector )] = null
243
241
private var lambda : BDM [Double ] = null
244
- private var Elogbeta : BDM [Double ]= null
242
+ private var Elogbeta : BDM [Double ] = null
245
243
private var expElogbeta : BDM [Double ] = null
246
244
247
245
// count of invocation to next, which helps deciding the weight for each iteration
248
- private var iteration = 0
246
+ private var iteration : Int = 0
249
247
250
248
/**
251
249
* A (positive) learning parameter that downweights early iterations. Larger values make early
252
- * iterations count less
250
+ * iterations count less.
253
251
*/
254
252
def getTau_0 : Double = this .tau_0
255
253
256
254
/**
257
255
* A (positive) learning parameter that downweights early iterations. Larger values make early
258
- * iterations count less
259
- * Automatic setting of parameter:
260
- * - default = 1024, which follows the recommendation from OnlineLDA paper.
256
+ * iterations count less.
257
+ * Default: 1024, following the original Online LDA paper.
261
258
*/
262
259
def setTau_0 (tau_0 : Double ): this .type = {
263
- require(tau_0 > 0 || tau_0 == - 1.0 , s " LDA tau_0 must be positive, but was set to $tau_0" )
260
+ require(tau_0 > 0 , s " LDA tau_0 must be positive, but was set to $tau_0" )
264
261
this .tau_0 = tau_0
265
262
this
266
263
}
@@ -273,31 +270,32 @@ class OnlineLDAOptimizer extends LDAOptimizer {
273
270
/**
274
271
* Learning rate: exponential decay rate---should be between
275
272
* (0.5, 1.0] to guarantee asymptotic convergence.
276
- * - default = 0.51, which follows the recommendation from OnlineLDA paper.
273
+ * Default: 0.51, based on the original Online LDA paper.
277
274
*/
278
275
def setKappa (kappa : Double ): this .type = {
279
- require(kappa >= 0 || kappa == - 1.0 ,
280
- s " Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa" )
276
+ require(kappa >= 0 , s " Online LDA kappa must be nonnegative, but was set to $kappa" )
281
277
this .kappa = kappa
282
278
this
283
279
}
284
280
285
281
/**
286
- * Mini-batch size , which controls how many documents are used in each iteration
282
+ * Mini-batch fraction , which sets the fraction of document sampled and used in each iteration
287
283
*/
288
- def getMiniBatchFraction : Double = this .minibatchFraction
284
+ def getMiniBatchFraction : Double = this .miniBatchFraction
289
285
290
286
/**
291
- * Mini-batch size, which controls how many documents are used in each iteration
292
- * default = 1% from total documents.
287
+ * Mini-batch fraction in (0, 1], which sets the fraction of document sampled and used in
288
+ * each iteration.
289
+ * Default: 0.01, i.e., 1% of total documents
293
290
*/
294
291
def setMiniBatchFraction (miniBatchFraction : Double ): this .type = {
295
- this .minibatchFraction = miniBatchFraction
292
+ require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0 ,
293
+ s " Online LDA miniBatchFraction must be in range (0,1], but was set to $miniBatchFraction" )
294
+ this .miniBatchFraction = miniBatchFraction
296
295
this
297
296
}
298
297
299
- private [clustering] override def initialize (docs : RDD [(Long , Vector )], lda : LDA ): LDAOptimizer = {
300
-
298
+ override private [clustering] def initialize (docs : RDD [(Long , Vector )], lda : LDA ): LDAOptimizer = {
301
299
this .k = lda.getK
302
300
this .corpusSize = docs.count()
303
301
this .vocabSize = docs.first()._2.size
@@ -320,24 +318,25 @@ class OnlineLDAOptimizer extends LDAOptimizer {
320
318
* model, and it will update the topic distribution adaptively for the terms appearing in the
321
319
* subset.
322
320
*/
323
- private [clustering] override def next (): OnlineLDAOptimizer = {
321
+ override private [clustering] def next (): OnlineLDAOptimizer = {
324
322
iteration += 1
325
- val batch = docs.sample(true , minibatchFraction , randomGenerator.nextLong())
326
- if (batch.isEmpty()) return this
323
+ val batch = docs.sample(withReplacement = true , miniBatchFraction , randomGenerator.nextLong())
324
+ if (batch.isEmpty()) return this
327
325
328
326
val k = this .k
329
327
val vocabSize = this .vocabSize
330
328
val expElogbeta = this .expElogbeta
331
329
val alpha = this .alpha
332
330
333
- val stats = batch.mapPartitions( docs => {
331
+ val stats : RDD [ BDM [ Double ]] = batch.mapPartitions { docs =>
334
332
val stat = BDM .zeros[Double ](k, vocabSize)
335
- docs.foreach( doc => {
333
+ docs.foreach { doc =>
336
334
val termCounts = doc._2
337
- val (ids, cts) = termCounts match {
338
- case v : DenseVector => ((( 0 until v.size).toList) , v.values)
335
+ val (ids : List [ Int ] , cts : Array [ Double ] ) = termCounts match {
336
+ case v : DenseVector => ((0 until v.size).toList, v.values)
339
337
case v : SparseVector => (v.indices.toList, v.values)
340
- case v => throw new IllegalArgumentException (" Do not support vector type " + v.getClass)
338
+ case v => throw new IllegalArgumentException (" Online LDA does not support vector type "
339
+ + v.getClass)
341
340
}
342
341
343
342
// Initialize the variational distribution q(theta|gamma) for the mini-batch
@@ -354,7 +353,7 @@ class OnlineLDAOptimizer extends LDAOptimizer {
354
353
while (meanchange > 1e-5 ) {
355
354
val lastgamma = gammad
356
355
// 1*K 1 * ids ids * k
357
- gammad = (expElogthetad :* ((ctsVector / phinorm) * ( expElogbetad.t) )) + alpha
356
+ gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha
358
357
Elogthetad = digamma(gammad) - digamma(sum(gammad))
359
358
expElogthetad = exp(Elogthetad )
360
359
phinorm = expElogthetad * expElogbetad + 1e-100
@@ -364,28 +363,28 @@ class OnlineLDAOptimizer extends LDAOptimizer {
364
363
val m1 = expElogthetad.t.toDenseMatrix.t
365
364
val m2 = (ctsVector / phinorm).t.toDenseMatrix
366
365
val outerResult = kron(m1, m2) // K * ids
367
- for (i <- 0 until ids.size) {
366
+ var i = 0
367
+ while (i < ids.size) {
368
368
stat(:: , ids(i)) := (stat(:: , ids(i)) + outerResult(:: , i))
369
+ i += 1
369
370
}
370
- stat
371
- })
371
+ }
372
372
Iterator (stat)
373
- })
373
+ }
374
374
375
- val batchResult = stats.reduce(_ += _)
376
- update(batchResult, iteration, (minibatchFraction * corpusSize).toInt)
375
+ val batchResult : BDM [ Double ] = stats.reduce(_ += _)
376
+ update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt)
377
377
this
378
378
}
379
379
380
- private [clustering] override def getLDAModel (iterationTimes : Array [Double ]): LDAModel = {
380
+ override private [clustering] def getLDAModel (iterationTimes : Array [Double ]): LDAModel = {
381
381
new LocalLDAModel (Matrices .fromBreeze(lambda).transpose)
382
382
}
383
383
384
384
/**
385
385
* Update lambda based on the batch submitted. batchSize can be different for each iteration.
386
386
*/
387
- private def update (raw : BDM [Double ], iter: Int , batchSize : Int ): Unit = {
388
-
387
+ private def update (raw : BDM [Double ], iter : Int , batchSize : Int ): Unit = {
389
388
val tau_0 = this .getTau_0
390
389
val kappa = this .getKappa
391
390
@@ -405,17 +404,17 @@ class OnlineLDAOptimizer extends LDAOptimizer {
405
404
/**
406
405
* Get a random matrix to initialize lambda
407
406
*/
408
- private def getGammaMatrix (row: Int , col: Int ): BDM [Double ] = {
407
+ private def getGammaMatrix (row : Int , col : Int ): BDM [Double ] = {
409
408
val gammaRandomGenerator = new Gamma (100 , 1.0 / 100.0 )
410
409
val temp = gammaRandomGenerator.sample(row * col).toArray
411
- ( new BDM [Double ](col, row, temp) ).t
410
+ new BDM [Double ](col, row, temp).t
412
411
}
413
412
414
413
/**
415
414
* For theta ~ Dir(alpha), computes E[log(theta)] given alpha. Currently the implementation
416
415
* uses digamma which is accurate but expensive.
417
416
*/
418
- private def dirichletExpectation (alpha : BDM [Double ]): BDM [Double ] = {
417
+ private def dirichletExpectation (alpha : BDM [Double ]): BDM [Double ] = {
419
418
val rowSum = sum(alpha(breeze.linalg.* , :: ))
420
419
val digAlpha = digamma(alpha)
421
420
val digRowSum = digamma(rowSum)
0 commit comments