@@ -103,45 +103,49 @@ class PrefixSpan private (
103
103
// Convert min support to a min number of transactions for this dataset
104
104
val minCount = if (minSupport == 0 ) 0L else math.ceil(sequences.count() * minSupport).toLong
105
105
106
- val itemCounts = sequences
106
+ // Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold
107
+ val freqItemCounts = sequences
107
108
.flatMap(seq => seq.distinct.map(item => (item, 1L )))
108
109
.reduceByKey(_ + _)
109
110
.filter(_._2 >= minCount)
110
- var allPatternAndCounts = itemCounts.map(x => (List (x._1), x._2))
111
111
112
- val prefixSuffixPairs = {
113
- val frequentItems = itemCounts.map(_._1).collect()
114
- val candidates = sequences.map { p =>
115
- p.filter (frequentItems.contains(_) )
116
- }
117
- candidates.flatMap { x =>
118
- frequentItems.map { y =>
119
- val sub = LocalPrefixSpan .getSuffix(y, x)
120
- (List (y), sub)
121
- }.filter(_._2.nonEmpty)
112
+ // Pairs of (length 1 prefix, suffix consisting of frequent items)
113
+ val itemSuffixPairs = {
114
+ val freqItems = freqItemCounts.keys.collect().toSet
115
+ sequences.flatMap { seq =>
116
+ freqItems.flatMap { item =>
117
+ val candidateSuffix = LocalPrefixSpan .getSuffix(item, seq.filter(freqItems.contains(_)))
118
+ candidateSuffix match {
119
+ case suffix if ! suffix.isEmpty => Some ((List (item), suffix))
120
+ case _ => None
121
+ }
122
+ }
122
123
}
123
124
}
124
- var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = partitionByProjDBSize(prefixSuffixPairs)
125
125
126
- while (largePrefixSuffixPairs.count() != 0 ) {
126
+ // Accumulator for the computed results to be returned
127
+ var resultsAccumulator = freqItemCounts.map(x => (List (x._1), x._2))
128
+
129
+ // Remaining work to be locally and distributively processed respectfully
130
+ var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(itemSuffixPairs)
131
+
132
+ // Continue processing until no pairs for distributed processing remain (i.e. all prefixes have
133
+ // projected database sizes <= `maxLocalProjDBSize`)
134
+ while (pairsForDistributed.count() != 0 ) {
127
135
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
128
- getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs )
129
- largePrefixSuffixPairs .unpersist()
136
+ getPatternCountsAndPrefixSuffixPairs(minCount, pairsForDistributed )
137
+ pairsForDistributed .unpersist()
130
138
val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs)
131
- largePrefixSuffixPairs = largerPairsPart
132
- largePrefixSuffixPairs .persist(StorageLevel .MEMORY_AND_DISK )
133
- smallPrefixSuffixPairs ++= smallerPairsPart
134
- allPatternAndCounts ++= nextPatternAndCounts
139
+ pairsForDistributed = largerPairsPart
140
+ pairsForDistributed .persist(StorageLevel .MEMORY_AND_DISK )
141
+ pairsForLocal ++= smallerPairsPart
142
+ resultsAccumulator ++= nextPatternAndCounts
135
143
}
136
144
137
- if (smallPrefixSuffixPairs.count() > 0 ) {
138
- val projectedDatabase = smallPrefixSuffixPairs
139
- // TODO aggregateByKey
140
- .groupByKey()
141
- val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase)
142
- allPatternAndCounts ++= nextPatternAndCounts
143
- }
144
- allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, count) }
145
+ // Process the small projected databases locally
146
+ resultsAccumulator ++= getPatternsInLocal(minCount, pairsForLocal.groupByKey())
147
+
148
+ resultsAccumulator.map { case (pattern, count) => (pattern.toArray, count) }
145
149
}
146
150
147
151
@@ -177,8 +181,8 @@ class PrefixSpan private (
177
181
*/
178
182
private def getPatternCountsAndPrefixSuffixPairs (
179
183
minCount : Long ,
180
- prefixSuffixPairs : RDD [(List [Int ], Array [Int ])]):
181
- (RDD [(List [Int ], Long )], RDD [(List [Int ], Array [Int ])]) = {
184
+ prefixSuffixPairs : RDD [(List [Int ], Array [Int ])])
185
+ : (RDD [(List [Int ], Long )], RDD [(List [Int ], Array [Int ])]) = {
182
186
val prefixAndFrequentItemAndCounts = prefixSuffixPairs
183
187
.flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L )) }
184
188
.reduceByKey(_ + _)
0 commit comments