Skip to content

Commit 9886f69

Browse files
author
Marcelo Vanzin
committed
[SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.
This fixes the thread leak. I also changed the unit test to keep track of allocated contexts and making sure they're closed after tests are run; this is needed since some tests use this pattern: val sc = createContext() doSomethingThatMayThrow() sc.stop()
1 parent 305abe1 commit 9886f69

File tree

3 files changed

+46
-34
lines changed

3 files changed

+46
-34
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.spark
1919

20+
import java.util.concurrent.{TimeUnit, Executors}
21+
2022
import scala.collection.mutable
2123

2224
import org.apache.spark.scheduler._
23-
import org.apache.spark.util.{SystemClock, Clock}
25+
import org.apache.spark.util.{Clock, SystemClock, Utils}
2426

2527
/**
2628
* An agent that dynamically allocates and removes executors based on the workload.
@@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
129131
// Listener for Spark events that impact the allocation policy
130132
private val listener = new ExecutorAllocationListener
131133

134+
// Executor that handles the scheduling task.
135+
private val executor = Executors.newSingleThreadScheduledExecutor(
136+
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
137+
132138
/**
133139
* Verify that the settings specified through the config are valid.
134140
* If not, throw an appropriate exception.
@@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
173179
}
174180

175181
/**
176-
* Register for scheduler callbacks to decide when to add and remove executors.
182+
* Register for scheduler callbacks to decide when to add and remove executors, and start
183+
* the scheduling task.
177184
*/
178185
def start(): Unit = {
179186
listenerBus.addListener(listener)
180-
startPolling()
187+
188+
val scheduleTask = new Runnable() {
189+
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
190+
}
191+
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
181192
}
182193

183194
/**
184-
* Start the main polling thread that keeps track of when to add and remove executors.
195+
* Stop the allocation manager.
185196
*/
186-
private def startPolling(): Unit = {
187-
val t = new Thread {
188-
override def run(): Unit = {
189-
while (true) {
190-
try {
191-
schedule()
192-
} catch {
193-
case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
194-
}
195-
Thread.sleep(intervalMillis)
196-
}
197-
}
198-
}
199-
t.setName("spark-dynamic-executor-allocation")
200-
t.setDaemon(true)
201-
t.start()
197+
def stop(): Unit = {
198+
executor.shutdown()
199+
executor.awaitTermination(10, TimeUnit.SECONDS)
202200
}
203201

204202
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1138,7 +1138,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
11381138
* Return whether dynamically adjusting the amount of resources allocated to
11391139
* this application is supported. This is currently only available for YARN.
11401140
*/
1141-
private[spark] def supportDynamicAllocation =
1141+
private[spark] def supportDynamicAllocation =
11421142
master.contains("yarn") || dynamicAllocationTesting
11431143

11441144
/**
@@ -1406,6 +1406,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14061406
dagScheduler = null
14071407
listenerBus.stop()
14081408
eventLogger.foreach(_.stop())
1409+
executorAllocationManager.foreach(_.stop())
14091410
env.actorSystem.stop(heartbeatReceiver)
14101411
progressBar.foreach(_.stop())
14111412
taskScheduler = null

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark
1919

2020
import scala.collection.mutable
2121

22-
import org.scalatest.{FunSuite, PrivateMethodTester}
22+
import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
2323
import org.apache.spark.executor.TaskMetrics
2424
import org.apache.spark.scheduler._
2525
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -28,10 +28,20 @@ import org.apache.spark.util.ManualClock
2828
/**
2929
* Test add and remove behavior of ExecutorAllocationManager.
3030
*/
31-
class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
31+
class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
3232
import ExecutorAllocationManager._
3333
import ExecutorAllocationManagerSuite._
3434

35+
private val contexts = new mutable.ListBuffer[SparkContext]()
36+
37+
before {
38+
contexts.clear()
39+
}
40+
41+
after {
42+
contexts.foreach(_.stop())
43+
}
44+
3545
test("verify min/max executors") {
3646
val conf = new SparkConf()
3747
.setMaster("local")
@@ -665,16 +675,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
665675
assert(removeTimes(manager).contains("executor-2"))
666676
assert(!removeTimes(manager).contains("executor-1"))
667677
}
668-
}
669-
670-
/**
671-
* Helper methods for testing ExecutorAllocationManager.
672-
* This includes methods to access private methods and fields in ExecutorAllocationManager.
673-
*/
674-
private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
675-
private val schedulerBacklogTimeout = 1L
676-
private val sustainedSchedulerBacklogTimeout = 2L
677-
private val executorIdleTimeout = 3L
678678

679679
private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
680680
val conf = new SparkConf()
@@ -688,9 +688,22 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
688688
sustainedSchedulerBacklogTimeout.toString)
689689
.set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
690690
.set("spark.dynamicAllocation.testing", "true")
691-
new SparkContext(conf)
691+
val sc = new SparkContext(conf)
692+
contexts += sc
693+
sc
692694
}
693695

696+
}
697+
698+
/**
699+
* Helper methods for testing ExecutorAllocationManager.
700+
* This includes methods to access private methods and fields in ExecutorAllocationManager.
701+
*/
702+
private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
703+
private val schedulerBacklogTimeout = 1L
704+
private val sustainedSchedulerBacklogTimeout = 2L
705+
private val executorIdleTimeout = 3L
706+
694707
private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
695708
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
696709
}

0 commit comments

Comments
 (0)