17
17
18
18
package org .apache .spark .ml .ann
19
19
20
- import breeze .linalg .{* , DenseMatrix => BDM , DenseVector => BDV , Vector => BV , axpy => brzAxpy ,
21
- sum => Bsum }
20
+ import breeze .linalg .{* , DenseMatrix => BDM , DenseVector => BDV , Vector => BV , axpy => Baxpy ,
21
+ sum => Bsum }
22
22
import breeze .numerics .{log => Blog , sigmoid => Bsigmoid }
23
+
23
24
import org .apache .spark .mllib .linalg .{Vector , Vectors }
24
25
import org .apache .spark .mllib .optimization ._
25
26
import org .apache .spark .rdd .RDD
@@ -177,8 +178,11 @@ private[ann] object AffineLayerModel {
177
178
* @param numOut number of layer outputs
178
179
* @return matrix A and vector b
179
180
*/
180
- def unroll (weights : Vector , position : Int ,
181
- numIn : Int , numOut : Int ): (BDM [Double ], BDV [Double ]) = {
181
+ def unroll (
182
+ weights : Vector ,
183
+ position : Int ,
184
+ numIn : Int ,
185
+ numOut : Int ): (BDM [Double ], BDV [Double ]) = {
182
186
val weightsCopy = weights.toArray
183
187
// TODO: the array is not copied to BDMs, make sure this is OK!
184
188
val a = new BDM [Double ](numOut, numIn, weightsCopy, position)
@@ -272,8 +276,11 @@ private[ann] object ActivationFunction {
272
276
}
273
277
}
274
278
275
- def apply (x1 : BDM [Double ], x2 : BDM [Double ], y : BDM [Double ],
276
- func : (Double , Double ) => Double ): Unit = {
279
+ def apply (
280
+ x1 : BDM [Double ],
281
+ x2 : BDM [Double ],
282
+ y : BDM [Double ],
283
+ func : (Double , Double ) => Double ): Unit = {
277
284
var i = 0
278
285
while (i < x1.rows) {
279
286
var j = 0
@@ -284,7 +291,6 @@ private[ann] object ActivationFunction {
284
291
i += 1
285
292
}
286
293
}
287
-
288
294
}
289
295
290
296
/**
@@ -320,8 +326,10 @@ private[ann] class SoftmaxFunction extends ActivationFunction {
320
326
}
321
327
}
322
328
323
- override def crossEntropy (output : BDM [Double ], target : BDM [Double ],
324
- result : BDM [Double ]): Double = {
329
+ override def crossEntropy (
330
+ output : BDM [Double ],
331
+ target : BDM [Double ],
332
+ result : BDM [Double ]): Double = {
325
333
def m (o : Double , t : Double ): Double = o - t
326
334
ActivationFunction (output, target, result, m)
327
335
- Bsum ( target :* Blog (output)) / output.cols
@@ -346,11 +354,13 @@ private[ann] class SigmoidFunction extends ActivationFunction {
346
354
ActivationFunction (x, y, s)
347
355
}
348
356
349
- override def crossEntropy (output : BDM [Double ], target : BDM [Double ],
350
- result : BDM [Double ]): Double = {
357
+ override def crossEntropy (
358
+ output : BDM [Double ],
359
+ target : BDM [Double ],
360
+ result : BDM [Double ]): Double = {
351
361
def m (o : Double , t : Double ): Double = o - t
352
362
ActivationFunction (output, target, result, m)
353
- - Bsum ( target :* Blog (output)) / output.cols
363
+ - Bsum (target :* Blog (output)) / output.cols
354
364
}
355
365
356
366
override def derivative (x : BDM [Double ], y : BDM [Double ]): Unit = {
@@ -384,13 +394,17 @@ private[ann] class FunctionalLayer (val activationFunction: ActivationFunction)
384
394
* Functional layer model. Holds no weights.
385
395
* @param activationFunction activation function
386
396
*/
387
- private [ann] class FunctionalLayerModel private (val activationFunction : ActivationFunction
388
- ) extends LayerModel {
397
+ private [ann] class FunctionalLayerModel private (val activationFunction : ActivationFunction )
398
+ extends LayerModel {
389
399
val size = 0
390
-
400
+ // matrices for in-place computations
401
+ // outputs
391
402
private var f : BDM [Double ] = null
403
+ // delta
392
404
private var d : BDM [Double ] = null
405
+ // matrix for error computation
393
406
private var e : BDM [Double ] = null
407
+ // delta gradient
394
408
private lazy val dg = new Array [Double ](0 )
395
409
396
410
override def eval (data : BDM [Double ]): BDM [Double ] = {
@@ -487,7 +501,7 @@ private[ann] trait TopologyModel extends Serializable{
487
501
* Feed forward ANN
488
502
* @param layers
489
503
*/
490
- class FeedForwardTopology private (val layers : Array [Layer ]) extends Topology {
504
+ private [ann] class FeedForwardTopology private (val layers : Array [Layer ]) extends Topology {
491
505
override def getInstance (weights : Vector ): TopologyModel = FeedForwardModel (this , weights)
492
506
493
507
override def getInstance (seed : Long ): TopologyModel = FeedForwardModel (this , seed)
@@ -496,7 +510,7 @@ class FeedForwardTopology private(val layers: Array[Layer]) extends Topology {
496
510
/**
497
511
* Factory for some of the frequently-used topologies
498
512
*/
499
- object FeedForwardTopology {
513
+ private [ml] object FeedForwardTopology {
500
514
/**
501
515
* Creates a feed forward topology from the array of layers
502
516
* @param layers array of layers
@@ -534,19 +548,23 @@ object FeedForwardTopology {
534
548
* @param layerModels models of layers
535
549
* @param topology topology of the network
536
550
*/
537
- private [spark] class FeedForwardModel private (val layerModels : Array [LayerModel ],
538
- val topology : FeedForwardTopology ) extends TopologyModel {
551
+ private [ml] class FeedForwardModel private (
552
+ val layerModels : Array [LayerModel ],
553
+ val topology : FeedForwardTopology ) extends TopologyModel {
539
554
override def forward (data : BDM [Double ]): Array [BDM [Double ]] = {
540
555
val outputs = new Array [BDM [Double ]](layerModels.length)
541
556
outputs(0 ) = layerModels(0 ).eval(data)
542
- for (i <- 1 until layerModels.length){
557
+ for (i <- 1 until layerModels.length) {
543
558
outputs(i) = layerModels(i).eval(outputs(i- 1 ))
544
559
}
545
560
outputs
546
561
}
547
562
548
- override def computeGradient (data : BDM [Double ], target : BDM [Double ], cumGradient : Vector ,
549
- realBatchSize : Int ): Double = {
563
+ override def computeGradient (
564
+ data : BDM [Double ],
565
+ target : BDM [Double ],
566
+ cumGradient : Vector ,
567
+ realBatchSize : Int ): Double = {
550
568
val outputs = forward(data)
551
569
val deltas = new Array [BDM [Double ]](layerModels.length)
552
570
val L = layerModels.length - 1
@@ -585,12 +603,12 @@ private[spark] class FeedForwardModel private(val layerModels: Array[LayerModel]
585
603
override def weights (): Vector = {
586
604
// TODO: extract roll
587
605
var size = 0
588
- for (i <- 0 until layerModels.length) {
606
+ for (i <- 0 until layerModels.length) {
589
607
size += layerModels(i).size
590
608
}
591
609
val array = new Array [Double ](size)
592
610
var offset = 0
593
- for (i <- 0 until layerModels.length) {
611
+ for (i <- 0 until layerModels.length) {
594
612
val layerWeights = layerModels(i).weights().toArray
595
613
System .arraycopy(layerWeights, 0 , array, offset, layerWeights.length)
596
614
offset += layerWeights.length
@@ -620,7 +638,7 @@ private[ann] object FeedForwardModel {
620
638
val layers = topology.layers
621
639
val layerModels = new Array [LayerModel ](layers.length)
622
640
var offset = 0
623
- for (i <- 0 until layers.length){
641
+ for (i <- 0 until layers.length) {
624
642
layerModels(i) = layers(i).getInstance(weights, offset)
625
643
offset += layerModels(i).size
626
644
}
@@ -658,8 +676,11 @@ private[ann] class ANNGradient(topology: Topology, dataStacker: DataStacker) ext
658
676
(gradient, loss)
659
677
}
660
678
661
- override def compute (data : Vector , label : Double , weights : Vector ,
662
- cumGradient : Vector ): Double = {
679
+ override def compute (
680
+ data : Vector ,
681
+ label : Double ,
682
+ weights : Vector ,
683
+ cumGradient : Vector ): Double = {
663
684
val (input, target, realBatchSize) = dataStacker.unstack(data)
664
685
val model = topology.getInstance(weights)
665
686
model.computeGradient(input, target, cumGradient, realBatchSize)
@@ -684,12 +705,12 @@ private[ann] class DataStacker(stackSize: Int, inputSize: Int, outputSize: Int)
684
705
*/
685
706
def stack (data : RDD [(Vector , Vector )]): RDD [(Double , Vector )] = {
686
707
val stackedData = if (stackSize == 1 ) {
687
- data.map( v =>
708
+ data.map { v =>
688
709
(0.0 ,
689
710
Vectors .fromBreeze(BDV .vertcat(
690
711
v._1.toBreeze.toDenseVector,
691
712
v._2.toBreeze.toDenseVector))
692
- ))
713
+ ) }
693
714
} else {
694
715
data.mapPartitions { it =>
695
716
it.grouped(stackSize).map { seq =>
@@ -728,14 +749,15 @@ private[ann] class DataStacker(stackSize: Int, inputSize: Int, outputSize: Int)
728
749
*/
729
750
private [ann] class ANNUpdater extends Updater {
730
751
731
- override def compute (weightsOld : Vector ,
732
- gradient : Vector ,
733
- stepSize : Double ,
734
- iter : Int ,
735
- regParam : Double ): (Vector , Double ) = {
752
+ override def compute (
753
+ weightsOld : Vector ,
754
+ gradient : Vector ,
755
+ stepSize : Double ,
756
+ iter : Int ,
757
+ regParam : Double ): (Vector , Double ) = {
736
758
val thisIterStepSize = stepSize
737
759
val brzWeights : BV [Double ] = weightsOld.toBreeze.toDenseVector
738
- brzAxpy (- thisIterStepSize, gradient.toBreeze, brzWeights)
760
+ Baxpy (- thisIterStepSize, gradient.toBreeze, brzWeights)
739
761
(Vectors .fromBreeze(brzWeights), 0 )
740
762
}
741
763
}
@@ -746,8 +768,10 @@ private[ann] class ANNUpdater extends Updater {
746
768
* @param inputSize input size
747
769
* @param outputSize output size
748
770
*/
749
- private [ml] class FeedForwardTrainer (topology : Topology , val inputSize : Int ,
750
- val outputSize : Int ) extends Serializable {
771
+ private [ml] class FeedForwardTrainer (
772
+ topology : Topology ,
773
+ val inputSize : Int ,
774
+ val outputSize : Int ) extends Serializable {
751
775
752
776
// TODO: what if we need to pass random seed?
753
777
private var _weights = topology.getInstance(11L ).weights()
0 commit comments