@@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
277
277
(" pomatoes" , " eructation" ) // 568647356
278
278
)
279
279
280
+ collisionPairs.foreach { case (w1, w2) =>
281
+ // String.hashCode is documented to use a specific algorithm, but check just in case
282
+ assert(w1.hashCode === w2.hashCode)
283
+ }
284
+
280
285
(1 to 100000 ).map(_.toString).foreach { i => map.insert(i, i) }
281
286
collisionPairs.foreach { case (w1, w2) =>
282
287
map.insert(w1, w2)
@@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
296
301
assert(kv._2.equals(expectedValue))
297
302
count += 1
298
303
}
299
- assert(count == 100000 + collisionPairs.size * 2 )
304
+ assert(count === 100000 + collisionPairs.size * 2 )
305
+ }
306
+
307
+ test(" spilling with many hash collisions" ) {
308
+ val conf = new SparkConf (true )
309
+ conf.set(" spark.shuffle.memoryFraction" , " 0.0001" )
310
+ sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
311
+
312
+ val map = new ExternalAppendOnlyMap [FixedHashObject , Int , Int ](_ => 1 , _ + _, _ + _)
313
+
314
+ // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
315
+ // problems if the map fails to group together the objects with the same code (SPARK-2043).
316
+ for (i <- 1 to 10 ) {
317
+ for (j <- 1 to 10000 ) {
318
+ map.insert(FixedHashObject (j, j % 2 ), 1 )
319
+ }
320
+ }
321
+
322
+ val it = map.iterator
323
+ var count = 0
324
+ while (it.hasNext) {
325
+ val kv = it.next()
326
+ assert(kv._2 === 10 )
327
+ count += 1
328
+ }
329
+ assert(count === 10000 )
300
330
}
301
331
302
332
test(" spilling with hash collisions using the Int.MaxValue key" ) {
@@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
317
347
}
318
348
}
319
349
}
350
+
351
+ /**
352
+ * A dummy class that always returns the same hash code, to easily test hash collisions
353
+ */
354
+ case class FixedHashObject (val v : Int , val h : Int ) extends Serializable {
355
+ override def hashCode (): Int = h
356
+ }
0 commit comments