1
+ /*
2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
3
+ * contributor license agreements. See the NOTICE file distributed with
4
+ * this work for additional information regarding copyright ownership.
5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
6
+ * (the "License"); you may not use this file except in compliance with
7
+ * the License. You may obtain a copy of the License at
8
+ *
9
+ * http://www.apache.org/licenses/LICENSE-2.0
10
+ *
11
+ * Unless required by applicable law or agreed to in writing, software
12
+ * distributed under the License is distributed on an "AS IS" BASIS,
13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ * See the License for the specific language governing permissions and
15
+ * limitations under the License.
16
+ */
17
+
18
+ package org .apache .spark .mllib .fpm
19
+
20
+ import org .apache .spark .rdd .RDD
21
+
22
+ /**
23
+ *
24
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
25
+ * The PrefixSpan algorithm is described in
26
+ * [[http://web.engr.illinois.edu/~hanj/pdf/span01.pdf ]].
27
+ *
28
+ * @param sequences original sequences data
29
+ * @param minSupport the minimal support level of the sequential pattern, any pattern appears
30
+ * more than minSupport times will be output
31
+ * @param maxPatternLength the maximal length of the sequential pattern, any pattern appears
32
+ * less than maxPatternLength will be output
33
+ *
34
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining Sequential Pattern Mining
35
+ * (Wikipedia)]]
36
+ */
37
+ class Prefixspan (
38
+ val sequences : RDD [Array [Int ]],
39
+ val minSupport : Int = 2 ,
40
+ val maxPatternLength : Int = 50 ) extends java.io.Serializable {
41
+
42
+ /**
43
+ * Calculate sequential patterns:
44
+ * a) find and collect length-one patterns
45
+ * b) for each length-one patterns and each sequence,
46
+ * emit (pattern (prefix), suffix sequence) as key-value pairs
47
+ * c) group by key and then map value iterator to array
48
+ * d) local PrefixSpan on each prefix
49
+ * @return sequential patterns
50
+ */
51
+ def run (): RDD [(Seq [Int ], Int )] = {
52
+ val (patternsOneLength, prefixAndCandidates) = findPatternsLengthOne()
53
+ val repartitionedRdd = repartitionSequences(prefixAndCandidates)
54
+ val nextPatterns = getPatternsInLocal(repartitionedRdd)
55
+ val allPatterns = patternsOneLength.map(x => (Seq (x._1), x._2)) ++ nextPatterns
56
+ allPatterns
57
+ }
58
+
59
+ /**
60
+ * Find the patterns that it's length is one
61
+ * @return length-one patterns and projection table
62
+ */
63
+ private def findPatternsLengthOne (): (RDD [(Int , Int )], RDD [(Seq [Int ], Array [Int ])]) = {
64
+ val patternsOneLength = sequences
65
+ .map(_.distinct)
66
+ .flatMap(p => p)
67
+ .map((_, 1 ))
68
+ .reduceByKey(_ + _)
69
+
70
+ val removedElements : Array [Int ] = patternsOneLength
71
+ .filter(_._2 < minSupport)
72
+ .map(_._1)
73
+ .collect()
74
+
75
+ val savedElements = patternsOneLength.filter(_._2 >= minSupport)
76
+
77
+ val savedElementsArray = savedElements
78
+ .map(_._1)
79
+ .collect()
80
+
81
+ val filteredSequences =
82
+ if (removedElements.isEmpty) {
83
+ sequences
84
+ } else {
85
+ sequences.map { p =>
86
+ p.filter { x => ! removedElements.contains(x) }
87
+ }
88
+ }
89
+
90
+ val prefixAndCandidates = filteredSequences.flatMap { x =>
91
+ savedElementsArray.map { y =>
92
+ val sub = getSuffix(y, x)
93
+ (Seq (y), sub)
94
+ }
95
+ }
96
+
97
+ (savedElements, prefixAndCandidates)
98
+ }
99
+
100
+ /**
101
+ * Re-partition the RDD data, to get better balance and performance.
102
+ * @param data patterns and projected sequences data before re-partition
103
+ * @return patterns and projected sequences data after re-partition
104
+ */
105
+ private def repartitionSequences (
106
+ data : RDD [(Seq [Int ], Array [Int ])]): RDD [(Seq [Int ], Array [Array [Int ]])] = {
107
+ val dataRemovedEmptyLine = data.filter(x => x._2.nonEmpty)
108
+ val dataMerged = dataRemovedEmptyLine
109
+ .groupByKey()
110
+ .map(x => (x._1, x._2.toArray))
111
+ dataMerged
112
+ }
113
+
114
+ /**
115
+ * calculate the patterns in local.
116
+ * @param data patterns and projected sequences data data
117
+ * @return patterns
118
+ */
119
+ private def getPatternsInLocal (
120
+ data : RDD [(Seq [Int ], Array [Array [Int ]])]): RDD [(Seq [Int ], Int )] = {
121
+ val result = data.flatMap { x =>
122
+ getPatternsWithPrefix(x._1, x._2)
123
+ }
124
+ result
125
+ }
126
+
127
+ /**
128
+ * calculate the patterns with one prefix in local.
129
+ * @param prefix prefix
130
+ * @param data patterns and projected sequences data
131
+ * @return patterns
132
+ */
133
+ private def getPatternsWithPrefix (
134
+ prefix : Seq [Int ],
135
+ data : Array [Array [Int ]]): Array [(Seq [Int ], Int )] = {
136
+ val elements = data
137
+ .map(x => x.distinct)
138
+ .flatMap(x => x)
139
+ .groupBy(x => x)
140
+ .map(x => (x._1, x._2.length))
141
+
142
+ val selectedSingleElements = elements.filter(x => x._2 >= minSupport)
143
+
144
+ val selectedElements = selectedSingleElements
145
+ .map(x => (prefix ++ Seq (x._1), x._2))
146
+ .toArray
147
+
148
+ val cleanedSearchSpace = data
149
+ .map(x => x.filter(y => selectedSingleElements.contains(y)))
150
+
151
+ val newSearchSpace = selectedSingleElements.map { x =>
152
+ val sub = cleanedSearchSpace.map(y => getSuffix(x._1, y)).filter(_.nonEmpty)
153
+ (prefix ++ Seq (x._1), sub)
154
+ }.filter(x => x._2.nonEmpty)
155
+ .toArray
156
+
157
+ val continueProcess = newSearchSpace.nonEmpty && prefix.length + 1 < maxPatternLength
158
+
159
+ if (continueProcess) {
160
+ val nextPatterns = newSearchSpace
161
+ .map(x => getPatternsWithPrefix(x._1, x._2))
162
+ .reduce(_ ++ _)
163
+ selectedElements ++ nextPatterns
164
+ } else {
165
+ selectedElements
166
+ }
167
+ }
168
+
169
+ /**
170
+ * calculate suffix sequence following a prefix in a sequence
171
+ * @param prefix prefix
172
+ * @param sequence original sequence
173
+ * @return suffix sequence
174
+ */
175
+ private def getSuffix (prefix : Int , sequence : Array [Int ]): Array [Int ] = {
176
+ val index = sequence.indexOf(prefix)
177
+ if (index == - 1 ) {
178
+ Array ()
179
+ } else {
180
+ sequence.takeRight(sequence.length - index - 1 )
181
+ }
182
+ }
183
+ }
0 commit comments