Skip to content

Commit 892debb

Browse files
committed
SPARK-2043: don't read a key with the next hash code in
ExternalAppendOnlyMap, instead use a buffered iterator to only read values with the current hash code.
1 parent 1765c8d commit 892debb

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
2020
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
2121
import java.util.Comparator
2222

23+
import scala.collection.BufferedIterator
2324
import scala.collection.mutable
2425
import scala.collection.mutable.ArrayBuffer
2526

@@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
231232
// Input streams are derived both from the in-memory map and spilled maps on disk
232233
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
233234
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
234-
private val inputStreams = Seq(sortedMap) ++ spilledMaps
235+
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
235236

236237
inputStreams.foreach { it =>
237238
val kcPairs = getMorePairs(it)
@@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
246247
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
247248
* Assume the given iterator is in sorted order.
248249
*/
249-
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
250+
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
250251
val kcPairs = new ArrayBuffer[(K, C)]
251252
if (it.hasNext) {
252253
var kc = it.next()
253254
kcPairs += kc
254255
val minHash = kc._1.hashCode()
255-
while (it.hasNext && kc._1.hashCode() == minHash) {
256+
while (it.hasNext && it.head._1.hashCode() == minHash) {
256257
kc = it.next()
257258
kcPairs += kc
258259
}
@@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
325326
*
326327
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
327328
*/
328-
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
329+
private class StreamBuffer(
330+
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
329331
extends Comparable[StreamBuffer] {
330332

331333
def isEmpty = pairs.length == 0

0 commit comments

Comments
 (0)