Skip to content

Commit 8a8e9ce

Browse files
srowenzzcclp
authored andcommitted
[SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests
## What changes were proposed in this pull request? Make spill tests wait until job has completed before returning the number of stages that spilled ## How was this patch tested? Existing Jenkins tests. Author: Sean Owen <[email protected]> Closes apache#13896 from srowen/SPARK-16193. (cherry picked from commit e877415) Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 60e095b)
1 parent 3014c29 commit 8a8e9ce

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.net.{URI, URL}
2222
import java.nio.charset.StandardCharsets
2323
import java.nio.file.Paths
2424
import java.util.Arrays
25+
import java.util.concurrent.{CountDownLatch, TimeUnit}
2526
import java.util.jar.{JarEntry, JarOutputStream}
2627

2728
import scala.collection.JavaConverters._
@@ -190,8 +191,14 @@ private[spark] object TestUtils {
190191
private class SpillListener extends SparkListener {
191192
private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]]
192193
private val spilledStageIds = new mutable.HashSet[Int]
194+
private val stagesDone = new CountDownLatch(1)
193195

194-
def numSpilledStages: Int = spilledStageIds.size
196+
def numSpilledStages: Int = {
197+
// Long timeout, just in case somehow the job end isn't notified.
198+
// Fails if a timeout occurs
199+
assert(stagesDone.await(10, TimeUnit.SECONDS))
200+
spilledStageIds.size
201+
}
195202

196203
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
197204
stageIdToTaskMetrics.getOrElseUpdate(
@@ -206,4 +213,8 @@ private class SpillListener extends SparkListener {
206213
spilledStageIds += stageId
207214
}
208215
}
216+
217+
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
218+
stagesDone.countDown()
219+
}
209220
}

0 commit comments

Comments
 (0)