Skip to content

Commit b45c13e

Browse files
committed
SPARK-2043: ExternalAppendOnlyMap doesn't always find matching keys
The current implementation reads one key with the next hash code as it finishes reading the keys with the current hash code, which may cause it to miss some matches of the next key. This can cause operations like join to give the wrong result when reduce tasks spill to disk and there are hash collisions, as values won't be matched together. This PR fixes it by not reading in that next key, using a peeking iterator instead. Author: Matei Zaharia <[email protected]> Closes #986 from mateiz/spark-2043 and squashes the following commits: 0959514 [Matei Zaharia] Added unit test for having many hash collisions 892debb [Matei Zaharia] SPARK-2043: don't read a key with the next hash code in ExternalAppendOnlyMap, instead use a buffered iterator to only read values with the current hash code.
1 parent 9bad0b7 commit b45c13e

File tree

2 files changed

+44
-5
lines changed

2 files changed

+44
-5
lines changed

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
2020
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
2121
import java.util.Comparator
2222

23+
import scala.collection.BufferedIterator
2324
import scala.collection.mutable
2425
import scala.collection.mutable.ArrayBuffer
2526

@@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
231232
// Input streams are derived both from the in-memory map and spilled maps on disk
232233
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
233234
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
234-
private val inputStreams = Seq(sortedMap) ++ spilledMaps
235+
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
235236

236237
inputStreams.foreach { it =>
237238
val kcPairs = getMorePairs(it)
@@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
246247
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
247248
* Assume the given iterator is in sorted order.
248249
*/
249-
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
250+
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
250251
val kcPairs = new ArrayBuffer[(K, C)]
251252
if (it.hasNext) {
252253
var kc = it.next()
253254
kcPairs += kc
254255
val minHash = kc._1.hashCode()
255-
while (it.hasNext && kc._1.hashCode() == minHash) {
256+
while (it.hasNext && it.head._1.hashCode() == minHash) {
256257
kc = it.next()
257258
kcPairs += kc
258259
}
@@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
325326
*
326327
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
327328
*/
328-
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
329+
private class StreamBuffer(
330+
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
329331
extends Comparable[StreamBuffer] {
330332

331333
def isEmpty = pairs.length == 0

core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
277277
("pomatoes", "eructation") // 568647356
278278
)
279279

280+
collisionPairs.foreach { case (w1, w2) =>
281+
// String.hashCode is documented to use a specific algorithm, but check just in case
282+
assert(w1.hashCode === w2.hashCode)
283+
}
284+
280285
(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
281286
collisionPairs.foreach { case (w1, w2) =>
282287
map.insert(w1, w2)
@@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
296301
assert(kv._2.equals(expectedValue))
297302
count += 1
298303
}
299-
assert(count == 100000 + collisionPairs.size * 2)
304+
assert(count === 100000 + collisionPairs.size * 2)
305+
}
306+
307+
test("spilling with many hash collisions") {
308+
val conf = new SparkConf(true)
309+
conf.set("spark.shuffle.memoryFraction", "0.0001")
310+
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
311+
312+
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
313+
314+
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
315+
// problems if the map fails to group together the objects with the same code (SPARK-2043).
316+
for (i <- 1 to 10) {
317+
for (j <- 1 to 10000) {
318+
map.insert(FixedHashObject(j, j % 2), 1)
319+
}
320+
}
321+
322+
val it = map.iterator
323+
var count = 0
324+
while (it.hasNext) {
325+
val kv = it.next()
326+
assert(kv._2 === 10)
327+
count += 1
328+
}
329+
assert(count === 10000)
300330
}
301331

302332
test("spilling with hash collisions using the Int.MaxValue key") {
@@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
317347
}
318348
}
319349
}
350+
351+
/**
352+
* A dummy class that always returns the same hash code, to easily test hash collisions
353+
*/
354+
case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
355+
override def hashCode(): Int = h
356+
}

0 commit comments

Comments
 (0)