Skip to content

Commit 64271b3

Browse files
committed
Modified codes according to comments.
1 parent d2250b7 commit 64271b3

File tree

1 file changed

+11
-16
lines changed

1 file changed

+11
-16
lines changed

mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class PrefixSpan private (
4545
private var minSupport: Double,
4646
private var maxPatternLength: Int) extends Logging with Serializable {
4747

48-
private val maxSuffixesBeforeLocalProcessing: Long = 10000
48+
private val maxProjectedDBSizeBeforeLocalProcessing: Long = 10000
4949

5050
/**
5151
* Constructs a default instance with default parameters
@@ -89,41 +89,36 @@ class PrefixSpan private (
8989
val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, sequences)
9090
val prefixSuffixPairs = getPrefixSuffixPairs(
9191
lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
92-
var patternsCount: Long = lengthOnePatternsAndCounts.count()
92+
prefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
9393
var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (ArrayBuffer(x._1), x._2))
9494
var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
9595
splitPrefixSuffixPairs(prefixSuffixPairs)
96-
largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
97-
var patternLength: Int = 1
98-
while (patternLength < maxPatternLength &&
99-
largePrefixSuffixPairs.count() != 0) {
96+
while (largePrefixSuffixPairs.count() != 0) {
10097
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
10198
getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs)
102-
patternsCount = nextPatternAndCounts.count()
10399
largePrefixSuffixPairs.unpersist()
104-
val splitedPrefixSuffixPairs = splitPrefixSuffixPairs(nextPrefixSuffixPairs)
105-
largePrefixSuffixPairs = splitedPrefixSuffixPairs._2
100+
val (smallerPairsPart, largerPairsPart) = splitPrefixSuffixPairs(nextPrefixSuffixPairs)
101+
largePrefixSuffixPairs = largerPairsPart
106102
largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
107-
smallPrefixSuffixPairs = smallPrefixSuffixPairs ++ splitedPrefixSuffixPairs._1
108-
allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
109-
patternLength = patternLength + 1
103+
smallPrefixSuffixPairs ++= smallerPairsPart
104+
allPatternAndCounts ++= nextPatternAndCounts
110105
}
111106
if (smallPrefixSuffixPairs.count() > 0) {
112107
val projectedDatabase = smallPrefixSuffixPairs
113108
.map(x => (x._1.toSeq, x._2))
114109
.groupByKey()
115110
.map(x => (x._1.toArray, x._2.toArray))
116111
val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase)
117-
allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
112+
allPatternAndCounts ++= nextPatternAndCounts
118113
}
119114
allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, count) }
120115
}
121116

122117

123118
/**
124119
* Split prefix suffix pairs to two parts:
125-
* suffixes' size less than maxSuffixesBeforeLocalProcessing and
126-
* suffixes' size more than maxSuffixesBeforeLocalProcessing
120+
* Prefixes with projected databases smaller than maxSuffixesBeforeLocalProcessing and
121+
* Prefixes with projected databases larger than maxSuffixesBeforeLocalProcessing
127122
* @param prefixSuffixPairs prefix (length n) and suffix pairs,
128123
* @return small size prefix suffix pairs and big size prefix suffix pairs
129124
* (RDD[prefix, suffix], RDD[prefix, suffix ])
@@ -134,7 +129,7 @@ class PrefixSpan private (
134129
val suffixSizeMap = prefixSuffixPairs
135130
.map(x => (x._1, x._2.length))
136131
.reduceByKey(_ + _)
137-
.map(x => (x._2 <= maxSuffixesBeforeLocalProcessing, Set(x._1)))
132+
.map(x => (x._2 <= maxProjectedDBSizeBeforeLocalProcessing, Set(x._1)))
138133
.reduceByKey(_ ++ _)
139134
.collect
140135
.toMap

0 commit comments

Comments
 (0)