@@ -77,32 +77,20 @@ class StreamingKMeansModel(
77
77
*/
78
78
def update (data : RDD [Vector ], decayFactor : Double , timeUnit : String ): StreamingKMeansModel = {
79
79
80
- /**
81
- * find nearest cluster to each point
82
- * @since 1.2.0
83
- */
80
+ // find nearest cluster to each point
84
81
val closest = data.map(point => (this .predict(point), (point, 1L )))
85
82
86
- /**
87
- * get sums and counts for updating each cluster
88
- * @since 1.2.0
89
- */
83
+ // get sums and counts for updating each cluster
90
84
val mergeContribs : ((Vector , Long ), (Vector , Long )) => (Vector , Long ) = (p1, p2) => {
91
85
BLAS .axpy(1.0 , p2._1, p1._1)
92
86
(p1._1, p1._2 + p2._2)
93
87
}
94
88
val dim = clusterCenters(0 ).size
95
89
96
- /**
97
- * @since 1.2.0
98
- */
99
90
val pointStats : Array [(Int , (Vector , Long ))] = closest
100
91
.aggregateByKey((Vectors .zeros(dim), 0L ))(mergeContribs, mergeContribs)
101
92
.collect()
102
93
103
- /**
104
- * @since 1.2.0
105
- */
106
94
val discount = timeUnit match {
107
95
case StreamingKMeans .BATCHES => decayFactor
108
96
case StreamingKMeans .POINTS =>
@@ -112,16 +100,10 @@ class StreamingKMeansModel(
112
100
math.pow(decayFactor, numNewPoints)
113
101
}
114
102
115
- /**
116
- * apply discount to weights
117
- * @since 1.2.0
118
- */
103
+ // apply discount to weights
119
104
BLAS .scal(discount, Vectors .dense(clusterWeights))
120
105
121
- /**
122
- * implement update rule
123
- * @since 1.2.0
124
- */
106
+ // implement update rule
125
107
pointStats.foreach { case (label, (sum, count)) =>
126
108
val centroid = clusterCenters(label)
127
109
@@ -141,10 +123,7 @@ class StreamingKMeansModel(
141
123
logInfo(s " Cluster $label updated with weight $updatedWeight and centroid: $display" )
142
124
}
143
125
144
- /**
145
- * Check whether the smallest cluster is dying. If so, split the largest cluster.
146
- * @since 1.2.0
147
- */
126
+ // Check whether the smallest cluster is dying. If so, split the largest cluster.
148
127
val weightsWithIndex = clusterWeights.view.zipWithIndex
149
128
val (maxWeight, largest) = weightsWithIndex.maxBy(_._1)
150
129
val (minWeight, smallest) = weightsWithIndex.minBy(_._1)
@@ -197,7 +176,7 @@ class StreamingKMeans(
197
176
198
177
/** @since 1.2.0 */
199
178
def this () = this (2 , 1.0 , StreamingKMeans .BATCHES )
200
- /** @since 1.2.0 */
179
+
201
180
protected var model : StreamingKMeansModel = new StreamingKMeansModel (null , null )
202
181
203
182
/**
0 commit comments