diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 76aeebd703d4e..f5da4ce79385b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.{SQLContext, Row} * the weight for Gaussian i, and weights.sum == 1 * @param gaussians Array of MultivariateGaussian where gaussians(i) represents * the Multivariate Gaussian (Normal) Distribution for Gaussian i + * @since 1.3.0 */ @Experimental class GaussianMixtureModel( @@ -53,32 +54,48 @@ class GaussianMixtureModel( override protected def formatVersion = "1.0" + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { GaussianMixtureModel.SaveLoadV1_0.save(sc, path, weights, gaussians) } - /** Number of gaussians in mixture */ + /** + * Number of gaussians in mixture + * @since 1.3.0 + */ def k: Int = weights.length - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + * @since 1.3.0 + */ def predict(points: RDD[Vector]): RDD[Int] = { val responsibilityMatrix = predictSoft(points) responsibilityMatrix.map(r => r.indexOf(r.max)) } - /** Maps given point to its cluster index. */ + /** + * Maps given point to its cluster index. + * @since 1.4.0 + */ def predict(point: Vector): Int = { val r = computeSoftAssignments(point.toBreeze.toDenseVector, gaussians, weights, k) r.indexOf(r.max) } - /** Java-friendly version of [[predict()]] */ + /** + * Java-friendly version of [[predict()]] + * @since 1.4.0 + */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] /** * Given the input vectors, return the membership value of each vector * to all mixture components. + * @since 1.3.0 */ def predictSoft(points: RDD[Vector]): RDD[Array[Double]] = { val sc = points.sparkContext @@ -91,6 +108,7 @@ class GaussianMixtureModel( /** * Given the input vector, return the membership values to all mixture components. + * @since 1.4.0 */ def predictSoft(point: Vector): Array[Double] = { computeSoftAssignments(point.toBreeze.toDenseVector, gaussians, weights, k) @@ -115,6 +133,9 @@ class GaussianMixtureModel( } } +/** + * @since 1.4.0 + */ @Experimental object GaussianMixtureModel extends Loader[GaussianMixtureModel] { @@ -147,6 +168,9 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { sc.parallelize(dataArray, 1).toDF().write.parquet(Loader.dataPath(path)) } + /** + * @since 1.4.0 + */ def load(sc: SparkContext, path: String): GaussianMixtureModel = { val dataPath = Loader.dataPath(path) val sqlContext = new SQLContext(sc) @@ -165,6 +189,9 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { } } + /** + * @since 1.4.0 + */ override def load(sc: SparkContext, path: String) : GaussianMixtureModel = { val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path) implicit val formats = DefaultFormats diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 0a65403f4ec95..c74de4d139a8c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -430,11 +430,14 @@ class KMeans private ( /** * Top-level methods for calling K-means clustering. + * @since 0.8.0 */ object KMeans { // Initialization mode names + /** @since 0.8.0 */ val RANDOM = "random" + /** @since 0.8.0 */ val K_MEANS_PARALLEL = "k-means||" /** @@ -446,6 +449,7 @@ object KMeans { * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). * @param seed random seed value for cluster initialization + * @since 1.3.0 */ def train( data: RDD[Vector], @@ -470,6 +474,7 @@ object KMeans { * @param maxIterations max number of iterations * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). + * @since 0.8.0 */ def train( data: RDD[Vector], @@ -486,6 +491,7 @@ object KMeans { /** * Trains a k-means model using specified parameters and the default values for unspecified. + * @since 0.8.0 */ def train( data: RDD[Vector], @@ -496,6 +502,7 @@ object KMeans { /** * Trains a k-means model using specified parameters and the default values for unspecified. + * @since 0.8.0 */ def train( data: RDD[Vector], diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 8ecb3df11d95e..7187ae6e3b0aa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -34,35 +34,52 @@ import org.apache.spark.sql.Row /** * A clustering model for K-means. Each point belongs to the cluster with the closest center. + * @since 0.8.0 */ class KMeansModel ( val clusterCenters: Array[Vector]) extends Saveable with Serializable with PMMLExportable { - /** A Java-friendly constructor that takes an Iterable of Vectors. */ + /** + * A Java-friendly constructor that takes an Iterable of Vectors. + * @since 1.4.0 + */ def this(centers: java.lang.Iterable[Vector]) = this(centers.asScala.toArray) - /** Total number of clusters. */ + /** + * Total number of clusters. + * @since 0.8.0 + */ def k: Int = clusterCenters.length - /** Returns the cluster index that a given point belongs to. */ + /** + * Returns the cluster index that a given point belongs to. + * @since 0.8.0 + */ def predict(point: Vector): Int = { KMeans.findClosest(clusterCentersWithNorm, new VectorWithNorm(point))._1 } - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + * @since 1.0.0 + */ def predict(points: RDD[Vector]): RDD[Int] = { val centersWithNorm = clusterCentersWithNorm val bcCentersWithNorm = points.context.broadcast(centersWithNorm) points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new VectorWithNorm(p))._1) } - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + * @since 1.0.0 + */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] /** * Return the K-means cost (sum of squared distances of points to their nearest center) for this * model on the given data. + * @since 0.8.0 */ def computeCost(data: RDD[Vector]): Double = { val centersWithNorm = clusterCentersWithNorm @@ -73,13 +90,20 @@ class KMeansModel ( private def clusterCentersWithNorm: Iterable[VectorWithNorm] = clusterCenters.map(new VectorWithNorm(_)) + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { KMeansModel.SaveLoadV1_0.save(sc, this, path) } + /** @since 1.4.0 */ override protected def formatVersion: String = "1.0" } +/** + * @since 1.4.0 + */ object KMeansModel extends Loader[KMeansModel] { override def load(sc: SparkContext, path: String): KMeansModel = { KMeansModel.SaveLoadV1_0.load(sc, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 0fc9b1ac4d716..02b7b881c4613 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -43,6 +43,7 @@ import org.apache.spark.util.Utils * * @see [[http://en.wikipedia.org/wiki/Latent_Dirichlet_allocation Latent Dirichlet allocation * (Wikipedia)]] + * @since 1.3.0 */ @Experimental class LDA private ( @@ -60,12 +61,15 @@ class LDA private ( /** * Number of topics to infer. I.e., the number of soft cluster centers. + * + * @since 1.3.0 */ def getK: Int = k /** * Number of topics to infer. I.e., the number of soft cluster centers. * (default = 10) + * @since 1.3.0 */ def setK(k: Int): this.type = { require(k > 0, s"LDA k (number of clusters) must be > 0, but was set to $k") @@ -78,6 +82,7 @@ class LDA private ( * distributions over topics ("theta"). * * This is the parameter to a Dirichlet distribution. + * @since 1.5.0 */ def getAsymmetricDocConcentration: Vector = this.docConcentration @@ -87,6 +92,7 @@ class LDA private ( * * This method assumes the Dirichlet distribution is symmetric and can be described by a single * [[Double]] parameter. It should fail if docConcentration is asymmetric. + * @since 1.3.0 */ def getDocConcentration: Double = { val parameter = docConcentration(0) @@ -121,6 +127,7 @@ class LDA private ( * - Values should be >= 0 * - default = uniformly (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. + * @since 1.5.0 */ def setDocConcentration(docConcentration: Vector): this.type = { require(docConcentration.size > 0, "docConcentration must have > 0 elements") @@ -128,22 +135,37 @@ class LDA private ( this } - /** Replicates a [[Double]] docConcentration to create a symmetric prior. */ + /** + * Replicates a [[Double]] docConcentration to create a symmetric prior. + * @since 1.3.0 + */ def setDocConcentration(docConcentration: Double): this.type = { this.docConcentration = Vectors.dense(docConcentration) this } - /** Alias for [[getAsymmetricDocConcentration]] */ + /** + * Alias for [[getAsymmetricDocConcentration]] + * @since 1.5.0 + */ def getAsymmetricAlpha: Vector = getAsymmetricDocConcentration - /** Alias for [[getDocConcentration]] */ + /** + * Alias for [[getDocConcentration]] + * @since 1.3.0 + */ def getAlpha: Double = getDocConcentration - /** Alias for [[setDocConcentration()]] */ + /** + * Alias for [[setDocConcentration()]] + * @since 1.5.0 + */ def setAlpha(alpha: Vector): this.type = setDocConcentration(alpha) - /** Alias for [[setDocConcentration()]] */ + /** + * Alias for [[setDocConcentration()]] + * @since 1.3.0 + */ def setAlpha(alpha: Double): this.type = setDocConcentration(alpha) /** @@ -154,6 +176,7 @@ class LDA private ( * * Note: The topics' distributions over terms are called "beta" in the original LDA paper * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009. + * @since 1.3.0 */ def getTopicConcentration: Double = this.topicConcentration @@ -178,36 +201,51 @@ class LDA private ( * - Value should be >= 0 * - default = (1.0 / k), following the implementation from * [[https://github.com/Blei-Lab/onlineldavb]]. + * @since 1.3.0 */ def setTopicConcentration(topicConcentration: Double): this.type = { this.topicConcentration = topicConcentration this } - /** Alias for [[getTopicConcentration]] */ + /** + * Alias for [[getTopicConcentration]] + * @since 1.3.0 + */ def getBeta: Double = getTopicConcentration - /** Alias for [[setTopicConcentration()]] */ + /** + * Alias for [[setTopicConcentration()]] + * @since 1.3.0 + */ def setBeta(beta: Double): this.type = setTopicConcentration(beta) /** * Maximum number of iterations for learning. + * @since 1.3.0 */ def getMaxIterations: Int = maxIterations /** * Maximum number of iterations for learning. * (default = 20) + * @since 1.3.0 */ def setMaxIterations(maxIterations: Int): this.type = { this.maxIterations = maxIterations this } - /** Random seed */ + /** + * Random seed + * @since 1.3.0 + */ def getSeed: Long = seed - /** Random seed */ + /** + * Random seed + * @since 1.3.0 + */ def setSeed(seed: Long): this.type = { this.seed = seed this @@ -215,6 +253,7 @@ class LDA private ( /** * Period (in iterations) between checkpoints. + * @since 1.3.0 */ def getCheckpointInterval: Int = checkpointInterval @@ -225,6 +264,7 @@ class LDA private ( * [[org.apache.spark.SparkContext]], this setting is ignored. * * @see [[org.apache.spark.SparkContext#setCheckpointDir]] + * @since 1.3.0 */ def setCheckpointInterval(checkpointInterval: Int): this.type = { this.checkpointInterval = checkpointInterval @@ -236,6 +276,7 @@ class LDA private ( * :: DeveloperApi :: * * LDAOptimizer used to perform the actual calculation + * @since 1.4.0 */ @DeveloperApi def getOptimizer: LDAOptimizer = ldaOptimizer @@ -244,6 +285,7 @@ class LDA private ( * :: DeveloperApi :: * * LDAOptimizer used to perform the actual calculation (default = EMLDAOptimizer) + * @since 1.4.0 */ @DeveloperApi def setOptimizer(optimizer: LDAOptimizer): this.type = { @@ -254,6 +296,7 @@ class LDA private ( /** * Set the LDAOptimizer used to perform the actual calculation by algorithm name. * Currently "em", "online" are supported. + * @since 1.4.0 */ def setOptimizer(optimizerName: String): this.type = { this.ldaOptimizer = @@ -274,6 +317,7 @@ class LDA private ( * (where the vocabulary size is the length of the vector). * Document IDs must be unique and >= 0. * @return Inferred LDA model + * @since 1.3.0 */ def run(documents: RDD[(Long, Vector)]): LDAModel = { val state = ldaOptimizer.initialize(documents, this) @@ -289,7 +333,10 @@ class LDA private ( state.getLDAModel(iterationTimes) } - /** Java-friendly version of [[run()]] */ + /** + * Java-friendly version of [[run()]] + * @since 1.3.0 + */ def run(documents: JavaPairRDD[java.lang.Long, Vector]): LDAModel = { run(documents.rdd.asInstanceOf[RDD[(Long, Vector)]]) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index a0008f9c99ad7..360241c8081ac 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -35,6 +35,7 @@ import org.apache.spark.rdd.RDD * * An LDAOptimizer specifies which optimization/learning/inference algorithm to use, and it can * hold optimizer-specific parameters for users to set. + * @since 1.4.0 */ @DeveloperApi sealed trait LDAOptimizer { @@ -73,7 +74,7 @@ sealed trait LDAOptimizer { * - Paper which clearly explains several algorithms, including EM: * Asuncion, Welling, Smyth, and Teh. * "On Smoothing and Inference for Topic Models." UAI, 2009. - * + * @since 1.4.0 */ @DeveloperApi final class EMLDAOptimizer extends LDAOptimizer { @@ -225,6 +226,7 @@ final class EMLDAOptimizer extends LDAOptimizer { * * Original Online LDA paper: * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. + * @since 1.4.0 */ @DeveloperApi final class OnlineLDAOptimizer extends LDAOptimizer { @@ -274,6 +276,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * A (positive) learning parameter that downweights early iterations. Larger values make early * iterations count less. + * @since 1.4.0 */ def getTau0: Double = this.tau0 @@ -281,6 +284,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * A (positive) learning parameter that downweights early iterations. Larger values make early * iterations count less. * Default: 1024, following the original Online LDA paper. + * @since 1.4.0 */ def setTau0(tau0: Double): this.type = { require(tau0 > 0, s"LDA tau0 must be positive, but was set to $tau0") @@ -290,6 +294,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * Learning rate: exponential decay rate + * @since 1.4.0 */ def getKappa: Double = this.kappa @@ -297,6 +302,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * Learning rate: exponential decay rate---should be between * (0.5, 1.0] to guarantee asymptotic convergence. * Default: 0.51, based on the original Online LDA paper. + * @since 1.4.0 */ def setKappa(kappa: Double): this.type = { require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa") @@ -306,6 +312,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * Mini-batch fraction, which sets the fraction of document sampled and used in each iteration + * @since 1.4.0 */ def getMiniBatchFraction: Double = this.miniBatchFraction @@ -318,6 +325,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * maxIterations * miniBatchFraction >= 1. * * Default: 0.05, i.e., 5% of total documents. + * @since 1.4.0 */ def setMiniBatchFraction(miniBatchFraction: Double): this.type = { require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0, @@ -329,6 +337,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { /** * Optimize alpha, indicates whether alpha (Dirichlet parameter for document-topic distribution) * will be optimized during training. + * @since 1.5.0 */ def getOptimzeAlpha: Boolean = this.optimizeAlpha @@ -336,6 +345,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer { * Sets whether to optimize alpha parameter during training. * * Default: false + * @since 1.5.0 */ def setOptimzeAlpha(optimizeAlpha: Boolean): this.type = { this.optimizeAlpha = optimizeAlpha diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 407e43a024a2e..9ad9643a695e8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -39,12 +39,16 @@ import org.apache.spark.{Logging, SparkContext, SparkException} * * @param k number of clusters * @param assignments an RDD of clustering [[PowerIterationClustering#Assignment]]s + * @since 1.3.0 */ @Experimental class PowerIterationClusteringModel( val k: Int, val assignments: RDD[PowerIterationClustering.Assignment]) extends Saveable with Serializable { + /** + * @since 1.4.0 + */ override def save(sc: SparkContext, path: String): Unit = { PowerIterationClusteringModel.SaveLoadV1_0.save(sc, this, path) } @@ -52,6 +56,9 @@ class PowerIterationClusteringModel( override protected def formatVersion: String = "1.0" } +/** + * @since 1.4.0 + */ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringModel] { override def load(sc: SparkContext, path: String): PowerIterationClusteringModel = { PowerIterationClusteringModel.SaveLoadV1_0.load(sc, path) @@ -65,6 +72,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode private[clustering] val thisClassName = "org.apache.spark.mllib.clustering.PowerIterationClusteringModel" + /** + * @since 1.4.0 + */ def save(sc: SparkContext, model: PowerIterationClusteringModel, path: String): Unit = { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ @@ -77,6 +87,9 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode dataRDD.write.parquet(Loader.dataPath(path)) } + /** + * @since 1.4.0 + */ def load(sc: SparkContext, path: String): PowerIterationClusteringModel = { implicit val formats = DefaultFormats val sqlContext = new SQLContext(sc) @@ -164,6 +177,7 @@ class PowerIterationClustering private[clustering] ( * assume s,,ij,, = 0.0. * * @return a [[PowerIterationClusteringModel]] that contains the clustering result + * @since 1.5.0 */ def run(graph: Graph[Double, Double]): PowerIterationClusteringModel = { val w = normalize(graph) @@ -221,6 +235,9 @@ class PowerIterationClustering private[clustering] ( } } +/** + * @since 1.3.0 + */ @Experimental object PowerIterationClustering extends Logging { @@ -229,6 +246,7 @@ object PowerIterationClustering extends Logging { * Cluster assignment. * @param id node id * @param cluster assigned cluster id + * @since 1.3.0 */ @Experimental case class Assignment(id: Long, cluster: Int) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index d9b34cec64894..329a4f2dfe09c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -63,6 +63,7 @@ import org.apache.spark.util.random.XORShiftRandom * such that at time t + h the discount applied to the data from t is 0.5. * The definition remains the same whether the time unit is given * as batches or points. + * @since 1.2.0 * */ @Experimental @@ -70,22 +71,38 @@ class StreamingKMeansModel( override val clusterCenters: Array[Vector], val clusterWeights: Array[Double]) extends KMeansModel(clusterCenters) with Logging { - /** Perform a k-means update on a batch of data. */ + /** + * Perform a k-means update on a batch of data. + * @since 1.2.0 + */ def update(data: RDD[Vector], decayFactor: Double, timeUnit: String): StreamingKMeansModel = { - // find nearest cluster to each point + /** + * find nearest cluster to each point + * @since 1.2.0 + */ val closest = data.map(point => (this.predict(point), (point, 1L))) - // get sums and counts for updating each cluster + /** + * get sums and counts for updating each cluster + * @since 1.2.0 + */ val mergeContribs: ((Vector, Long), (Vector, Long)) => (Vector, Long) = (p1, p2) => { BLAS.axpy(1.0, p2._1, p1._1) (p1._1, p1._2 + p2._2) } val dim = clusterCenters(0).size + + /** + * @since 1.2.0 + */ val pointStats: Array[(Int, (Vector, Long))] = closest .aggregateByKey((Vectors.zeros(dim), 0L))(mergeContribs, mergeContribs) .collect() + /** + * @since 1.2.0 + */ val discount = timeUnit match { case StreamingKMeans.BATCHES => decayFactor case StreamingKMeans.POINTS => @@ -95,10 +112,16 @@ class StreamingKMeansModel( math.pow(decayFactor, numNewPoints) } - // apply discount to weights + /** + * apply discount to weights + * @since 1.2.0 + */ BLAS.scal(discount, Vectors.dense(clusterWeights)) - // implement update rule + /** + * implement update rule + * @since 1.2.0 + */ pointStats.foreach { case (label, (sum, count)) => val centroid = clusterCenters(label) @@ -118,7 +141,10 @@ class StreamingKMeansModel( logInfo(s"Cluster $label updated with weight $updatedWeight and centroid: $display") } - // Check whether the smallest cluster is dying. If so, split the largest cluster. + /** + * Check whether the smallest cluster is dying. If so, split the largest cluster. + * @since 1.2.0 + */ val weightsWithIndex = clusterWeights.view.zipWithIndex val (maxWeight, largest) = weightsWithIndex.maxBy(_._1) val (minWeight, smallest) = weightsWithIndex.minBy(_._1) @@ -161,6 +187,7 @@ class StreamingKMeansModel( * .setRandomCenters(5, 100.0) * .trainOn(DStream) * }}} + * @since 1.2.0 */ @Experimental class StreamingKMeans( @@ -168,23 +195,33 @@ class StreamingKMeans( var decayFactor: Double, var timeUnit: String) extends Logging with Serializable { + /** @since 1.2.0 */ def this() = this(2, 1.0, StreamingKMeans.BATCHES) - + /** @since 1.2.0 */ protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null) - /** Set the number of clusters. */ + /** + * Set the number of clusters. + * @since 1.2.0 + */ def setK(k: Int): this.type = { this.k = k this } - /** Set the decay factor directly (for forgetful algorithms). */ + /** + * Set the decay factor directly (for forgetful algorithms). + * @since 1.2.0 + */ def setDecayFactor(a: Double): this.type = { this.decayFactor = a this } - /** Set the half life and time unit ("batches" or "points") for forgetful algorithms. */ + /** + * Set the half life and time unit ("batches" or "points") for forgetful algorithms. + * @since 1.2.0 + */ def setHalfLife(halfLife: Double, timeUnit: String): this.type = { if (timeUnit != StreamingKMeans.BATCHES && timeUnit != StreamingKMeans.POINTS) { throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) @@ -195,7 +232,10 @@ class StreamingKMeans( this } - /** Specify initial centers directly. */ + /** + * Specify initial centers directly. + * @since 1.2.0 + */ def setInitialCenters(centers: Array[Vector], weights: Array[Double]): this.type = { model = new StreamingKMeansModel(centers, weights) this @@ -207,6 +247,7 @@ class StreamingKMeans( * @param dim Number of dimensions * @param weight Weight for each center * @param seed Random seed + * @since 1.2.0 */ def setRandomCenters(dim: Int, weight: Double, seed: Long = Utils.random.nextLong): this.type = { val random = new XORShiftRandom(seed) @@ -216,7 +257,10 @@ class StreamingKMeans( this } - /** Return the latest model. */ + /** + * Return the latest model. + * @since 1.2.0 + */ def latestModel(): StreamingKMeansModel = { model } @@ -228,6 +272,7 @@ class StreamingKMeans( * and updates the model using each batch of data from the stream. * * @param data DStream containing vector data + * @since 1.2.0 */ def trainOn(data: DStream[Vector]) { assertInitialized() @@ -236,7 +281,10 @@ class StreamingKMeans( } } - /** Java-friendly version of `trainOn`. */ + /** + * Java-friendly version of `trainOn`. + * @since 1.4.0 + */ def trainOn(data: JavaDStream[Vector]): Unit = trainOn(data.dstream) /** @@ -244,13 +292,17 @@ class StreamingKMeans( * * @param data DStream containing vector data * @return DStream containing predictions + * @since 1.2.0 */ def predictOn(data: DStream[Vector]): DStream[Int] = { assertInitialized() data.map(model.predict) } - /** Java-friendly version of `predictOn`. */ + /** + * Java-friendly version of `predictOn`. + * @since 1.4.0 + */ def predictOn(data: JavaDStream[Vector]): JavaDStream[java.lang.Integer] = { JavaDStream.fromDStream(predictOn(data.dstream).asInstanceOf[DStream[java.lang.Integer]]) } @@ -261,13 +313,17 @@ class StreamingKMeans( * @param data DStream containing (key, feature vector) pairs * @tparam K key type * @return DStream containing the input keys and the predictions as values + * @since 1.2.0 */ def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = { assertInitialized() data.mapValues(model.predict) } - /** Java-friendly version of `predictOnValues`. */ + /** + * Java-friendly version of `predictOnValues`. + * @since 1.4.0 + */ def predictOnValues[K]( data: JavaPairDStream[K, Vector]): JavaPairDStream[K, java.lang.Integer] = { implicit val tag = fakeClassTag[K]