Skip to content

Commit 1694c9e

Browse files
committed
almost finished addressing comments
1 parent f9d664b commit 1694c9e

File tree

1 file changed

+42
-73
lines changed

1 file changed

+42
-73
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala

Lines changed: 42 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,25 @@ import org.apache.spark.storage.StorageLevel
3030
*
3131
* @param numRowBlocks Number of blocks that form the rows of the matrix.
3232
* @param numColBlocks Number of blocks that form the columns of the matrix.
33+
* @param suggestedNumPartitions Number of partitions to partition the rdd into. The final number
34+
* of partitions will be set to `min(suggestedNumPartitions,
35+
* numRowBlocks * numColBlocks)`, because setting the number of
36+
* partitions greater than the number of sub matrices is not useful.
3337
*/
3438
private[mllib] class GridPartitioner(
3539
val numRowBlocks: Int,
3640
val numColBlocks: Int,
37-
val numParts: Int) extends Partitioner {
41+
suggestedNumPartitions: Int) extends Partitioner {
3842
// Having the number of partitions greater than the number of sub matrices does not help
39-
override val numPartitions = math.min(numParts, numRowBlocks * numColBlocks)
43+
override val numPartitions = math.min(suggestedNumPartitions, numRowBlocks * numColBlocks)
44+
45+
val totalBlocks = numRowBlocks.toLong * numColBlocks
46+
// Gives the number of blocks that need to be in each partition
47+
val targetNumBlocksPerPartition = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
48+
// Number of neighboring blocks to take in each row
49+
val numRowBlocksPerPartition = math.ceil(numRowBlocks * 1.0 / targetNumBlocksPerPartition).toInt
50+
// Number of neighboring blocks to take in each column
51+
val numColBlocksPerPartition = math.ceil(numColBlocks * 1.0 / targetNumBlocksPerPartition).toInt
4052

4153
/**
4254
* Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise
@@ -51,27 +63,20 @@ private[mllib] class GridPartitioner(
5163
override def getPartition(key: Any): Int = {
5264
key match {
5365
case (blockRowIndex: Int, blockColIndex: Int) =>
54-
getBlockId(blockRowIndex, blockColIndex)
66+
getPartitionId(blockRowIndex, blockColIndex)
5567
case (blockRowIndex: Int, innerIndex: Int, blockColIndex: Int) =>
56-
getBlockId(blockRowIndex, blockColIndex)
68+
getPartitionId(blockRowIndex, blockColIndex)
5769
case _ =>
5870
throw new IllegalArgumentException(s"Unrecognized key. key: $key")
5971
}
6072
}
6173

6274
/** Partitions sub-matrices as blocks with neighboring sub-matrices. */
63-
private def getBlockId(blockRowIndex: Int, blockColIndex: Int): Int = {
64-
val totalBlocks = numRowBlocks * numColBlocks
65-
// Gives the number of blocks that need to be in each partition
66-
val partitionRatio = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
67-
// Number of neighboring blocks to take in each row
68-
val subBlocksPerRow = math.ceil(numRowBlocks * 1.0 / partitionRatio).toInt
69-
// Number of neighboring blocks to take in each column
70-
val subBlocksPerCol = math.ceil(numColBlocks * 1.0 / partitionRatio).toInt
75+
private def getPartitionId(blockRowIndex: Int, blockColIndex: Int): Int = {
7176
// Coordinates of the block
72-
val i = blockRowIndex / subBlocksPerRow
73-
val j = blockColIndex / subBlocksPerCol
74-
val blocksPerRow = math.ceil(numRowBlocks * 1.0 / subBlocksPerRow).toInt
77+
val i = blockRowIndex / numRowBlocksPerPartition
78+
val j = blockColIndex / numColBlocksPerPartition
79+
val blocksPerRow = math.ceil(numRowBlocks * 1.0 / numRowBlocksPerPartition).toInt
7580
j * blocksPerRow + i
7681
}
7782

@@ -91,10 +96,10 @@ private[mllib] class GridPartitioner(
9196
* Represents a distributed matrix in blocks of local matrices.
9297
*
9398
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
94-
* @param nRows Number of rows of this matrix
95-
* @param nCols Number of columns of this matrix
96-
* @param numRowBlocks Number of blocks that form the rows of this matrix
97-
* @param numColBlocks Number of blocks that form the columns of this matrix
99+
* @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
100+
* the number of rows will be calculated when `numRows` is invoked.
101+
* @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
102+
* zero, the number of columns will be calculated when `numCols` is invoked.
98103
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
99104
* rows are not required to have the given number of rows
100105
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
@@ -104,8 +109,6 @@ class BlockMatrix(
104109
val rdd: RDD[((Int, Int), Matrix)],
105110
private var nRows: Long,
106111
private var nCols: Long,
107-
val numRowBlocks: Int,
108-
val numColBlocks: Int,
109112
val rowsPerBlock: Int,
110113
val colsPerBlock: Int) extends DistributedMatrix with Logging {
111114

@@ -115,25 +118,18 @@ class BlockMatrix(
115118
* Alternate constructor for BlockMatrix without the input of the number of rows and columns.
116119
*
117120
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
118-
* @param numRowBlocks Number of blocks that form the rows of this matrix
119-
* @param numColBlocks Number of blocks that form the columns of this matrix
120121
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
121122
* rows are not required to have the given number of rows
122123
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
123124
* columns are not required to have the given number of columns
124125
*/
125126
def this(
126127
rdd: RDD[((Int, Int), Matrix)],
127-
numRowBlocks: Int,
128-
numColBlocks: Int,
129128
rowsPerBlock: Int,
130129
colsPerBlock: Int) = {
131-
this(rdd, 0L, 0L, numRowBlocks, numColBlocks, rowsPerBlock, colsPerBlock)
130+
this(rdd, 0L, 0L, rowsPerBlock, colsPerBlock)
132131
}
133132

134-
private[mllib] var partitioner: GridPartitioner =
135-
new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length)
136-
137133
private lazy val dims: (Long, Long) = getDim
138134

139135
override def numRows(): Long = {
@@ -146,48 +142,21 @@ class BlockMatrix(
146142
nCols
147143
}
148144

145+
val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt
146+
val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt
147+
148+
private[mllib] var partitioner: GridPartitioner =
149+
new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length)
150+
151+
152+
149153
/** Returns the dimensions of the matrix. */
150154
private def getDim: (Long, Long) = {
151-
case class MatrixMetaData(var rowIndex: Int, var colIndex: Int,
152-
var numRows: Int, var numCols: Int)
153-
// picks the sizes of the matrix with the maximum indices
154-
def pickSizeByGreaterIndex(example: MatrixMetaData, base: MatrixMetaData): MatrixMetaData = {
155-
if (example.rowIndex > base.rowIndex) {
156-
base.rowIndex = example.rowIndex
157-
base.numRows = example.numRows
158-
}
159-
if (example.colIndex > base.colIndex) {
160-
base.colIndex = example.colIndex
161-
base.numCols = example.numCols
162-
}
163-
base
164-
}
165-
166-
// Aggregate will return an error if the rdd is empty
167-
val lastRowCol = rdd.treeAggregate(new MatrixMetaData(0, 0, 0, 0))(
168-
seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) =>
169-
pickSizeByGreaterIndex(
170-
new MatrixMetaData(blockXInd, blockYInd, mat.numRows, mat.numCols), base)
171-
},
172-
combOp = (c1, c2) => (c1, c2) match {
173-
case (res1, res2) =>
174-
pickSizeByGreaterIndex(res1, res2)
175-
})
176-
// We add the size of the edge matrices, because they can be less than the specified
177-
// rowsPerBlock or colsPerBlock.
178-
(lastRowCol.rowIndex.toLong * rowsPerBlock + lastRowCol.numRows,
179-
lastRowCol.colIndex.toLong * colsPerBlock + lastRowCol.numCols)
180-
}
155+
val (rows, cols) = rdd.map { case ((blockRowIndex, blockColIndex), mat) =>
156+
(blockRowIndex * rowsPerBlock + mat.numRows, blockColIndex * colsPerBlock + mat.numCols)
157+
}.reduce((x0, x1) => (math.max(x0._1, x1._1), math.max(x0._2, x1._2)))
181158

182-
/** Returns the Frobenius Norm of the matrix */
183-
def normFro(): Double = {
184-
math.sqrt(rdd.map { mat => mat._2 match {
185-
case sparse: SparseMatrix =>
186-
sparse.values.map(x => math.pow(x, 2)).sum
187-
case dense: DenseMatrix =>
188-
dense.values.map(x => math.pow(x, 2)).sum
189-
}
190-
}.reduce(_ + _))
159+
(math.max(rows, nRows), math.max(cols, nCols))
191160
}
192161

193162
/** Cache the underlying RDD. */
@@ -210,14 +179,14 @@ class BlockMatrix(
210179
s"Int.MaxValue. Currently numCols: ${numCols()}")
211180
val nRows = numRows().toInt
212181
val nCols = numCols().toInt
213-
val mem = nRows * nCols * 8 / 1000000
182+
val mem = nRows.toLong * nCols / 125000
214183
if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!")
215184

216-
val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1))
185+
val parts = rdd.collect()
217186
val values = new Array[Double](nRows * nCols)
218-
parts.foreach { case ((rowIndex, colIndex), block) =>
219-
val rowOffset = rowIndex * rowsPerBlock
220-
val colOffset = colIndex * colsPerBlock
187+
parts.foreach { case ((blockRowIndex, blockColIndex), block) =>
188+
val rowOffset = blockRowIndex * rowsPerBlock
189+
val colOffset = blockColIndex * colsPerBlock
221190
var j = 0
222191
val mat = block.toArray
223192
while (j < block.numCols) {

0 commit comments

Comments
 (0)