Skip to content

Commit d8feb82

Browse files
Merge remote-tracking branch 'upstream/master' into SPARK-3278
2 parents 3da56e5 + f377431 commit d8feb82

File tree

7 files changed

+418
-5
lines changed

7 files changed

+418
-5
lines changed
243 KB
Loading

docs/mllib-clustering.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,26 @@ a given dataset, the algorithm returns the best clustering result).
3434
* *initializationSteps* determines the number of steps in the k-means\|\| algorithm.
3535
* *epsilon* determines the distance threshold within which we consider k-means to have converged.
3636

37+
### Power Iteration Clustering
38+
39+
Power iteration clustering is a scalable and efficient algorithm for clustering points given pointwise mutual affinity values. Internally the algorithm:
40+
41+
* accepts a [Graph](https://spark.apache.org/docs/0.9.2/api/graphx/index.html#org.apache.spark.graphx.Graph) that represents a normalized pairwise affinity between all input points.
42+
* calculates the principal eigenvalue and eigenvector
43+
* Clusters each of the input points according to their principal eigenvector component value
44+
45+
Details of this algorithm are found within [Power Iteration Clustering, Lin and Cohen]{www.icml2010.org/papers/387.pdf}
46+
47+
Example outputs for a dataset inspired by the paper - but with five clusters instead of three- have he following output from our implementation:
48+
49+
<p style="text-align: center;">
50+
<img src="img/PIClusteringFiveCirclesInputsAndOutputs.png"
51+
title="The Property Graph"
52+
alt="The Property Graph"
53+
width="50%" />
54+
<!-- Images are downsized intentionally to improve quality on retina displays -->
55+
</p>
56+
3757
### Examples
3858

3959
<div class="codetabs">

mllib/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@
5050
<artifactId>spark-sql_${scala.binary.version}</artifactId>
5151
<version>${project.version}</version>
5252
</dependency>
53+
<dependency>
54+
<groupId>org.apache.spark</groupId>
55+
<artifactId>spark-graphx_${scala.binary.version}</artifactId>
56+
<version>${project.version}</version>
57+
</dependency>
5358
<dependency>
5459
<groupId>org.jblas</groupId>
5560
<artifactId>jblas</artifactId>
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.clustering
19+
20+
import org.apache.spark.{Logging, SparkException}
21+
import org.apache.spark.graphx._
22+
import org.apache.spark.graphx.impl.GraphImpl
23+
import org.apache.spark.mllib.linalg.Vectors
24+
import org.apache.spark.mllib.util.MLUtils
25+
import org.apache.spark.rdd.RDD
26+
import org.apache.spark.util.random.XORShiftRandom
27+
28+
/**
29+
* Model produced by [[PowerIterationClustering]].
30+
*
31+
* @param k number of clusters
32+
* @param assignments an RDD of (vertexID, clusterID) pairs
33+
*/
34+
class PowerIterationClusteringModel(
35+
val k: Int,
36+
val assignments: RDD[(Long, Int)]) extends Serializable
37+
38+
/**
39+
* Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by Lin and
40+
* Cohen (see http://www.icml2010.org/papers/387.pdf). From the abstract: PIC finds a very
41+
* low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise
42+
* similarity matrix of the data.
43+
*
44+
* @param k Number of clusters.
45+
* @param maxIterations Maximum number of iterations of the PIC algorithm.
46+
*/
47+
class PowerIterationClustering private[clustering] (
48+
private var k: Int,
49+
private var maxIterations: Int) extends Serializable {
50+
51+
import org.apache.spark.mllib.clustering.PowerIterationClustering._
52+
53+
/** Constructs a PIC instance with default parameters: {k: 2, maxIterations: 100}. */
54+
def this() = this(k = 2, maxIterations = 100)
55+
56+
/**
57+
* Set the number of clusters.
58+
*/
59+
def setK(k: Int): this.type = {
60+
this.k = k
61+
this
62+
}
63+
64+
/**
65+
* Set maximum number of iterations of the power iteration loop
66+
*/
67+
def setMaxIterations(maxIterations: Int): this.type = {
68+
this.maxIterations = maxIterations
69+
this
70+
}
71+
72+
/**
73+
* Run the PIC algorithm.
74+
*
75+
* @param similarities an RDD of (i, j, s_ij_) tuples representing the affinity matrix, which is
76+
* the matrix A in the PIC paper. The similarity s_ij_ must be nonnegative.
77+
* This is a symmetric matrix and hence s_ij_ = s_ji_. For any (i, j) with
78+
* nonzero similarity, there should be either (i, j, s_ij_) or (j, i, s_ji_)
79+
* in the input. Tuples with i = j are ignored, because we assume s_ij_ = 0.0.
80+
*
81+
* @return a [[PowerIterationClusteringModel]] that contains the clustering result
82+
*/
83+
def run(similarities: RDD[(Long, Long, Double)]): PowerIterationClusteringModel = {
84+
val w = normalize(similarities)
85+
val w0 = randomInit(w)
86+
pic(w0)
87+
}
88+
89+
/**
90+
* Runs the PIC algorithm.
91+
*
92+
* @param w The normalized affinity matrix, which is the matrix W in the PIC paper with
93+
* w_ij_ = a_ij_ / d_ii_ as its edge properties and the initial vector of the power
94+
* iteration as its vertex properties.
95+
*/
96+
private def pic(w: Graph[Double, Double]): PowerIterationClusteringModel = {
97+
val v = powerIter(w, maxIterations)
98+
val assignments = kMeans(v, k)
99+
new PowerIterationClusteringModel(k, assignments)
100+
}
101+
}
102+
103+
private[clustering] object PowerIterationClustering extends Logging {
104+
/**
105+
* Normalizes the affinity matrix (A) by row sums and returns the normalized affinity matrix (W).
106+
*/
107+
def normalize(similarities: RDD[(Long, Long, Double)]): Graph[Double, Double] = {
108+
val edges = similarities.flatMap { case (i, j, s) =>
109+
if (s < 0.0) {
110+
throw new SparkException("Similarity must be nonnegative but found s($i, $j) = $s.")
111+
}
112+
if (i != j) {
113+
Seq(Edge(i, j, s), Edge(j, i, s))
114+
} else {
115+
None
116+
}
117+
}
118+
val gA = Graph.fromEdges(edges, 0.0)
119+
val vD = gA.aggregateMessages[Double](
120+
sendMsg = ctx => {
121+
ctx.sendToSrc(ctx.attr)
122+
},
123+
mergeMsg = _ + _,
124+
TripletFields.EdgeOnly)
125+
GraphImpl.fromExistingRDDs(vD, gA.edges)
126+
.mapTriplets(
127+
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
128+
TripletFields.Src)
129+
}
130+
131+
/**
132+
* Generates random vertex properties (v0) to start power iteration.
133+
*
134+
* @param g a graph representing the normalized affinity matrix (W)
135+
* @return a graph with edges representing W and vertices representing a random vector
136+
* with unit 1-norm
137+
*/
138+
def randomInit(g: Graph[Double, Double]): Graph[Double, Double] = {
139+
val r = g.vertices.mapPartitionsWithIndex(
140+
(part, iter) => {
141+
val random = new XORShiftRandom(part)
142+
iter.map { case (id, _) =>
143+
(id, random.nextGaussian())
144+
}
145+
}, preservesPartitioning = true).cache()
146+
val sum = r.values.map(math.abs).sum()
147+
val v0 = r.mapValues(x => x / sum)
148+
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
149+
}
150+
151+
/**
152+
* Runs power iteration.
153+
* @param g input graph with edges representing the normalized affinity matrix (W) and vertices
154+
* representing the initial vector of the power iterations.
155+
* @param maxIterations maximum number of iterations
156+
* @return a [[VertexRDD]] representing the pseudo-eigenvector
157+
*/
158+
def powerIter(
159+
g: Graph[Double, Double],
160+
maxIterations: Int): VertexRDD[Double] = {
161+
// the default tolerance used in the PIC paper, with a lower bound 1e-8
162+
val tol = math.max(1e-5 / g.vertices.count(), 1e-8)
163+
var prevDelta = Double.MaxValue
164+
var diffDelta = Double.MaxValue
165+
var curG = g
166+
for (iter <- 0 until maxIterations if math.abs(diffDelta) > tol) {
167+
val msgPrefix = s"Iteration $iter"
168+
// multiply W by vt
169+
val v = curG.aggregateMessages[Double](
170+
sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
171+
mergeMsg = _ + _,
172+
TripletFields.Dst).cache()
173+
// normalize v
174+
val norm = v.values.map(math.abs).sum()
175+
logInfo(s"$msgPrefix: norm(v) = $norm.")
176+
val v1 = v.mapValues(x => x / norm)
177+
// compare difference
178+
val delta = curG.joinVertices(v1) { case (_, x, y) =>
179+
math.abs(x - y)
180+
}.vertices.values.sum()
181+
logInfo(s"$msgPrefix: delta = $delta.")
182+
diffDelta = math.abs(delta - prevDelta)
183+
logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
184+
// update v
185+
curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
186+
prevDelta = delta
187+
}
188+
curG.vertices
189+
}
190+
191+
/**
192+
* Runs k-means clustering.
193+
* @param v a [[VertexRDD]] representing the pseudo-eigenvector
194+
* @param k number of clusters
195+
* @return a [[VertexRDD]] representing the clustering assignments
196+
*/
197+
def kMeans(v: VertexRDD[Double], k: Int): VertexRDD[Int] = {
198+
val points = v.mapValues(x => Vectors.dense(x)).cache()
199+
val model = new KMeans()
200+
.setK(k)
201+
.setRuns(5)
202+
.setSeed(0L)
203+
.run(points.values)
204+
points.mapValues(p => model.predict(p)).cache()
205+
}
206+
}

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import scala.collection.mutable.ArrayBuffer
2121

2222
import breeze.linalg.{DenseMatrix => BDM}
2323

24-
import org.apache.spark.{Logging, Partitioner}
25-
import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix}
24+
import org.apache.spark.{SparkException, Logging, Partitioner}
25+
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
2626
import org.apache.spark.rdd.RDD
2727
import org.apache.spark.storage.StorageLevel
2828

