Skip to content

Commit af3ddc9

Browse files
committed
Handle port collisions in flume polling test
1 parent 63bdb1f commit af3ddc9

File tree

1 file changed

+30
-1
lines changed

1 file changed

+30
-1
lines changed

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,37 @@ class FlumePollingStreamSuite extends TestSuiteBase {
4545
val eventsPerBatch = 100
4646
val totalEventsPerChannel = batchCount * eventsPerBatch
4747
val channelCapacity = 5000
48+
val maxAttempts = 5
4849

4950
test("flume polling test") {
51+
testMultipleTimes(testFlumePolling)
52+
}
53+
54+
test("flume polling test multiple hosts") {
55+
testMultipleTimes(testFlumePollingMultipleHost)
56+
}
57+
58+
/**
59+
* Run the given test until no more java.net.BindException's are thrown.
60+
* Do this only up to a certain attempt limit.
61+
*/
62+
private def testMultipleTimes(test: () => Unit): Unit = {
63+
var testPassed = false
64+
var attempt = 0
65+
while (!testPassed && attempt < maxAttempts) {
66+
try {
67+
test()
68+
testPassed = true
69+
} catch {
70+
case e: java.net.BindException =>
71+
logError("Exception when running flume polling test", e)
72+
attempt += 1
73+
}
74+
}
75+
assert(testPassed, s"Test failed after $attempt attempts!")
76+
}
77+
78+
private def testFlumePolling(): Unit = {
5079
val testPort = getTestPort
5180
// Set up the streaming context and input streams
5281
val ssc = new StreamingContext(conf, batchDuration)
@@ -80,7 +109,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
80109
channel.stop()
81110
}
82111

83-
test("flume polling test multiple hosts") {
112+
private def testFlumePollingMultipleHost(): Unit = {
84113
val testPort = getTestPort
85114
// Set up the streaming context and input streams
86115
val ssc = new StreamingContext(conf, batchDuration)

0 commit comments

Comments
 (0)