Skip to content

Commit 2180c87

Browse files
committed
Stop SparkListenerBus daemon thread when DAGScheduler is stopped.
1 parent ee6e7f9 commit 2180c87

File tree

3 files changed

+17
-5
lines changed

3 files changed

+17
-5
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ class DAGScheduler(
133133

134134
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
135135

136-
private[spark] val listenerBus = new SparkListenerBus()
136+
// An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped.
137+
private[spark] val listenerBus = new SparkListenerBus
137138

138139
// Contains the locations that each RDD's partitions are cached on
139140
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
@@ -1121,5 +1122,6 @@ class DAGScheduler(
11211122
}
11221123
metadataCleaner.cancel()
11231124
taskSched.stop()
1125+
listenerBus.stop()
11241126
}
11251127
}

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], propertie
4343
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
4444
extends SparkListenerEvents
4545

46+
/** An event used in the listener to shutdown the listener daemon thread. */
47+
private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents
48+
4649
trait SparkListener {
4750
/**
4851
* Called when a stage is completed, with information on the completed stage

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
2424
import org.apache.spark.Logging
2525

2626
/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
27-
private[spark] class SparkListenerBus() extends Logging {
28-
private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
27+
private[spark] class SparkListenerBus extends Logging {
28+
private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListener]
2929

3030
/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
3131
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
32-
private val EVENT_QUEUE_CAPACITY = 10000
32+
private val EVENT_QUEUE_CAPACITY = 10000
3333
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
3434
private var queueFullErrorMessageLogged = false
3535

36+
// Create a new daemon thread to listen for events. This thread is stopped when it receives
37+
// a SparkListenerShutdown event, using the stop method.
3638
new Thread("SparkListenerBus") {
3739
setDaemon(true)
3840
override def run() {
@@ -53,6 +55,9 @@ private[spark] class SparkListenerBus() extends Logging {
5355
sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
5456
case taskEnd: SparkListenerTaskEnd =>
5557
sparkListeners.foreach(_.onTaskEnd(taskEnd))
58+
case SparkListenerShutdown =>
59+
// Get out of the while loop and shutdown the daemon thread
60+
return
5661
case _ =>
5762
}
5863
}
@@ -80,7 +85,7 @@ private[spark] class SparkListenerBus() extends Logging {
8085
*/
8186
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
8287
val finishTime = System.currentTimeMillis + timeoutMillis
83-
while (!eventQueue.isEmpty()) {
88+
while (!eventQueue.isEmpty) {
8489
if (System.currentTimeMillis > finishTime) {
8590
return false
8691
}
@@ -90,4 +95,6 @@ private[spark] class SparkListenerBus() extends Logging {
9095
}
9196
return true
9297
}
98+
99+
def stop(): Unit = post(SparkListenerShutdown)
93100
}

0 commit comments

Comments
 (0)