@@ -158,11 +158,13 @@ class BlockMatrix(
158158
private[mllib] var partitioner: GridPartitioner =
159159
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)
160160

161+
private lazy val blockInfo = blocks.mapValues(block => (block.numRows, block.numCols)).cache()
162+
161163
/** Estimates the dimensions of the matrix. */
162164
private def estimateDim(): Unit = {
163-
val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
164-
(blockRowIndex.toLong * rowsPerBlock + mat.numRows,
165-
blockColIndex.toLong * colsPerBlock + mat.numCols)
165+
val (rows, cols) = blockInfo.map { case ((blockRowIndex, blockColIndex), (m, n)) =>
166+
(blockRowIndex.toLong * rowsPerBlock + m,
167+
blockColIndex.toLong * colsPerBlock + n)
166168
}.reduce { (x0, x1) =>
167169
(math.max(x0._1, x1._1), math.max(x0._2, x1._2))
168170
}
@@ -172,6 +174,41 @@ class BlockMatrix(
172174
assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
173175
}
174176

177+
def validate(): Unit = {
178+
logDebug("Validating BlockMatrix...")
179+
// check if the matrix is larger than the claimed dimensions
180+
estimateDim()
181+
logDebug("BlockMatrix dimensions are okay...")
182+
183+
// Check if there are multiple MatrixBlocks with the same index.
184+
blockInfo.countByKey().foreach { case (key, cnt) =>
185+
if (cnt > 1) {
186+
throw new SparkException(s"Found multiple MatrixBlocks with the indices $key. Please " +
187+
"remove blocks with duplicate indices.")
188+
}
189+
}
190+
logDebug("MatrixBlock indices are okay...")
191+
// Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock
192+
// The first tuple is the index and the second tuple is the dimensions of the MatrixBlock
193+
val dimensionMsg = s"dimensions different than rowsPerBlock: $rowsPerBlock, and " +
194+
s"colsPerBlock: $colsPerBlock. Blocks on the right and bottom edges can have smaller " +
195+
s"dimensions. You may use the repartition method to fix this issue."
196+
blockInfo.foreach { case ((blockRowIndex, blockColIndex), (m, n)) =>
197+
if ((blockRowIndex < numRowBlocks - 1 && m != rowsPerBlock) ||
198+
(blockRowIndex == numRowBlocks - 1 && (m <= 0 || m > rowsPerBlock))) {
199+
throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " +
200+
dimensionMsg)
201+
}
202+
if ((blockColIndex < numColBlocks - 1 && n != colsPerBlock) ||
203+
(blockColIndex == numColBlocks - 1 && (n <= 0 || n > colsPerBlock))) {
204+
throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " +
205+
dimensionMsg)
206+
}
207+
}
208+
logDebug("MatrixBlock dimensions are okay...")
209+
logDebug("BlockMatrix is valid!")
210+
}
211+
175212
/** Caches the underlying RDD. */
176213
def cache(): this.type = {
177214
blocks.cache()

0 commit comments

Comments
 (0)