@@ -103,7 +103,7 @@ class PrefixSpan private (
103
103
// Convert min support to a min number of transactions for this dataset
104
104
val minCount = if (minSupport == 0 ) 0L else math.ceil(sequences.count() * minSupport).toLong
105
105
106
- // Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold
106
+ // ( Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold
107
107
val freqItemCounts = sequences
108
108
.flatMap(seq => seq.distinct.map(item => (item, 1L )))
109
109
.reduceByKey(_ + _)
@@ -113,8 +113,9 @@ class PrefixSpan private (
113
113
val itemSuffixPairs = {
114
114
val freqItems = freqItemCounts.keys.collect().toSet
115
115
sequences.flatMap { seq =>
116
+ val filteredSeq = seq.filter(freqItems.contains(_))
116
117
freqItems.flatMap { item =>
117
- val candidateSuffix = LocalPrefixSpan .getSuffix(item, seq.filter(freqItems.contains(_)) )
118
+ val candidateSuffix = LocalPrefixSpan .getSuffix(item, filteredSeq )
118
119
candidateSuffix match {
119
120
case suffix if ! suffix.isEmpty => Some ((List (item), suffix))
120
121
case _ => None
@@ -123,7 +124,8 @@ class PrefixSpan private (
123
124
}
124
125
}
125
126
126
- // Accumulator for the computed results to be returned
127
+ // Accumulator for the computed results to be returned, initialized to the frequent items (i.e.
128
+ // frequent length-one prefixes)
127
129
var resultsAccumulator = freqItemCounts.map(x => (List (x._1), x._2))
128
130
129
131
// Remaining work to be locally and distributively processed respectfully
@@ -133,7 +135,7 @@ class PrefixSpan private (
133
135
// projected database sizes <= `maxLocalProjDBSize`)
134
136
while (pairsForDistributed.count() != 0 ) {
135
137
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
136
- getPatternCountsAndPrefixSuffixPairs (minCount, pairsForDistributed)
138
+ extendPrefixes (minCount, pairsForDistributed)
137
139
pairsForDistributed.unpersist()
138
140
val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs)
139
141
pairsForDistributed = largerPairsPart
@@ -151,7 +153,6 @@ class PrefixSpan private (
151
153
152
154
/**
153
155
* Partitions the prefix-suffix pairs by projected database size.
154
- *
155
156
* @param prefixSuffixPairs prefix (length n) and suffix pairs,
156
157
* @return prefix-suffix pairs partitioned by whether their projected database size is <= or
157
158
* greater than [[maxLocalProjDBSize ]]
@@ -173,44 +174,57 @@ class PrefixSpan private (
173
174
}
174
175
175
176
/**
176
- * Get the pattern and counts, and prefix suffix pairs
177
+ * Extends all prefixes by one item from their suffix and computes the resulting frequent prefixes
178
+ * and remaining work.
177
179
* @param minCount minimum count
178
- * @param prefixSuffixPairs prefix (length n ) and suffix pairs,
179
- * @return pattern ( length n+1) and counts, and prefix ( length n+1) and suffix pairs
180
- * (RDD[pattern, count], RDD[ prefix, suffix ])
180
+ * @param prefixSuffixPairs prefix (length N ) and suffix pairs,
181
+ * @return (frequent length N+1 extended prefix, count) pairs and (frequent length N+1 extended
182
+ * prefix, corresponding suffix) pairs.
181
183
*/
182
- private def getPatternCountsAndPrefixSuffixPairs (
184
+ private def extendPrefixes (
183
185
minCount : Long ,
184
186
prefixSuffixPairs : RDD [(List [Int ], Array [Int ])])
185
187
: (RDD [(List [Int ], Long )], RDD [(List [Int ], Array [Int ])]) = {
186
- val prefixAndFrequentItemAndCounts = prefixSuffixPairs
188
+
189
+ // (length N prefix, item from suffix) pairs and their corresponding number of occurrences
190
+ // Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport`
191
+ val prefixItemPairAndCounts = prefixSuffixPairs
187
192
.flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L )) }
188
193
.reduceByKey(_ + _)
189
194
.filter(_._2 >= minCount)
190
- val patternAndCounts = prefixAndFrequentItemAndCounts
191
- .map { case (( prefix, item), count) => (item :: prefix, count) }
192
- val prefixToFrequentNextItemsMap = prefixAndFrequentItemAndCounts
195
+
196
+ // Map from prefix to set of possible next items from suffix
197
+ val prefixToNextItems = prefixItemPairAndCounts
193
198
.keys
194
199
.groupByKey()
195
200
.mapValues(_.toSet)
196
201
.collect()
197
202
.toMap
198
- val nextPrefixSuffixPairs = prefixSuffixPairs
199
- .filter(x => prefixToFrequentNextItemsMap.contains(x._1))
203
+
204
+
205
+ // Frequent patterns with length N+1 and their corresponding counts
206
+ val extendedPrefixAndCounts = prefixItemPairAndCounts
207
+ .map { case ((prefix, item), count) => (item :: prefix, count) }
208
+
209
+ // Remaining work, all prefixes will have length N+1
210
+ val extendedPrefixAndSuffix = prefixSuffixPairs
211
+ .filter(x => prefixToNextItems.contains(x._1))
200
212
.flatMap { case (prefix, suffix) =>
201
- val frequentNextItems = prefixToFrequentNextItemsMap(prefix)
202
- val filteredSuffix = suffix.filter(frequentNextItems.contains(_))
203
- frequentNextItems.flatMap { item =>
204
- val suffix = LocalPrefixSpan .getSuffix(item, filteredSuffix)
205
- if (suffix.isEmpty) None
206
- else Some (item :: prefix, suffix)
213
+ val frequentNextItems = prefixToNextItems(prefix)
214
+ val filteredSuffix = suffix.filter(frequentNextItems.contains(_))
215
+ frequentNextItems.flatMap { item =>
216
+ LocalPrefixSpan .getSuffix(item, filteredSuffix) match {
217
+ case suffix if ! suffix.isEmpty => Some (item :: prefix, suffix)
218
+ case _ => None
219
+ }
220
+ }
207
221
}
208
- }
209
- (patternAndCounts, nextPrefixSuffixPairs )
222
+
223
+ (extendedPrefixAndCounts, extendedPrefixAndSuffix )
210
224
}
211
225
212
226
/**
213
- * calculate the patterns in local.
227
+ * Calculate the patterns in local.
214
228
* @param minCount the absolute minimum count
215
229
* @param data prefixes and projected sequences data data
216
230
* @return patterns
0 commit comments