Skip to content

SPARK-2043: ExternalAppendOnlyMap doesn't always find matching keys #986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.util.collection
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
import java.util.Comparator

import scala.collection.BufferedIterator
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
private val inputStreams = Seq(sortedMap) ++ spilledMaps
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
Expand All @@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
*/
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
val minHash = kc._1.hashCode()
while (it.hasNext && kc._1.hashCode() == minHash) {
while (it.hasNext && it.head._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
}
Expand Down Expand Up @@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
*/
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
private class StreamBuffer(
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
extends Comparable[StreamBuffer] {

def isEmpty = pairs.length == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
("pomatoes", "eructation") // 568647356
)

collisionPairs.foreach { case (w1, w2) =>
// String.hashCode is documented to use a specific algorithm, but check just in case
assert(w1.hashCode === w2.hashCode)
}

(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
collisionPairs.foreach { case (w1, w2) =>
map.insert(w1, w2)
Expand All @@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
assert(kv._2.equals(expectedValue))
count += 1
}
assert(count == 100000 + collisionPairs.size * 2)
assert(count === 100000 + collisionPairs.size * 2)
}

test("spilling with many hash collisions") {
val conf = new SparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)

// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
// problems if the map fails to group together the objects with the same code (SPARK-2043).
for (i <- 1 to 10) {
for (j <- 1 to 10000) {
map.insert(FixedHashObject(j, j % 2), 1)
}
}

val it = map.iterator
var count = 0
while (it.hasNext) {
val kv = it.next()
assert(kv._2 === 10)
count += 1
}
assert(count === 10000)
}

test("spilling with hash collisions using the Int.MaxValue key") {
Expand All @@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
}
}

/**
* A dummy class that always returns the same hash code, to easily test hash collisions
*/
case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
override def hashCode(): Int = h
}