Skip to content

Commit 7ebd149

Browse files
committed
Incorporate Xiangrui's first set of PR comments except restructure PIC.run to take Graph but do not remove Gaussian
1 parent 121e4d5 commit 7ebd149

File tree

3 files changed

+65
-138
lines changed

3 files changed

+65
-138
lines changed

mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala renamed to mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala

Lines changed: 50 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import scala.language.existentials
3535
* representation. The resulting pseudo-eigenvector provides effective clustering - as
3636
* performed by Parallel KMeans.
3737
*/
38-
object PIClustering {
38+
object PowerIterationClustering {
3939

4040
private val logger = Logger.getLogger(getClass.getName())
4141

@@ -44,32 +44,32 @@ object PIClustering {
4444
type DGraph = Graph[Double, Double]
4545
type IndexedVector[Double] = (Long, BDV[Double])
4646

47+
4748
// Terminate iteration when norm changes by less than this value
48-
private[mllib] val DefaultMinNormChange: Double = 1e-11
49+
private[mllib] val defaultMinNormChange: Double = 1e-11
4950

50-
// Default σ for Gaussian Distance calculations
51-
private[mllib] val DefaultSigma = 1.0
51+
// Default sigma for Gaussian Distance calculations
52+
private[mllib] val defaultSigma = 1.0
5253

5354
// Default number of iterations for PIC loop
54-
private[mllib] val DefaultIterations: Int = 20
55+
private[mllib] val defaultIterations: Int = 20
5556

5657
// Default minimum affinity between points - lower than this it is considered
5758
// zero and no edge will be created
58-
private[mllib] val DefaultMinAffinity = 1e-11
59+
private[mllib] val defaultMinAffinity = 1e-11
5960

6061
// Do not allow divide by zero: change to this value instead
61-
val DefaultDivideByZeroVal: Double = 1e-15
62+
val defaultDivideByZeroVal: Double = 1e-15
6263

6364
// Default number of runs by the KMeans.run() method
64-
val DefaultKMeansRuns = 10
65+
val defaultKMeansRuns = 10
6566

6667
/**
6768
*
6869
* Run a Power Iteration Clustering
6970
*
7071
* @param sc Spark Context
71-
* @param points Input Points in format of [(VertexId,(x,y)]
72-
* where VertexId is a Long
72+
* @param G Affinity Matrix in a Sparse Graph structure
7373
* @param nClusters Number of clusters to create
7474
* @param nIterations Number of iterations of the PIC algorithm
7575
* that calculates primary PseudoEigenvector and Eigenvalue
@@ -83,30 +83,13 @@ object PIClustering {
8383
* Seq[(VertexId, ClusterID Membership)]
8484
*/
8585
def run(sc: SparkContext,
86-
points: Points,
86+
G: Graph[Double, Double],
8787
nClusters: Int,
88-
nIterations: Int = DefaultIterations,
89-
sigma: Double = DefaultSigma,
90-
minAffinity: Double = DefaultMinAffinity,
91-
nRuns: Int = DefaultKMeansRuns)
88+
nIterations: Int = defaultIterations,
89+
sigma: Double = defaultSigma,
90+
minAffinity: Double = defaultMinAffinity,
91+
nRuns: Int = defaultKMeansRuns)
9292
: (Seq[(Int, Vector)], Seq[((VertexId, Vector), Int)]) = {
93-
val vidsRdd = sc.parallelize(points.map(_._1).sorted)
94-
val nVertices = points.length
95-
96-
val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
97-
val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
98-
if (logger.isDebugEnabled) {
99-
logger.debug(s"Vt(0)=${
100-
printVector(new BDV(initialVt.map {
101-
_._2
102-
}.toArray))
103-
}")
104-
}
105-
val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
106-
val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
107-
if (logger.isDebugEnabled) {
108-
logger.debug(printMatrixFromEdges(G.edges))
109-
}
11093
val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations)
11194
// TODO: avoid local collect and then sc.parallelize.
11295
val localVt = vt.collect.sortBy(_._1)
@@ -140,36 +123,43 @@ object PIClustering {
140123
}
141124

142125
/**
143-
* Read Points from an input file in the following format:
144-
* Vertex1Id Coord11 Coord12 CoordX13 .. Coord1D
145-
* Vertex2Id Coord21 Coord22 CoordX23 .. Coord2D
146-
* ..
147-
* VertexNId CoordN1 CoordN2 CoordN23 .. CoordND
148-
*
149-
* Where N is the number of observations, each a D-dimension point
150126
*
151-
* E.g.
127+
* Create an affinity matrix
152128
*
153-
* 19 1.8035177495 0.7460582552 0.2361611395 -0.8645567427 -0.8613062
154-
* 10 0.5534111111 1.0456386879 1.7045663273 0.7281759816 1.0807487792
155-
* 911 1.200749626 1.8962364439 2.5117192131 -0.4034737281 -0.9069696484
156-
*
157-
* Which represents three 5-dimensional input Points with VertexIds 19,10, and 911
158-
* @param verticesFile Local filesystem path to the Points input file
159-
* @return Set of Vertices in format appropriate for consumption by the PIC algorithm
129+
* @param sc Spark Context
130+
* @param points Input Points in format of [(VertexId,(x,y)]
131+
* where VertexId is a Long
132+
* @param sigma Sigma for Gaussian distribution calculation according to
133+
* [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2
134+
* @param minAffinity Minimum Affinity between two Points in the input dataset: below
135+
* this threshold the affinity will be considered "close to" zero and
136+
* no Edge will be created between those Points in the sparse matrix
137+
* @return Tuple of (Seq[(Cluster Id,Cluster Center)],
138+
* Seq[(VertexId, ClusterID Membership)]
160139
*/
161-
def readVerticesfromFile(verticesFile: String): Points = {
162-
163-
import scala.io.Source
164-
val vertices = Source.fromFile(verticesFile).getLines.map { l =>
165-
val toks = l.split("\t")
166-
val arr = new BDV(toks.slice(1, toks.length).map(_.toDouble))
167-
(toks(0).toLong, arr)
168-
}.toSeq
140+
def createGaussianAffinityMatrix(sc: SparkContext,
141+
points: Points,
142+
sigma: Double = defaultSigma,
143+
minAffinity: Double = defaultMinAffinity)
144+
: Graph[Double, Double] = {
145+
val vidsRdd = sc.parallelize(points.map(_._1).sorted)
146+
val nVertices = points.length
147+
148+
val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
149+
val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
150+
if (logger.isDebugEnabled) {
151+
logger.debug(s"Vt(0)=${
152+
printVector(new BDV(initialVt.map {
153+
_._2
154+
}.toArray))
155+
}")
156+
}
157+
val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
158+
val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
169159
if (logger.isDebugEnabled) {
170-
logger.debug(s"Read in ${vertices.length} from $verticesFile")
160+
logger.debug(printMatrixFromEdges(G.edges))
171161
}
172-
vertices
162+
G
173163
}
174164

175165
/**
@@ -205,7 +195,7 @@ object PIClustering {
205195
*/
206196
def getPrincipalEigen(sc: SparkContext,
207197
G: DGraph,
208-
nIterations: Int = DefaultIterations,
198+
nIterations: Int = defaultIterations,
209199
optMinNormChange: Option[Double] = None
210200
): (DGraph, Double, VertexRDD[Double]) = {
211201

@@ -312,7 +302,7 @@ object PIClustering {
312302
* @return
313303
*/
314304
private[mllib] def createSparseEdgesRdd(sc: SparkContext, wRdd: RDD[IndexedVector[Double]],
315-
minAffinity: Double = DefaultMinAffinity) = {
305+
minAffinity: Double = defaultMinAffinity) = {
316306
val labels = wRdd.map { case (vid, vect) => vid}.collect
317307
val edgesRdd = wRdd.flatMap { case (vid, vect) =>
318308
for ((dval, ix) <- vect.toArray.zipWithIndex
@@ -387,7 +377,7 @@ object PIClustering {
387377

388378
}
389379

390-
private[mllib] def makeNonZero(dval: Double, tol: Double = DefaultDivideByZeroVal) = {
380+
private[mllib] def makeNonZero(dval: Double, tol: Double = defaultDivideByZeroVal) = {
391381
if (Math.abs(dval) < tol) {
392382
Math.signum(dval) * tol
393383
} else {

mllib/src/test/resources/log4j.mllib.properties

Lines changed: 0 additions & 41 deletions
This file was deleted.

mllib/src/test/scala/org/apache/spark/mllib/clustering/PIClusteringSuite.scala renamed to mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala

Lines changed: 15 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,24 @@ package org.apache.spark.mllib.clustering
1919

2020
import breeze.linalg.{DenseVector => BDV}
2121
import org.apache.log4j.Logger
22-
import org.apache.spark.graphx._
23-
import org.apache.spark.{SparkConf, SparkContext}
22+
import org.apache.spark.mllib.util.MLlibTestSparkContext
2423
import org.scalatest.FunSuite
2524

2625
import scala.util.Random
2726

28-
class PIClusteringSuite extends FunSuite with LocalSparkContext {
27+
class PowerIterationClusteringSuite extends FunSuite with MLlibTestSparkContext {
2928

3029
val logger = Logger.getLogger(getClass.getName)
3130

32-
import org.apache.spark.mllib.clustering.PIClusteringSuite._
31+
import org.apache.spark.mllib.clustering.PowerIterationClusteringSuite._
3332

34-
val PIC = PIClustering
33+
val PIC = PowerIterationClustering
3534
val A = Array
3635

3736
test("concentricCirclesTest") {
3837
concentricCirclesTest()
3938
}
4039

41-
4240
def concentricCirclesTest() = {
4341
val sigma = 1.0
4442
val nIterations = 10
@@ -55,23 +53,22 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
5553

5654
val nClusters = circleSpecs.size
5755
val cdata = createConcentricCirclesData(circleSpecs)
58-
withSpark { sc =>
59-
val vertices = new Random().shuffle(cdata.map { p =>
60-
(p.label, new BDV(Array(p.x, p.y)))
61-
})
56+
val vertices = new Random().shuffle(cdata.map { p =>
57+
(p.label, new BDV(Array(p.x, p.y)))
58+
})
6259

63-
val nVertices = vertices.length
64-
val (ccenters, estCollected) = PIC.run(sc, vertices, nClusters, nIterations)
65-
logger.info(s"Cluster centers: ${ccenters.mkString(",")} " +
66-
s"\nEstimates: ${estCollected.mkString("[", ",", "]")}")
67-
assert(ccenters.size == circleSpecs.length, "Did not get correct number of centers")
60+
val nVertices = vertices.length
61+
val G = PIC.createGaussianAffinityMatrix(sc, vertices)
62+
val (ccenters, estCollected) = PIC.run(sc, G, nClusters, nIterations)
63+
logger.info(s"Cluster centers: ${ccenters.mkString(",")} " +
64+
s"\nEstimates: ${estCollected.mkString("[", ",", "]")}")
65+
assert(ccenters.size == circleSpecs.length, "Did not get correct number of centers")
6866

69-
}
7067
}
7168

7269
}
7370

74-
object PIClusteringSuite {
71+
object PowerIterationClusteringSuite {
7572
val logger = Logger.getLogger(getClass.getName)
7673
val A = Array
7774

@@ -115,26 +112,7 @@ object PIClusteringSuite {
115112
}
116113

117114
def main(args: Array[String]) {
118-
val pictest = new PIClusteringSuite
115+
val pictest = new PowerIterationClusteringSuite
119116
pictest.concentricCirclesTest()
120117
}
121118
}
122-
123-
/**
124-
* Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped
125-
* after each test.
126-
* TODO: import this from the graphx test cases package i.e. may need update to pom.xml
127-
*/
128-
trait LocalSparkContext {
129-
/** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
130-
def withSpark[T](f: SparkContext => T) = {
131-
val conf = new SparkConf()
132-
GraphXUtils.registerKryoClasses(conf)
133-
val sc = new SparkContext("local", "test", conf)
134-
try {
135-
f(sc)
136-
} finally {
137-
sc.stop()
138-
}
139-
}
140-
}

0 commit comments

Comments
 (0)