@@ -774,6 +774,28 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
774
774
/**
775
775
* :: DeveloperApi ::
776
776
* Implementation of the ALS algorithm.
777
+ *
778
+ * This implementation of the ALS factorization algorithm partitions the two sets of factors among
779
+ * Spark workers so as to reduce network communication by only sending one copy of each factor
780
+ * vector to each Spark worker on each iteration, and only if needed. This is achieved by
781
+ * precomputing some information about the ratings matrix to determine which users require which
782
+ * item factors and vice versa. See the Scaladoc for `InBlock` for a detailed explanation of how
783
+ * the precomputation is done.
784
+ *
785
+ * In addition, since each iteration of calculating the factor matrices depends on the known
786
+ * ratings, which are spread across Spark partitions, a naive implementation would incur
787
+ * significant network communication overhead between Spark workers, as the ratings RDD would be
788
+ * repeatedly shuffled during each iteration. This implementation reduces that overhead by
789
+ * performing the shuffling operation up front, precomputing each partition's ratings dependencies
790
+ * and duplicating those values to the appropriate workers before starting iterations to solve for
791
+ * the factor matrices. See the Scaladoc for `OutBlock` for a detailed explanation of how the
792
+ * precomputation is done.
793
+ *
794
+ * Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by
795
+ * contiguous blocks from the ratings matrix but by a hash function on the rating's location in
796
+ * the matrix. If it helps you to visualize the partitions, it is easier to think of the term
797
+ * "block" as referring to a subset of an RDD containing the ratings rather than a contiguous
798
+ * submatrix of the ratings matrix.
777
799
*/
778
800
@ DeveloperApi
779
801
def train [ID : ClassTag ]( // scalastyle:ignore
@@ -791,32 +813,43 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
791
813
checkpointInterval : Int = 10 ,
792
814
seed : Long = 0L )(
793
815
implicit ord : Ordering [ID ]): (RDD [(ID , Array [Float ])], RDD [(ID , Array [Float ])]) = {
816
+
794
817
require(! ratings.isEmpty(), s " No ratings available from $ratings" )
795
818
require(intermediateRDDStorageLevel != StorageLevel .NONE ,
796
819
" ALS is not designed to run without persisting intermediate RDDs." )
820
+
797
821
val sc = ratings.sparkContext
822
+
823
+ // Precompute the rating dependencies of each partition
798
824
val userPart = new ALSPartitioner (numUserBlocks)
799
825
val itemPart = new ALSPartitioner (numItemBlocks)
800
- val userLocalIndexEncoder = new LocalIndexEncoder (userPart.numPartitions)
801
- val itemLocalIndexEncoder = new LocalIndexEncoder (itemPart.numPartitions)
802
- val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
803
826
val blockRatings = partitionRatings(ratings, userPart, itemPart)
804
827
.persist(intermediateRDDStorageLevel)
805
828
val (userInBlocks, userOutBlocks) =
806
829
makeBlocks(" user" , blockRatings, userPart, itemPart, intermediateRDDStorageLevel)
807
- // materialize blockRatings and user blocks
808
- userOutBlocks.count()
830
+ userOutBlocks.count() // materialize blockRatings and user blocks
809
831
val swappedBlockRatings = blockRatings.map {
810
832
case ((userBlockId, itemBlockId), RatingBlock (userIds, itemIds, localRatings)) =>
811
833
((itemBlockId, userBlockId), RatingBlock (itemIds, userIds, localRatings))
812
834
}
813
835
val (itemInBlocks, itemOutBlocks) =
814
836
makeBlocks(" item" , swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel)
815
- // materialize item blocks
816
- itemOutBlocks.count()
837
+ itemOutBlocks.count() // materialize item blocks
838
+
839
+ // Encoders for storing each user/item's partition ID and index within its partition using a
840
+ // single integer; used as an optimization
841
+ val userLocalIndexEncoder = new LocalIndexEncoder (userPart.numPartitions)
842
+ val itemLocalIndexEncoder = new LocalIndexEncoder (itemPart.numPartitions)
843
+
844
+ // These are the user and item factor matrices that, once trained, are multiplied together to
845
+ // estimate the rating matrix. The two matrices are stored in RDDs, partitioned by column such
846
+ // that each factor column resides on the same Spark worker as its corresponding user or item.
817
847
val seedGen = new XORShiftRandom (seed)
818
848
var userFactors = initialize(userInBlocks, rank, seedGen.nextLong())
819
849
var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong())
850
+
851
+ val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
852
+
820
853
var previousCheckpointFile : Option [String ] = None
821
854
val shouldCheckpoint : Int => Boolean = (iter) =>
822
855
sc.checkpointDir.isDefined && checkpointInterval != - 1 && (iter % checkpointInterval == 0 )
@@ -830,6 +863,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
830
863
logWarning(s " Cannot delete checkpoint file $file: " , e)
831
864
}
832
865
}
866
+
833
867
if (implicitPrefs) {
834
868
for (iter <- 1 to maxIter) {
835
869
userFactors.setName(s " userFactors- $iter" ).persist(intermediateRDDStorageLevel)
@@ -910,26 +944,154 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
910
944
private type FactorBlock = Array [Array [Float ]]
911
945
912
946
/**
913
- * Out-link block that stores, for each dst (item/user) block, which src (user/item) factors to
914
- * send. For example, outLinkBlock(0) contains the local indices (not the original src IDs) of the
915
- * src factors in this block to send to dst block 0.
947
+ * A mapping of the columns of the items factor matrix that are needed when calculating each row
948
+ * of the users factor matrix, and vice versa.
949
+ *
950
+ * Specifically, when calculating a user factor vector, since only those columns of the items
951
+ * factor matrix that correspond to the items that that user has rated are needed, we can avoid
952
+ * having to repeatedly copy the entire items factor matrix to each worker later in the algorithm
953
+ * by precomputing these dependencies for all users, storing them in an RDD of `OutBlock`s. The
954
+ * items' dependencies on the columns of the users factor matrix is computed similarly.
955
+ *
956
+ * =Example=
957
+ *
958
+ * Using the example provided in the `InBlock` Scaladoc, `userOutBlocks` would look like the
959
+ * following:
960
+ *
961
+ * {{{
962
+ * userOutBlocks.collect() == Seq(
963
+ * 0 -> Array(Array(0, 1), Array(0, 1)),
964
+ * 1 -> Array(Array(0), Array(0))
965
+ * )
966
+ * }}}
967
+ *
968
+ * Each value in this map-like sequence is of type `Array[Array[Int]]`. The values in the
969
+ * inner array are the ranks of the sorted user IDs in that partition; so in the example above,
970
+ * `Array(0, 1)` in partition 0 refers to user IDs 0 and 6, since when all unique user IDs in
971
+ * partition 0 are sorted, 0 is the first ID and 6 is the second. The position of each inner
972
+ * array in its enclosing outer array denotes the partition number to which item IDs map; in the
973
+ * example, the first `Array(0, 1)` is in position 0 of its outer array, denoting item IDs that
974
+ * map to partition 0.
975
+ *
976
+ * In summary, the data structure encodes the following information:
977
+ *
978
+ * * There are ratings with user IDs 0 and 6 (encoded in `Array(0, 1)`, where 0 and 1 are the
979
+ * indices of the user IDs 0 and 6 on partition 0) whose item IDs map to partitions 0 and 1
980
+ * (represented by the fact that `Array(0, 1)` appears in both the 0th and 1st positions).
981
+ *
982
+ * * There are ratings with user ID 3 (encoded in `Array(0)`, where 0 is the index of the user
983
+ * ID 3 on partition 1) whose item IDs map to partitions 0 and 1 (represented by the fact that
984
+ * `Array(0)` appears in both the 0th and 1st positions).
916
985
*/
917
986
private type OutBlock = Array [Array [Int ]]
918
987
919
988
/**
920
- * In-link block for computing src (user/item) factors. This includes the original src IDs
921
- * of the elements within this block as well as encoded dst (item/user) indices and corresponding
922
- * ratings. The dst indices are in the form of (blockId, localIndex), which are not the original
923
- * dst IDs. To compute src factors, we expect receiving dst factors that match the dst indices.
924
- * For example, if we have an in-link record
989
+ * In-link block for computing user and item factor matrices.
990
+ *
991
+ * The ALS algorithm partitions the columns of the users factor matrix evenly among Spark workers.
992
+ * Since each column of the factor matrix is calculated using the known ratings of the correspond-
993
+ * ing user, and since the ratings don't change across iterations, the ALS algorithm preshuffles
994
+ * the ratings to the appropriate partitions, storing them in `InBlock` objects.
995
+ *
996
+ * The ratings shuffled by item ID are computed similarly and also stored in `InBlock` objects.
997
+ * Note that this means every rating is stored twice, once as shuffled by user ID and once by item
998
+ * ID. This is a necessary tradeoff, since in general a rating will not be on the same worker
999
+ * when partitioned by user as by item.
1000
+ *
1001
+ * =Example=
1002
+ *
1003
+ * Say we have a small collection of eight items to offer the seven users in our application. We
1004
+ * have some known ratings given by the users, as seen in the matrix below:
1005
+ *
1006
+ * {{{
1007
+ * Items
1008
+ * 0 1 2 3 4 5 6 7
1009
+ * +---+---+---+---+---+---+---+---+
1010
+ * 0 | |0.1| | |0.4| | |0.7|
1011
+ * +---+---+---+---+---+---+---+---+
1012
+ * 1 | | | | | | | | |
1013
+ * +---+---+---+---+---+---+---+---+
1014
+ * U 2 | | | | | | | | |
1015
+ * s +---+---+---+---+---+---+---+---+
1016
+ * e 3 | |3.1| | |3.4| | |3.7|
1017
+ * r +---+---+---+---+---+---+---+---+
1018
+ * s 4 | | | | | | | | |
1019
+ * +---+---+---+---+---+---+---+---+
1020
+ * 5 | | | | | | | | |
1021
+ * +---+---+---+---+---+---+---+---+
1022
+ * 6 | |6.1| | |6.4| | |6.7|
1023
+ * +---+---+---+---+---+---+---+---+
1024
+ * }}}
1025
+ *
1026
+ * The ratings are represented as an RDD, passed to the `partitionRatings` method as the `ratings`
1027
+ * parameter:
1028
+ *
1029
+ * {{{
1030
+ * ratings.collect() == Seq(
1031
+ * Rating(0, 1, 0.1f),
1032
+ * Rating(0, 4, 0.4f),
1033
+ * Rating(0, 7, 0.7f),
1034
+ * Rating(3, 1, 3.1f),
1035
+ * Rating(3, 4, 3.4f),
1036
+ * Rating(3, 7, 3.7f),
1037
+ * Rating(6, 1, 6.1f),
1038
+ * Rating(6, 4, 6.4f),
1039
+ * Rating(6, 7, 6.7f)
1040
+ * )
1041
+ * }}}
925
1042
*
926
- * {srcId: 0, dstBlockId: 2, dstLocalIndex: 3, rating: 5.0},
1043
+ * Say that we are using two partitions to calculate each factor matrix:
927
1044
*
928
- * and assume that the dst factors are stored as dstFactors: Map[Int, Array[Array[Float]]], which
929
- * is a blockId to dst factors map, the corresponding dst factor of the record is dstFactor(2)(3).
1045
+ * {{{
1046
+ * val userPart = new ALSPartitioner(2)
1047
+ * val itemPart = new ALSPartitioner(2)
1048
+ * val blockRatings = partitionRatings(ratings, userPart, itemPart)
1049
+ * }}}
930
1050
*
931
- * We use a CSC-like (compressed sparse column) format to store the in-link information. So we can
932
- * compute src factors one after another using only one normal equation instance.
1051
+ * Ratings are mapped to partitions using the user/item IDs modulo the number of partitions. With
1052
+ * two partitions, ratings with even-valued user IDs are shuffled to partition 0 while those with
1053
+ * odd-valued user IDs are shuffled to partition 1:
1054
+ *
1055
+ * {{{
1056
+ * userInBlocks.collect() == Seq(
1057
+ * 0 -> Seq(
1058
+ * // Internally, the class stores the ratings in a more optimized format than
1059
+ * // a sequence of `Rating`s, but for clarity we show it as such here.
1060
+ * Rating(0, 1, 0.1f),
1061
+ * Rating(0, 4, 0.4f),
1062
+ * Rating(0, 7, 0.7f),
1063
+ * Rating(6, 1, 6.1f),
1064
+ * Rating(6, 4, 6.4f),
1065
+ * Rating(6, 7, 6.7f)
1066
+ * ),
1067
+ * 1 -> Seq(
1068
+ * Rating(3, 1, 3.1f),
1069
+ * Rating(3, 4, 3.4f),
1070
+ * Rating(3, 7, 3.7f)
1071
+ * )
1072
+ * )
1073
+ * }}}
1074
+ *
1075
+ * Similarly, ratings with even-valued item IDs are shuffled to partition 0 while those with
1076
+ * odd-valued item IDs are shuffled to partition 1:
1077
+ *
1078
+ * {{{
1079
+ * itemInBlocks.collect() == Seq(
1080
+ * 0 -> Seq(
1081
+ * Rating(0, 4, 0.4f),
1082
+ * Rating(3, 4, 3.4f),
1083
+ * Rating(6, 4, 6.4f)
1084
+ * ),
1085
+ * 1 -> Seq(
1086
+ * Rating(0, 1, 0.1f),
1087
+ * Rating(0, 7, 0.7f),
1088
+ * Rating(3, 1, 3.1f),
1089
+ * Rating(3, 7, 3.7f),
1090
+ * Rating(6, 1, 6.1f),
1091
+ * Rating(6, 7, 6.7f)
1092
+ * )
1093
+ * )
1094
+ * }}}
933
1095
*
934
1096
* @param srcIds src ids (ordered)
935
1097
* @param dstPtrs dst pointers. Elements in range [dstPtrs(i), dstPtrs(i+1)) of dst indices and
@@ -1026,7 +1188,24 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
1026
1188
}
1027
1189
1028
1190
/**
1029
- * Partitions raw ratings into blocks.
1191
+ * Groups an RDD of [[Rating ]]s by the user partition and item partition to which each `Rating`
1192
+ * maps according to the given partitioners. The returned pair RDD holds the ratings, encoded in
1193
+ * a memory-efficient format but otherwise unchanged, keyed by the (user partition ID, item
1194
+ * partition ID) pair.
1195
+ *
1196
+ * Performance note: This is an expensive operation that performs an RDD shuffle.
1197
+ *
1198
+ * Implementation note: This implementation produces the same result as the following but
1199
+ * generates fewer intermediate objects:
1200
+ *
1201
+ * {{{
1202
+ * ratings.map { r =>
1203
+ * ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
1204
+ * }.aggregateByKey(new RatingBlockBuilder)(
1205
+ * seqOp = (b, r) => b.add(r),
1206
+ * combOp = (b0, b1) => b0.merge(b1.build()))
1207
+ * .mapValues(_.build())
1208
+ * }}}
1030
1209
*
1031
1210
* @param ratings raw ratings
1032
1211
* @param srcPart partitioner for src IDs
@@ -1037,17 +1216,6 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
1037
1216
ratings : RDD [Rating [ID ]],
1038
1217
srcPart : Partitioner ,
1039
1218
dstPart : Partitioner ): RDD [((Int , Int ), RatingBlock [ID ])] = {
1040
-
1041
- /* The implementation produces the same result as the following but generates less objects.
1042
-
1043
- ratings.map { r =>
1044
- ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
1045
- }.aggregateByKey(new RatingBlockBuilder)(
1046
- seqOp = (b, r) => b.add(r),
1047
- combOp = (b0, b1) => b0.merge(b1.build()))
1048
- .mapValues(_.build())
1049
- */
1050
-
1051
1219
val numPartitions = srcPart.numPartitions * dstPart.numPartitions
1052
1220
ratings.mapPartitions { iter =>
1053
1221
val builders = Array .fill(numPartitions)(new RatingBlockBuilder [ID ])
@@ -1135,8 +1303,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
1135
1303
def length : Int = srcIds.length
1136
1304
1137
1305
/**
1138
- * Compresses the block into an [[ InBlock ]] . The algorithm is the same as converting a
1139
- * sparse matrix from coordinate list (COO) format into compressed sparse column (CSC) format.
1306
+ * Compresses the block into an ` InBlock` . The algorithm is the same as converting a sparse
1307
+ * matrix from coordinate list (COO) format into compressed sparse column (CSC) format.
1140
1308
* Sorting is done using Spark's built-in Timsort to avoid generating too many objects.
1141
1309
*/
1142
1310
def compress (): InBlock [ID ] = {
0 commit comments