Skip to content

Commit a61943d

Browse files
author
Feynman Liang
committed
Collect small patterns to local
1 parent 4ddf479 commit a61943d

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ class PrefixSpan private (
149149
}
150150

151151
// Process the small projected databases locally
152-
val remainingResults = getPatternsInLocal(minCount, pairsForLocal.groupByKey())
152+
val remainingResults = getPatternsInLocal(
153+
minCount, sc.parallelize(pairsForLocal, 1).groupByKey())
153154

154155
(sc.parallelize(resultsAccumulator, 1) ++ remainingResults)
155156
.map { case (pattern, count) => (pattern.toArray, count) }
@@ -163,7 +164,7 @@ class PrefixSpan private (
163164
* greater than [[maxLocalProjDBSize]]
164165
*/
165166
private def partitionByProjDBSize(prefixSuffixPairs: RDD[(List[Int], Array[Int])])
166-
: (RDD[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = {
167+
: (Array[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = {
167168
val prefixToSuffixSize = prefixSuffixPairs
168169
.aggregateByKey(0)(
169170
seqOp = { case (count, suffix) => count + suffix.length },
@@ -175,7 +176,7 @@ class PrefixSpan private (
175176
.toSet
176177
val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) }
177178
val large = prefixSuffixPairs.filter { case (prefix, _) => !smallPrefixes.contains(prefix) }
178-
(small, large)
179+
(small.collect(), large)
179180
}
180181

181182
/**

0 commit comments

Comments
 (0)