Skip to content

Commit e1db950

Browse files
author
Li Pu
committed
SPARK-1782: svd for sparse matrix using ARPACK
copy ARPACK dsaupd/dseupd code from latest breeze change RowMatrix to use sparse SVD change tests for sparse SVD
1 parent 60b89fe commit e1db950

File tree

3 files changed

+153
-19
lines changed

3 files changed

+153
-19
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.linalg
19+
20+
import org.apache.spark.annotation.Experimental
21+
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
22+
import org.netlib.util.{intW, doubleW}
23+
import com.github.fommil.netlib.ARPACK
24+
25+
/**
26+
* :: Experimental ::
27+
* Represents eigenvalue decomposition factors.
28+
*/
29+
@Experimental
30+
case class EigenValueDecomposition[VType](s: Vector, V: VType)
31+
32+
object EigenValueDecomposition {
33+
/**
34+
* Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK.
35+
* The caller needs to ensure that the input matrix is real symmetric. This function requires
36+
* memory for `n*(4*k+4)` doubles.
37+
*
38+
* @param mul a function that multiplies the symmetric matrix with a Vector.
39+
* @param n dimension of the square matrix (maximum Int.MaxValue).
40+
* @param k number of leading eigenvalues required.
41+
* @param tol tolerance of the eigs computation.
42+
* @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors
43+
* (columns of the matrix). The number of computed eigenvalues might be smaller than k.
44+
*/
45+
private[mllib] def symmetricEigs(mul: Vector => Vector, n: Int, k: Int, tol: Double)
46+
: (BDV[Double], BDM[Double]) = {
47+
require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n")
48+
49+
val arpack = ARPACK.getInstance()
50+
51+
val tolW = new doubleW(tol)
52+
val nev = new intW(k)
53+
val ncv = scala.math.min(2*k,n)
54+
55+
val bmat = "I"
56+
val which = "LM"
57+
58+
var iparam = new Array[Int](11)
59+
iparam(0) = 1
60+
iparam(2) = 300
61+
iparam(6) = 1
62+
63+
var ido = new intW(0)
64+
var info = new intW(0)
65+
var resid:Array[Double] = new Array[Double](n)
66+
var v = new Array[Double](n*ncv)
67+
var workd = new Array[Double](3*n)
68+
var workl = new Array[Double](ncv*(ncv+8))
69+
var ipntr = new Array[Int](11)
70+
71+
// first call to ARPACK
72+
arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd,
73+
workl, workl.length, info)
74+
75+
val w = BDV(workd)
76+
77+
while(ido.`val` !=99) {
78+
if (ido.`val` != -1 && ido.`val` != 1)
79+
throw new IllegalStateException("ARPACK returns ido = " + ido.`val`)
80+
// multiply working vector with the matrix
81+
val inputOffset = ipntr(0) - 1
82+
val outputOffset = ipntr(1) - 1
83+
val x = w(inputOffset until inputOffset + n)
84+
val y = w(outputOffset until outputOffset + n)
85+
y := BDV(mul(Vectors.fromBreeze(x)).toArray)
86+
// call ARPACK
87+
arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr,
88+
workd, workl, workl.length, info)
89+
}
90+
91+
if (info.`val` != 0)
92+
throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val`)
93+
94+
val d = new Array[Double](nev.`val`)
95+
val select = new Array[Boolean](ncv)
96+
val z = java.util.Arrays.copyOfRange(v, 0, nev.`val` * n)
97+
98+
arpack.dseupd(true, "A", select, d, z, n, 0.0, bmat, n, which, nev, tol, resid, ncv, v, n,
99+
iparam, ipntr, workd, workl, workl.length, info)
100+
101+
val computed = iparam(4)
102+
103+
val s = BDV(d)(0 until computed)
104+
val U = new BDM(n, computed, z)
105+
106+
val sortedEigenValuesWithIndex = s.toArray.zipWithIndex.sortBy(-1 * _._1).zipWithIndex
107+
108+
val sorteds = BDV(sortedEigenValuesWithIndex.map(_._1._1))
109+
val sortedU = BDM.zeros[Double](n, computed)
110+
111+
// copy eigenvectors in descending order of eigenvalues
112+
sortedEigenValuesWithIndex.map{
113+
r => {
114+
sortedU(::, r._2) := U(::, r._1._2)
115+
}
116+
}
117+
118+
(sorteds, sortedU)
119+
}
120+
}

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,19 @@ class RowMatrix(
200200
nRows
201201
}
202202

203+
/**
204+
* Multiply the Gramian matrix `A^T A` by a Vector on the right.
205+
*
206+
* @param v a local vector whose length must match the number of columns of this matrix
207+
* @return a local DenseVector representing the product
208+
*/
209+
private[mllib] def multiplyGramianMatrix(v: Vector): Vector = {
210+
val bv = rows.map{
211+
row => row.toBreeze * row.toBreeze.dot(v.toBreeze)
212+
}.reduce( (x: BV[Double], y: BV[Double]) => x + y )
213+
Vectors.fromBreeze(bv)
214+
}
215+
203216
/**
204217
* Computes the Gramian matrix `A^T A`.
205218
*/
@@ -221,15 +234,17 @@ class RowMatrix(
221234

222235
/**
223236
* Computes the singular value decomposition of this matrix.
224-
* Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
237+
* Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V',
238+
* where S contains the leading singular values, U and V contain the corresponding singular
239+
* vectors.
225240
*
226-
* There is no restriction on m, but we require `n^2` doubles to fit in memory.
227-
* Further, n should be less than m.
228-
229-
* The decomposition is computed by first computing A'A = V S^2 V',
230-
* computing svd locally on that (since n x n is small), from which we recover S and V.
231-
* Then we compute U via easy matrix multiplication as U = A * (V * S^-1).
232-
* Note that this approach requires `O(n^3)` time on the master node.
241+
* There is no restriction on m, but we require `n*(6*k+4)` doubles to fit in memory on the master
242+
* node. Further, n should be less than m.
243+
*
244+
* The decomposition is computed by providing a function that multiples a vector with A'A to
245+
* ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V.
246+
* Then we compute U via easy matrix multiplication as U = A * (V * S-1).
247+
* Note that this approach requires `O(nnz(A))` time.
233248
*
234249
* At most k largest non-zero singular values and associated vectors are returned.
235250
* If there are k such values, then the dimensions of the return will be:
@@ -243,20 +258,19 @@ class RowMatrix(
243258
* @param computeU whether to compute U
244259
* @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
245260
* are treated as zero, where sigma(0) is the largest singular value.
261+
* @param tol the tolerance of the svd computation.
246262
* @return SingularValueDecomposition(U, s, V)
247263
*/
248264
def computeSVD(
249265
k: Int,
250266
computeU: Boolean = false,
251-
rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
267+
rCond: Double = 1e-9,
268+
tol: Double = 1e-6): SingularValueDecomposition[RowMatrix, Matrix] = {
252269
val n = numCols().toInt
253270
require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")
254271

255-
val G = computeGramianMatrix()
256-
257-
// TODO: Use sparse SVD instead.
258-
val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) =
259-
brzSvd(G.toBreeze.asInstanceOf[BDM[Double]])
272+
val (sigmaSquares: BDV[Double], u: BDM[Double]) =
273+
EigenValueDecomposition.symmetricEigs(multiplyGramianMatrix, n, k, tol)
260274
val sigmas: BDV[Double] = brzSqrt(sigmaSquares)
261275

262276
// Determine effective rank.

mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
9999
val localMat = mat.toBreeze()
100100
val (localU, localSigma, localVt) = brzSvd(localMat)
101101
val localV: BDM[Double] = localVt.t.toDenseMatrix
102-
for (k <- 1 to n) {
102+
for (k <- 1 to (n - 1)) {
103103
val svd = mat.computeSVD(k, computeU = true)
104104
val U = svd.U
105105
val s = svd.s
@@ -113,19 +113,19 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
113113
assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
114114
assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
115115
}
116-
val svdWithoutU = mat.computeSVD(n)
116+
val svdWithoutU = mat.computeSVD(n - 1)
117117
assert(svdWithoutU.U === null)
118118
}
119119
}
120120

121121
test("svd of a low-rank matrix") {
122-
val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2)
123-
val mat = new RowMatrix(rows, 4, 2)
122+
val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2)
123+
val mat = new RowMatrix(rows, 4, 3)
124124
val svd = mat.computeSVD(2, computeU = true)
125125
assert(svd.s.size === 1, "should not return zero singular values")
126126
assert(svd.U.numRows() === 4)
127127
assert(svd.U.numCols() === 1)
128-
assert(svd.V.numRows === 2)
128+
assert(svd.V.numRows === 3)
129129
assert(svd.V.numCols === 1)
130130
}
131131

0 commit comments

Comments
 (0)