Skip to content

Commit 61d60df

Browse files
committed
Minor cleanups:
* Update *Concentration parameter documentation * EM Optimizer: createVertices() does not need to be a function * OnlineLDAOptimizer: typos in doc * Clean up the core code for online LDA (Scala style)
1 parent a996a82 commit 61d60df

File tree

2 files changed

+63
-63
lines changed

2 files changed

+63
-63
lines changed

mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,15 @@ class LDA private (
9090
* If set to -1, then docConcentration is set automatically.
9191
* (default = -1 = automatic)
9292
*
93-
* Automatic setting of parameter:
94-
* - For EM: default = (50 / k) + 1.
95-
* - The 50/k is common in LDA libraries.
96-
* - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM.
97-
* - For Online: default = (1.0 / k).
98-
* - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]].
99-
*
100-
* Note: For EM optimizer, This value should be > 1.0.
93+
* Optimizer-specific parameter settings:
94+
* - EM
95+
* - Value should be > 1.0
96+
* - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
97+
* Asuncion et al. (2009), who recommend a +1 adjustment for EM.
98+
* - Online
99+
* - Value should be >= 0
100+
* - default = (1.0 / k), following the implementation from
101+
* [[https://github.com/Blei-Lab/onlineldavb]].
101102
*/
102103
def setDocConcentration(docConcentration: Double): this.type = {
103104
this.docConcentration = docConcentration
@@ -117,8 +118,7 @@ class LDA private (
117118
* This is the parameter to a symmetric Dirichlet distribution.
118119
*
119120
* Note: The topics' distributions over terms are called "beta" in the original LDA paper
120-
* by Blei et al., but are ca
121-
* lled "phi" in many later papers such as Asuncion et al., 2009.
121+
* by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009.
122122
*/
123123
def getTopicConcentration: Double = this.topicConcentration
124124

@@ -134,14 +134,15 @@ class LDA private (
134134
* If set to -1, then topicConcentration is set automatically.
135135
* (default = -1 = automatic)
136136
*
137-
* Automatic setting of parameter:
138-
* - For EM: default = 0.1 + 1.
139-
* - The 0.1 gives a small amount of smoothing.
140-
* - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM.
141-
* - For Online: default = (1.0 / k).
142-
* - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]].
143-
*
144-
* Note: For EM optimizer, This value should be > 1.0.
137+
* Optimizer-specific parameter settings:
138+
* - EM
139+
* - Value should be > 1.0
140+
* - default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows
141+
* Asuncion et al. (2009), who recommend a +1 adjustment for EM.
142+
* - Online
143+
* - Value should be >= 0
144+
* - default = (1.0 / k), following the implementation from
145+
* [[https://github.com/Blei-Lab/onlineldavb]].
145146
*/
146147
def setTopicConcentration(topicConcentration: Double): this.type = {
147148
this.topicConcentration = topicConcentration

mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class EMLDAOptimizer extends LDAOptimizer {
8181
import LDA._
8282

8383
/**
84-
* Following fields will only be initialized through initialize method
84+
* The following fields will only be initialized through the initialize() method
8585
*/
8686
private[clustering] var graph: Graph[TopicCounts, TokenCount] = null
8787
private[clustering] var k: Int = 0
@@ -94,7 +94,7 @@ class EMLDAOptimizer extends LDAOptimizer {
9494
/**
9595
* Compute bipartite term/doc graph.
9696
*/
97-
private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={
97+
override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
9898

9999
val docConcentration = lda.getDocConcentration
100100
val topicConcentration = lda.getTopicConcentration
@@ -121,7 +121,7 @@ class EMLDAOptimizer extends LDAOptimizer {
121121

122122
// Create vertices.
123123
// Initially, we use random soft assignments of tokens to topics (random gamma).
124-
def createVertices(): RDD[(VertexId, TopicCounts)] = {
124+
val docTermVertices: RDD[(VertexId, TopicCounts)] = {
125125
val verticesTMP: RDD[(VertexId, TopicCounts)] =
126126
edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
127127
val random = new Random(partIndex + randomSeed)
@@ -134,8 +134,6 @@ class EMLDAOptimizer extends LDAOptimizer {
134134
verticesTMP.reduceByKey(_ + _)
135135
}
136136

137-
val docTermVertices = createVertices()
138-
139137
// Partition such that edges are grouped by document
140138
this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D)
141139
this.k = k
@@ -216,10 +214,10 @@ class EMLDAOptimizer extends LDAOptimizer {
216214
* :: Experimental ::
217215
*
218216
* An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA
219-
* algorithm, which processes a subset of the corpus on each iteration, and update the term-topic
217+
* algorithm, which processes a subset of the corpus on each iteration, and updates the term-topic
220218
* distribution adaptively.
221219
*
222-
* References:
220+
* Original Online LDA paper:
223221
* Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
224222
*/
225223
@Experimental
@@ -236,31 +234,30 @@ class OnlineLDAOptimizer extends LDAOptimizer {
236234
// Online LDA specific parameters
237235
private var tau_0: Double = 1024
238236
private var kappa: Double = 0.51
239-
private var minibatchFraction: Double = 0.01
237+
private var miniBatchFraction: Double = 0.01
240238

241239
// internal data structure
242240
private var docs: RDD[(Long, Vector)] = null
243241
private var lambda: BDM[Double] = null
244-
private var Elogbeta: BDM[Double]= null
242+
private var Elogbeta: BDM[Double] = null
245243
private var expElogbeta: BDM[Double] = null
246244

247245
// count of invocation to next, which helps deciding the weight for each iteration
248-
private var iteration = 0
246+
private var iteration: Int = 0
249247

250248
/**
251249
* A (positive) learning parameter that downweights early iterations. Larger values make early
252-
* iterations count less
250+
* iterations count less.
253251
*/
254252
def getTau_0: Double = this.tau_0
255253

256254
/**
257255
* A (positive) learning parameter that downweights early iterations. Larger values make early
258-
* iterations count less
259-
* Automatic setting of parameter:
260-
* - default = 1024, which follows the recommendation from OnlineLDA paper.
256+
* iterations count less.
257+
* Default: 1024, following the original Online LDA paper.
261258
*/
262259
def setTau_0(tau_0: Double): this.type = {
263-
require(tau_0 > 0 || tau_0 == -1.0, s"LDA tau_0 must be positive, but was set to $tau_0")
260+
require(tau_0 > 0, s"LDA tau_0 must be positive, but was set to $tau_0")
264261
this.tau_0 = tau_0
265262
this
266263
}
@@ -273,31 +270,32 @@ class OnlineLDAOptimizer extends LDAOptimizer {
273270
/**
274271
* Learning rate: exponential decay rate---should be between
275272
* (0.5, 1.0] to guarantee asymptotic convergence.
276-
* - default = 0.51, which follows the recommendation from OnlineLDA paper.
273+
* Default: 0.51, based on the original Online LDA paper.
277274
*/
278275
def setKappa(kappa: Double): this.type = {
279-
require(kappa >= 0 || kappa == -1.0,
280-
s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
276+
require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa")
281277
this.kappa = kappa
282278
this
283279
}
284280

285281
/**
286-
* Mini-batch size, which controls how many documents are used in each iteration
282+
* Mini-batch fraction, which sets the fraction of document sampled and used in each iteration
287283
*/
288-
def getMiniBatchFraction: Double = this.minibatchFraction
284+
def getMiniBatchFraction: Double = this.miniBatchFraction
289285

290286
/**
291-
* Mini-batch size, which controls how many documents are used in each iteration
292-
* default = 1% from total documents.
287+
* Mini-batch fraction in (0, 1], which sets the fraction of document sampled and used in
288+
* each iteration.
289+
* Default: 0.01, i.e., 1% of total documents
293290
*/
294291
def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
295-
this.minibatchFraction = miniBatchFraction
292+
require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0,
293+
s"Online LDA miniBatchFraction must be in range (0,1], but was set to $miniBatchFraction")
294+
this.miniBatchFraction = miniBatchFraction
296295
this
297296
}
298297

299-
private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={
300-
298+
private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
301299
this.k = lda.getK
302300
this.corpusSize = docs.count()
303301
this.vocabSize = docs.first()._2.size
@@ -322,22 +320,23 @@ class OnlineLDAOptimizer extends LDAOptimizer {
322320
*/
323321
private[clustering] override def next(): OnlineLDAOptimizer = {
324322
iteration += 1
325-
val batch = docs.sample(true, minibatchFraction, randomGenerator.nextLong())
326-
if(batch.isEmpty()) return this
323+
val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong())
324+
if (batch.isEmpty()) return this
327325

328326
val k = this.k
329327
val vocabSize = this.vocabSize
330328
val expElogbeta = this.expElogbeta
331329
val alpha = this.alpha
332330

333-
val stats = batch.mapPartitions(docs =>{
331+
val stats: RDD[BDM[Double]] = batch.mapPartitions { docs =>
334332
val stat = BDM.zeros[Double](k, vocabSize)
335-
docs.foreach(doc =>{
333+
docs.foreach { doc =>
336334
val termCounts = doc._2
337-
val (ids, cts) = termCounts match {
338-
case v: DenseVector => (((0 until v.size).toList), v.values)
335+
val (ids: List[Int], cts: Array[Double]) = termCounts match {
336+
case v: DenseVector => ((0 until v.size).toList, v.values)
339337
case v: SparseVector => (v.indices.toList, v.values)
340-
case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
338+
case v => throw new IllegalArgumentException("Online LDA does not support vector type "
339+
+ v.getClass)
341340
}
342341

343342
// Initialize the variational distribution q(theta|gamma) for the mini-batch
@@ -354,7 +353,7 @@ class OnlineLDAOptimizer extends LDAOptimizer {
354353
while (meanchange > 1e-5) {
355354
val lastgamma = gammad
356355
// 1*K 1 * ids ids * k
357-
gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha
356+
gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha
358357
Elogthetad = digamma(gammad) - digamma(sum(gammad))
359358
expElogthetad = exp(Elogthetad)
360359
phinorm = expElogthetad * expElogbetad + 1e-100
@@ -364,28 +363,28 @@ class OnlineLDAOptimizer extends LDAOptimizer {
364363
val m1 = expElogthetad.t.toDenseMatrix.t
365364
val m2 = (ctsVector / phinorm).t.toDenseMatrix
366365
val outerResult = kron(m1, m2) // K * ids
367-
for (i <- 0 until ids.size) {
366+
var i = 0
367+
while (i < ids.size) {
368368
stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
369+
i += 1
369370
}
370-
stat
371-
})
371+
}
372372
Iterator(stat)
373-
})
373+
}
374374

375-
val batchResult = stats.reduce(_ += _)
376-
update(batchResult, iteration, (minibatchFraction * corpusSize).toInt)
375+
val batchResult: BDM[Double] = stats.reduce(_ += _)
376+
update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt)
377377
this
378378
}
379379

380-
private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
380+
override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
381381
new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
382382
}
383383

384384
/**
385385
* Update lambda based on the batch submitted. batchSize can be different for each iteration.
386386
*/
387-
private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
388-
387+
private def update(raw: BDM[Double], iter: Int, batchSize: Int): Unit = {
389388
val tau_0 = this.getTau_0
390389
val kappa = this.getKappa
391390

@@ -405,17 +404,17 @@ class OnlineLDAOptimizer extends LDAOptimizer {
405404
/**
406405
* Get a random matrix to initialize lambda
407406
*/
408-
private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={
407+
private def getGammaMatrix(row: Int, col: Int): BDM[Double] = {
409408
val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0)
410409
val temp = gammaRandomGenerator.sample(row * col).toArray
411-
(new BDM[Double](col, row, temp)).t
410+
new BDM[Double](col, row, temp).t
412411
}
413412

414413
/**
415414
* For theta ~ Dir(alpha), computes E[log(theta)] given alpha. Currently the implementation
416415
* uses digamma which is accurate but expensive.
417416
*/
418-
private def dirichletExpectation(alpha : BDM[Double]): BDM[Double] = {
417+
private def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = {
419418
val rowSum = sum(alpha(breeze.linalg.*, ::))
420419
val digAlpha = digamma(alpha)
421420
val digRowSum = digamma(rowSum)

0 commit comments

Comments
 (0)