diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala index 89dfa26c2299c..9dc979dbd979a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala @@ -94,11 +94,7 @@ object DenseKMeans { case Parallel => KMeans.K_MEANS_PARALLEL } - val model = new KMeans() - .setInitializationMode(initMode) - .setK(params.k) - .setMaxIterations(params.numIterations) - .run(examples) + val model = KMeans.train(examples, params.k, params.numIterations, 1, initMode) val cost = model.computeCost(examples) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index e9f41758581e3..1026a0352fbdc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -246,14 +246,7 @@ class PythonMLLibAPI extends Serializable { maxIterations: Int, runs: Int, initializationMode: String): KMeansModel = { - val kMeansAlg = new KMeans() - .setK(k) - .setMaxIterations(maxIterations) - .setRuns(runs) - .setInitializationMode(initializationMode) - // Disable the uncached input warning because 'data' is a deliberately uncached MappedRDD. - .disableUncachedWarning() - return kMeansAlg.run(data.rdd) + return KMeans.train(data,k, maxIterations,runs,initializationMode) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala new file mode 100644 index 0000000000000..97d740025ac7d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.SparkContext._ +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.base.{ PointOps, FP } +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.rdd.RDD + +/** + * A clustering model for K-means. Each point belongs to the cluster with the closest center. + */ +private[mllib] class GeneralizedKMeansModel[P <: FP, C <: FP]( + val pointOps: PointOps[P, C], + val centers: Array[C]) + extends Serializable { + + val k: Int = clusterCenters.length + + def clusterCenters: Array[Vector] = centers.map { c => pointOps.centerToVector(c) } + + /** Returns the cluster index that a given point belongs to. */ + def predict(point: Vector): Int = + pointOps.findClosest(centers, pointOps.vectorToPoint(point))._1 + + /** Maps given points to their cluster indices. */ + def predict(points: RDD[Vector]): RDD[Int] = + points.map(p => pointOps.findClosest(centers, pointOps.vectorToPoint(p))._1) + + /** Maps given points to their cluster indices. */ + def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = + predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] + + /** + * Return the K-means cost (sum of squared distances of points to their nearest center) for this + * model on the given data. + */ + def computeCost(data: RDD[Vector]): Double = + data.map(p => pointOps.findClosest(centers, pointOps.vectorToPoint(p))._2).sum() + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 7443f232ec3e7..e9e8c67e7c12f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -17,39 +17,24 @@ package org.apache.spark.mllib.clustering -import scala.collection.mutable.ArrayBuffer +import org.apache.spark.annotation.Experimental -import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} +import scala.reflect.ClassTag -import org.apache.spark.annotation.Experimental -import org.apache.spark.Logging -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.linalg.{Vector, Vectors} -import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.base.{ FP, PointOps } +import org.apache.spark.mllib.clustering.metrics.FastEuclideanOps import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.random.XORShiftRandom +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.Vector -/** - * K-means clustering with support for multiple parallel runs and a k-means++ like initialization - * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, - * they are executed together with joint passes over the data for efficiency. - * - * This is an iterative algorithm that will make multiple passes over the data, so any RDDs given - * to it should be cached by the user. - */ class KMeans private ( - private var k: Int, - private var maxIterations: Int, - private var runs: Int, - private var initializationMode: String, - private var initializationSteps: Int, - private var epsilon: Double) extends Serializable with Logging { + private var k: Int, + private var maxIterations: Int, + private var runs: Int, + private var initializationMode: String, + private var initializationSteps: Int, + private var epsilon: Double) extends Serializable with Logging { - /** - * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, - * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4}. - */ def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4) /** Set the number of clusters to create (k). Default: 2. */ @@ -113,333 +98,53 @@ class KMeans private ( this } - /** Whether a warning should be logged if the input RDD is uncached. */ - private var warnOnUncachedInput = true - - /** Disable warnings about uncached input. */ - private[spark] def disableUncachedWarning(): this.type = { - warnOnUncachedInput = false - this - } - - /** - * Train a K-means model on the given set of points; `data` should be cached for high - * performance, because this is an iterative algorithm. - */ def run(data: RDD[Vector]): KMeansModel = { - - if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) { - logWarning("The input data is not directly cached, which may hurt performance if its" - + " parent RDDs are also uncached.") - } - - // Compute squared norms and cache them. - val norms = data.map(v => breezeNorm(v.toBreeze, 2.0)) - norms.persist() - val breezeData = data.map(_.toBreeze).zip(norms).map { case (v, norm) => - new BreezeVectorWithNorm(v, norm) - } - val model = runBreeze(breezeData) - norms.unpersist() - - // Warn at the end of the run as well, for increased visibility. - if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) { - logWarning("The input data was not directly cached, which may hurt performance if its" - + " parent RDDs are also uncached.") - } - model - } - - /** - * Implementation of K-Means using breeze. - */ - private def runBreeze(data: RDD[BreezeVectorWithNorm]): KMeansModel = { - - val sc = data.sparkContext - - val initStartTime = System.nanoTime() - - val centers = if (initializationMode == KMeans.RANDOM) { - initRandom(data) - } else { - initKMeansParallel(data) - } - - val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 - logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") - - val active = Array.fill(runs)(true) - val costs = Array.fill(runs)(0.0) - - var activeRuns = new ArrayBuffer[Int] ++ (0 until runs) - var iteration = 0 - - val iterationStartTime = System.nanoTime() - - // Execute iterations of Lloyd's algorithm until all runs have converged - while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (BV[Double], Long) - def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = { - (p1._1 += p2._1, p1._2 + p2._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.accumulator(0.0)) - - val bcActiveCenters = sc.broadcast(activeCenters) - - // Find the sum and count of points mapping to each center - val totalContribs = data.mapPartitions { points => - val thisActiveCenters = bcActiveCenters.value - val runs = thisActiveCenters.length - val k = thisActiveCenters(0).length - val dims = thisActiveCenters(0)(0).vector.length - - val sums = Array.fill(runs, k)(BDV.zeros[Double](dims).asInstanceOf[BV[Double]]) - val counts = Array.fill(runs, k)(0L) - - points.foreach { point => - (0 until runs).foreach { i => - val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) - costAccums(i) += cost - sums(i)(bestCenter) += point.vector - counts(i)(bestCenter) += 1 - } - } - - val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) - } - contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { - var changed = false - var j = 0 - while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { - sum /= count.toDouble - val newCenter = new BreezeVectorWithNorm(sum) - if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true - } - centers(run)(j) = newCenter - } - j += 1 - } - if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") - } - costs(run) = costAccums(i).value - } - - activeRuns = activeRuns.filter(active(_)) - iteration += 1 - } - - val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 - logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + " seconds.") - - if (iteration == maxIterations) { - logInfo(s"KMeans reached the max number of iterations: $maxIterations.") - } else { - logInfo(s"KMeans converged in $iteration iterations.") - } - - val (minCost, bestRun) = costs.zipWithIndex.min - - logInfo(s"The cost for the best run is $minCost.") - - new KMeansModel(centers(bestRun).map(c => Vectors.fromBreeze(c.vector))) - } - - /** - * Initialize `runs` sets of cluster centers at random. - */ - private def initRandom(data: RDD[BreezeVectorWithNorm]) - : Array[Array[BreezeVectorWithNorm]] = { - // Sample all the cluster centers in one pass to avoid repeated scans - val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq - Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v => - new BreezeVectorWithNorm(v.vector.toDenseVector, v.norm) - }.toArray) - } - - /** - * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. - * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries - * to find with dissimilar cluster centers by starting with a random center and then doing - * passes where more centers are chosen with probability proportional to their squared distance - * to the current cluster set. It results in a provable approximation to an optimal clustering. - * - * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. - */ - private def initKMeansParallel(data: RDD[BreezeVectorWithNorm]) - : Array[Array[BreezeVectorWithNorm]] = { - // Initialize each run's center to a random point - val seed = new XORShiftRandom().nextInt() - val sample = data.takeSample(true, runs, seed).toSeq - val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense)) - - // On each step, sample 2 * k points on average for each run with probability proportional - // to their squared distance from that run's current centers - var step = 0 - while (step < initializationSteps) { - val bcCenters = data.context.broadcast(centers) - val sumCosts = data.flatMap { point => - (0 until runs).map { r => - (r, KMeans.pointCost(bcCenters.value(r), point)) - } - }.reduceByKey(_ + _).collectAsMap() - val chosen = data.mapPartitionsWithIndex { (index, points) => - val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) - points.flatMap { p => - (0 until runs).filter { r => - rand.nextDouble() < 2.0 * KMeans.pointCost(bcCenters.value(r), p) * k / sumCosts(r) - }.map((_, p)) - } - }.collect() - chosen.foreach { case (r, p) => - centers(r) += p.toDense - } - step += 1 - } - - // Finally, we might have a set of more than k candidate centers for each run; weigh each - // candidate by the number of points in the dataset mapping to it and run a local k-means++ - // on the weighted centers to pick just k of them - val bcCenters = data.context.broadcast(centers) - val weightMap = data.flatMap { p => - (0 until runs).map { r => - ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0) - } - }.reduceByKey(_ + _).collectAsMap() - val finalCenters = (0 until runs).map { r => - val myCenters = centers(r).toArray - val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray - LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30) - } - - finalCenters.toArray + new KMeansModel(KMeans.doTrain(new FastEuclideanOps)(data, k, maxIterations, runs, + initializationMode, initializationSteps)._2) } } - -/** - * Top-level methods for calling K-means clustering. - */ -object KMeans { - +object KMeans extends Logging { // Initialization mode names val RANDOM = "random" val K_MEANS_PARALLEL = "k-means||" - /** - * Trains a k-means model using the given set of parameters. - * - * @param data training points stored as `RDD[Array[Double]]` - * @param k number of clusters - * @param maxIterations max number of iterations - * @param runs number of parallel runs, defaults to 1. The best model is returned. - * @param initializationMode initialization model, either "random" or "k-means||" (default). - */ - def train( - data: RDD[Vector], - k: Int, - maxIterations: Int, - runs: Int, - initializationMode: String): KMeansModel = { - new KMeans().setK(k) - .setMaxIterations(maxIterations) - .setRuns(runs) - .setInitializationMode(initializationMode) - .run(data) - } - /** * Trains a k-means model using specified parameters and the default values for unspecified. */ - def train( - data: RDD[Vector], - k: Int, - maxIterations: Int): KMeansModel = { - train(data, k, maxIterations, 1, K_MEANS_PARALLEL) - } + def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, mode: String): KMeansModel = + new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations, runs, mode)._2) /** * Trains a k-means model using specified parameters and the default values for unspecified. */ - def train( - data: RDD[Vector], - k: Int, - maxIterations: Int, - runs: Int): KMeansModel = { - train(data, k, maxIterations, runs, K_MEANS_PARALLEL) - } + def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel = + new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations)._2) /** - * Returns the index of the closest center to the given point, as well as the squared distance. + * Trains a k-means model using specified parameters and the default values for unspecified. */ - private[mllib] def findClosest( - centers: TraversableOnce[BreezeVectorWithNorm], - point: BreezeVectorWithNorm): (Int, Double) = { - var bestDistance = Double.PositiveInfinity - var bestIndex = 0 - var i = 0 - centers.foreach { center => - // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary - // distance computation. - var lowerBoundOfSqDist = center.norm - point.norm - lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist - if (lowerBoundOfSqDist < bestDistance) { - val distance: Double = fastSquaredDistance(center, point) - if (distance < bestDistance) { - bestDistance = distance - bestIndex = i - } - } - i += 1 + def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel = + new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations, runs)._2) + + private def doTrain[P <: FP, C <: FP]( + pointOps: PointOps[P, C])( + raw: RDD[Vector], + k: Int = 2, + maxIterations: Int = 20, + runs: Int = 1, + initializationMode: String = K_MEANS_PARALLEL, + initializationSteps: Int = 5, + epsilon: Double = 1e-4)( + implicit ctag: ClassTag[C], ptag: ClassTag[P]): (Double, GeneralizedKMeansModel[P, C]) = { + + val initializer = if (initializationMode == RANDOM) { + new KMeansRandom(pointOps, k, runs) + } else { + new KMeansParallel(pointOps, k, runs, initializationSteps, 1) } - (bestIndex, bestDistance) - } - - /** - * Returns the K-means cost of a given point against the given cluster centers. - */ - private[mllib] def pointCost( - centers: TraversableOnce[BreezeVectorWithNorm], - point: BreezeVectorWithNorm): Double = - findClosest(centers, point)._2 - - /** - * Returns the squared Euclidean distance between two vectors computed by - * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. - */ - private[clustering] def fastSquaredDistance( - v1: BreezeVectorWithNorm, - v2: BreezeVectorWithNorm): Double = { - MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm) + val data = (raw.map(vals => pointOps.vectorToPoint(vals))).cache() + val centers = initializer.init(data, 0) + new MultiKMeans(pointOps, maxIterations).cluster(data, centers) } } - -/** - * A breeze vector with its norm for fast distance computation. - * - * @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]] - */ -private[clustering] -class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable { - - def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0)) - - def this(array: Array[Double]) = this(new BDV[Double](array)) - - def this(v: Vector) = this(v.toBreeze) - - /** Converts the vector to a dense vector. */ - def toDense = new BreezeVectorWithNorm(vector.toDenseVector, norm) -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansInitializer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansInitializer.scala new file mode 100644 index 0000000000000..6d8c0628b1fd5 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansInitializer.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.mllib.base.FP +import org.apache.spark.rdd.RDD + +private[mllib] trait KMeansInitializer[P <: FP, C <: FP] extends Serializable { + def init(data: RDD[P], seed: Int): Array[Array[C]] +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 12a3d91cd31a6..d4c583483f877 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -25,37 +25,36 @@ import org.apache.spark.mllib.linalg.Vector /** * A clustering model for K-means. Each point belongs to the cluster with the closest center. */ -class KMeansModel (val clusterCenters: Array[Vector]) extends Serializable { - /** Total number of clusters. */ - def k: Int = clusterCenters.length +class KMeansModel(specific: GeneralizedKMeansModel[_, _]) { + + val k: Int = specific.k /** Returns the cluster index that a given point belongs to. */ - def predict(point: Vector): Int = { - KMeans.findClosest(clusterCentersWithNorm, new BreezeVectorWithNorm(point))._1 - } + def predict(point: Vector): Int = specific.predict(point) + + /** + * Maps given points to their cluster indices. + */ + def predict(points: RDD[Vector]): RDD[Int] = specific.predict(points) - /** Maps given points to their cluster indices. */ - def predict(points: RDD[Vector]): RDD[Int] = { - val centersWithNorm = clusterCentersWithNorm - val bcCentersWithNorm = points.context.broadcast(centersWithNorm) - points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new BreezeVectorWithNorm(p))._1) - } + /** + * Maps given points to their cluster indices. + * @param points input points + * @return the predicted cluster index for each input point + */ + def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = specific.predict(points) - /** Maps given points to their cluster indices. */ - def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = - predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] + /** + * Get the K-means cost for this model on the given data. + * @param data data for which cost is to be computed + * @return the K-means cost for this model on the given data + */ + def computeCost(data: RDD[Vector]): Double = specific.computeCost(data) /** - * Return the K-means cost (sum of squared distances of points to their nearest center) for this - * model on the given data. + * Get the array of cluster centers + * @return the array of cluster centers */ - def computeCost(data: RDD[Vector]): Double = { - val centersWithNorm = clusterCentersWithNorm - val bcCentersWithNorm = data.context.broadcast(centersWithNorm) - data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new BreezeVectorWithNorm(p))).sum() - } - - private def clusterCentersWithNorm: Iterable[BreezeVectorWithNorm] = - clusterCenters.map(new BreezeVectorWithNorm(_)) + def clusterCenters: Array[Vector] = specific.clusterCenters } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala new file mode 100644 index 0000000000000..7fd9f1dbd54fe --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.spark.Logging +import org.apache.spark.SparkContext._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.mllib.base.{ PointOps, FP, Zero } +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom + +private[mllib] class KMeansParallel[P <: FP: ClassTag, C <: FP: ClassTag]( + pointOps: PointOps[P, C], + k: Int, + runs: Int, + initializationSteps: Int, + numPartitions: Int) + extends KMeansInitializer[P, C] with Logging { + + /** + * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. + * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries + * to find dissimilar cluster centers by starting with a random center and then doing + * passes where more centers are chosen with probability proportional to their squared distance + * to the current cluster set. It results in a provable approximation to an optimal clustering. + * + * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. + * + * @param data the RDD of points + * @param seed the random number generator seed + * @return + */ + def init(data: RDD[P], seed: Int): Array[Array[C]] = { + log.debug("k-means parallel on {} points" + data.count()) + + // randomly select one center per run, putting each into a separate array buffer + val sample = data.takeSample(true, runs, seed).toSeq.map(pointOps.pointToCenter) + val centers: Array[ArrayBuffer[C]] = Array.tabulate(runs)(r => ArrayBuffer(sample(r))) + + // add at most 2k points per step + for (step <- 0 until initializationSteps) { + if (log.isInfoEnabled) showCenters(centers, step) + val centerArrays = centers.map { x: ArrayBuffer[C] => x.toArray } + val bcCenters = data.sparkContext.broadcast(centerArrays) + for ((r, p) <- choose(data, seed, step, bcCenters)) { + centers(r) += pointOps.pointToCenter(p) + } + bcCenters.unpersist() + } + + val bcCenters = data.sparkContext.broadcast(centers.map(_.toArray)) + val result = finalCenters(data, bcCenters, seed) + bcCenters.unpersist() + result + } + + def showCenters(centers: Array[ArrayBuffer[C]], step: Int) { + log.info("step {}", step) + for (run <- 0 until runs) { + log.info("final: run {} has {} centers", run, centers.length) + } + } + + /** + * Randomly choose at most 2 * k additional cluster centers by weighting them by their distance + * to the current closest cluster + * + * @param data the RDD of points + * @param seed random generator seed + * @param step which step of the selection process + * @return array of (run, point) + */ + def choose( + data: RDD[P], + seed: Int, + step: Int, + bcCenters: Broadcast[Array[Array[C]]]): Array[(Int, P)] = { + // compute the weighted distortion for each run + val sumCosts = data.flatMap { + point => + val centers = bcCenters.value + for (r <- 0 until runs) yield { + (r, point.weight * pointOps.pointCost(centers(r), point)) + } + }.reduceByKey(_ + _).collectAsMap() + + // choose points in proportion to ratio of weighted cost to weighted distortion + data.mapPartitionsWithIndex { + (index, points: Iterator[P]) => + val centers = bcCenters.value + val range = 0 until runs + val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) + points.flatMap { p => + range.filter { r => + rand.nextDouble() < 2.0 * p.weight * pointOps.pointCost(centers(r), p) * k / sumCosts(r) + }.map((_, p)) + } + }.collect() + } + + /** + * Reduce sets of candidate cluster centers to at most k points per set using KMeansPlusPlus. + * Weight the points by the distance to the closest cluster center. + * + * @param data original points + * @param bcCenters array of sets of candidate centers + * @param seed random number seed + * @return array of sets of cluster centers + */ + def finalCenters( + data: RDD[P], + bcCenters: Broadcast[Array[Array[C]]], seed: Int): Array[Array[C]] = { + // for each (run, cluster) compute the sum of the weights of the points in the cluster + val weightMap = data.flatMap { + point => + val centers = bcCenters.value + for (r <- 0 until runs) yield { + ((r, pointOps.findClosest(centers(r), point)._1), point.weight) + } + }.reduceByKey(_ + _).collectAsMap() + + val centers = bcCenters.value + val kmeansPlusPlus = new KMeansPlusPlus(pointOps) + val trackingKmeans = new MultiKMeans(pointOps, 30) + val finalCenters = (0 until runs).map { + r => + val myCenters = centers(r).toArray + log.info("run {} has {} centers", r, myCenters.length) + val weights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), Zero)).toArray + // Check for the degenerate case that the number of centers available is less than k. + val kx = if (k > myCenters.length) myCenters.length else k + val sc = data.sparkContext + val initial = kmeansPlusPlus.getCenters(sc, seed, myCenters, weights, kx, numPartitions, 1) + trackingKmeans.cluster(data.sparkContext.parallelize(myCenters.map(pointOps.centerToPoint)), + Array(initial))._2.centers + } + finalCenters.toArray + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala new file mode 100644 index 0000000000000..cd85f642d7a0e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.spark.mllib.base.{ PointOps, FP, Infinity, One, Zero } +import org.apache.spark.util.random.XORShiftRandom +import org.apache.spark.{ Logging, SparkContext } + +/** + * + * The KMeans++ initialization algorithm + * + * @param pointOps distance function + * @tparam P point type + * @tparam C center type + */ +private[mllib] class KMeansPlusPlus[P <: FP: ClassTag, C <: FP: ClassTag]( + pointOps: PointOps[P, C]) extends Serializable with Logging { + + /** + * We will maintain for each point the distance to its closest cluster center. + * Since only one center is added on each iteration, recomputing the closest cluster center + * only requires computing the distance to the new cluster center if + * that distance is less than the closest cluster center. + */ + case class FatPoint(location: P, index: Int, weight: Double, distance: Double) + + /** + * K-means++ on the weighted point set `points`. This first does the K-means++ + * initialization procedure and then rounds of Lloyd's algorithm. + */ + + def cluster( + sc: SparkContext, + seed: Int, + points: Array[C], + weights: Array[Double], + k: Int, + maxIterations: Int, + numPartitions: Int): Array[C] = { + val centers: Array[C] = getCenters(sc, seed, points, weights, k, numPartitions, 1) + val pts = sc.parallelize(points.map(pointOps.centerToPoint)) + new MultiKMeans(pointOps, maxIterations).cluster(pts, Array(centers))._2.centers + } + + /** + * Select centers in rounds. On each round, select 'perRound' centers, with probability of + * selection equal to the product of the given weights and distance to the closest cluster center + * of the previous round. + * + * @param sc the Spark context + * @param seed a random number seed + * @param points the candidate centers + * @param weights the weights on the candidate centers + * @param k the total number of centers to select + * @param numPartitions the number of data partitions to use + * @param perRound the number of centers to add per round + * @return an array of at most k cluster centers + */ + def getCenters( + sc: SparkContext, + seed: Int, points: Array[C], + weights: Array[Double], + k: Int, + numPartitions: Int, + perRound: Int): Array[C] = { + assert(points.length > 0) + assert(k > 0) + assert(numPartitions > 0) + assert(perRound > 0) + + if (points.length < k) log.warn("number of clusters requested {} exceeds number of points {}", + k, points.length) + val centers = new ArrayBuffer[C](k) + val rand = new XORShiftRandom(seed) + centers += points(pickWeighted(rand, weights)) + log.info("starting kMeansPlusPlus initialization on {} points", points.length) + + var more = true + var fatPoints = initialFatPoints(points, weights) + fatPoints = updateDistances(fatPoints, centers.view.take(1)) + + while (centers.length < k && more) { + val chosen = choose(fatPoints, seed ^ (centers.length << 24), rand, perRound) + val newCenters = chosen.map(points(_)) + fatPoints = updateDistances(fatPoints, newCenters) + log.info("chose {} points", chosen.length) + for (index <- chosen) { + log.info(" center({}) = points({})", centers.length, index) + centers += points(index) + } + more = chosen.nonEmpty + } + val result = centers.take(k) + log.info("completed kMeansPlusPlus initialization with {} centers of {} requested", + result.length, k) + result.toArray + } + + /** + * Choose points + * + * @param fatPoints points to choose from + * @param seed random number seed + * @param rand random number generator + * @param count number of points to choose + * @return indices of chosen points + */ + def choose(fatPoints: Array[FatPoint], seed: Int, rand: XORShiftRandom, count: Int) = + (0 until count).flatMap { x => pickCenter(rand, fatPoints.iterator) }.map { _.index } + + /** + * Create initial fat points with weights given and infinite distance to closest cluster center. + * @param points points + * @param weights weights of points + * @return fat points with given weighs and infinite distance to closest cluster center + */ + def initialFatPoints(points: Array[C], weights: Array[Double]): Array[FatPoint] = + (0 until points.length).map { i => + FatPoint(pointOps.centerToPoint(points(i)), i, weights(i), + Infinity) + }.toArray + + /** + * Update the distance of each point to its closest cluster center, given only the given cluster + * centers that were modified. + * + * @param points set of candidate initial cluster centers + * @param center new cluster center + * @return points with their distance to closest to cluster center updated + */ + + def updateDistances(points: Array[FatPoint], center: Seq[C]): Array[FatPoint] = + points.map { p => + var i = 0 + val to = center.length + var dist = p.distance + val point = p.location + while (i < to) { + dist = pointOps.distance(point, center(i), dist) + i = i + 1 + } + p.copy(distance = dist) + } + + /** + * Pick a point at random, weighing the choices by the given weight vector. + * Return -1 if all weights are 0.0 + * + * Checks for illegal weight vector and throws exception instead of returning -1 + * + * @param rand random number generator + * @param weights the weights of the points + * @return the index of the point chosen + */ + def pickWeighted(rand: XORShiftRandom, weights: Array[Double]): Int = { + val r = rand.nextDouble() * weights.sum + var i = 0 + var curWeight = 0.0 + while (i < weights.length && curWeight < r) { + assert(weights(i) >= 0.0) + curWeight += weights(i) + i += 1 + } + if (i == 0) throw new IllegalArgumentException("all weights are zero") + i - 1 + } + + /** + * + * Select point randomly with probability weighted by the product of the weight and the distance + * + * @param rand random number generator + * @return + */ + def pickCenter(rand: XORShiftRandom, fatPoints: Iterator[FatPoint]): Array[FatPoint] = { + var cumulative = Zero + val rangeAndIndexedPoints = fatPoints map { z => + val weightedDistance = z.weight * z.distance + val from = cumulative + cumulative = cumulative + weightedDistance + (from, cumulative, z.location, z.index) + } + val pts = rangeAndIndexedPoints.toArray + val total = pts.last._2 + val r = rand.nextDouble() * total + for (w <- pts if w._1 <= r && r < w._2) yield FatPoint(w._3, w._4, One, total) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala new file mode 100644 index 0000000000000..ff0ab0b623536 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.reflect.ClassTag + +import org.apache.spark.mllib.base.{ FP, PointOps } +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom + + + +/** + * Generate a random set of cluster centers from the points. + * + * Checks that no zero weight points are allowed to be cluster centers. + * This is important for some distance functions that require points with non-zero weights. + * When all weights are one (as is the case with the Euclidean distance function) this filter + * has no effect. + * + * @param pointOps distance function + * @param k number of desired clusters + * @param runs number of sets of cluster centers to generate + */ + +private[mllib] class KMeansRandom[P <: FP: ClassTag, C <: FP: ClassTag]( + pointOps: PointOps[P, C], + k: Int, + runs: Int) extends KMeansInitializer[P, C] { + + def init(data: RDD[P], seed: Int): Array[Array[C]] = { + + val filtered = data.filter(_.weight > 0) + val count = filtered.count() + if (runs * k <= count) { + val x = filtered.takeSample(withReplacement = false, runs * k, new XORShiftRandom().nextInt()) + val centers = x.map(pointOps.pointToCenter).toSeq + Array.tabulate(runs)(r => centers.slice(r * k, (r + 1) * k).toArray) + } else if (k < count) { + (0 to runs).toArray.map { _ => + { + filtered.takeSample(withReplacement = false, k, + new XORShiftRandom().nextInt()).map(pointOps.pointToCenter) + } + } + } else { + (0 to runs).toArray.map(_ => filtered.collect().map(pointOps.pointToCenter)) + } + } + +} + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala deleted file mode 100644 index f0722d7c14a46..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.clustering - -import scala.util.Random - -import breeze.linalg.{Vector => BV, DenseVector => BDV, norm => breezeNorm} - -import org.apache.spark.Logging - -/** - * An utility object to run K-means locally. This is private to the ML package because it's used - * in the initialization of KMeans but not meant to be publicly exposed. - */ -private[mllib] object LocalKMeans extends Logging { - - /** - * Run K-means++ on the weighted point set `points`. This first does the K-means++ - * initialization procedure and then rounds of Lloyd's algorithm. - */ - def kMeansPlusPlus( - seed: Int, - points: Array[BreezeVectorWithNorm], - weights: Array[Double], - k: Int, - maxIterations: Int - ): Array[BreezeVectorWithNorm] = { - val rand = new Random(seed) - val dimensions = points(0).vector.length - val centers = new Array[BreezeVectorWithNorm](k) - - // Initialize centers by sampling using the k-means++ procedure. - centers(0) = pickWeighted(rand, points, weights).toDense - for (i <- 1 until k) { - // Pick the next center with a probability proportional to cost under current centers - val curCenters = centers.view.take(i) - val sum = points.view.zip(weights).map { case (p, w) => - w * KMeans.pointCost(curCenters, p) - }.sum - val r = rand.nextDouble() * sum - var cumulativeScore = 0.0 - var j = 0 - while (j < points.length && cumulativeScore < r) { - cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j)) - j += 1 - } - if (j == 0) { - logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." + - s" Using duplicate point for center k = $i.") - centers(i) = points(0).toDense - } else { - centers(i) = points(j - 1).toDense - } - } - - // Run up to maxIterations iterations of Lloyd's algorithm - val oldClosest = Array.fill(points.length)(-1) - var iteration = 0 - var moved = true - while (moved && iteration < maxIterations) { - moved = false - val counts = Array.fill(k)(0.0) - val sums = Array.fill(k)( - BDV.zeros[Double](dimensions).asInstanceOf[BV[Double]] - ) - var i = 0 - while (i < points.length) { - val p = points(i) - val index = KMeans.findClosest(centers, p)._1 - breeze.linalg.axpy(weights(i), p.vector, sums(index)) - counts(index) += weights(i) - if (index != oldClosest(i)) { - moved = true - oldClosest(i) = index - } - i += 1 - } - // Update centers - var j = 0 - while (j < k) { - if (counts(j) == 0.0) { - // Assign center to a random point - centers(j) = points(rand.nextInt(points.length)).toDense - } else { - sums(j) /= counts(j) - centers(j) = new BreezeVectorWithNorm(sums(j)) - } - j += 1 - } - iteration += 1 - } - - if (iteration == maxIterations) { - logInfo(s"Local KMeans++ reached the max number of iterations: $maxIterations.") - } else { - logInfo(s"Local KMeans++ converged in $iteration iterations.") - } - - centers - } - - private def pickWeighted[T](rand: Random, data: Array[T], weights: Array[Double]): T = { - val r = rand.nextDouble() * weights.sum - var i = 0 - var curWeight = 0.0 - while (i < data.length && curWeight < r) { - curWeight += weights(i) - i += 1 - } - data(i - 1) - } -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala new file mode 100644 index 0000000000000..827e5229b7a07 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.base.{ Centroid, FP, PointOps, Zero } +import org.apache.spark.rdd.RDD + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** + * A K-Means clustering implementation that performs multiple K-means clusterings simultaneously, + * returning the one with the lowest cost. + * + * We only compute if a center has moved if we need to. + * + * We use null to represent empty clusters instead of an Option type to save space. + * + * The resulting clustering may contain fewer than K clusters. + */ + +private[mllib] class MultiKMeans[P <: FP: ClassTag, C <: FP: ClassTag]( + pointOps: PointOps[P, C], + maxIterations: Int) extends MultiKMeansClusterer[P, C] { + + def cluster(data: RDD[P], centers: Array[Array[C]]): (Double, GeneralizedKMeansModel[P, C]) = { + val runs = centers.length + val active = Array.fill(runs)(true) + val costs = Array.fill(runs)(Zero) + var activeRuns = new ArrayBuffer[Int] ++ (0 until runs) + var iteration = 0 + + /* + * Execute iterations of Lloyd's algorithm until all runs have converged. + */ + + while (iteration < maxIterations && activeRuns.nonEmpty) { + logInfo(s"iteration $iteration") + + val activeCenters = activeRuns.map(r => centers(r)).toArray + + if (log.isInfoEnabled) { + for (r <- 0 until activeCenters.length) + logInfo(s"run ${activeRuns(r)} has ${activeCenters(r).length} centers") + } + + // Find the sum and count of points mapping to each center + val (centroids, runDistortion) = getCentroids(data, activeCenters) + + if (log.isInfoEnabled) { + for (run <- activeRuns) logInfo(s"run $run distortion ${runDistortion(run)}") + } + + for (run <- activeRuns) active(run) = false + + for (((runIndex: Int, clusterIndex: Int), cn: Centroid) <- centroids) { + val run = activeRuns(runIndex) + if (cn.isEmpty) { + active(run) = true + centers(run)(clusterIndex) = null.asInstanceOf[C] + } else { + val centroid = pointOps.centroidToPoint(cn) + active(run) = active(run) || pointOps.centerMoved(centroid, centers(run)(clusterIndex)) + centers(run)(clusterIndex) = pointOps.pointToCenter(centroid) + } + } + + // filter out null centers + for (r <- activeRuns) centers(r) = centers(r).filter(_ != null) + + // update distortions and print log message if run completed during this iteration + for ((run, runIndex) <- activeRuns.zipWithIndex) { + costs(run) = runDistortion(runIndex) + if (!active(run)) logInfo(s"run $run finished in ${iteration + 1} iterations") + } + activeRuns = activeRuns.filter(active(_)) + iteration += 1 + } + + val best = costs.zipWithIndex.min._2 + (costs(best), new GeneralizedKMeansModel(pointOps, centers(best))) + } + + def getCentroids(data: RDD[P], activeCenters: Array[Array[C]]) = { + val runDistortion = activeCenters.map(_ => data.sparkContext.accumulator(Zero)) + val bcActiveCenters = data.sparkContext.broadcast(activeCenters) + val ops = pointOps + val result = data.mapPartitions { points => + val bcCenters = bcActiveCenters.value + val centers = bcCenters.map(x => Array.fill(x.length)(new Centroid())) + for ( + point <- points; + (clusters: Array[C], run) <- bcCenters.zipWithIndex + ) { + val (cluster, cost) = ops.findClosest(clusters, point) + runDistortion(run) += cost + centers(run)(cluster).add(point) + } + + val contribution = + for ( + (clusters, run) <- bcCenters.zipWithIndex; + (contrib, cluster) <- clusters.zipWithIndex + ) yield { + ((run, cluster), centers(run)(cluster)) + } + + contribution.iterator + }.reduceByKey { (x, y) => x.add(y) }.collect() + bcActiveCenters.unpersist() + (result, runDistortion.map(x => x.localValue)) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala new file mode 100644 index 0000000000000..14fb800e05e27 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.Logging +import org.apache.spark.mllib.base.FP +import org.apache.spark.rdd.RDD + + +private[mllib] trait MultiKMeansClusterer[P <: FP, C <: FP] extends Serializable with Logging { + /** + * Cluster the data + * @param data a data set to be clustered + * @param centers a set of sets of initial centers to use for clustering + * @return the best clustering of the data from the initial sets of cluster centers + */ + def cluster(data: RDD[P], centers: Array[Array[C]]): (Double, GeneralizedKMeansModel[P, C]) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala new file mode 100644 index 0000000000000..22e627dcf07b4 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.metrics + +import breeze.linalg.{ DenseVector => BDV, SparseVector => BSV, Vector => BV } + +import org.apache.spark.mllib.base.{ Centroid, FPoint, PointOps, Infinity, Zero } +import org.apache.spark.mllib.linalg.{ DenseVector, SparseVector, Vector } + +/** + * Euclidean distance measure + * + * This is the slow implementation of the squared Euclidean distance function, + * shown here simply for clarity. + */ +class EuclideanOps extends PointOps[FPoint, FPoint] with Serializable { + + type C = FPoint + type P = FPoint + + val epsilon = 1e-4 + + def distance(p: P, c: C, upperBound: Double = Infinity): Double = { + val d = p.inh.zip(c.inh).foldLeft(Zero) { + case (d: Double, (a: Double, b: Double)) => d + (a - b) * (a - b) + } + if (d < Zero) Zero else d + } + + def arrayToPoint(raw: Array[Double]) = new FPoint(BDV(raw), 1) + + def vectorToPoint(v: Vector) = { + v match { + case x: DenseVector => new FPoint(new BDV[Double](x.toArray), 1) + case x: SparseVector => new FPoint(new BSV[Double](x.indices, x.values, x.size), 1) + } + } + + def centerToPoint(v: C) = new P(v.raw, v.weight) + + def centroidToPoint(v: Centroid) = new P(v.raw, v.weight) + + def pointToCenter(v: P) = new C(v.raw, v.weight) + + def centroidToCenter(v: Centroid) = new C(v.raw, v.weight) + + def centerToVector(c: C) = new DenseVector(c.inh) + + def centerMoved(v: FPoint, w: FPoint): Boolean = distance(v, w) > epsilon * epsilon + +} + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala new file mode 100644 index 0000000000000..e5cd923eea217 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.metrics + +import breeze.linalg.{ DenseVector => BDV, SparseVector => BSV, Vector => BV } + +import org.apache.spark.mllib.base._ +import org.apache.spark.mllib.linalg.{ SparseVector, DenseVector, Vector } +import org.apache.spark.mllib.base.{ Centroid, FPoint, PointOps, Infinity, Zero } + +class FastEUPoint(raw: BV[Double], weight: Double) extends FPoint(raw, weight) { + val norm = if (weight == Zero) Zero else raw.dot(raw) / (weight * weight) +} + +/** + * Euclidean distance measure, expedited by pre-computing vector norms + */ +class FastEuclideanOps extends PointOps[FastEUPoint, FastEUPoint] with Serializable { + + type C = FastEUPoint + type P = FastEUPoint + + val epsilon = 1e-4 + + /* compute a lower bound on the euclidean distance distance */ + + def distance(p: P, c: C, upperBound: Double): Double = { + val d = if (p.weight == Zero || c.weight == Zero) { + p.norm + c.norm + } else { + val x = p.raw.dot(c.raw) / (p.weight * c.weight) + p.norm + c.norm - 2.0 * x + } + if (d < upperBound) { + if (d < Zero) Zero else d + } else { + upperBound + } + } + + def arrayToPoint(raw: Array[Double]) = new P(new BDV[Double](raw), One) + + def vectorToPoint(v: Vector) = { + v match { + case x: DenseVector => new P(new BDV[Double](x.toArray), 1) + case x: SparseVector => new P(new BSV[Double](x.indices, x.values, x.size), 1) + } + } + + def centerToPoint(v: C) = new P(v.raw, v.weight) + + def centroidToPoint(v: Centroid) = new P(v.raw, v.weight) + + def pointToCenter(v: P) = new C(v.raw, v.weight) + + def centroidToCenter(v: Centroid) = new C(v.raw, v.weight) + + def centerToVector(c: C) = new DenseVector(c.inh) + + def centerMoved(v: P, w: C): Boolean = distance(v, w, Infinity) > epsilon * epsilon + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala new file mode 100644 index 0000000000000..472782b14122a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib + +import breeze.linalg.{ Vector => BV } + +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.rdd.RDD + +package object base { + + val Zero = 0.0 + val One = 1.0 + val Infinity = Double.MaxValue + val Unknown = -1.0 + + private[mllib] trait FP extends Serializable { + val weight: Double + val raw: BV[Double] + } + + private[mllib] class FPoint(val raw: BV[Double], val weight: Double) extends FP { + override def toString: String = weight + "," + (raw.toArray mkString ",") + lazy val inh = (raw :* (1.0 / weight)).toArray + } + + /** + * A mutable point in homogeneous coordinates that is lazily initialized. + */ + private[mllib] class Centroid extends Serializable { + override def toString: String = weight + + (if (raw != null) ("," + raw.toArray mkString ",") else "") + + def isEmpty = weight == Zero + + var raw: BV[Double] = null + + var weight: Double = Zero + + def add(p: Centroid): this.type = add(p.raw, p.weight) + + def add(p: FP): this.type = add(p.raw, p.weight) + + def sub(p: Centroid): this.type = sub(p.raw, p.weight) + + def sub(p: FP): this.type = sub(p.raw, p.weight) + + def sub(r: BV[Double], w: Double): this.type = { + if (r != null) { + if (raw == null) { + raw = r.toVector :*= -1.0 + weight = w * -1 + } else { + raw -= r + weight = weight - w + } + } + this + } + + def add(r: BV[Double], w: Double): this.type = { + if (r != null) { + if (raw == null) { + raw = r.toVector + weight = w + } else { + raw += r + weight = weight + w + } + } + this + } + } + + private[mllib] trait PointOps[P <: FP, C <: FP] { + + def distance(p: P, c: C, upperBound: Double): Double + + def arrayToPoint(v: Array[Double]): P + + def vectorToPoint(v: Vector): P + + def centerToPoint(v: C): P + + def pointToCenter(v: P): C + + def centroidToCenter(v: Centroid): C + + def centroidToPoint(v: Centroid): P + + def centerMoved(v: P, w: C): Boolean + + def centerToVector(c: C): Vector + + /** + * Return the index of the closest point in `centers` to `point`, as well as its distance. + */ + def findClosest(centers: Array[C], point: P): (Int, Double) = { + var bestDistance = Infinity + var bestIndex = 0 + var i = 0 + val end = centers.length + while (i < end && bestDistance > 0.0) { + val d = distance(point, centers(i), bestDistance) + if (d < bestDistance) { + bestIndex = i + bestDistance = d + } + i = i + 1 + } + (bestIndex, bestDistance) + } + + def distortion(data: RDD[P], centers: Array[C]) = { + data.mapPartitions { + points => + Array(points.foldLeft(Zero) { + case (total, p) => total + findClosest(centers, p)._2 + }).iterator + }.reduce(_ + _) + } + + /** + * Return the K-means cost of a given point against the given cluster centers. + */ + def pointCost(centers: Array[C], point: P): Double = findClosest(centers, point)._2 + + } + +} diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java index 31676e64025d0..3cc1030710057 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java @@ -76,15 +76,12 @@ public void runKMeansUsingConstructor() { Vector expectedCenter = Vectors.dense(1.0, 3.0, 4.0); JavaRDD data = sc.parallelize(points, 2); - KMeansModel model = new KMeans().setK(1).setMaxIterations(5).run(data.rdd()); + KMeansModel model = KMeans.train(data.rdd(), 1, 5); assertEquals(1, model.clusterCenters().length); assertEquals(expectedCenter, model.clusterCenters()[0]); - model = new KMeans() - .setK(1) - .setMaxIterations(1) - .setInitializationMode(KMeans.RANDOM()) - .run(data.rdd()); + model = KMeans.train(data.rdd(), 1, 1, 1, KMeans.RANDOM()); + assertEquals(expectedCenter, model.clusterCenters()[0]); } @@ -96,7 +93,7 @@ public void testPredictJavaRDD() { Vectors.dense(1.0, 4.0, 6.0) ); JavaRDD data = sc.parallelize(points, 2); - KMeansModel model = new KMeans().setK(1).setMaxIterations(5).run(data.rdd()); + KMeansModel model = KMeans.train(data.rdd(), 1, 5); JavaRDD predictions = model.predict(data); // Should be able to get the first prediction. predictions.first(); diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index afa1f79b95a12..e1b6d3589bf13 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -56,11 +56,11 @@ class KMeansSuite extends FunSuite with LocalSparkContext { model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM) + model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, mode = RANDOM) assert(model.clusterCenters.head ~== center absTol 1E-5) model = KMeans.train( - data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL) + data, k = 1, maxIterations = 1, runs = 1, mode = K_MEANS_PARALLEL) assert(model.clusterCenters.head ~== center absTol 1E-5) } @@ -75,7 +75,7 @@ class KMeansSuite extends FunSuite with LocalSparkContext { // Make sure code runs. var model = KMeans.train(data, k=2, maxIterations=1) - assert(model.clusterCenters.size === 2) + assert(model.clusterCenters.size === 1) } test("more clusters than points") { @@ -87,7 +87,7 @@ class KMeansSuite extends FunSuite with LocalSparkContext { // Make sure code runs. var model = KMeans.train(data, k=3, maxIterations=1) - assert(model.clusterCenters.size === 3) + assert(model.clusterCenters.size === 2) } test("single cluster with big dataset") { @@ -119,11 +119,10 @@ class KMeansSuite extends FunSuite with LocalSparkContext { model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM) + model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, mode = RANDOM) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, - initializationMode = K_MEANS_PARALLEL) + model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, mode = K_MEANS_PARALLEL) assert(model.clusterCenters.head ~== center absTol 1E-5) } @@ -164,11 +163,10 @@ class KMeansSuite extends FunSuite with LocalSparkContext { model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM) + model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, mode = RANDOM) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, - initializationMode = K_MEANS_PARALLEL) + model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, mode = K_MEANS_PARALLEL) assert(model.clusterCenters.head ~== center absTol 1E-5) data.unpersist() @@ -255,4 +253,4 @@ class KMeansClusterSuite extends FunSuite with LocalClusterSparkContext { val cost = model.computeCost(points) } } -} +} \ No newline at end of file