-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-20484][MLLIB] Add documentation to ALS code #17793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4661ddb
7d1491e
2fdbcaa
fb8f16d
0a2edf0
6da60f0
57de83b
5a4eb85
e5cdba1
c82501a
983f9eb
3d5d8a6
6d27fff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -774,6 +774,28 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { | |
/** | ||
* :: DeveloperApi :: | ||
* Implementation of the ALS algorithm. | ||
* | ||
* This implementation of the ALS factorization algorithm partitions the two sets of factors among | ||
* Spark workers so as to reduce network communication by only sending one copy of each factor | ||
* vector to each Spark worker on each iteration, and only if needed. This is achieved by | ||
* precomputing some information about the ratings matrix to determine which users require which | ||
* item factors and vice versa. See the Scaladoc for `InBlock` for a detailed explanation of how | ||
* the precomputation is done. | ||
* | ||
* In addition, since each iteration of calculating the factor matrices depends on the known | ||
* ratings, which are spread across Spark partitions, a naive implementation would incur | ||
* significant network communication overhead between Spark workers, as the ratings RDD would be | ||
* repeatedly shuffled during each iteration. This implementation reduces that overhead by | ||
* performing the shuffling operation up front, precomputing each partition's ratings dependencies | ||
* and duplicating those values to the appropriate workers before starting iterations to solve for | ||
* the factor matrices. See the Scaladoc for `OutBlock` for a detailed explanation of how the | ||
* precomputation is done. | ||
* | ||
* Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by | ||
* contiguous blocks from the ratings matrix but by a hash function on the rating's location in | ||
* the matrix. If it helps you to visualize the partitions, it is easier to think of the term | ||
* "block" as referring to a subset of an RDD containing the ratings rather than a contiguous | ||
* submatrix of the ratings matrix. | ||
*/ | ||
@DeveloperApi | ||
def train[ID: ClassTag]( // scalastyle:ignore | ||
|
@@ -791,32 +813,43 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { | |
checkpointInterval: Int = 10, | ||
seed: Long = 0L)( | ||
implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = { | ||
|
||
require(!ratings.isEmpty(), s"No ratings available from $ratings") | ||
require(intermediateRDDStorageLevel != StorageLevel.NONE, | ||
"ALS is not designed to run without persisting intermediate RDDs.") | ||
|
||
val sc = ratings.sparkContext | ||
|
||
// Precompute the rating dependencies of each partition | ||
val userPart = new ALSPartitioner(numUserBlocks) | ||
val itemPart = new ALSPartitioner(numItemBlocks) | ||
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) | ||
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions) | ||
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver | ||
val blockRatings = partitionRatings(ratings, userPart, itemPart) | ||
.persist(intermediateRDDStorageLevel) | ||
val (userInBlocks, userOutBlocks) = | ||
makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel) | ||
// materialize blockRatings and user blocks | ||
userOutBlocks.count() | ||
userOutBlocks.count() // materialize blockRatings and user blocks | ||
val swappedBlockRatings = blockRatings.map { | ||
case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) => | ||
((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) | ||
} | ||
val (itemInBlocks, itemOutBlocks) = | ||
makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel) | ||
// materialize item blocks | ||
itemOutBlocks.count() | ||
itemOutBlocks.count() // materialize item blocks | ||
|
||
// Encoders for storing each user/item's partition ID and index within its partition using a | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is probably fine but I tend to avoid moving code around unless it really helps -- this minimizes things like back-port merge conflict problems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved the code because otherwise the comment on L823 ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not add the comment before the encoder |
||
// single integer; used as an optimization | ||
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) | ||
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions) | ||
|
||
// These are the user and item factor matrices that, once trained, are multiplied together to | ||
// estimate the rating matrix. The two matrices are stored in RDDs, partitioned by column such | ||
// that each factor column resides on the same Spark worker as its corresponding user or item. | ||
val seedGen = new XORShiftRandom(seed) | ||
var userFactors = initialize(userInBlocks, rank, seedGen.nextLong()) | ||
var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong()) | ||
|
||
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver | ||
|
||
var previousCheckpointFile: Option[String] = None | ||
val shouldCheckpoint: Int => Boolean = (iter) => | ||
sc.checkpointDir.isDefined && checkpointInterval != -1 && (iter % checkpointInterval == 0) | ||
|
@@ -830,6 +863,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { | |
logWarning(s"Cannot delete checkpoint file $file:", e) | ||
} | ||
} | ||
|
||
if (implicitPrefs) { | ||
for (iter <- 1 to maxIter) { | ||
userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel) | ||
|
@@ -910,26 +944,154 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { | |
private type FactorBlock = Array[Array[Float]] | ||
|
||
/** | ||
* Out-link block that stores, for each dst (item/user) block, which src (user/item) factors to | ||
* send. For example, outLinkBlock(0) contains the local indices (not the original src IDs) of the | ||
* src factors in this block to send to dst block 0. | ||
* A mapping of the columns of the items factor matrix that are needed when calculating each row | ||
* of the users factor matrix, and vice versa. | ||
* | ||
* Specifically, when calculating a user factor vector, since only those columns of the items | ||
* factor matrix that correspond to the items that that user has rated are needed, we can avoid | ||
* having to repeatedly copy the entire items factor matrix to each worker later in the algorithm | ||
* by precomputing these dependencies for all users, storing them in an RDD of `OutBlock`s. The | ||
* items' dependencies on the columns of the users factor matrix is computed similarly. | ||
* | ||
* =Example= | ||
* | ||
* Using the example provided in the `InBlock` Scaladoc, `userOutBlocks` would look like the | ||
* following: | ||
* | ||
* {{{ | ||
* userOutBlocks.collect() == Seq( | ||
* 0 -> Array(Array(0, 1), Array(0, 1)), | ||
* 1 -> Array(Array(0), Array(0)) | ||
* ) | ||
* }}} | ||
* | ||
* Each value in this map-like sequence is of type `Array[Array[Int]]`. The values in the | ||
* inner array are the ranks of the sorted user IDs in that partition; so in the example above, | ||
* `Array(0, 1)` in partition 0 refers to user IDs 0 and 6, since when all unique user IDs in | ||
* partition 0 are sorted, 0 is the first ID and 6 is the second. The position of each inner | ||
* array in its enclosing outer array denotes the partition number to which item IDs map; in the | ||
* example, the first `Array(0, 1)` is in position 0 of its outer array, denoting item IDs that | ||
* map to partition 0. | ||
* | ||
* In summary, the data structure encodes the following information: | ||
* | ||
* * There are ratings with user IDs 0 and 6 (encoded in `Array(0, 1)`, where 0 and 1 are the | ||
* indices of the user IDs 0 and 6 on partition 0) whose item IDs map to partitions 0 and 1 | ||
* (represented by the fact that `Array(0, 1)` appears in both the 0th and 1st positions). | ||
* | ||
* * There are ratings with user ID 3 (encoded in `Array(0)`, where 0 is the index of the user | ||
* ID 3 on partition 1) whose item IDs map to partitions 0 and 1 (represented by the fact that | ||
* `Array(0)` appears in both the 0th and 1st positions). | ||
*/ | ||
private type OutBlock = Array[Array[Int]] | ||
|
||
/** | ||
* In-link block for computing src (user/item) factors. This includes the original src IDs | ||
* of the elements within this block as well as encoded dst (item/user) indices and corresponding | ||
* ratings. The dst indices are in the form of (blockId, localIndex), which are not the original | ||
* dst IDs. To compute src factors, we expect receiving dst factors that match the dst indices. | ||
* For example, if we have an in-link record | ||
* In-link block for computing user and item factor matrices. | ||
* | ||
* The ALS algorithm partitions the columns of the users factor matrix evenly among Spark workers. | ||
* Since each column of the factor matrix is calculated using the known ratings of the correspond- | ||
* ing user, and since the ratings don't change across iterations, the ALS algorithm preshuffles | ||
* the ratings to the appropriate partitions, storing them in `InBlock` objects. | ||
* | ||
* The ratings shuffled by item ID are computed similarly and also stored in `InBlock` objects. | ||
* Note that this means every rating is stored twice, once as shuffled by user ID and once by item | ||
* ID. This is a necessary tradeoff, since in general a rating will not be on the same worker | ||
* when partitioned by user as by item. | ||
* | ||
* =Example= | ||
* | ||
* Say we have a small collection of eight items to offer the seven users in our application. We | ||
* have some known ratings given by the users, as seen in the matrix below: | ||
* | ||
* {{{ | ||
* Items | ||
* 0 1 2 3 4 5 6 7 | ||
* +---+---+---+---+---+---+---+---+ | ||
* 0 | |0.1| | |0.4| | |0.7| | ||
* +---+---+---+---+---+---+---+---+ | ||
* 1 | | | | | | | | | | ||
* +---+---+---+---+---+---+---+---+ | ||
* U 2 | | | | | | | | | | ||
* s +---+---+---+---+---+---+---+---+ | ||
* e 3 | |3.1| | |3.4| | |3.7| | ||
* r +---+---+---+---+---+---+---+---+ | ||
* s 4 | | | | | | | | | | ||
* +---+---+---+---+---+---+---+---+ | ||
* 5 | | | | | | | | | | ||
* +---+---+---+---+---+---+---+---+ | ||
* 6 | |6.1| | |6.4| | |6.7| | ||
* +---+---+---+---+---+---+---+---+ | ||
* }}} | ||
* | ||
* The ratings are represented as an RDD, passed to the `partitionRatings` method as the `ratings` | ||
* parameter: | ||
* | ||
* {{{ | ||
* ratings.collect() == Seq( | ||
* Rating(0, 1, 0.1f), | ||
* Rating(0, 4, 0.4f), | ||
* Rating(0, 7, 0.7f), | ||
* Rating(3, 1, 3.1f), | ||
* Rating(3, 4, 3.4f), | ||
* Rating(3, 7, 3.7f), | ||
* Rating(6, 1, 6.1f), | ||
* Rating(6, 4, 6.4f), | ||
* Rating(6, 7, 6.7f) | ||
* ) | ||
* }}} | ||
* | ||
* {srcId: 0, dstBlockId: 2, dstLocalIndex: 3, rating: 5.0}, | ||
* Say that we are using two partitions to calculate each factor matrix: | ||
* | ||
* and assume that the dst factors are stored as dstFactors: Map[Int, Array[Array[Float]]], which | ||
* is a blockId to dst factors map, the corresponding dst factor of the record is dstFactor(2)(3). | ||
* {{{ | ||
* val userPart = new ALSPartitioner(2) | ||
* val itemPart = new ALSPartitioner(2) | ||
* val blockRatings = partitionRatings(ratings, userPart, itemPart) | ||
* }}} | ||
* | ||
* We use a CSC-like (compressed sparse column) format to store the in-link information. So we can | ||
* compute src factors one after another using only one normal equation instance. | ||
* Ratings are mapped to partitions using the user/item IDs modulo the number of partitions. With | ||
* two partitions, ratings with even-valued user IDs are shuffled to partition 0 while those with | ||
* odd-valued user IDs are shuffled to partition 1: | ||
* | ||
* {{{ | ||
* userInBlocks.collect() == Seq( | ||
* 0 -> Seq( | ||
* // Internally, the class stores the ratings in a more optimized format than | ||
* // a sequence of `Rating`s, but for clarity we show it as such here. | ||
* Rating(0, 1, 0.1f), | ||
* Rating(0, 4, 0.4f), | ||
* Rating(0, 7, 0.7f), | ||
* Rating(6, 1, 6.1f), | ||
* Rating(6, 4, 6.4f), | ||
* Rating(6, 7, 6.7f) | ||
* ), | ||
* 1 -> Seq( | ||
* Rating(3, 1, 3.1f), | ||
* Rating(3, 4, 3.4f), | ||
* Rating(3, 7, 3.7f) | ||
* ) | ||
* ) | ||
* }}} | ||
* | ||
* Similarly, ratings with even-valued item IDs are shuffled to partition 0 while those with | ||
* odd-valued item IDs are shuffled to partition 1: | ||
* | ||
* {{{ | ||
* itemInBlocks.collect() == Seq( | ||
* 0 -> Seq( | ||
* Rating(0, 4, 0.4f), | ||
* Rating(3, 4, 3.4f), | ||
* Rating(6, 4, 6.4f) | ||
* ), | ||
* 1 -> Seq( | ||
* Rating(0, 1, 0.1f), | ||
* Rating(0, 7, 0.7f), | ||
* Rating(3, 1, 3.1f), | ||
* Rating(3, 7, 3.7f), | ||
* Rating(6, 1, 6.1f), | ||
* Rating(6, 7, 6.7f) | ||
* ) | ||
* ) | ||
* }}} | ||
* | ||
* @param srcIds src ids (ordered) | ||
* @param dstPtrs dst pointers. Elements in range [dstPtrs(i), dstPtrs(i+1)) of dst indices and | ||
|
@@ -1026,7 +1188,24 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { | |
} | ||
|
||
/** | ||
* Partitions raw ratings into blocks. | ||
* Groups an RDD of [[Rating]]s by the user partition and item partition to which each `Rating` | ||
* maps according to the given partitioners. The returned pair RDD holds the ratings, encoded in | ||
* a memory-efficient format but otherwise unchanged, keyed by the (user partition ID, item | ||
* partition ID) pair. | ||
* | ||
* Performance note: This is an expensive operation that performs an RDD shuffle. | ||
* | ||
* Implementation note: This implementation produces the same result as the following but | ||
* generates fewer intermediate objects: | ||
* | ||
* {{{ | ||
* ratings.map { r => | ||
* ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r) | ||
* }.aggregateByKey(new RatingBlockBuilder)( | ||
* seqOp = (b, r) => b.add(r), | ||
* combOp = (b0, b1) => b0.merge(b1.build())) | ||
* .mapValues(_.build()) | ||
* }}} | ||
* | ||
* @param ratings raw ratings | ||
* @param srcPart partitioner for src IDs | ||
|
@@ -1037,17 +1216,6 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { | |
ratings: RDD[Rating[ID]], | ||
srcPart: Partitioner, | ||
dstPart: Partitioner): RDD[((Int, Int), RatingBlock[ID])] = { | ||
|
||
/* The implementation produces the same result as the following but generates less objects. | ||
|
||
ratings.map { r => | ||
((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r) | ||
}.aggregateByKey(new RatingBlockBuilder)( | ||
seqOp = (b, r) => b.add(r), | ||
combOp = (b0, b1) => b0.merge(b1.build())) | ||
.mapValues(_.build()) | ||
*/ | ||
|
||
val numPartitions = srcPart.numPartitions * dstPart.numPartitions | ||
ratings.mapPartitions { iter => | ||
val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID]) | ||
|
@@ -1135,8 +1303,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { | |
def length: Int = srcIds.length | ||
|
||
/** | ||
* Compresses the block into an [[InBlock]]. The algorithm is the same as converting a | ||
* sparse matrix from coordinate list (COO) format into compressed sparse column (CSC) format. | ||
* Compresses the block into an `InBlock`. The algorithm is the same as converting a sparse | ||
* matrix from coordinate list (COO) format into compressed sparse column (CSC) format. | ||
* Sorting is done using Spark's built-in Timsort to avoid generating too many objects. | ||
*/ | ||
def compress(): InBlock[ID] = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a nit, but I wouldn't make changes like this. It doesn't add anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the comment because the only other comment that has its own line,
// Precompute the rating dependencies of each partition
, is serving as the heading for this entire block of code, and having other whole-line comments in this block is a bit of a mismatch. If you still feel reversion is necessary though, just let me know.