Skip to content

Commit 1e8b539

Browse files
Jesper Lundgrentdas
authored andcommitted
[STREAMING] SPARK-4986 Wait for receivers to deregister and receiver job to terminate
A slow receiver might not have enough time to shutdown cleanly even when graceful shutdown is used. This PR extends graceful waiting to make sure all receivers have deregistered and that the receiver job has terminated. Author: Jesper Lundgren <[email protected]> Closes apache#4338 from cleaton/stopreceivers and squashes the following commits: a9cf223 [Jesper Lundgren] remove cleaner.ttl config f969b6e [Jesper Lundgren] fix inversed logic in unit test 3d0bd35 [Jesper Lundgren] switch boleans to match running status instead of terminated 9a9ff88 [Jesper Lundgren] wait for receivers to shutdown and receiver job to terminate d179372 [Jesper Lundgren] Add graceful shutdown unit test covering slow receiver onStop
1 parent 681f9df commit 1e8b539

File tree

3 files changed

+75
-4
lines changed

3 files changed

+75
-4
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
7373
logDebug("Stopping JobScheduler")
7474

7575
// First, stop receiving
76-
receiverTracker.stop()
76+
receiverTracker.stop(processAllReceivedData)
7777

7878
// Second, stop generating jobs. If it has to process all received data,
7979
// then this will wait for all the processing through JobScheduler to be over.

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
8686
}
8787

8888
/** Stop the receiver execution thread. */
89-
def stop() = synchronized {
89+
def stop(graceful: Boolean) = synchronized {
9090
if (!receiverInputStreams.isEmpty && actor != null) {
9191
// First, stop the receivers
92-
if (!skipReceiverLaunch) receiverExecutor.stop()
92+
if (!skipReceiverLaunch) receiverExecutor.stop(graceful)
9393

9494
// Finally, stop the actor
9595
ssc.env.actorSystem.stop(actor)
@@ -218,6 +218,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
218218
/** This thread class runs all the receivers on the cluster. */
219219
class ReceiverLauncher {
220220
@transient val env = ssc.env
221+
@volatile @transient private var running = false
221222
@transient val thread = new Thread() {
222223
override def run() {
223224
try {
@@ -233,14 +234,24 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
233234
thread.start()
234235
}
235236

236-
def stop() {
237+
def stop(graceful: Boolean) {
237238
// Send the stop signal to all the receivers
238239
stopReceivers()
239240

240241
// Wait for the Spark job that runs the receivers to be over
241242
// That is, for the receivers to quit gracefully.
242243
thread.join(10000)
243244

245+
if (graceful) {
246+
val pollTime = 100
247+
def done = { receiverInfo.isEmpty && !running }
248+
logInfo("Waiting for receiver job to terminate gracefully")
249+
while(!done) {
250+
Thread.sleep(pollTime)
251+
}
252+
logInfo("Waited for receiver job to terminate gracefully")
253+
}
254+
244255
// Check if all the receivers have been deregistered or not
245256
if (!receiverInfo.isEmpty) {
246257
logWarning("All of the receivers have not deregistered, " + receiverInfo)
@@ -295,7 +306,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
295306

296307
// Distribute the receivers and start them
297308
logInfo("Starting " + receivers.length + " receivers")
309+
running = true
298310
ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))
311+
running = false
299312
logInfo("All of the receivers have been terminated")
300313
}
301314

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,32 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
205205
}
206206
}
207207

208+
test("stop slow receiver gracefully") {
209+
val conf = new SparkConf().setMaster(master).setAppName(appName)
210+
conf.set("spark.streaming.gracefulStopTimeout", "20000")
211+
sc = new SparkContext(conf)
212+
logInfo("==================================\n\n\n")
213+
ssc = new StreamingContext(sc, Milliseconds(100))
214+
var runningCount = 0
215+
SlowTestReceiver.receivedAllRecords = false
216+
//Create test receiver that sleeps in onStop()
217+
val totalNumRecords = 15
218+
val recordsPerSecond = 1
219+
val input = ssc.receiverStream(new SlowTestReceiver(totalNumRecords, recordsPerSecond))
220+
input.count().foreachRDD { rdd =>
221+
val count = rdd.first()
222+
runningCount += count.toInt
223+
logInfo("Count = " + count + ", Running count = " + runningCount)
224+
}
225+
ssc.start()
226+
ssc.awaitTermination(500)
227+
ssc.stop(stopSparkContext = false, stopGracefully = true)
228+
logInfo("Running count = " + runningCount)
229+
assert(runningCount > 0)
230+
assert(runningCount == totalNumRecords)
231+
Thread.sleep(100)
232+
}
233+
208234
test("awaitTermination") {
209235
ssc = new StreamingContext(master, appName, batchDuration)
210236
val inputStream = addInputStream(ssc)
@@ -319,6 +345,38 @@ object TestReceiver {
319345
val counter = new AtomicInteger(1)
320346
}
321347

348+
/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
349+
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
350+
351+
var receivingThreadOption: Option[Thread] = None
352+
353+
def onStart() {
354+
val thread = new Thread() {
355+
override def run() {
356+
logInfo("Receiving started")
357+
for(i <- 1 to totalRecords) {
358+
Thread.sleep(1000 / recordsPerSecond)
359+
store(i)
360+
}
361+
SlowTestReceiver.receivedAllRecords = true
362+
logInfo(s"Received all $totalRecords records")
363+
}
364+
}
365+
receivingThreadOption = Some(thread)
366+
thread.start()
367+
}
368+
369+
def onStop() {
370+
// Simulate slow receiver by waiting for all records to be produced
371+
while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100)
372+
// no cleanup to be done, the receiving thread should stop on it own
373+
}
374+
}
375+
376+
object SlowTestReceiver {
377+
var receivedAllRecords = false
378+
}
379+
322380
/** Streaming application for testing DStream and RDD creation sites */
323381
package object testPackage extends Assertions {
324382
def test() {

0 commit comments

Comments
 (0)