Skip to content

Commit 819824b

Browse files
author
Li Pu
committed
add flag for dense svd or sparse svd
1 parent eb15100 commit 819824b

File tree

3 files changed

+67
-47
lines changed

3 files changed

+67
-47
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,14 @@ object EigenValueDecomposition {
3939
*
4040
* @param mul a function that multiplies the symmetric matrix with a DenseVector.
4141
* @param n dimension of the square matrix (maximum Int.MaxValue).
42-
* @param k number of leading eigenvalues required.
42+
* @param k number of leading eigenvalues required, 0 < k < n.
4343
* @param tol tolerance of the eigs computation.
4444
* @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors
45-
* (columns of the matrix). The number of computed eigenvalues might be smaller than k.
45+
* (columns of the matrix).
46+
* @note The number of computed eigenvalues might be smaller than k when some Ritz values do not
47+
* satisfy the convergence criterion specified by tol (see ARPACK Users Guide, Chapter 4.6
48+
* for more details). The maximum number of Arnoldi update iterations is set to 300 in this
49+
* function.
4650
*/
4751
private[mllib] def symmetricEigs(mul: DenseVector => DenseVector, n: Int, k: Int, tol: Double)
4852
: (BDV[Double], BDM[Double]) = {
@@ -55,10 +59,10 @@ object EigenValueDecomposition {
5559
val tolW = new doubleW(tol)
5660
// number of desired eigenvalues, 0 < nev < n
5761
val nev = new intW(k)
58-
// nev Lanczos vectors are generated are generated in the first iteration
59-
// ncv-nev Lanczos vectors are generated are generated in each subsequent iteration
62+
// nev Lanczos vectors are generated in the first iteration
63+
// ncv-nev Lanczos vectors are generated in each subsequent iteration
6064
// ncv must be smaller than n
61-
val ncv = scala.math.min(2 * k, n)
65+
val ncv = math.min(2 * k, n)
6266

6367
// "I" for standard eigenvalue problem, "G" for generalized eigenvalue problem
6468
val bmat = "I"
@@ -75,7 +79,7 @@ object EigenValueDecomposition {
7579

7680
var ido = new intW(0)
7781
var info = new intW(0)
78-
var resid:Array[Double] = new Array[Double](n)
82+
var resid = new Array[Double](n)
7983
var v = new Array[Double](n * ncv)
8084
var workd = new Array[Double](n * 3)
8185
var workl = new Array[Double](ncv * (ncv + 8))
@@ -128,19 +132,20 @@ object EigenValueDecomposition {
128132
// number of computed eigenvalues, might be smaller than k
129133
val computed = iparam(4)
130134

131-
val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map{
135+
val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map {
132136
r => (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n))
133137
}
134138

135139
// sort the eigen-pairs in descending order
136-
val sortedEigenPairs = eigenPairs.sortBy(-1 * _._1)
140+
val sortedEigenPairs = eigenPairs.sortBy(- _._1)
137141

138142
// copy eigenvectors in descending order of eigenvalues
139143
val sortedU = BDM.zeros[Double](n, computed)
140-
sortedEigenPairs.zipWithIndex.map{
144+
sortedEigenPairs.zipWithIndex.foreach {
141145
r => {
146+
val b = r._2 * n
142147
for (i <- 0 until n) {
143-
sortedU.data(r._2 * n + i) = r._1._2(i)
148+
sortedU.data(b + i) = r._1._2(i)
144149
}
145150
}
146151
}

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,14 @@ class RowMatrix(
207207
* @param v a local DenseVector whose length must match the number of columns of this matrix.
208208
* @return a local DenseVector representing the product.
209209
*/
210-
private[mllib] def multiplyGramianMatrix(v: DenseVector): DenseVector = {
210+
private[mllib] def multiplyGramianMatrixBy(v: DenseVector): DenseVector = {
211211
val n = numCols().toInt
212+
val vbr = rows.context.broadcast(v.toBreeze)
212213

213214
val bv = rows.aggregate(BDV.zeros[Double](n))(
214215
seqOp = (U, r) => {
215216
val rBrz = r.toBreeze
216-
val a = rBrz.dot(v.toBreeze)
217+
val a = rBrz.dot(vbr.value)
217218
rBrz match {
218219
case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U)
219220
case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U)
@@ -223,7 +224,7 @@ class RowMatrix(
223224
combOp = (U1, U2) => U1 += U2
224225
)
225226

226-
new DenseVector(bv.data)
227+
Vectors.fromBreeze(bv).asInstanceOf[DenseVector]
227228
}
228229

229230
/**
@@ -259,7 +260,11 @@ class RowMatrix(
259260
k: Int,
260261
computeU: Boolean = false,
261262
rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
262-
computeSVD(k, computeU, rCond, 1e-9)
263+
if (numCols() < 100) {
264+
computeSVD(k, computeU, rCond, 1e-9, true)
265+
} else {
266+
computeSVD(k, computeU, rCond, 1e-9, false)
267+
}
263268
}
264269

265270
/**
@@ -274,7 +279,7 @@ class RowMatrix(
274279
* The decomposition is computed by providing a function that multiples a vector with A'A to
275280
* ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V.
276281
* Then we compute U via easy matrix multiplication as U = A * (V * S^{-1}).
277-
* Note that this approach requires `O(nnz(A))` time.
282+
* Note that this approach requires approximately `O(k * nnz(A))` time.
278283
*
279284
* ARPACK requires k to be strictly less than n. Thus when the requested eigenvalues k = n, a
280285
* non-sparse implementation will be used, which requires `n^2` doubles to fit in memory and
@@ -294,21 +299,27 @@ class RowMatrix(
294299
* are treated as zero, where sigma(0) is the largest singular value.
295300
* @param tol the numerical tolerance of svd computation. Larger tolerance means fewer iterations,
296301
* but less accurate result.
302+
* @param isDenseSVD invoke dense SVD implementation when isDenseSVD = true. This requires
303+
* `O(n^2)` memory and `O(n^3)` time. For a skinny matrix (m >> n) with small n,
304+
* dense implementation might be faster.
297305
* @return SingularValueDecomposition(U, s, V)
298306
*/
299307
def computeSVD(
300308
k: Int,
301309
computeU: Boolean,
302310
rCond: Double,
303-
tol: Double): SingularValueDecomposition[RowMatrix, Matrix] = {
311+
tol: Double,
312+
isDenseSVD: Boolean): SingularValueDecomposition[RowMatrix, Matrix] = {
304313
val n = numCols().toInt
305314
require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")
306315

307-
val (sigmaSquares: BDV[Double], u: BDM[Double]) = if (k < n) {
308-
EigenValueDecomposition.symmetricEigs(multiplyGramianMatrix, n, k, tol)
316+
val (sigmaSquares: BDV[Double], u: BDM[Double]) = if (!isDenseSVD && k < n) {
317+
EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol)
309318
} else {
310-
logWarning(s"Request full SVD (k = n = $k), while ARPACK requires k strictly less than n. " +
311-
s"Using non-sparse implementation.")
319+
if (!isDenseSVD && k == n) {
320+
logWarning(s"Request full SVD (k = n = $k), while ARPACK requires k strictly less than " +
321+
s"n. Using non-sparse implementation.")
322+
}
312323
val G = computeGramianMatrix()
313324
val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) =
314325
brzSvd(G.toBreeze.asInstanceOf[BDM[Double]])

mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -95,38 +95,42 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
9595
}
9696

9797
test("svd of a full-rank matrix") {
98-
for (mat <- Seq(denseMat, sparseMat)) {
99-
val localMat = mat.toBreeze()
100-
val (localU, localSigma, localVt) = brzSvd(localMat)
101-
val localV: BDM[Double] = localVt.t.toDenseMatrix
102-
for (k <- 1 to n) {
103-
val svd = mat.computeSVD(k, computeU = true)
104-
val U = svd.U
105-
val s = svd.s
106-
val V = svd.V
107-
assert(U.numRows() === m)
108-
assert(U.numCols() === k)
109-
assert(s.size === k)
110-
assert(V.numRows === n)
111-
assert(V.numCols === k)
112-
assertColumnEqualUpToSign(U.toBreeze(), localU, k)
113-
assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
114-
assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
98+
for (denseSVD <- Seq(true, false)) {
99+
for (mat <- Seq(denseMat, sparseMat)) {
100+
val localMat = mat.toBreeze()
101+
val (localU, localSigma, localVt) = brzSvd(localMat)
102+
val localV: BDM[Double] = localVt.t.toDenseMatrix
103+
for (k <- 1 to n) {
104+
val svd = mat.computeSVD(k, true, 1e-9, 1e-9, denseSVD)
105+
val U = svd.U
106+
val s = svd.s
107+
val V = svd.V
108+
assert(U.numRows() === m)
109+
assert(U.numCols() === k)
110+
assert(s.size === k)
111+
assert(V.numRows === n)
112+
assert(V.numCols === k)
113+
assertColumnEqualUpToSign(U.toBreeze(), localU, k)
114+
assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
115+
assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
116+
}
117+
val svdWithoutU = mat.computeSVD(n - 1, false, 1e-9, 1e-9, denseSVD)
118+
assert(svdWithoutU.U === null)
115119
}
116-
val svdWithoutU = mat.computeSVD(n - 1)
117-
assert(svdWithoutU.U === null)
118120
}
119121
}
120122

121123
test("svd of a low-rank matrix") {
122-
val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2)
123-
val mat = new RowMatrix(rows, 4, 3)
124-
val svd = mat.computeSVD(2, computeU = true)
125-
assert(svd.s.size === 1, "should not return zero singular values")
126-
assert(svd.U.numRows() === 4)
127-
assert(svd.U.numCols() === 1)
128-
assert(svd.V.numRows === 3)
129-
assert(svd.V.numCols === 1)
124+
for (denseSVD <- Seq(true, false)) {
125+
val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2)
126+
val mat = new RowMatrix(rows, 4, 3)
127+
val svd = mat.computeSVD(2, true, 1e-9, 1e-9, denseSVD)
128+
assert(svd.s.size === 1, "should not return zero singular values")
129+
assert(svd.U.numRows() === 4)
130+
assert(svd.U.numCols() === 1)
131+
assert(svd.V.numRows === 3)
132+
assert(svd.V.numCols === 1)
133+
}
130134
}
131135

132136
def closeToZero(G: BDM[Double]): Boolean = {

0 commit comments

Comments
 (0)