@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
23
23
import org .apache .spark .{TaskContext , Partition }
24
24
25
25
private [spark]
26
- class SlidedRDDPartition [T ](val idx : Int , val prev : Partition , val tail : Array [T ])
26
+ class SlidingRDDPartition [T ](val idx : Int , val prev : Partition , val tail : Array [T ])
27
27
extends Partition with Serializable {
28
28
override val index : Int = idx
29
29
}
@@ -41,36 +41,36 @@ class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T
41
41
* @see [[org.apache.spark.rdd.RDD#sliding ]]
42
42
*/
43
43
private [spark]
44
- class SlidedRDD [T : ClassTag ](@ transient val parent : RDD [T ], val windowSize : Int )
44
+ class SlidingRDD [T : ClassTag ](@ transient val parent : RDD [T ], val windowSize : Int )
45
45
extends RDD [Array [T ]](parent) {
46
46
47
- require(windowSize > 1 , " Window size must be greater than 1." )
47
+ require(windowSize > 1 , s " Window size must be greater than 1, but got $windowSize . " )
48
48
49
49
override def compute (split : Partition , context : TaskContext ): Iterator [Array [T ]] = {
50
- val part = split.asInstanceOf [SlidedRDDPartition [T ]]
50
+ val part = split.asInstanceOf [SlidingRDDPartition [T ]]
51
51
(firstParent[T ].iterator(part.prev, context) ++ part.tail)
52
52
.sliding(windowSize)
53
53
.map(_.toArray)
54
54
.filter(_.size == windowSize)
55
55
}
56
56
57
57
override def getPreferredLocations (split : Partition ): Seq [String ] =
58
- firstParent[T ].preferredLocations(split.asInstanceOf [SlidedRDDPartition [T ]].prev)
58
+ firstParent[T ].preferredLocations(split.asInstanceOf [SlidingRDDPartition [T ]].prev)
59
59
60
60
override def getPartitions : Array [Partition ] = {
61
61
val parentPartitions = parent.partitions
62
62
val n = parentPartitions.size
63
63
if (n == 0 ) {
64
64
Array .empty
65
65
} else if (n == 1 ) {
66
- Array (new SlidedRDDPartition [T ](0 , parentPartitions(0 ), Array .empty))
66
+ Array (new SlidingRDDPartition [T ](0 , parentPartitions(0 ), Array .empty))
67
67
} else {
68
68
val n1 = n - 1
69
69
val w1 = windowSize - 1
70
70
// Get the first w1 items of each partition, starting from the second partition.
71
71
val nextHeads =
72
72
parent.context.runJob(parent, (iter : Iterator [T ]) => iter.take(w1).toArray, 1 until n, true )
73
- val partitions = mutable.ArrayBuffer [SlidedRDDPartition [T ]]()
73
+ val partitions = mutable.ArrayBuffer [SlidingRDDPartition [T ]]()
74
74
var i = 0
75
75
var partitionIndex = 0
76
76
while (i < n1) {
@@ -85,14 +85,14 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
85
85
tail ++= nextHeads(j)
86
86
j += 1
87
87
}
88
- partitions += new SlidedRDDPartition [T ](partitionIndex, parentPartitions(i), tail.toArray)
88
+ partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(i), tail.toArray)
89
89
partitionIndex += 1
90
90
// Skip appended heads.
91
91
i = j
92
92
}
93
93
// If the head of last partition has size w1, we also need to add this partition.
94
94
if (nextHeads(n1 - 1 ).size == w1) {
95
- partitions += new SlidedRDDPartition [T ](partitionIndex, parentPartitions(n1), Array .empty)
95
+ partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(n1), Array .empty)
96
96
}
97
97
partitions.toArray
98
98
}
0 commit comments