@@ -31,6 +31,7 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors}
31
31
import org .apache .spark .HashPartitioner
32
32
import org .apache .spark .storage .StorageLevel
33
33
import org .apache .spark .mllib .rdd .RDDFunctions ._
34
+
34
35
/**
35
36
* Entry in vocabulary
36
37
*/
@@ -61,18 +62,15 @@ private case class VocabWord(
61
62
* Distributed Representations of Words and Phrases and their Compositionality.
62
63
* @param size vector dimension
63
64
* @param startingAlpha initial learning rate
64
- * @param window context words from [-window, window]
65
- * @param minCount minimum frequncy to consider a vocabulary word
66
- * @param parallelisum number of partitions to run Word2Vec
65
+ * @param parallelism number of partitions to run Word2Vec (using a small number for accuracy)
66
+ * @param numIterations number of iterations to run, should be smaller than or equal to parallelism
67
67
*/
68
68
@ Experimental
69
69
class Word2Vec (
70
70
val size : Int ,
71
71
val startingAlpha : Double ,
72
- val window : Int ,
73
- val minCount : Int ,
74
- val parallelism : Int = 1 ,
75
- val numIterations : Int = 1 )
72
+ val parallelism : Int = 1 ,
73
+ val numIterations : Int = 1 )
76
74
extends Serializable with Logging {
77
75
78
76
private val EXP_TABLE_SIZE = 1000
@@ -81,7 +79,13 @@ class Word2Vec(
81
79
private val MAX_SENTENCE_LENGTH = 1000
82
80
private val layer1Size = size
83
81
private val modelPartitionNum = 100
84
-
82
+
83
+ /** context words from [-window, window] */
84
+ private val window = 5
85
+
86
+ /** minimum frequency to consider a vocabulary word */
87
+ private val minCount = 5
88
+
85
89
private var trainWordsCount = 0
86
90
private var vocabSize = 0
87
91
private var vocab : Array [VocabWord ] = null
@@ -99,7 +103,7 @@ class Word2Vec(
99
103
0 ))
100
104
.filter(_.cn >= minCount)
101
105
.collect()
102
- .sortWith((a, b)=> a.cn > b.cn)
106
+ .sortWith((a, b) => a.cn > b.cn)
103
107
104
108
vocabSize = vocab.length
105
109
var a = 0
@@ -111,16 +115,12 @@ class Word2Vec(
111
115
logInfo(" trainWordsCount = " + trainWordsCount)
112
116
}
113
117
114
- private def learnVocabPerPartition (words: RDD [String ]) {
115
-
116
- }
117
-
118
- private def createExpTable (): Array [Double ] = {
119
- val expTable = new Array [Double ](EXP_TABLE_SIZE )
118
+ private def createExpTable (): Array [Float ] = {
119
+ val expTable = new Array [Float ](EXP_TABLE_SIZE )
120
120
var i = 0
121
121
while (i < EXP_TABLE_SIZE ) {
122
122
val tmp = math.exp((2.0 * i / EXP_TABLE_SIZE - 1.0 ) * MAX_EXP )
123
- expTable(i) = tmp / (tmp + 1 )
123
+ expTable(i) = ( tmp / (tmp + 1.0 )).toFloat
124
124
i += 1
125
125
}
126
126
expTable
@@ -209,7 +209,7 @@ class Word2Vec(
209
209
* @return a Word2VecModel
210
210
*/
211
211
212
- def fit [S <: Iterable [String ]](dataset: RDD [S ]): Word2VecModel = {
212
+ def fit [S <: Iterable [String ]](dataset : RDD [S ]): Word2VecModel = {
213
213
214
214
val words = dataset.flatMap(x => x)
215
215
@@ -223,39 +223,37 @@ class Word2Vec(
223
223
val bcVocab = sc.broadcast(vocab)
224
224
val bcVocabHash = sc.broadcast(vocabHash)
225
225
226
- val sentences : RDD [Array [Int ]] = words.mapPartitions {
227
- iter => { new Iterator [Array [Int ]] {
228
- def hasNext = iter.hasNext
229
-
230
- def next = {
231
- var sentence = new ArrayBuffer [Int ]
232
- var sentenceLength = 0
233
- while (iter.hasNext && sentenceLength < MAX_SENTENCE_LENGTH ) {
234
- val word = bcVocabHash.value.get(iter.next)
235
- word match {
236
- case Some (w) => {
237
- sentence += w
238
- sentenceLength += 1
239
- }
240
- case None =>
241
- }
226
+ val sentences : RDD [Array [Int ]] = words.mapPartitions { iter =>
227
+ new Iterator [Array [Int ]] {
228
+ def hasNext : Boolean = iter.hasNext
229
+
230
+ def next (): Array [Int ] = {
231
+ var sentence = new ArrayBuffer [Int ]
232
+ var sentenceLength = 0
233
+ while (iter.hasNext && sentenceLength < MAX_SENTENCE_LENGTH ) {
234
+ val word = bcVocabHash.value.get(iter.next())
235
+ word match {
236
+ case Some (w) =>
237
+ sentence += w
238
+ sentenceLength += 1
239
+ case None =>
242
240
}
243
- sentence.toArray
244
241
}
242
+ sentence.toArray
245
243
}
246
244
}
247
245
}
248
246
249
247
val newSentences = sentences.repartition(parallelism).cache()
250
- var syn0Global
251
- = Array .fill[Double ](vocabSize * layer1Size)((Random .nextDouble - 0.5 ) / layer1Size)
252
- var syn1Global = new Array [Double ](vocabSize * layer1Size)
248
+ var syn0Global =
249
+ Array .fill[Float ](vocabSize * layer1Size)((Random .nextFloat() - 0.5f ) / layer1Size)
250
+ var syn1Global = new Array [Float ](vocabSize * layer1Size)
253
251
254
252
for (iter <- 1 to numIterations) {
255
253
val (aggSyn0, aggSyn1, _, _) =
256
- // TODO: broadcast temp instead of serializing it directly
254
+ // TODO: broadcast temp instead of serializing it directly
257
255
// or initialize the model in each executor
258
- newSentences.treeAggregate((syn0Global.clone() , syn1Global.clone() , 0 , 0 ))(
256
+ newSentences.treeAggregate((syn0Global, syn1Global, 0 , 0 ))(
259
257
seqOp = (c, v) => (c, v) match {
260
258
case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
261
259
var lwc = lastWordCount
@@ -280,23 +278,23 @@ class Word2Vec(
280
278
if (c >= 0 && c < sentence.size) {
281
279
val lastWord = sentence(c)
282
280
val l1 = lastWord * layer1Size
283
- val neu1e = new Array [Double ](layer1Size)
281
+ val neu1e = new Array [Float ](layer1Size)
284
282
// Hierarchical softmax
285
283
var d = 0
286
284
while (d < bcVocab.value(word).codeLen) {
287
285
val l2 = bcVocab.value(word).point(d) * layer1Size
288
286
// Propagate hidden -> output
289
- var f = blas.ddot (layer1Size, syn0, l1, 1 , syn1, l2, 1 )
287
+ var f = blas.sdot (layer1Size, syn0, l1, 1 , syn1, l2, 1 )
290
288
if (f > - MAX_EXP && f < MAX_EXP ) {
291
289
val ind = ((f + MAX_EXP ) * (EXP_TABLE_SIZE / MAX_EXP / 2.0 )).toInt
292
290
f = expTable.value(ind)
293
- val g = (1 - bcVocab.value(word).code(d) - f) * alpha
294
- blas.daxpy (layer1Size, g, syn1, l2, 1 , neu1e, 0 , 1 )
295
- blas.daxpy (layer1Size, g, syn0, l1, 1 , syn1, l2, 1 )
291
+ val g = (( 1 - bcVocab.value(word).code(d) - f) * alpha).toFloat
292
+ blas.saxpy (layer1Size, g, syn1, l2, 1 , neu1e, 0 , 1 )
293
+ blas.saxpy (layer1Size, g, syn0, l1, 1 , syn1, l2, 1 )
296
294
}
297
295
d += 1
298
296
}
299
- blas.daxpy (layer1Size, 1.0 , neu1e, 0 , 1 , syn0, l1, 1 )
297
+ blas.saxpy (layer1Size, 1.0f , neu1e, 0 , 1 , syn0, l1, 1 )
300
298
}
301
299
}
302
300
a += 1
@@ -308,24 +306,24 @@ class Word2Vec(
308
306
combOp = (c1, c2) => (c1, c2) match {
309
307
case ((syn0_1, syn1_1, lwc_1, wc_1), (syn0_2, syn1_2, lwc_2, wc_2)) =>
310
308
val n = syn0_1.length
311
- val weight1 = 1.0 * wc_1 / (wc_1 + wc_2)
312
- val weight2 = 1.0 * wc_2 / (wc_1 + wc_2)
313
- blas.dscal (n, weight1, syn0_1, 1 )
314
- blas.dscal (n, weight1, syn1_1, 1 )
315
- blas.daxpy (n, weight2, syn0_2, 1 , syn0_1, 1 )
316
- blas.daxpy (n, weight2, syn1_2, 1 , syn1_1, 1 )
309
+ val weight1 = 1.0f * wc_1 / (wc_1 + wc_2)
310
+ val weight2 = 1.0f * wc_2 / (wc_1 + wc_2)
311
+ blas.sscal (n, weight1, syn0_1, 1 )
312
+ blas.sscal (n, weight1, syn1_1, 1 )
313
+ blas.saxpy (n, weight2, syn0_2, 1 , syn0_1, 1 )
314
+ blas.saxpy (n, weight2, syn1_2, 1 , syn1_1, 1 )
317
315
(syn0_1, syn1_1, lwc_1 + lwc_2, wc_1 + wc_2)
318
316
})
319
317
syn0Global = aggSyn0
320
318
syn1Global = aggSyn1
321
319
}
322
320
newSentences.unpersist()
323
321
324
- val wordMap = new Array [(String , Array [Double ])](vocabSize)
322
+ val wordMap = new Array [(String , Array [Float ])](vocabSize)
325
323
var i = 0
326
324
while (i < vocabSize) {
327
325
val word = bcVocab.value(i).word
328
- val vector = new Array [Double ](layer1Size)
326
+ val vector = new Array [Float ](layer1Size)
329
327
Array .copy(syn0Global, i * layer1Size, vector, 0 , layer1Size)
330
328
wordMap(i) = (word, vector)
331
329
i += 1
@@ -341,15 +339,15 @@ class Word2Vec(
341
339
/**
342
340
* Word2Vec model
343
341
*/
344
- class Word2VecModel (private val model : RDD [(String , Array [Double ])]) extends Serializable {
342
+ class Word2VecModel (private val model : RDD [(String , Array [Float ])]) extends Serializable {
345
343
346
- private def cosineSimilarity (v1 : Array [Double ], v2 : Array [Double ]): Double = {
344
+ private def cosineSimilarity (v1 : Array [Float ], v2 : Array [Float ]): Double = {
347
345
require(v1.length == v2.length, " Vectors should have the same length" )
348
346
val n = v1.length
349
- val norm1 = blas.dnrm2 (n, v1, 1 )
350
- val norm2 = blas.dnrm2 (n, v2, 1 )
347
+ val norm1 = blas.snrm2 (n, v1, 1 )
348
+ val norm2 = blas.snrm2 (n, v2, 1 )
351
349
if (norm1 == 0 || norm2 == 0 ) return 0.0
352
- blas.ddot (n, v1, 1 , v2,1 ) / norm1 / norm2
350
+ blas.sdot (n, v1, 1 , v2,1 ) / norm1 / norm2
353
351
}
354
352
355
353
/**
@@ -362,7 +360,7 @@ class Word2VecModel (private val model:RDD[(String, Array[Double])]) extends Ser
362
360
if (result.isEmpty) {
363
361
throw new IllegalStateException (s " ${word} not in vocabulary " )
364
362
}
365
- else Vectors .dense(result(0 ))
363
+ else Vectors .dense(result(0 ).map(_.toDouble) )
366
364
}
367
365
368
366
/**
@@ -394,7 +392,7 @@ class Word2VecModel (private val model:RDD[(String, Array[Double])]) extends Ser
394
392
def findSynonyms (vector : Vector , num : Int ): Array [(String , Double )] = {
395
393
require(num > 0 , " Number of similar words should > 0" )
396
394
val topK = model.map { case (w, vec) =>
397
- (cosineSimilarity(vector.toArray, vec), w) }
395
+ (cosineSimilarity(vector.toArray.map(_.toFloat) , vec), w) }
398
396
.sortByKey(ascending = false )
399
397
.take(num + 1 )
400
398
.map(_.swap)
@@ -410,18 +408,16 @@ object Word2Vec{
410
408
* @param input RDD of words
411
409
* @param size vector dimension
412
410
* @param startingAlpha initial learning rate
413
- * @param window context words from [-window, window]
414
- * @param minCount minimum frequncy to consider a vocabulary word
415
- * @return Word2Vec model
416
- */
411
+ * @param parallelism number of partitions to run Word2Vec (using a small number for accuracy)
412
+ * @param numIterations number of iterations, should be smaller than or equal to parallelism
413
+ * @return Word2Vec model
414
+ */
417
415
def train [S <: Iterable [String ]](
418
416
input : RDD [S ],
419
417
size : Int ,
420
418
startingAlpha : Double ,
421
- window : Int ,
422
- minCount : Int ,
423
419
parallelism : Int = 1 ,
424
420
numIterations: Int = 1 ): Word2VecModel = {
425
- new Word2Vec (size,startingAlpha, window, minCount, parallelism, numIterations).fit[S ](input)
421
+ new Word2Vec (size,startingAlpha, parallelism, numIterations).fit[S ](input)
426
422
}
427
423
}
0 commit comments